123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- # /// script
- # dependencies = [
- # "fastapi",
- # "uvicorn",
- # "python-dotenv",
- # "httpx",
- # ]
- # ///
- #
- from fastapi import FastAPI, Request, HTTPException, Depends
- from fastapi.responses import RedirectResponse, HTMLResponse
- from dotenv import load_dotenv
- import os
- from cocorev.database import get_db
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.future import select
- from cocorev.utils import (
- fetch_companies_from_db,
- fetch_hubspot_companies,
- get_access_token,
- store_companies_in_db,
- store_access_token,
- )
- from cocorev.schemas import CompanySchema
- from cocorev.utils import (
- exchange_code_for_token,
- get_hubspot_contacts,
- get_hubspot_companies,
- )
- from cocorev.models import Company
- app = FastAPI()
- # Load environment variables
- load_dotenv()
- CLIENT_ID = os.getenv("HUBSPOT_CLIENT_ID")
- CLIENT_SECRET = os.getenv("HUBSPOT_CLIENT_SECRET")
- REDIRECT_URI = os.getenv("REDIRECT_URI")
- app = FastAPI()
- AUTH_URL = "https://app.hubspot.com/oauth/authorize/"
- @app.get("/", response_class=HTMLResponse)
- def home():
- auth_url = (
- f"{AUTH_URL}?client_id={CLIENT_ID}&redirect_uri={REDIRECT_URI}"
- f"&scope=crm.objects.companies.read&response_type=code"
- )
- return f'<a href="{auth_url}">Authorize with HubSpot</a>'
- @app.get("/oauth/callback")
- async def oauth_callback(request: Request, db: AsyncSession = Depends(get_db)):
- code = request.query_params.get("code")
- if not code:
- raise HTTPException(status_code=400, detail="Authorization failed.")
- token_data = await exchange_code_for_token(
- code, CLIENT_ID, CLIENT_SECRET, REDIRECT_URI
- )
- await store_access_token(db, token_data)
- return {"message": "Authorization successful, token stored!"}
- # Store the token data (in memory for now)
- access_token = token_data["access_token"]
- request.state.access_token = access_token
- return {"message": "Authorization successful", "access_token": access_token}
- @app.get("/contacts")
- async def contacts(request: Request):
- try:
- access_token = request.state.access_token
- except AttributeError:
- access_token = request.query_params.get("access_token")
- if not access_token:
- raise HTTPException(
- status_code=401, detail="Unauthorized. Please authenticate first."
- )
- contacts = await get_hubspot_contacts(access_token)
- return contacts
- @app.get("/sync-companies")
- async def sync_companies(db: AsyncSession = Depends(get_db)):
- access_token = await get_access_token(db)
- if not access_token:
- raise HTTPException(
- status_code=401, detail="No valid access token found. Please reauthorize."
- )
- companies = await fetch_hubspot_companies(access_token)
- await store_companies_in_db(db, companies)
- return {"message": "Companies synchronized successfully"}
- @app.get("/companies", response_model=list[CompanySchema])
- async def get_companies(db: AsyncSession = Depends(get_db)):
- """Fetch and return all companies in JSON format."""
- companies = await fetch_companies_from_db(db)
- return companies # FastAPI will serialize it correctly
- if __name__ == "__main__":
- app.run()
|