#!/usr/bin/env -S uv run --script # # /// script # dependencies = [ # "requests", # "beautifulsoup4", # "imaplib2", # ] # /// import imaplib import email from email.header import decode_header from bs4 import BeautifulSoup import requests import time import os import re # ---------------------- # CONFIGURATION # ---------------------- GMAIL_USER = os.getenv("GMAIL_USER", "your-email@gmail.com") GMAIL_APP_PASSWORD = os.getenv("GMAIL_APP_PASSWORD", "your-app-password") NTFY_TOPIC = os.getenv("NTFY_TOPIC", "dev-notifications") NTFY_SERVER = os.getenv("NTFY_SERVER", "https://ntfy.sh") CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "60")) JIRA_SENDERS = ["jira@yourcompany.com", "jira@atlassian.net"] SLACK_SENDER = "notification@slack.com" GITHUB_SENDER = "notifications@github.com" JIRA_BASE_URL = os.getenv("JIRA_BASE_URL", "https://yourcompany.atlassian.net/browse") GH_STATE_AND_TITLE_TO_SKIP = ["New Relic Exporter", "cancelled", "skipped", "succeeded"] # ---------------------- # HELPERS # ---------------------- def sanitize_header(value: str) -> str: return re.sub(r'[\r\n]+', ' ', value).strip() def clean_subject(subject): decoded, encoding = decode_header(subject)[0] if isinstance(decoded, bytes): decoded = decoded.decode(encoding or 'utf-8', errors='ignore') return sanitize_header(decoded) def extract_body(msg): if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": return part.get_payload(decode=True).decode(errors="ignore") for part in msg.walk(): if part.get_content_type() == "text/html": html = part.get_payload(decode=True).decode(errors="ignore") return BeautifulSoup(html, "html.parser").get_text() else: payload = msg.get_payload(decode=True).decode(errors="ignore") if msg.get_content_type() == "text/html": return BeautifulSoup(payload, "html.parser").get_text() return payload return "(No readable body content found)" def clean_slack_body(body): # Remove intro like: "Hi Colin,\n\nYou have a new direct message from ..." cleaned = re.sub( r"Hi .*?,\s+You have a new direct message from.*?\(.*?\.slack\.com.*?\)\.\s+---", "", body, flags=re.DOTALL, ) return cleaned.strip() def clean_slack_body(body): # Remove intro like: "Hi Colin,\n\nYou have a new direct message from ..." cleaned = re.sub( r"Hi .*?,\s+You have a new direct message from.*?\(.*?\.slack\.com.*?\)\.\s+---", "", body, flags=re.DOTALL, ) return cleaned.strip() def clean_and_relocate_slack_footer(body): # Match the "View in the archives" block archive_pattern = re.compile( r"(@\w+)?\s*View in the archives: (https://.*?\.slack\.com/\S+)", re.IGNORECASE ) match = archive_pattern.search(body) if match: slack_url = match.group(2).strip() cleaned_body = archive_pattern.sub("", body).strip() # Format the link cleanly at the bottom footer = f"🔗 View in Slack: {slack_url}" return f"{cleaned_body}\n\n---\n{footer}" else: return body.strip() def extract_github_link(subject, body): pr_match = re.search(r'\(PR\s+#(\d+)\)', subject) repo_match = re.search(r'\[([^\]]+)\]', subject) # [owner/repo] if pr_match and repo_match: return f"https://github.com/{repo_match.group(1)}/pull/{pr_match.group(1)}" match = re.search(r'https://github\.com/\S+', body) return match.group(0) if match else None def extract_jira_link(subject): ticket_match = re.search(r'\b([A-Z]+-\d+)\b', subject) if ticket_match: return f"{JIRA_BASE_URL}/{ticket_match.group(1)}" return None def format_message(source, subject, body, link=None): return body.strip() or "" def send_ntfy_notification(title, message): url = f"{NTFY_SERVER.rstrip('/')}/{NTFY_TOPIC}" requests.post(url, data=message.encode("utf-8"), headers={"Title": sanitize_header(title)}) def is_github_email(from_email): return GITHUB_SENDER in from_email.lower() def is_jira_email(from_email, subject): return any(s in from_email.lower() for s in JIRA_SENDERS) or re.search(r'[A-Z]+-\d+', subject) def is_slack_email(from_email, subject): return SLACK_SENDER in from_email.lower() def archive_message(mail, email_id): try: if isinstance(email_id, bytes): email_id = email_id.decode() mail.store(email_id, '+X-GM-LABELS', 'processed') except Exception as e: print(f"Error archiving message {email_id}: {e}") # ---------------------- # MAIN LOOP # ---------------------- def check_notifications(): print("Checking inbox...") mail = imaplib.IMAP4_SSL("imap.gmail.com") mail.login(GMAIL_USER, GMAIL_APP_PASSWORD) mail.select("inbox") #result, data = mail.search(None, '(UNSEEN)') ok, data = mail.search(None, 'UNSEEN X-GM-LABELS', "inbox") #search_criteria = r'(UNSEEN X-GM-LABELS "\\Inbox")' #result, data = mail.uid('search', None, search_criteria) mail_ids = data[0].split() if not mail_ids: print("No new notifications.") mail.logout() return for num in mail_ids: result, msg_data = mail.fetch(num, '(RFC822)') raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) subject = clean_subject(msg.get("Subject", "No Subject")) from_email = msg.get("From", "") body = extract_body(msg) if is_github_email(from_email): source = "GitHub" link = extract_github_link(subject, body) elif is_jira_email(from_email, subject): source = "Jira" link = extract_jira_link(subject) elif is_slack_email(from_email, subject): source = "Slack" link = "" body = clean_and_relocate_slack_footer(clean_slack_body(body)) else: print(f"Skipping non-matching email: {subject}") continue message = format_message(source, subject, body, link) send_notice = True for exclusion in GH_STATE_AND_TITLE_TO_SKIP: if exclusion in subject: print(f"Skipping notification: {subject}") send_notice = False if send_notice: print(f"Sending {source} notification: {subject}") send_ntfy_notification(subject, message) archive_message(mail, num) mail.expunge() mail.logout() # ---------------------- # RUNNER # ---------------------- if __name__ == "__main__": while True: try: check_notifications() except Exception as e: print(f"Error: {e}") time.sleep(CHECK_INTERVAL)