checkworkmail.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. #!/usr/bin/env -S uv run --script
  2. #
  3. # /// script
  4. # dependencies = [
  5. # "requests",
  6. # "beautifulsoup4",
  7. # "imaplib2",
  8. # ]
  9. # ///
  10. import imaplib
  11. import email
  12. from email.header import decode_header
  13. import requests
  14. import time
  15. import os
  16. # ----------------------
  17. # CONFIGURATION
  18. # ----------------------
  19. GMAIL_USER = os.getenv("GMAIL_USER", "your-email@gmail.com")
  20. GMAIL_APP_PASSWORD = os.getenv("GMAIL_APP_PASSWORD", "") # Replace or use env vars
  21. NTFY_TOPIC = os.getenv("NTFY_TOPIC", "KKddGQxVm2LoP0JF")
  22. NTFY_SERVER = os.getenv("NTFY_SERVER", "https://ntfy.unbl.ink")
  23. CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "60")) # seconds
  24. def clean_subject(subject):
  25. """Decode and clean up the email subject"""
  26. decoded, encoding = decode_header(subject)[0]
  27. if isinstance(decoded, bytes):
  28. decoded = decoded.decode(encoding or 'utf-8', errors='ignore')
  29. # Remove problematic characters for HTTP headers
  30. return decoded.replace('\r', '').replace('\n', '').strip()
  31. def send_ntfy_notification(title, message):
  32. url = f"{NTFY_SERVER.rstrip('/')}/{NTFY_TOPIC}"
  33. requests.post(url, data=message.encode("utf-8"), headers={"Title": title})
  34. def check_github_notifications():
  35. print("Connecting to Gmail...")
  36. mail = imaplib.IMAP4_SSL("imap.gmail.com")
  37. mail.login(GMAIL_USER, GMAIL_APP_PASSWORD)
  38. mail.select("inbox")
  39. result, data = mail.search(None, '(UNSEEN FROM "notifications@github.com")')
  40. mail_ids = data[0].split()
  41. if not mail_ids:
  42. print("No new GitHub notifications.")
  43. mail.logout()
  44. return
  45. print(f"Found {len(mail_ids)} new GitHub notifications")
  46. for num in mail_ids:
  47. result, msg_data = mail.fetch(num, '(RFC822)')
  48. raw_email = msg_data[0][1]
  49. msg = email.message_from_bytes(raw_email)
  50. subject = clean_subject(msg["Subject"])
  51. from_email = msg.get("From")
  52. if msg.is_multipart():
  53. for part in msg.walk():
  54. if part.get_content_type() == "text/plain":
  55. body = part.get_payload(decode=True).decode(errors="ignore")
  56. break
  57. else:
  58. body = "(No plain text content found)"
  59. else:
  60. body = msg.get_payload(decode=True).decode(errors="ignore")
  61. print(f"Sending: {subject.replace('\n', '').replace('\r', '').strip()}")
  62. send_ntfy_notification(subject, body[:300])
  63. mail.logout()
  64. if __name__ == "__main__":
  65. while True:
  66. try:
  67. check_github_notifications()
  68. except Exception as e:
  69. print(f"Error: {e}")
  70. time.sleep(CHECK_INTERVAL)