utils.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import httpx
  2. from datetime import datetime, timedelta
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from sqlalchemy import delete
  5. from sqlalchemy.future import select
  6. from cocorev.models import Token, Company
  7. TOKEN_URL = "https://api.hubapi.com/oauth/v1/token"
  8. CONTACTS_API_URL = "https://api.hubapi.com/crm/v3/objects/contacts"
  9. COMPANIES_API_URL = "https://api.hubapi.com/crm/v3/objects/companies"
  10. async def exchange_code_for_token(
  11. code, client_id, client_secret, redirect_uri
  12. ):
  13. async with httpx.AsyncClient() as client:
  14. response = await client.post(
  15. TOKEN_URL,
  16. data={
  17. "grant_type": "authorization_code",
  18. "client_id": client_id,
  19. "client_secret": client_secret,
  20. "redirect_uri": redirect_uri,
  21. "code": code,
  22. },
  23. )
  24. response.raise_for_status()
  25. return response.json()
  26. async def get_hubspot_contacts(access_token):
  27. async with httpx.AsyncClient() as client:
  28. response = await client.get(
  29. CONTACTS_API_URL,
  30. headers={"Authorization": f"Bearer {access_token}"},
  31. )
  32. response.raise_for_status()
  33. return response.json()
  34. async def get_hubspot_companies(access_token):
  35. async with httpx.AsyncClient() as client:
  36. response = await client.get(
  37. COMPANIES_API_URL,
  38. headers={"Authorization": f"Bearer {access_token}"},
  39. )
  40. response.raise_for_status()
  41. return response.json()
  42. async def fetch_hubspot_companies(access_token):
  43. async with httpx.AsyncClient() as client:
  44. response = await client.get(
  45. COMPANIES_API_URL,
  46. headers={"Authorization": f"Bearer {access_token}"},
  47. )
  48. response.raise_for_status()
  49. return response.json()["results"]
  50. async def store_companies_in_db(db: AsyncSession, companies):
  51. for company in companies:
  52. existing = await db.execute(
  53. select(Company).where(Company.hubspot_id == company.get("id"))
  54. )
  55. if existing.scalar():
  56. continue # Skip if already in DB
  57. new_company = Company(
  58. hubspot_id=company["id"],
  59. name=company["properties"].get("name", "Unknown"),
  60. domain=company["properties"].get("domain"),
  61. )
  62. db.add(new_company)
  63. await db.commit()
  64. async def store_access_token(db: AsyncSession, token_data: dict):
  65. expires_in = token_data["expires_in"]
  66. expires_at = datetime.utcnow() + timedelta(seconds=expires_in)
  67. # Remove existing tokens properly using ORM
  68. await db.execute(delete(Token))
  69. await db.commit()
  70. new_token = Token(
  71. access_token=token_data["access_token"],
  72. refresh_token=token_data.get("refresh_token"),
  73. expires_at=expires_at,
  74. )
  75. db.add(new_token)
  76. await db.commit()
  77. async def get_access_token(db: AsyncSession):
  78. result = await db.execute(select(Token))
  79. token = result.scalars().first()
  80. if token and token.expires_at > datetime.utcnow():
  81. return token.access_token
  82. return None
  83. async def fetch_companies_from_db(db: AsyncSession):
  84. """Fetches all companies and returns them as a list of dictionaries."""
  85. result = await db.execute(select(Company))
  86. companies = (
  87. result.scalars().all()
  88. ) # Correct way to fetch results in async SQLAlchemy
  89. return [
  90. company.__dict__ for company in companies
  91. ] # Convert ORM objects to dicts