1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- import json
- import imaplib
- import email
- from email.header import decode_header
- from profiles.models import UserProfile
- from scrobbles.models import Scrobble
- from scrobbles.scrobblers import email_scrobble_board_game
- import logging
- logger = logging.getLogger(__name__)
- def process_scrobbles_from_imap() -> list[Scrobble]:
- """For all user profiles with IMAP creds, check inbox for scrobbleable email attachments."""
- scrobbles_created: list[Scrobble] = []
- active_profiles = UserProfile.objects.filter(imap_auto_import=True)
- logger.info("Starting import of scrobbles from IMAP", extra={"active_profiles": active_profiles})
- for profile in active_profiles:
- logger.info(
- "Importing scrobbles from IMAP for user",
- extra={"user_id": profile.user_id},
- )
- mail = imaplib.IMAP4_SSL(profile.imap_url)
- mail.login(profile.imap_user, profile.imap_pass)
- mail.select("INBOX") # TODO configure this in profile
- # Search for unseen emails
- status, messages = mail.search(None, "UnSeen")
- if status != "OK":
- logger.info("IMAP status not OK", extra={"status": status})
- return
- for uid in messages[0].split():
- status, msg_data = mail.fetch(uid, "(RFC822)")
- if status != "OK":
- logger.info("IMAP status not OK", extra={"status": status})
- continue
- try:
- message = email.message_from_bytes(msg_data[0][1])
- logger.info(
- "Processing email message", extra={"email_msg": message}
- )
- except IndexError:
- logger.info("No email message data found")
- return
- for part in message.walk():
- if part.get_content_disposition() == "attachment":
- filename = part.get_filename()
- if filename:
- # Decode the filename if necessary
- decoded_name, encoding = decode_header(filename)[0]
- if isinstance(decoded_name, bytes):
- filename = decoded_name.decode(encoding or "utf-8")
- file_data = part.get_payload(decode=True)
- parsed_json = ""
- # Try parsing JSON if applicable
- parsed_json = {}
- if filename.lower().endswith(".bgsplay"):
- # TODO Pull this out into a parse_pgsplay function
- try:
- parsed_json = json.loads(
- file_data.decode("utf-8")
- )
- except Exception as e:
- logger.error("Failed to parse JSON file", extra={"filename": filename, "error": e})
- if not parsed_json:
- logger.info("No JSON found in BG Stats file", extra={"filename": filename})
- continue
- scrobble = email_scrobble_board_game(
- parsed_json, profile.user_id
- )
- if scrobble:
- scrobbles_created.append(scrobble)
- mail.logout()
- if scrobbles_created:
- logger.info(
- f"Creating {len(scrobbles_created)} new scrobbles",
- extra={"scrobbles_created": scrobbles_created},
- )
- return scrobbles_created
- logger.info(f"No new scrobbles found in IMAP folders")
- return []
|