Parcourir la source

[boardgames] Start adding email scrobbling for board games

Colin Powell il y a 2 jours
Parent
commit
0fa831fa42

+ 39 - 0
vrobbler/apps/profiles/migrations/0024_userprofile_bgstat_id_userprofile_imap_auto_import_and_more.py

@@ -0,0 +1,39 @@
+# Generated by Django 4.2.19 on 2025-07-02 14:54
+
+from django.db import migrations, models
+import encrypted_field.fields
+
+
+class Migration(migrations.Migration):
+
+    dependencies = [
+        ("profiles", "0023_alter_userprofile_timezone"),
+    ]
+
+    operations = [
+        migrations.AddField(
+            model_name="userprofile",
+            name="bgstat_id",
+            field=models.CharField(blank=True, max_length=255, null=True),
+        ),
+        migrations.AddField(
+            model_name="userprofile",
+            name="imap_auto_import",
+            field=models.BooleanField(default=False),
+        ),
+        migrations.AddField(
+            model_name="userprofile",
+            name="imap_pass",
+            field=encrypted_field.fields.EncryptedField(blank=True, null=True),
+        ),
+        migrations.AddField(
+            model_name="userprofile",
+            name="imap_url",
+            field=models.CharField(blank=True, max_length=255, null=True),
+        ),
+        migrations.AddField(
+            model_name="userprofile",
+            name="imap_user",
+            field=models.CharField(blank=True, max_length=255, null=True),
+        ),
+    ]

+ 6 - 0
vrobbler/apps/profiles/models.py

@@ -31,6 +31,7 @@ class UserProfile(TimeStampedModel):
 
     task_context_tags_str = models.CharField(max_length=255, **BNULL)
 
+    bgstat_id = models.CharField(max_length=255, **BNULL)
     bgg_username = models.CharField(max_length=255, **BNULL)
     lichess_username = models.CharField(max_length=255, **BNULL)
 
@@ -43,6 +44,11 @@ class UserProfile(TimeStampedModel):
     webdav_pass = EncryptedField(**BNULL)
     webdav_auto_import = models.BooleanField(default=False)
 
+    imap_url = models.CharField(max_length=255, **BNULL)
+    imap_user = models.CharField(max_length=255, **BNULL)
+    imap_pass = EncryptedField(**BNULL)
+    imap_auto_import = models.BooleanField(default=False)
+
     ntfy_url = models.CharField(max_length=255, **BNULL)
     ntfy_enabled = models.BooleanField(default=False)
 

+ 98 - 0
vrobbler/apps/scrobbles/imap.py

@@ -0,0 +1,98 @@
+import json
+import imaplib
+import email
+from email.header import decode_header
+from django.core.files.base import ContentFile
+from profiles.models import UserProfile
+from scrobbles.models import Scrobble
+from scrobbles.scrobblers import email_scrobble_board_game
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def process_scrobbles_from_imap() -> list[Scrobble] | None:
+    """For all user profiles with IMAP creds, check inbox for scrobbleable email attachments."""
+    scrobbles_to_create: list[Scrobble] = []
+
+    active_profiles = UserProfile.objects.filter(imap_auto_import=True)
+    for profile in active_profiles:
+        logger.info(
+            "Importing scrobbles from IMAP for user",
+            extra={"user_id": profile.user_id},
+        )
+        mail = imaplib.IMAP4_SSL(profile.imap_url)
+        mail.login(profile.imap_user, profile.imap_pass)
+        mail.select("INBOX")
+
+        # Search for unseen emails
+        status, messages = mail.search(None, "(UNSEEN)")
+        if status != "OK":
+            return
+
+        for uid in messages[0].split():
+            status, msg_data = mail.fetch(uid, "(RFC822)")
+            if status != "OK":
+                logger.info("IMAP status not OK", extra={"status": status})
+                continue
+
+            try:
+                msg = email.message_from_bytes(msg_data[0][1])
+                logger.info("Processing email message", extra={"msg": msg})
+            except IndexError:
+                logger.info("No email message data found")
+                return
+
+            for part in msg.walk():
+                if part.get_content_disposition() == "attachment":
+                    filename = part.get_filename()
+                    if filename:
+                        # Decode the filename if necessary
+                        decoded_name, encoding = decode_header(filename)[0]
+                        if isinstance(decoded_name, bytes):
+                            filename = decoded_name.decode(encoding or "utf-8")
+
+                        file_data = part.get_payload(decode=True)
+
+                        parsed_json = ""
+
+                        # Try parsing JSON if applicable
+                        if filename.lower().endswith(".bgsplay"):
+                            try:
+                                parsed_json = json.loads(
+                                    file_data.decode("utf-8")
+                                )
+                                scrobbles_to_create.append(
+                                    email_scrobble_board_game(
+                                        parsed_json, profile.user_id
+                                    )
+                                )
+
+                            except Exception as e:
+                                # You might want to log this
+                                print(
+                                    f"Failed to parse JSON from {filename}: {e}"
+                                )
+
+                        # Avoid duplicates
+                        if not EmailAttachment.objects.filter(
+                            email_uid=uid.decode(), filename=filename
+                        ).exists():
+                            attachment = EmailAttachment(
+                                email_uid=uid.decode(), filename=filename
+                            )
+                            attachment.file.save(
+                                filename, ContentFile(file_data)
+                            )
+                            attachment.save()
+
+        mail.logout()
+
+    if scrobbles_to_create:
+        logger.info(
+            f"Creating {len(scrobbles_to_create)} new scrobbles",
+            extra={"scrobbles_to_create": scrobbles_to_create},
+        )
+        created_scrobbles = Scrobble.objects.bulk_create(scrobbles_to_create)
+        return created_scrobbles
+    logger.info(f"No new scrobbles found in IMAP folders")

+ 30 - 4
vrobbler/apps/scrobbles/scrobblers.py

@@ -1,7 +1,7 @@
 import logging
 import re
 from datetime import datetime
-from typing import Optional
+from typing import Any, Optional
 
 import pendulum
 import pytz
@@ -285,7 +285,7 @@ def manual_scrobble_book(
 
 def manual_scrobble_board_game(
     bggeek_id: str, user_id: int, action: Optional[str] = None
-):
+) -> Scrobble | None:
     boardgame = BoardGame.find_or_create(bggeek_id)
 
     if not boardgame:
@@ -311,6 +311,28 @@ def manual_scrobble_board_game(
     return Scrobble.create_or_update(boardgame, user_id, scrobble_dict)
 
 
+def email_scrobble_board_game(
+    bgstat_data: dict[str, Any], user_id: int
+) -> Scrobble | None:
+    game_dict: dict[str, Any] = bgstat_data.get("games", [])[0]
+    if not game_dict.get("bggId", False):
+        logger.info(
+            "Data from BG Stats JSON had not BGG ID, not scrobbling",
+            extra={"bgstat_data": bgstat_data},
+        )
+        return
+
+    boardgame = BoardGame.find_or_create(game_dict.get("bggId"))
+    # TODO Enrich data from our bgstats data?
+    #
+    # TODO Build up board game meta data from the rest of the data
+    scrobble_dict = {}
+    players_list: list[dict[str, Any]] = bgstat_data.get("players", [])
+    location: dict[str, Any] = bgstat_data.get("locations", [])[0]
+
+    return Scrobble.create_or_update(boardgame, user_id, scrobble_dict)
+
+
 def manual_scrobble_from_url(
     url: str, user_id: int, action: Optional[str] = None
 ) -> Scrobble:
@@ -411,7 +433,9 @@ def todoist_scrobble_task(
     stopped: bool = False,
     user_context_list: list[str] = [],
 ) -> Scrobble:
-    title = get_title_from_labels(todoist_task.get("todoist_label_list", []), user_context_list)
+    title = get_title_from_labels(
+        todoist_task.get("todoist_label_list", []), user_context_list
+    )
     task = Task.find_or_create(title)
 
     timestamp = pendulum.parse(todoist_task.get("updated_at", timezone.now()))
@@ -536,7 +560,9 @@ def emacs_scrobble_task(
     user_context_list: list[str] = [],
 ) -> Scrobble | None:
     source_id = task_data.get("source_id")
-    title = get_title_from_labels(task_data.get("labels", []), user_context_list)
+    title = get_title_from_labels(
+        task_data.get("labels", []), user_context_list
+    )
 
     task = Task.find_or_create(title)