123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import httpx
- from datetime import datetime, timedelta
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import delete
- from sqlalchemy.future import select
- from cocorev.models import Token, Company
- TOKEN_URL = "https://api.hubapi.com/oauth/v1/token"
- CONTACTS_API_URL = "https://api.hubapi.com/crm/v3/objects/contacts"
- COMPANIES_API_URL = "https://api.hubapi.com/crm/v3/objects/companies"
- async def exchange_code_for_token(
- code, client_id, client_secret, redirect_uri
- ):
- async with httpx.AsyncClient() as client:
- response = await client.post(
- TOKEN_URL,
- data={
- "grant_type": "authorization_code",
- "client_id": client_id,
- "client_secret": client_secret,
- "redirect_uri": redirect_uri,
- "code": code,
- },
- )
- response.raise_for_status()
- return response.json()
- async def get_hubspot_contacts(access_token):
- async with httpx.AsyncClient() as client:
- response = await client.get(
- CONTACTS_API_URL,
- headers={"Authorization": f"Bearer {access_token}"},
- )
- response.raise_for_status()
- return response.json()
- async def get_hubspot_companies(access_token):
- async with httpx.AsyncClient() as client:
- response = await client.get(
- COMPANIES_API_URL,
- headers={"Authorization": f"Bearer {access_token}"},
- )
- response.raise_for_status()
- return response.json()
- async def fetch_hubspot_companies(access_token):
- async with httpx.AsyncClient() as client:
- response = await client.get(
- COMPANIES_API_URL,
- headers={"Authorization": f"Bearer {access_token}"},
- )
- response.raise_for_status()
- return response.json()["results"]
- async def store_companies_in_db(db: AsyncSession, companies):
- for company in companies:
- existing = await db.execute(
- select(Company).where(Company.hubspot_id == company.get("id"))
- )
- if existing.scalar():
- continue # Skip if already in DB
- new_company = Company(
- hubspot_id=company["id"],
- name=company["properties"].get("name", "Unknown"),
- domain=company["properties"].get("domain"),
- )
- db.add(new_company)
- await db.commit()
- async def store_access_token(db: AsyncSession, token_data: dict):
- expires_in = token_data["expires_in"]
- expires_at = datetime.utcnow() + timedelta(seconds=expires_in)
- # Remove existing tokens properly using ORM
- await db.execute(delete(Token))
- await db.commit()
- new_token = Token(
- access_token=token_data["access_token"],
- refresh_token=token_data.get("refresh_token"),
- expires_at=expires_at,
- )
- db.add(new_token)
- await db.commit()
- async def get_access_token(db: AsyncSession):
- result = await db.execute(select(Token))
- token = result.scalars().first()
- if token and token.expires_at > datetime.utcnow():
- return token.access_token
- return None
- async def fetch_companies_from_db(db: AsyncSession):
- """Fetches all companies and returns them as a list of dictionaries."""
- result = await db.execute(select(Company))
- companies = (
- result.scalars().all()
- ) # Correct way to fetch results in async SQLAlchemy
- return [
- company.__dict__ for company in companies
- ] # Convert ORM objects to dicts
|