||
- import logging
- import re
- from datetime import datetime, timedelta
- from typing import Any, Optional
- import pendulum
- import pytz
- from beers.models import Beer
- from boardgames.models import BoardGame, BoardGameDesigner, BoardGameLocation
- from books.constants import READCOMICSONLINE_URL
- from books.models import Book, BookLogData, BookPageLogData
- from books.utils import parse_readcomicsonline_uri
- from bricksets.models import BrickSet
- from dateutil.parser import parse
- from django.utils import timezone
- from locations.constants import LOCATION_PROVIDERS
- from locations.models import GeoLocation
- from music.constants import JELLYFIN_POST_KEYS, MOPIDY_POST_KEYS
- from music.models import Track
- from people.models import Person
- from podcasts.models import PodcastEpisode
- from podcasts.utils import parse_mopidy_uri
- from profiles.models import UserProfile
- from puzzles.models import Puzzle
- from scrobbles.constants import (
- JELLYFIN_AUDIO_ITEM_TYPES,
- MANUAL_SCROBBLE_FNS,
- SCROBBLE_CONTENT_URLS,
- )
- from scrobbles.models import Scrobble
- from scrobbles.notifications import ScrobbleNtfyNotification
- from scrobbles.utils import (
- convert_to_seconds,
- extract_domain,
- remove_last_part,
- next_url_if_exists,
- )
- from sports.models import SportEvent
- from sports.thesportsdb import lookup_event_from_thesportsdb
- from tasks.models import Task
- from tasks.utils import get_title_from_labels
- from videogames.howlongtobeat import lookup_game_from_hltb
- from videogames.models import VideoGame
- from videos.models import Video
- from webpages.models import WebPage
- logger = logging.getLogger(__name__)
- def mopidy_scrobble_media(post_data: dict, user_id: int) -> Scrobble:
- media_type = Scrobble.MediaType.TRACK
- if "podcast" in post_data.get("mopidy_uri", ""):
- media_type = Scrobble.MediaType.PODCAST_EPISODE
- logger.info(
- "[mopidy_webhook] called",
- extra={
- "user_id": user_id,
- "post_data": post_data,
- "media_type": media_type,
- },
- )
- if media_type == Scrobble.MediaType.PODCAST_EPISODE:
- parsed_data = parse_mopidy_uri(post_data.get("mopidy_uri", ""))
- if not parsed_data:
- logger.warning("Tried to scrobble podcast but no uri found", extra={"post_data": post_data})
- return Scrobble()
- media_obj = PodcastEpisode.find_or_create(**parsed_data)
- else:
- media_obj = Track.find_or_create(
- title=post_data.get("name", ""),
- artist_name=post_data.get("artist", ""),
- album_name=post_data.get("album", ""),
- run_time_seconds=post_data.get("run_time", 900000),
- )
- log = {}
- try:
- log = {"mopidy_source": post_data.get("mopidy_uri", "").split(":")[0]}
- except IndexError:
- pass
- return media_obj.scrobble_for_user(
- user_id,
- source="Mopidy",
- playback_position_seconds=int(
- post_data.get(MOPIDY_POST_KEYS.get("PLAYBACK_POSITION_TICKS"), 1)
- / 1000
- ),
- status=post_data.get(MOPIDY_POST_KEYS.get("STATUS"), ""),
- log=log,
- )
- def jellyfin_scrobble_media(
- post_data: dict, user_id: int
- ) -> Optional[Scrobble]:
- media_type = Scrobble.MediaType.VIDEO
- if post_data.pop("ItemType", "") in JELLYFIN_AUDIO_ITEM_TYPES:
- media_type = Scrobble.MediaType.TRACK
- null_position_on_progress = (
- post_data.get("PlaybackPosition") == "00:00:00"
- and post_data.get("NotificationType") == "PlaybackProgress"
- )
- # Jellyfin has some race conditions with it's webhooks, these hacks fix some of them
- if null_position_on_progress:
- logger.info(
- "[jellyfin_scrobble_media] no playback position tick, aborting",
- extra={"post_data": post_data},
- )
- return
- timestamp = parse(
- post_data.get(JELLYFIN_POST_KEYS.get("TIMESTAMP"), "")
- ).replace(tzinfo=pytz.utc)
- playback_position_seconds = int(
- post_data.get(JELLYFIN_POST_KEYS.get("PLAYBACK_POSITION_TICKS"), 1)
- / 10000000
- )
- if media_type == Scrobble.MediaType.VIDEO:
- media_obj = Video.get_from_imdb_id(
- post_data.get("Provider_imdb", "").replace("tt", "")
- )
- else:
- media_obj = Track.find_or_create(
- title=post_data.get("Name", ""),
- artist_name=post_data.get("Artist", ""),
- album_name=post_data.get("Album", ""),
- run_time_seconds=convert_to_seconds(
- post_data.get("RunTime", 900000)
- ),
- )
- # A hack because we don't worry about updating music ... we either finish it or we don't
- playback_position_seconds = 0
- if not media_obj:
- logger.info(
- "[jellyfin_scrobble_media] no video found from POST data",
- extra={"post_data": post_data},
- )
- return
- playback_status = "resumed"
- if post_data.get("IsPaused"):
- playback_status = "paused"
- elif post_data.get("NotificationType") == "PlaybackStop":
- playback_status = "stopped"
- return media_obj.scrobble_for_user(
- user_id,
- source=post_data.get(JELLYFIN_POST_KEYS.get("SOURCE")),
- playback_position_seconds=playback_position_seconds,
- status=playback_status,
- )
- def web_scrobbler_scrobble_media(
- youtube_id: str, user_id: int, status: str = "started"
- ) -> Optional[Scrobble]:
- video = Video.get_from_youtube_id(youtube_id)
- return video.scrobble_for_user(user_id, status, source="Web Scrobbler")
- def manual_scrobble_video(
- video_id: str, user_id: int, action: Optional[str] = None
- ):
- if "tt" in video_id:
- video = Video.get_from_imdb_id(video_id)
- else:
- video = Video.get_from_youtube_id(video_id)
- # When manually scrobbling, try finding a source from the series
- source = "Vrobbler"
- if video.tv_series:
- source = video.tv_series.preferred_source
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": source,
- }
- logger.info(
- "[scrobblers] manual video scrobble request received",
- extra={
- "video_id": video.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.VIDEO,
- },
- )
- scrobble = Scrobble.create_or_update(video, user_id, scrobble_dict)
- if action == "stop":
- scrobble.stop(force_finish=True)
- return scrobble
- def manual_scrobble_event(
- thesportsdb_id: str, user_id: int, action: Optional[str] = None
- ):
- data_dict = lookup_event_from_thesportsdb(thesportsdb_id)
- event = SportEvent.find_or_create(data_dict)
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "TheSportsDB",
- }
- return Scrobble.create_or_update(event, user_id, scrobble_dict)
- def manual_scrobble_video_game(
- hltb_id: str, user_id: int, action: Optional[str] = None
- ):
- game = VideoGame.objects.filter(hltb_id=hltb_id).first()
- if not game:
- data_dict = lookup_game_from_hltb(hltb_id)
- if not data_dict:
- logger.info(
- "[manual_scrobble_video_game] game not found on hltb",
- extra={
- "hltb_id": hltb_id,
- "user_id": user_id,
- "media_type": Scrobble.MediaType.VIDEO_GAME,
- },
- )
- return
- game = VideoGame.find_or_create(data_dict)
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- "long_play_complete": False,
- }
- logger.info(
- "[scrobblers] manual video game scrobble request received",
- extra={
- "videogame_id": game.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.VIDEO_GAME,
- },
- )
- return Scrobble.create_or_update(game, user_id, scrobble_dict)
- def manual_scrobble_book(
- title: str, user_id: int, action: Optional[str] = None
- ):
- log = {}
- source = "Vrobbler"
- page = None
- url = ""
- if READCOMICSONLINE_URL in title:
- url = title
- title, volume, page = parse_readcomicsonline_uri(title)
- if not title:
- logger.info(
- "[scrobblers] manual book scrobble request failed",
- extra={
- "title": title,
- "user_id": user_id,
- "media_type": Scrobble.MediaType.BOOK,
- },
- )
- return
- title = f"{title} - Issue {volume}"
- if not page:
- page = 1
- logger.info("[scrobblers] Book page included in scrobble, should update!")
- source = READCOMICSONLINE_URL.replace("https://", "")
- # TODO: Check for scrobble of this book already and if so, update the page count
- book = Book.find_or_create(title, url=url, enrich=True)
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": source,
- "long_play_complete": False,
- }
- logger.info(
- "[scrobblers] manual book scrobble request received",
- extra={
- "book_id": book.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.BOOK,
- },
- )
- scrobble = Scrobble.create_or_update(book, user_id, scrobble_dict, read_log_page=page)
- if action == "stop":
- if url:
- if isinstance(scrobble.log, "BookLogData"):
- scrobble.log.resume_url = next_url_if_exists(url)
- else:
- scrobble.log["resume_url"] = next_url_if_exists(url)
- scrobble.save(update_fields=["log"])
- scrobble.stop(force_finish=True)
- return scrobble
- def manual_scrobble_board_game(
- bggeek_id: str, user_id: int, action: Optional[str] = None
- ) -> Scrobble | None:
- boardgame = BoardGame.find_or_create(bggeek_id)
- if not boardgame:
- logger.error(f"No board game found for ID {bggeek_id}")
- return
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- }
- logger.info(
- "[vrobbler-scrobble] board game scrobble request received",
- extra={
- "boardgame_id": boardgame.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.BOARD_GAME,
- },
- )
- return Scrobble.create_or_update(boardgame, user_id, scrobble_dict)
- def find_and_enrich_board_game_data(game_dict: dict) -> BoardGame | None:
- """TODO Move this to a utility somewhere"""
- game = BoardGame.find_or_create(game_dict.get("bggId"))
- if game:
- game.cooperative = game_dict.get("cooperative", False)
- game.highest_wins = game_dict.get("highestWins", True)
- game.no_points = game_dict.get("noPoints", False)
- game.uses_teams = game_dict.get("useTeams", False)
- game.bgstats_id = game_dict.get("uuid", None)
- if not game.rating:
- game.rating = game_dict.get("rating") / 10
- game.save()
- if game_dict.get("designers"):
- for designer_name in game_dict.get("designers", "").split(", "):
- designer, created = BoardGameDesigner.objects.get_or_create(
- name=designer_name
- )
- game.designers.add(designer.id)
- return game
- def email_scrobble_board_game(
- bgstat_data: dict[str, Any], user_id: int
- ) -> list[Scrobble]:
- game_list: list = bgstat_data.get("games", [])
- if not game_list:
- logger.info(
- "No game data from BG Stats, not scrobbling",
- extra={"bgstat_data": bgstat_data},
- )
- return []
- player_dict = {}
- for player in bgstat_data.get("players", []):
- if player.get("isAnonymous"):
- person, _created = Person.objects.get_or_create(name="Anonymous")
- else:
- person, _created = Person.objects.get_or_create(
- bgstats_id=player.get("uuid")
- )
- if not person.name:
- person.name = player.get("name", "")
- person.save()
- player_dict[player.get("id")] = person
- base_games = {}
- expansions = {}
- log_data = {}
- for game in game_list:
- logger.info(f"Finding and enriching {game.get('name')}")
- enriched_game = find_and_enrich_board_game_data(game)
- if game.get("isBaseGame"):
- base_games[game.get("id")] = enriched_game
- if game.get("isExpansion"):
- expansions[game.get("id")] = enriched_game
- locations = {}
- for location_dict in bgstat_data.get("locations", []):
- location, _created = BoardGameLocation.objects.get_or_create(
- bgstats_id=location_dict.get("uuid")
- )
- update_fields = []
- if not location.name:
- location.name = location_dict.get("name")
- update_fields.append("name")
- geoloc = GeoLocation.objects.filter(
- title__icontains=location.name
- ).first()
- if geoloc:
- location.geo_location = geoloc
- update_fields.append("geo_location")
- if update_fields:
- location.save(update_fields=update_fields)
- locations[location_dict.get("id")] = location
- scrobbles_created = []
- second = 0
- for play_dict in bgstat_data.get("plays", []):
- hour = None
- minute = None
- second = None
- if "comments" in play_dict.keys():
- for line in play_dict.get("comments", "").split("\n"):
- if "Learning to play" in line:
- log_data["learning"] = True
- if "Start time:" in line:
- start_time = line.split(": ")[1]
- pieces = start_time.split(":")
- hour = int(pieces[0])
- minute = int(pieces[1])
- try:
- second = int(pieces[2])
- except IndexError:
- second = 0
- log_data["notes"] = [play_dict.get("comments")]
- log_data["expansion_ids"] = []
- try:
- base_game = base_games[play_dict.get("gameRefId")]
- except KeyError:
- try:
- base_game = expansions[play_dict.get("gameRefId")]
- except KeyError:
- logger.info(
- "Skipping scrobble of play, can't find game",
- extra={"play_dict": play_dict},
- )
- continue
- for eplay in play_dict.get("expansionPlays", []):
- expansion = expansions[eplay.get("gameRefId")]
- expansion.expansion_for_boardgame = base_game
- expansion.save()
- log_data["expansion_ids"].append(expansion.id)
- if log_data.get("expansion_ids") == []:
- log_data.pop("expansion_ids")
- if play_dict.get("locationRefId", False):
- log_data["location_id"] = locations[
- play_dict.get("locationRefId")
- ].id
- if play_dict.get("rounds", False):
- log_data["rounds"] = play_dict.get("rounds")
- if play_dict.get("board", False):
- log_data["board"] = play_dict.get("board")
- log_data["players"] = []
- for score_dict in play_dict.get("playerScores", []):
- log_data["players"].append(
- {
- "person_id": player_dict[score_dict.get("playerRefId")].id,
- "new": score_dict.get("newPlayer"),
- "win": score_dict.get("winner"),
- "score": score_dict.get("score"),
- "rank": score_dict.get("rank"),
- "seat_order": score_dict.get("seatOrder"),
- "role": score_dict.get("role"),
- }
- )
- timestamp = parse(play_dict.get("playDate"))
- if hour and minute:
- logger.info(f"Scrobble playDate has manual start time {timestamp}")
- timestamp = timestamp.replace(
- hour=hour, minute=minute, second=second or 0
- )
- logger.info(f"Update to {timestamp}")
- profile = UserProfile.objects.filter(user_id=user_id).first()
- timestamp = profile.get_timestamp_with_tz(timestamp)
- if play_dict.get("durationMin") > 0:
- duration_seconds = play_dict.get("durationMin") * 60
- else:
- duration_seconds = base_game.run_time_seconds
- stop_timestamp = timestamp + timedelta(seconds=duration_seconds)
- logger.info(f"Creating scrobble for {base_game} at {timestamp}")
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timestamp,
- "playback_position_seconds": duration_seconds,
- "source": "BG Stats",
- "log": log_data,
- }
- scrobble = None
- if timestamp.year > 2023:
- logger.info(
- "Scrobbles older than 2024 likely have no time associated just create it"
- )
- scrobble = Scrobble.objects.filter(
- board_game=base_game, user_id=user_id, timestamp=timestamp
- ).first()
- if scrobble:
- logger.info(
- "Scrobble already exists, skipping",
- extra={"scrobble_dict": scrobble_dict, "user_id": user_id},
- )
- continue
- scrobble = Scrobble.create_or_update(
- base_game, user_id, scrobble_dict, skip_in_progress_check=True
- )
- scrobble.timezone = timestamp.tzinfo.name
- scrobble.stop_timestamp = stop_timestamp
- scrobble.in_progress = False
- scrobble.played_to_completion = True
- scrobble.save()
- scrobbles_created.append(scrobble)
- ScrobbleNtfyNotification(scrobble).send()
- return scrobbles_created
- def manual_scrobble_from_url(
- url: str, user_id: int, action: Optional[str] = None
- ) -> Scrobble:
- """We have scrobblable media URLs, and then any other webpages that
- we want to scrobble as a media type in and of itself. This checks whether
- we know about the content type, and routes it to the appropriate media
- scrobbler. Otherwise, return nothing."""
- content_key = ""
- domain = extract_domain(url)
- for key, content_urls in SCROBBLE_CONTENT_URLS.items():
- for content_url in content_urls:
- if domain in content_url:
- content_key = key
- item_id = None
- if not content_key:
- content_key = "-w"
- item_id = url
- # Try generic search for any URL with digit-based IDs
- if not item_id:
- try:
- item_id = re.findall(r"\d+", url)[0]
- except IndexError:
- pass
- if content_key == "-i" and "v=" in url:
- item_id = url.split("v=")[1].split("&")[0]
- elif content_key == "-c" and "comics" in url:
- item_id = url
- elif content_key == "-i" and "title/tt" in url:
- item_id = "tt" + str(item_id)
- scrobble_fn = MANUAL_SCROBBLE_FNS[content_key]
- return eval(scrobble_fn)(item_id, user_id, action=action)
- def todoist_scrobble_task_finish(
- todoist_task: dict, user_id: int, timestamp: datetime
- ) -> Optional[Scrobble]:
- scrobble = Scrobble.objects.filter(
- user_id=user_id,
- log__todoist_id=todoist_task.get("todoist_id"),
- in_progress=True,
- played_to_completion=False,
- ).first()
- if not scrobble:
- logger.info(
- "[todoist_scrobble_task_finish] todoist webhook finish called on missing task"
- )
- return
- scrobble.stop(timestamp=timestamp, force_finish=True)
- return scrobble
- def todoist_scrobble_update_task(
- todoist_note: dict, user_id: int
- ) -> Optional[Scrobble]:
- scrobble = Scrobble.objects.filter(
- in_progress=True,
- user_id=user_id,
- log__todoist_id=todoist_note.get("task_id"),
- ).first()
- if not scrobble:
- logger.info(
- "[todoist_scrobble_update_task] no task found",
- extra={
- "todoist_note": todoist_note,
- "user_id": user_id,
- "media_type": Scrobble.MediaType.TASK,
- },
- )
- return
- if not scrobble.log.get("notes"):
- scrobble.log["notes"] = []
- scrobble.log["notes"].append(todoist_note.get("notes"))
- scrobble.save(update_fields=["log"])
- logger.info(
- "[todoist_scrobble_update_task] todoist note added",
- extra={
- "todoist_note": todoist_note,
- "user_id": user_id,
- "media_type": Scrobble.MediaType.TASK,
- },
- )
- return scrobble
- def todoist_scrobble_task(
- todoist_task: dict,
- user_id: int,
- started: bool = False,
- stopped: bool = False,
- user_context_list: list[str] = [],
- ) -> Scrobble:
- title = get_title_from_labels(
- todoist_task.get("todoist_label_list", []), user_context_list
- )
- task = Task.find_or_create(title)
- timestamp = pendulum.parse(todoist_task.pop("updated_at", timezone.now()))
- in_progress_scrobble = Scrobble.objects.filter(
- user_id=user_id,
- in_progress=True,
- log__todoist_id=todoist_task.get("todoist_id"),
- task=task,
- ).last()
- if not in_progress_scrobble and stopped:
- logger.info(
- "[todoist_scrobble_task] cannot stop already stopped task",
- extra={
- "todoist_type": todoist_task["todoist_type"],
- "todoist_event": todoist_task["todoist_event"],
- "todoist_id": todoist_task["todoist_id"],
- },
- )
- return
- if in_progress_scrobble and started:
- logger.info(
- "[todoist_scrobble_task] cannot start already started task",
- extra={
- "todoist_type": todoist_task["todoist_type"],
- "todoist_event": todoist_task["todoist_event"],
- "todoist_id": todoist_task["todoist_id"],
- },
- )
- return in_progress_scrobble
- # Finish an in-progress scrobble
- if in_progress_scrobble and stopped:
- logger.info(
- "[todoist_scrobble_task] finishing",
- extra={
- "todoist_type": todoist_task["todoist_type"],
- "todoist_event": todoist_task["todoist_event"],
- "todoist_id": todoist_task["todoist_id"],
- },
- )
- return todoist_scrobble_task_finish(todoist_task, user_id, timestamp)
- todoist_task["title"] = todoist_task.pop("description")
- todoist_task["description"] = todoist_task.pop("details")
- todoist_task["labels"] = todoist_task.pop("todoist_label_list", [])
- todoist_task.pop("todoist_type")
- todoist_task.pop("todoist_event")
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timestamp,
- "playback_position_seconds": 0,
- "source": "Todoist",
- "log": todoist_task,
- }
- logger.info(
- "[todoist_scrobble_task] creating",
- extra={
- "task_id": task.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.TASK,
- },
- )
- scrobble = Scrobble.create_or_update(task, user_id, scrobble_dict)
- return scrobble
- def emacs_scrobble_update_task(
- emacs_id: str, emacs_notes: dict, user_id: int
- ) -> Optional[Scrobble]:
- scrobble = Scrobble.objects.filter(
- in_progress=True,
- user_id=user_id,
- log__orgmode_id=emacs_id,
- source="Org-mode",
- ).first()
- if not scrobble:
- logger.info(
- "[emacs_scrobble_update_task] no task found",
- extra={
- "emacs_notes": emacs_notes,
- "user_id": user_id,
- "media_type": Scrobble.MediaType.TASK,
- },
- )
- return
- notes_updated = False
- for note in emacs_notes:
- existing_note_ts = [
- n.get("timestamp") for n in scrobble.log.get("notes", [])
- ]
- if not scrobble.log.get('notes"'):
- scrobble.log["notes"] = []
- if note.get("timestamp") not in existing_note_ts:
- scrobble.log["notes"].append(
- {note.get("timestamp"): note.get("content")}
- )
- notes_updated = True
- if notes_updated:
- scrobble.save(update_fields=["log"])
- logger.info(
- "[emacs_scrobble_update_task] emacs note added",
- extra={
- "emacs_note": emacs_notes,
- "user_id": user_id,
- "media_type": Scrobble.MediaType.TASK,
- },
- )
- return scrobble
- def emacs_scrobble_task(
- task_data: dict,
- user_id: int,
- started: bool = False,
- stopped: bool = False,
- user_context_list: list[str] = [],
- ) -> Scrobble | None:
- orgmode_id = task_data.get("source_id")
- title = get_title_from_labels(
- task_data.get("labels", []), user_context_list
- )
- task = Task.find_or_create(title)
- timestamp = pendulum.parse(task_data.pop("updated_at", timezone.now()))
- in_progress_scrobble = Scrobble.objects.filter(
- user_id=user_id,
- in_progress=True,
- log__orgmode_id=orgmode_id,
- log__source="orgmode",
- task=task,
- ).last()
- if not in_progress_scrobble and stopped:
- logger.info(
- "[emacs_scrobble_task] cannot stop already stopped task",
- extra={
- "orgmode_id": orgmode_id,
- },
- )
- return
- if in_progress_scrobble and started:
- logger.info(
- "[emacs_scrobble_task] cannot start already started task",
- extra={
- "ormode_id": orgmode_id,
- },
- )
- return in_progress_scrobble
- # Finish an in-progress scrobble
- if in_progress_scrobble and stopped:
- logger.info(
- "[emacs_scrobble_task] finishing",
- extra={
- "orgmode_id": orgmode_id,
- },
- )
- in_progress_scrobble.stop(timestamp=timestamp, force_finish=True)
- return in_progress_scrobble
- if in_progress_scrobble:
- return in_progress_scrobble
- notes = task_data.pop("notes")
- if notes:
- task_data["notes"] = [note.get("content") for note in notes]
- task_data["title"] = task_data.pop("description")
- task_data["description"] = task_data.pop("body")
- task_data["labels"] = task_data.pop("labels")
- task_data["orgmode_id"] = task_data.pop("source_id")
- task_data["orgmode_state"] = task_data.pop("state")
- task_data["orgmode_properties"] = task_data.pop("properties")
- task_data["orgmode_drawers"] = task_data.pop("drawers")
- task_data["orgmode_timestamps"] = task_data.pop("timestamps")
- task_data.pop("source")
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timestamp,
- "playback_position_seconds": 0,
- "source": "Org-mode",
- "log": task_data,
- }
- logger.info(
- "[emacs_scrobble_task] creating",
- extra={
- "task_id": task.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.TASK,
- },
- )
- scrobble = Scrobble.create_or_update(task, user_id, scrobble_dict)
- return scrobble
- def manual_scrobble_task(url: str, user_id: int, action: Optional[str] = None):
- source_id = re.findall(r"\d+", url)[0]
- if "todoist" in url:
- source = "Todoist"
- title = "Generic Todoist task"
- description = " ".join(url.split("/")[-1].split("-")[:-1]).capitalize()
- task = Task.find_or_create(title)
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": source,
- "log": {"description": description, "source_id": source_id},
- }
- logger.info(
- "[vrobbler-scrobble] webpage scrobble request received",
- extra={
- "task_id": task.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.WEBPAGE,
- },
- )
- scrobble = Scrobble.create_or_update(task, user_id, scrobble_dict)
- return scrobble
- def manual_scrobble_webpage(
- url: str, user_id: int, action: Optional[str] = None
- ):
- webpage = WebPage.find_or_create({"url": url})
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- }
- logger.info(
- "[vrobbler-scrobble] webpage scrobble request received",
- extra={
- "webpage_id": webpage.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.WEBPAGE,
- },
- )
- scrobble = Scrobble.create_or_update(webpage, user_id, scrobble_dict)
- if action == "stop":
- scrobble.stop(force_finish=True)
- else:
- # possibly async this?
- scrobble.push_to_archivebox()
- return scrobble
- def gpslogger_scrobble_location(data_dict: dict, user_id: int) -> Scrobble:
- location = GeoLocation.find_or_create(data_dict)
- timestamp = pendulum.parse(data_dict.get("time", timezone.now()))
- extra_data = {
- "user_id": user_id,
- "timestamp": timestamp,
- "source": "GPSLogger",
- "media_type": Scrobble.MediaType.GEO_LOCATION,
- }
- scrobble = Scrobble.create_or_update_location(
- location,
- extra_data,
- user_id,
- )
- provider = LOCATION_PROVIDERS[data_dict.get("prov")]
- if "gps_updates" not in scrobble.log.keys():
- scrobble.log["gps_updates"] = []
- scrobble.log["gps_updates"].append(
- {
- "timestamp": data_dict.get("time"),
- "position_provider": provider,
- }
- )
- if scrobble.timestamp:
- scrobble.playback_position_seconds = (
- timezone.now() - scrobble.timestamp
- ).seconds
- scrobble.save(update_fields=["log", "playback_position_seconds"])
- logger.info(
- "[gpslogger_webhook] gpslogger scrobble request received",
- extra={
- "scrobble_id": scrobble.id,
- "provider": provider,
- "user_id": user_id,
- "timestamp": extra_data.get("timestamp"),
- "raw_timestamp": data_dict.get("time"),
- "media_type": Scrobble.MediaType.GEO_LOCATION,
- },
- )
- return scrobble
- def web_scrobbler_scrobble_video_or_song(
- data_dict: dict, user_id: Optional[int]
- ) -> Scrobble:
- # We're not going to create music tracks, because the only time
- # we'd hit this is if we're listening to a concert or something.
- artist_name = data_dict.get("artist")
- track_name = data_dict.get("track")
- tracks = Track.objects.filter(
- artist__name=data_dict.get("artist"), title=data_dict.get("track")
- )
- if tracks.count() > 1:
- logger.warning(
- "Multiple tracks found for Web Scrobbler",
- extra={"artist": artist_name, "track": track_name},
- )
- track = tracks.first()
- # No track found, create a Video
- if not track:
- Video.get_from_youtube_id()
- # Now we run off a scrobble
- mopidy_data = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": data_dict.get("playback_time_ticks"),
- "source": "Mopidy",
- "mopidy_status": data_dict.get("status"),
- }
- logger.info(
- "[scrobblers] webhook mopidy scrobble request received",
- extra={
- "episode_id": episode.id if episode else None,
- "user_id": user_id,
- "scrobble_dict": mopidy_data,
- "media_type": Scrobble.MediaType.PODCAST_EPISODE,
- },
- )
- scrobble = None
- if episode:
- scrobble = Scrobble.create_or_update(episode, user_id, mopidy_data)
- return scrobble
- def manual_scrobble_beer(
- untappd_id: str, user_id: int, action: Optional[str] = None
- ):
- beer = Beer.find_or_create(untappd_id)
- if not beer:
- logger.error(f"No beer found for Untappd ID {untappd_id}")
- return
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- }
- logger.info(
- "[vrobbler-scrobble] beer scrobble request received",
- extra={
- "beer_id": beer.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.BEER,
- },
- )
- # TODO Kick out a process to enrich the media here, and in every scrobble event
- return Scrobble.create_or_update(beer, user_id, scrobble_dict)
- def manual_scrobble_puzzle(
- ipdb_id: str, user_id: int, action: Optional[str] = None
- ):
- puzzle = Puzzle.find_or_create(ipdb_id)
- if not puzzle:
- logger.error(f"No puzzle found for IPDB ID {ipdb_id}")
- return
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- }
- logger.info(
- "[vrobbler-scrobble] puzzle scrobble request received",
- extra={
- "puzzle_id": puzzle.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.PUZZLE,
- },
- )
- # TODO Kick out a process to enrich the media here, and in every scrobble event
- return Scrobble.create_or_update(puzzle, user_id, scrobble_dict)
- def manual_scrobble_brickset(
- brickset_id: str, user_id: int, action: Optional[str] = None
- ):
- brickset = BrickSet.find_or_create(brickset_id)
- if not brickset:
- logger.error(f"No brickset found for Brickset ID {brickset_id}")
- return
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- "log": {"serial_scrobble_id": ""},
- }
- logger.info(
- "[vrobbler-scrobble] brickset scrobble request received",
- extra={
- "brickset_id": brickset.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.BRICKSET,
- },
- )
- # TODO Kick out a process to enrich the media here, and in every scrobble event
- # TODO Need to check for past scrobbles and auto populate serial scrobble id if possible
- return Scrobble.create_or_update(brickset, user_id, scrobble_dict)
|