1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000 |
- import logging
- import re
- from datetime import datetime, timedelta
- from typing import Any, Optional
- import pendulum
- import pytz
- from beers.models import Beer
- from boardgames.models import BoardGame, BoardGameDesigner, BoardGameLocation
- from books.models import Book
- from bricksets.models import BrickSet
- from dateutil.parser import parse
- from django.utils import timezone
- from locations.constants import LOCATION_PROVIDERS
- from locations.models import GeoLocation
- from music.constants import JELLYFIN_POST_KEYS, MOPIDY_POST_KEYS
- from music.models import Track
- from people.models import Person
- from podcasts.models import PodcastEpisode
- from podcasts.utils import parse_mopidy_uri
- from puzzles.models import Puzzle
- from scrobbles.constants import (
- JELLYFIN_AUDIO_ITEM_TYPES,
- MANUAL_SCROBBLE_FNS,
- SCROBBLE_CONTENT_URLS,
- )
- from scrobbles.models import Scrobble
- from scrobbles.utils import convert_to_seconds, extract_domain
- from sports.models import SportEvent
- from sports.thesportsdb import lookup_event_from_thesportsdb
- from tasks.models import Task
- from tasks.utils import get_title_from_labels
- from videogames.howlongtobeat import lookup_game_from_hltb
- from videogames.models import VideoGame
- from videos.models import Video
- from webpages.models import WebPage
- logger = logging.getLogger(__name__)
- def mopidy_scrobble_media(post_data: dict, user_id: int) -> Scrobble:
- media_type = Scrobble.MediaType.TRACK
- if "podcast" in post_data.get("mopidy_uri", ""):
- media_type = Scrobble.MediaType.PODCAST_EPISODE
- logger.info(
- "[mopidy_webhook] called",
- extra={
- "user_id": user_id,
- "post_data": post_data,
- "media_type": media_type,
- },
- )
- if media_type == Scrobble.MediaType.PODCAST_EPISODE:
- parsed_data = parse_mopidy_uri(post_data.get("mopidy_uri", ""))
- podcast_name = post_data.get(
- "album", parsed_data.get("podcast_name", "")
- )
- media_obj = PodcastEpisode.find_or_create(
- title=parsed_data.get("episode_filename", ""),
- podcast_name=podcast_name,
- producer_name=post_data.get("artist", ""),
- number=parsed_data.get("episode_num", ""),
- pub_date=parsed_data.get("pub_date", ""),
- mopidy_uri=post_data.get("mopidy_uri", ""),
- )
- else:
- media_obj = Track.find_or_create(
- title=post_data.get("name", ""),
- artist_name=post_data.get("artist", ""),
- album_name=post_data.get("album", ""),
- run_time_seconds=post_data.get("run_time", 900000),
- )
- log = {}
- try:
- log = {"mopidy_source": post_data.get("mopidy_uri", "").split(":")[0]}
- except IndexError:
- pass
- return media_obj.scrobble_for_user(
- user_id,
- source="Mopidy",
- playback_position_seconds=int(
- post_data.get(MOPIDY_POST_KEYS.get("PLAYBACK_POSITION_TICKS"), 1)
- / 1000
- ),
- status=post_data.get(MOPIDY_POST_KEYS.get("STATUS"), ""),
- log=log,
- )
- def jellyfin_scrobble_media(
- post_data: dict, user_id: int
- ) -> Optional[Scrobble]:
- media_type = Scrobble.MediaType.VIDEO
- if post_data.pop("ItemType", "") in JELLYFIN_AUDIO_ITEM_TYPES:
- media_type = Scrobble.MediaType.TRACK
- null_position_on_progress = (
- post_data.get("PlaybackPosition") == "00:00:00"
- and post_data.get("NotificationType") == "PlaybackProgress"
- )
- # Jellyfin has some race conditions with it's webhooks, these hacks fix some of them
- if null_position_on_progress:
- logger.info(
- "[jellyfin_scrobble_media] no playback position tick, aborting",
- extra={"post_data": post_data},
- )
- return
- timestamp = parse(
- post_data.get(JELLYFIN_POST_KEYS.get("TIMESTAMP"), "")
- ).replace(tzinfo=pytz.utc)
- playback_position_seconds = int(
- post_data.get(JELLYFIN_POST_KEYS.get("PLAYBACK_POSITION_TICKS"), 1)
- / 10000000
- )
- if media_type == Scrobble.MediaType.VIDEO:
- media_obj = Video.get_from_imdb_id(
- post_data.get("Provider_imdb", "").replace("tt", "")
- )
- else:
- media_obj = Track.find_or_create(
- title=post_data.get("Name", ""),
- artist_name=post_data.get("Artist", ""),
- album_name=post_data.get("Album", ""),
- run_time_seconds=convert_to_seconds(
- post_data.get("RunTime", 900000)
- ),
- musicbrainz_id=post_data.get("Provider_musicbrainztrack", ""),
- )
- # A hack because we don't worry about updating music ... we either finish it or we don't
- playback_position_seconds = 0
- if not media_obj:
- logger.info(
- "[jellyfin_scrobble_media] no video found from POST data",
- extra={"post_data": post_data},
- )
- return
- playback_status = "resumed"
- if post_data.get("IsPaused"):
- playback_status = "paused"
- elif post_data.get("NotificationType") == "PlaybackStop":
- playback_status = "stopped"
- return media_obj.scrobble_for_user(
- user_id,
- source=post_data.get(JELLYFIN_POST_KEYS.get("SOURCE")),
- playback_position_seconds=playback_position_seconds,
- status=playback_status,
- )
- def web_scrobbler_scrobble_media(
- youtube_id: str, user_id: int, status: str = "started"
- ) -> Optional[Scrobble]:
- video = Video.get_from_youtube_id(youtube_id)
- return video.scrobble_for_user(user_id, status, source="Web Scrobbler")
- def manual_scrobble_video(
- video_id: str, user_id: int, action: Optional[str] = None
- ):
- if "tt" in video_id:
- video = Video.get_from_imdb_id(video_id)
- else:
- video = Video.get_from_youtube_id(video_id)
- # When manually scrobbling, try finding a source from the series
- source = "Vrobbler"
- if video.tv_series:
- source = video.tv_series.preferred_source
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": source,
- }
- logger.info(
- "[scrobblers] manual video scrobble request received",
- extra={
- "video_id": video.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.VIDEO,
- },
- )
- scrobble = Scrobble.create_or_update(video, user_id, scrobble_dict)
- if action == "stop":
- scrobble.stop(force_finish=True)
- return scrobble
- def manual_scrobble_event(
- thesportsdb_id: str, user_id: int, action: Optional[str] = None
- ):
- data_dict = lookup_event_from_thesportsdb(thesportsdb_id)
- event = SportEvent.find_or_create(data_dict)
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "TheSportsDB",
- }
- return Scrobble.create_or_update(event, user_id, scrobble_dict)
- def manual_scrobble_video_game(
- hltb_id: str, user_id: int, action: Optional[str] = None
- ):
- game = VideoGame.objects.filter(hltb_id=hltb_id).first()
- if not game:
- data_dict = lookup_game_from_hltb(hltb_id)
- if not data_dict:
- logger.info(
- "[manual_scrobble_video_game] game not found on hltb",
- extra={
- "hltb_id": hltb_id,
- "user_id": user_id,
- "media_type": Scrobble.MediaType.VIDEO_GAME,
- },
- )
- return
- game = VideoGame.find_or_create(data_dict)
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- "long_play_complete": False,
- }
- logger.info(
- "[scrobblers] manual video game scrobble request received",
- extra={
- "videogame_id": game.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.VIDEO_GAME,
- },
- )
- return Scrobble.create_or_update(game, user_id, scrobble_dict)
- def manual_scrobble_book(
- title: str, user_id: int, action: Optional[str] = None
- ):
- book = Book.get_from_google(title)
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- "long_play_complete": False,
- }
- logger.info(
- "[scrobblers] manual book scrobble request received",
- extra={
- "book_id": book.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.BOOK,
- },
- )
- return Scrobble.create_or_update(book, user_id, scrobble_dict)
- def manual_scrobble_board_game(
- bggeek_id: str, user_id: int, action: Optional[str] = None
- ) -> Scrobble | None:
- boardgame = BoardGame.find_or_create(bggeek_id)
- if not boardgame:
- logger.error(f"No board game found for ID {bggeek_id}")
- return
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- }
- logger.info(
- "[vrobbler-scrobble] board game scrobble request received",
- extra={
- "boardgame_id": boardgame.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.BOARD_GAME,
- },
- )
- return Scrobble.create_or_update(boardgame, user_id, scrobble_dict)
- def find_and_enrich_board_game_data(game_dict: dict) -> BoardGame | None:
- """TODO Move this to a utility somewhere"""
- game = BoardGame.find_or_create(game_dict.get("bggId"))
- if game:
- game.cooperative = game_dict.get("cooperative", False)
- game.highest_wins = game_dict.get("highestWins", True)
- game.no_points = game_dict.get("noPoints", False)
- game.uses_teams = game_dict.get("useTeams", False)
- if not game.rating:
- game.rating = game_dict.get("rating") / 10
- game.save()
- if game_dict.get("designers"):
- for designer_name in game_dict.get("designers", "").split(", "):
- BoardGameDesigner.objects.get_or_create(name=designer_name)
- return game
- def email_scrobble_board_game(
- bgstat_data: dict[str, Any], user_id: int
- ) -> list[Scrobble]:
- game_list: list = bgstat_data.get("games", [])
- if not game_list:
- logger.info(
- "No game data from BG Stats, not scrobbling",
- extra={"bgstat_data": bgstat_data},
- )
- return []
- player_dict = {}
- for player in bgstat_data.get("players", []):
- if player.get("isAnonymous"):
- person, _created = Person.objects.get_or_create(name="Anonymous")
- else:
- person, _created = Person.objects.get_or_create(
- bgstats_id=player.get("uuid")
- )
- if not person.name:
- person.name = player.get("name", "")
- person.save()
- player_dict[player.get("id")] = person
- base_games = {}
- expansions = {}
- log_data = {}
- for game in game_list:
- logger.info(f"Finding and enriching {game.get('name')}")
- enriched_game = find_and_enrich_board_game_data(game)
- if game.get("isBaseGame"):
- base_games[game.get("id")] = enriched_game
- elif game.get("isExpansion"):
- expansions[game.get("id")] = enriched_game
- locations = {}
- for location_dict in bgstat_data.get("locations", []):
- location, _created = BoardGameLocation.objects.get_or_create(
- bgstats_id=location_dict.get("uuid")
- )
- update_fields = []
- if not location.name:
- location.name = location_dict.get("name")
- update_fields.append("name")
- geoloc = GeoLocation.objects.filter(
- title__icontains=location.name
- ).first()
- if geoloc:
- location.geo_location = geoloc
- update_fields.append("geo_location")
- if update_fields:
- location.save(update_fields=update_fields)
- locations[location_dict.get("id")] = location
- scrobbles_created = []
- for play_dict in bgstat_data.get("plays", []):
- log_data["expansion_ids"] = []
- try:
- base_game = base_games[play_dict.get("gameRefId")]
- except KeyError:
- try:
- base_game = expansions[play_dict.get("gameRefId")]
- except KeyError:
- print(play_dict)
- logger.info(
- "Skipping scrobble of play, can't find game",
- extra={"play_dict": play_dict},
- )
- continue
- for eplay in play_dict.get("expansionPlays", []):
- expansion = expansions[eplay.get("gameRefId")]
- expansion.expansion_for_boardgame = base_game
- expansion.save()
- log_data["expansion_ids"].append(expansion.id)
- if play_dict.get("locationRefId", False):
- log_data["location_id"] = locations[
- play_dict.get("locationRefId")
- ].id
- if play_dict.get("rounds", False):
- log_data["rounds"] = play_dict.get("rounds")
- if play_dict.get("board", False):
- log_data["board"] = play_dict.get("board")
- log_data["players"] = []
- for score_dict in play_dict.get("playerScores", []):
- log_data["players"].append(
- {
- "person_id": player_dict[score_dict.get("playerRefId")].id,
- "new": score_dict.get("newPlayer"),
- "win": score_dict.get("winner"),
- "score": score_dict.get("score"),
- "rank": score_dict.get("rank"),
- "seat_order": score_dict.get("seatOrder"),
- "role": score_dict.get("role"),
- }
- )
- start = parse(play_dict.get("playDate"))
- if play_dict.get("durationMin") > 0:
- duration_seconds = play_dict.get("durationMin") * 60
- else:
- duration_seconds = base_game.run_time_seconds
- stop = start + timedelta(seconds=duration_seconds)
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": start,
- "playback_position_seconds": duration_seconds,
- "source": "BG Stats",
- "log": log_data,
- }
- print(scrobble_dict)
- scrobble = Scrobble.create_or_update(base_game, user_id, scrobble_dict)
- scrobble.stop_timestamp = stop
- scrobble.in_progress = False
- scrobble.played_to_completion = True
- scrobble.save()
- scrobbles_created.append(scrobble)
- return scrobbles_created
- def manual_scrobble_from_url(
- url: str, user_id: int, action: Optional[str] = None
- ) -> Scrobble:
- """We have scrobblable media URLs, and then any other webpages that
- we want to scrobble as a media type in and of itself. This checks whether
- we know about the content type, and routes it to the appropriate media
- scrobbler. Otherwise, return nothing."""
- content_key = ""
- domain = extract_domain(url)
- for key, content_urls in SCROBBLE_CONTENT_URLS.items():
- for content_url in content_urls:
- if domain in content_url:
- content_key = key
- item_id = None
- if not content_key:
- content_key = "-w"
- item_id = url
- # Try generic search for any URL with digit-based IDs
- if not item_id:
- try:
- item_id = re.findall(r"\d+", url)[0]
- except IndexError:
- pass
- if content_key == "-i" and "v=" in url:
- item_id = url.split("v=")[1].split("&")[0]
- elif content_key == "-i" and "title/tt" in url:
- item_id = "tt" + str(item_id)
- scrobble_fn = MANUAL_SCROBBLE_FNS[content_key]
- return eval(scrobble_fn)(item_id, user_id, action=action)
- def todoist_scrobble_task_finish(
- todoist_task: dict, user_id: int, timestamp: datetime
- ) -> Optional[Scrobble]:
- scrobble = Scrobble.objects.filter(
- user_id=user_id,
- log__todoist_id=todoist_task.get("todoist_id"),
- in_progress=True,
- played_to_completion=False,
- ).first()
- if not scrobble:
- logger.info(
- "[todoist_scrobble_task_finish] todoist webhook finish called on missing task"
- )
- return
- scrobble.stop(timestamp=timestamp, force_finish=True)
- return scrobble
- def todoist_scrobble_update_task(
- todoist_note: dict, user_id: int
- ) -> Optional[Scrobble]:
- scrobble = Scrobble.objects.filter(
- in_progress=True,
- user_id=user_id,
- log__todoist_id=todoist_note.get("task_id"),
- ).first()
- if not scrobble:
- logger.info(
- "[todoist_scrobble_update_task] no task found",
- extra={
- "todoist_note": todoist_note,
- "user_id": user_id,
- "media_type": Scrobble.MediaType.TASK,
- },
- )
- return
- existing_notes = scrobble.log.get("notes", {})
- existing_notes[todoist_note.get("todoist_id")] = todoist_note.get("notes")
- scrobble.log["notes"] = existing_notes
- scrobble.save(update_fields=["log"])
- logger.info(
- "[todoist_scrobble_update_task] todoist note added",
- extra={
- "todoist_note": todoist_note,
- "user_id": user_id,
- "media_type": Scrobble.MediaType.TASK,
- },
- )
- return scrobble
- def todoist_scrobble_task(
- todoist_task: dict,
- user_id: int,
- started: bool = False,
- stopped: bool = False,
- user_context_list: list[str] = [],
- ) -> Scrobble:
- title = get_title_from_labels(
- todoist_task.get("todoist_label_list", []), user_context_list
- )
- task = Task.find_or_create(title)
- timestamp = pendulum.parse(todoist_task.get("updated_at", timezone.now()))
- in_progress_scrobble = Scrobble.objects.filter(
- user_id=user_id,
- in_progress=True,
- log__todoist_id=todoist_task.get("todoist_id"),
- task=task,
- ).last()
- if not in_progress_scrobble and stopped:
- logger.info(
- "[todoist_scrobble_task] cannot stop already stopped task",
- extra={
- "todoist_type": todoist_task["todoist_type"],
- "todoist_event": todoist_task["todoist_event"],
- "todoist_id": todoist_task["todoist_id"],
- },
- )
- return
- if in_progress_scrobble and started:
- logger.info(
- "[todoist_scrobble_task] cannot start already started task",
- extra={
- "todoist_type": todoist_task["todoist_type"],
- "todoist_event": todoist_task["todoist_event"],
- "todoist_id": todoist_task["todoist_id"],
- },
- )
- return in_progress_scrobble
- # Finish an in-progress scrobble
- if in_progress_scrobble and stopped:
- logger.info(
- "[todoist_scrobble_task] finishing",
- extra={
- "todoist_type": todoist_task["todoist_type"],
- "todoist_event": todoist_task["todoist_event"],
- "todoist_id": todoist_task["todoist_id"],
- },
- )
- return todoist_scrobble_task_finish(todoist_task, user_id, timestamp)
- # Default to create new scrobble "if not in_progress_scrobble and in_progress_in_todoist"
- # TODO Should use updated_at from TOdoist, but parsing isn't working
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timestamp,
- "playback_position_seconds": 0,
- "source": "Todoist",
- "log": todoist_task,
- }
- logger.info(
- "[todoist_scrobble_task] creating",
- extra={
- "task_id": task.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.TASK,
- },
- )
- scrobble = Scrobble.create_or_update(task, user_id, scrobble_dict)
- return scrobble
- def emacs_scrobble_update_task(
- emacs_id: str, emacs_notes: dict, user_id: int
- ) -> Optional[Scrobble]:
- scrobble = Scrobble.objects.filter(
- in_progress=True,
- user_id=user_id,
- log__source_id=emacs_id,
- log__source="orgmode",
- ).first()
- if not scrobble:
- logger.info(
- "[emacs_scrobble_update_task] no task found",
- extra={
- "emacs_notes": emacs_notes,
- "user_id": user_id,
- "media_type": Scrobble.MediaType.TASK,
- },
- )
- return
- notes_updated = False
- for note in emacs_notes:
- existing_note_ts = [
- n.get("timestamp") for n in scrobble.log.get("notes", [])
- ]
- if not scrobble.log.get('notes"'):
- scrobble.log["notes"] = []
- if note.get("timestamp") not in existing_note_ts:
- scrobble.log["notes"].append(
- {note.get("timestamp"): note.get("content")}
- )
- notes_updated = True
- if notes_updated:
- scrobble.save(update_fields=["log"])
- logger.info(
- "[emacs_scrobble_update_task] emacs note added",
- extra={
- "emacs_note": emacs_notes,
- "user_id": user_id,
- "media_type": Scrobble.MediaType.TASK,
- },
- )
- return scrobble
- def emacs_scrobble_task(
- task_data: dict,
- user_id: int,
- started: bool = False,
- stopped: bool = False,
- user_context_list: list[str] = [],
- ) -> Scrobble | None:
- source_id = task_data.get("source_id")
- title = get_title_from_labels(
- task_data.get("labels", []), user_context_list
- )
- task = Task.find_or_create(title)
- timestamp = pendulum.parse(task_data.get("updated_at", timezone.now()))
- in_progress_scrobble = Scrobble.objects.filter(
- user_id=user_id,
- in_progress=True,
- log__source_id=source_id,
- log__source="orgmode",
- task=task,
- ).last()
- if not in_progress_scrobble and stopped:
- logger.info(
- "[emacs_scrobble_task] cannot stop already stopped task",
- extra={
- "emacs_id": source_id,
- },
- )
- return
- if in_progress_scrobble and started:
- logger.info(
- "[emacs_scrobble_task] cannot start already started task",
- extra={
- "emacs_id": source_id,
- },
- )
- return in_progress_scrobble
- # Finish an in-progress scrobble
- if in_progress_scrobble and stopped:
- logger.info(
- "[emacs_scrobble_task] finishing",
- extra={
- "emacs_id": source_id,
- },
- )
- in_progress_scrobble.stop(timestamp=timestamp, force_finish=True)
- return in_progress_scrobble
- if in_progress_scrobble:
- return in_progress_scrobble
- notes = task_data.pop("notes")
- if notes:
- task_data["notes"] = []
- for note in notes:
- task_data["notes"].append(
- {note.get("timestamp"): note.get("content")}
- )
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timestamp,
- "playback_position_seconds": 0,
- "source": "Org-mode",
- "log": task_data,
- }
- logger.info(
- "[emacs_scrobble_task] creating",
- extra={
- "task_id": task.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.TASK,
- },
- )
- scrobble = Scrobble.create_or_update(task, user_id, scrobble_dict)
- return scrobble
- def manual_scrobble_task(url: str, user_id: int, action: Optional[str] = None):
- source_id = re.findall(r"\d+", url)[0]
- if "todoist" in url:
- source = "Todoist"
- title = "Generic Todoist task"
- description = " ".join(url.split("/")[-1].split("-")[:-1]).capitalize()
- task = Task.find_or_create(title)
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": source,
- "log": {"description": description, "source_id": source_id},
- }
- logger.info(
- "[vrobbler-scrobble] webpage scrobble request received",
- extra={
- "task_id": task.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.WEBPAGE,
- },
- )
- scrobble = Scrobble.create_or_update(task, user_id, scrobble_dict)
- return scrobble
- def manual_scrobble_webpage(
- url: str, user_id: int, action: Optional[str] = None
- ):
- webpage = WebPage.find_or_create({"url": url})
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- }
- logger.info(
- "[vrobbler-scrobble] webpage scrobble request received",
- extra={
- "webpage_id": webpage.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.WEBPAGE,
- },
- )
- scrobble = Scrobble.create_or_update(webpage, user_id, scrobble_dict)
- # possibly async this?
- scrobble.push_to_archivebox()
- return scrobble
- def gpslogger_scrobble_location(data_dict: dict, user_id: int) -> Scrobble:
- location = GeoLocation.find_or_create(data_dict)
- timestamp = pendulum.parse(data_dict.get("time", timezone.now()))
- extra_data = {
- "user_id": user_id,
- "timestamp": timestamp,
- "source": "GPSLogger",
- "media_type": Scrobble.MediaType.GEO_LOCATION,
- }
- scrobble = Scrobble.create_or_update_location(
- location,
- extra_data,
- user_id,
- )
- provider = LOCATION_PROVIDERS[data_dict.get("prov")]
- if "gps_updates" not in scrobble.log.keys():
- scrobble.log["gps_updates"] = []
- scrobble.log["gps_updates"].append(
- {
- "timestamp": data_dict.get("time"),
- "position_provider": provider,
- }
- )
- if scrobble.timestamp:
- scrobble.playback_position_seconds = (
- timezone.now() - scrobble.timestamp
- ).seconds
- scrobble.save(update_fields=["log", "playback_position_seconds"])
- logger.info(
- "[gpslogger_webhook] gpslogger scrobble request received",
- extra={
- "scrobble_id": scrobble.id,
- "provider": provider,
- "user_id": user_id,
- "timestamp": extra_data.get("timestamp"),
- "raw_timestamp": data_dict.get("time"),
- "media_type": Scrobble.MediaType.GEO_LOCATION,
- },
- )
- return scrobble
- def web_scrobbler_scrobble_video_or_song(
- data_dict: dict, user_id: Optional[int]
- ) -> Scrobble:
- # We're not going to create music tracks, because the only time
- # we'd hit this is if we're listening to a concert or something.
- artist_name = data_dict.get("artist")
- track_name = data_dict.get("track")
- tracks = Track.objects.filter(
- artist__name=data_dict.get("artist"), title=data_dict.get("track")
- )
- if tracks.count() > 1:
- logger.warning(
- "Multiple tracks found for Web Scrobbler",
- extra={"artist": artist_name, "track": track_name},
- )
- track = tracks.first()
- # No track found, create a Video
- if not track:
- Video.get_from_youtube_id()
- # Now we run off a scrobble
- mopidy_data = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": data_dict.get("playback_time_ticks"),
- "source": "Mopidy",
- "mopidy_status": data_dict.get("status"),
- }
- logger.info(
- "[scrobblers] webhook mopidy scrobble request received",
- extra={
- "episode_id": episode.id if episode else None,
- "user_id": user_id,
- "scrobble_dict": mopidy_data,
- "media_type": Scrobble.MediaType.PODCAST_EPISODE,
- },
- )
- scrobble = None
- if episode:
- scrobble = Scrobble.create_or_update(episode, user_id, mopidy_data)
- return scrobble
- def manual_scrobble_beer(
- untappd_id: str, user_id: int, action: Optional[str] = None
- ):
- beer = Beer.find_or_create(untappd_id)
- if not beer:
- logger.error(f"No beer found for Untappd ID {untappd_id}")
- return
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- }
- logger.info(
- "[vrobbler-scrobble] beer scrobble request received",
- extra={
- "beer_id": beer.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.BEER,
- },
- )
- # TODO Kick out a process to enrich the media here, and in every scrobble event
- return Scrobble.create_or_update(beer, user_id, scrobble_dict)
- def manual_scrobble_puzzle(
- ipdb_id: str, user_id: int, action: Optional[str] = None
- ):
- puzzle = Puzzle.find_or_create(ipdb_id)
- if not puzzle:
- logger.error(f"No puzzle found for IPDB ID {ipdb_id}")
- return
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- }
- logger.info(
- "[vrobbler-scrobble] puzzle scrobble request received",
- extra={
- "puzzle_id": puzzle.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.PUZZLE,
- },
- )
- # TODO Kick out a process to enrich the media here, and in every scrobble event
- return Scrobble.create_or_update(puzzle, user_id, scrobble_dict)
- def manual_scrobble_brickset(
- brickset_id: str, user_id: int, action: Optional[str] = None
- ):
- brickset = BrickSet.find_or_create(brickset_id)
- if not brickset:
- logger.error(f"No brickset found for Brickset ID {brickset_id}")
- return
- scrobble_dict = {
- "user_id": user_id,
- "timestamp": timezone.now(),
- "playback_position_seconds": 0,
- "source": "Vrobbler",
- "log": {"serial_scrobble_id": ""},
- }
- logger.info(
- "[vrobbler-scrobble] brickset scrobble request received",
- extra={
- "brickset_id": brickset.id,
- "user_id": user_id,
- "scrobble_dict": scrobble_dict,
- "media_type": Scrobble.MediaType.BRICKSET,
- },
- )
- # TODO Kick out a process to enrich the media here, and in every scrobble event
- # TODO Need to check for past scrobbles and auto populate serial scrobble id if possible
- return Scrobble.create_or_update(brickset, user_id, scrobble_dict)
|