imap.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import json
  2. import imaplib
  3. import email
  4. from email.header import decode_header
  5. from profiles.models import UserProfile
  6. from scrobbles.models import Scrobble
  7. from scrobbles.scrobblers import email_scrobble_board_game
  8. import logging
  9. logger = logging.getLogger(__name__)
  10. def process_scrobbles_from_imap() -> list[Scrobble]:
  11. """For all user profiles with IMAP creds, check inbox for scrobbleable email attachments."""
  12. scrobbles_created: list[Scrobble] = []
  13. active_profiles = UserProfile.objects.filter(imap_auto_import=True)
  14. logger.info("Starting import of scrobbles from IMAP", extra={"active_profiles": active_profiles})
  15. for profile in active_profiles:
  16. logger.info(
  17. "Importing scrobbles from IMAP for user",
  18. extra={"user_id": profile.user_id},
  19. )
  20. mail = imaplib.IMAP4_SSL(profile.imap_url)
  21. mail.login(profile.imap_user, profile.imap_pass)
  22. mail.select("INBOX") # TODO configure this in profile
  23. # Search for unseen emails
  24. status, messages = mail.search(None, "UnSeen")
  25. if status != "OK":
  26. logger.info("IMAP status not OK", extra={"status": status})
  27. return
  28. for uid in messages[0].split():
  29. status, msg_data = mail.fetch(uid, "(RFC822)")
  30. if status != "OK":
  31. logger.info("IMAP status not OK", extra={"status": status})
  32. continue
  33. try:
  34. message = email.message_from_bytes(msg_data[0][1])
  35. logger.info(
  36. "Processing email message", extra={"email_msg": message}
  37. )
  38. except IndexError:
  39. logger.info("No email message data found")
  40. return
  41. for part in message.walk():
  42. if part.get_content_disposition() == "attachment":
  43. filename = part.get_filename()
  44. if filename:
  45. # Decode the filename if necessary
  46. decoded_name, encoding = decode_header(filename)[0]
  47. if isinstance(decoded_name, bytes):
  48. filename = decoded_name.decode(encoding or "utf-8")
  49. file_data = part.get_payload(decode=True)
  50. parsed_json = ""
  51. # Try parsing JSON if applicable
  52. parsed_json = {}
  53. if filename.lower().endswith(".bgsplay"):
  54. # TODO Pull this out into a parse_pgsplay function
  55. try:
  56. parsed_json = json.loads(
  57. file_data.decode("utf-8")
  58. )
  59. except Exception as e:
  60. logger.error("Failed to parse JSON file", extra={"filename": filename, "error": e})
  61. if not parsed_json:
  62. logger.info("No JSON found in BG Stats file", extra={"filename": filename})
  63. continue
  64. scrobble = email_scrobble_board_game(
  65. parsed_json, profile.user_id
  66. )
  67. if scrobble:
  68. scrobbles_created.append(scrobble)
  69. mail.logout()
  70. if scrobbles_created:
  71. logger.info(
  72. f"Creating {len(scrobbles_created)} new scrobbles",
  73. extra={"scrobbles_created": scrobbles_created},
  74. )
  75. return scrobbles_created
  76. logger.info(f"No new scrobbles found in IMAP folders")
  77. return []