main.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. # /// script
  2. # dependencies = [
  3. # "fastapi",
  4. # "uvicorn",
  5. # "python-dotenv",
  6. # "httpx",
  7. # ]
  8. # ///
  9. #
  10. from fastapi import FastAPI, Request, HTTPException, Depends
  11. from fastapi.responses import RedirectResponse, HTMLResponse
  12. from dotenv import load_dotenv
  13. import os
  14. from cocorev.database import get_db
  15. from sqlalchemy.ext.asyncio import AsyncSession
  16. from sqlalchemy.future import select
  17. from cocorev.utils import (
  18. fetch_companies_from_db,
  19. fetch_hubspot_companies,
  20. get_access_token,
  21. store_companies_in_db,
  22. store_access_token,
  23. )
  24. from cocorev.schemas import CompanySchema
  25. from cocorev.utils import (
  26. exchange_code_for_token,
  27. get_hubspot_contacts,
  28. get_hubspot_companies,
  29. )
  30. from cocorev.models import Company
  31. app = FastAPI()
  32. # Load environment variables
  33. load_dotenv()
  34. CLIENT_ID = os.getenv("HUBSPOT_CLIENT_ID")
  35. CLIENT_SECRET = os.getenv("HUBSPOT_CLIENT_SECRET")
  36. REDIRECT_URI = os.getenv("REDIRECT_URI")
  37. app = FastAPI()
  38. AUTH_URL = "https://app.hubspot.com/oauth/authorize/"
  39. @app.get("/", response_class=HTMLResponse)
  40. def home():
  41. auth_url = (
  42. f"{AUTH_URL}?client_id={CLIENT_ID}&redirect_uri={REDIRECT_URI}"
  43. f"&scope=crm.objects.companies.read&response_type=code"
  44. )
  45. return f'<a href="{auth_url}">Authorize with HubSpot</a>'
  46. @app.get("/oauth/callback")
  47. async def oauth_callback(request: Request, db: AsyncSession = Depends(get_db)):
  48. code = request.query_params.get("code")
  49. if not code:
  50. raise HTTPException(status_code=400, detail="Authorization failed.")
  51. token_data = await exchange_code_for_token(
  52. code, CLIENT_ID, CLIENT_SECRET, REDIRECT_URI
  53. )
  54. await store_access_token(db, token_data)
  55. return {"message": "Authorization successful, token stored!"}
  56. # Store the token data (in memory for now)
  57. access_token = token_data["access_token"]
  58. request.state.access_token = access_token
  59. return {"message": "Authorization successful", "access_token": access_token}
  60. @app.get("/contacts")
  61. async def contacts(request: Request):
  62. try:
  63. access_token = request.state.access_token
  64. except AttributeError:
  65. access_token = request.query_params.get("access_token")
  66. if not access_token:
  67. raise HTTPException(
  68. status_code=401, detail="Unauthorized. Please authenticate first."
  69. )
  70. contacts = await get_hubspot_contacts(access_token)
  71. return contacts
  72. @app.get("/sync-companies")
  73. async def sync_companies(db: AsyncSession = Depends(get_db)):
  74. access_token = await get_access_token(db)
  75. if not access_token:
  76. raise HTTPException(
  77. status_code=401, detail="No valid access token found. Please reauthorize."
  78. )
  79. companies = await fetch_hubspot_companies(access_token)
  80. await store_companies_in_db(db, companies)
  81. return {"message": "Companies synchronized successfully"}
  82. @app.get("/companies", response_model=list[CompanySchema])
  83. async def get_companies(db: AsyncSession = Depends(get_db)):
  84. """Fetch and return all companies in JSON format."""
  85. companies = await fetch_companies_from_db(db)
  86. return companies # FastAPI will serialize it correctly
  87. if __name__ == "__main__":
  88. app.run()