123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- import hashlib
- import logging
- import re
- from datetime import datetime, timedelta, tzinfo
- import pytz
- import requests
- from django.apps import apps
- from django.contrib.auth import get_user_model
- from django.db import models
- from django.utils import timezone
- from profiles.models import UserProfile
- from profiles.utils import now_user_timezone
- from scrobbles.constants import LONG_PLAY_MEDIA
- from scrobbles.tasks import (
- process_koreader_import,
- process_lastfm_import,
- process_retroarch_import,
- )
- from webdav.client import get_webdav_client
- logger = logging.getLogger(__name__)
- User = get_user_model()
- def timestamp_user_tz_to_utc(timestamp: int, user_tz: tzinfo) -> datetime:
- return user_tz.localize(datetime.utcfromtimestamp(timestamp)).astimezone(
- pytz.utc
- )
- def convert_to_seconds(run_time: str) -> int:
- """Jellyfin sends run time as 00:00:00 string. We want the run time to
- actually be in seconds so we'll convert it"
- This is actually deprecated, as we now convert to seconds before saving.
- But for older videos, we'll leave this here.
- """
- run_time_int = 0
- if ":" in str(run_time):
- run_time_list = run_time.split(":")
- hours = int(run_time_list[0])
- minutes = int(run_time_list[1])
- seconds = int(run_time_list[2])
- run_time_int = int((((hours * 60) + minutes) * 60) + seconds)
- return run_time_int
- def get_scrobbles_for_media(media_obj, user: User) -> models.QuerySet:
- Scrobble = apps.get_model(app_label="scrobbles", model_name="Scrobble")
- media_query = None
- media_class = media_obj.__class__.__name__
- if media_class == "Book":
- media_query = models.Q(book=media_obj)
- if media_class == "VideoGame":
- media_query = models.Q(video_game=media_obj)
- if media_class == "Brickset":
- media_query = models.Q(brickset=media_obj)
- if media_class == "Task":
- media_query = models.Q(task=media_obj)
- if not media_query:
- logger.warn(f"Do not know about media {media_class} 🙍")
- return QuerySet()
- return Scrobble.objects.filter(media_query, user=user)
- def get_recently_played_board_games(user: User) -> dict:
- ...
- def get_long_plays_in_progress(user: User) -> dict:
- """Find all books where the last scrobble is not marked complete"""
- media_dict = {
- "active": [],
- "inactive": [],
- }
- now = now_user_timezone(user.profile)
- for app, model in LONG_PLAY_MEDIA.items():
- media_obj = apps.get_model(app_label=app, model_name=model)
- for media in media_obj.objects.all():
- last_scrobble = media.scrobble_set.filter(user=user).last()
- if last_scrobble and last_scrobble.long_play_complete == False:
- days_past = (now - last_scrobble.timestamp).days
- if days_past > 7:
- media_dict["inactive"].append(media)
- else:
- media_dict["active"].append(media)
- media_dict["active"].reverse()
- media_dict["inactive"].reverse()
- return media_dict
- def get_long_plays_completed(user: User) -> list:
- """Find all books where the last scrobble is not marked complete"""
- media_list = []
- for app, model in LONG_PLAY_MEDIA.items():
- media_obj = apps.get_model(app_label=app, model_name=model)
- for media in media_obj.objects.all():
- if (
- media.scrobble_set.all()
- and media.scrobble_set.filter(user=user)
- .order_by("timestamp")
- .last()
- .long_play_complete
- == True
- ):
- media_list.append(media)
- return media_list
- def import_lastfm_for_all_users(restart=False):
- """Grab a list of all users with LastFM enabled and kickoff imports for them"""
- LastFmImport = apps.get_model("scrobbles", "LastFMImport")
- lastfm_enabled_user_ids = UserProfile.objects.filter(
- lastfm_username__isnull=False,
- lastfm_password__isnull=False,
- lastfm_auto_import=True,
- ).values_list("user_id", flat=True)
- lastfm_import_count = 0
- for user_id in lastfm_enabled_user_ids:
- lfm_import, created = LastFmImport.objects.get_or_create(
- user_id=user_id, processed_finished__isnull=True
- )
- if not created and not restart:
- logger.info(
- f"Not resuming failed LastFM import {lfm_import.id} for user {user_id}, use restart=True to restart"
- )
- continue
- process_lastfm_import.delay(lfm_import.id)
- lastfm_import_count += 1
- return lastfm_import_count
- def import_retroarch_for_all_users(restart=False):
- """Grab a list of all users with Retroarch enabled and kickoff imports for them"""
- RetroarchImport = apps.get_model("scrobbles", "RetroarchImport")
- retroarch_enabled_user_ids = UserProfile.objects.filter(
- retroarch_path__isnull=False,
- retroarch_auto_import=True,
- ).values_list("user_id", flat=True)
- retroarch_import_count = 0
- for user_id in retroarch_enabled_user_ids:
- retroarch_import, created = RetroarchImport.objects.get_or_create(
- user_id=user_id, processed_finished__isnull=True
- )
- if not created and not restart:
- logger.info(
- f"Not resuming failed LastFM import {retroarch_import.id} for user {user_id}, use restart=True to restart"
- )
- continue
- process_retroarch_import.delay(retroarch_import.id)
- retroarch_import_count += 1
- return retroarch_import_count
- def delete_zombie_scrobbles(dry_run=True):
- """Look for any scrobble over a day old that is not paused and still in progress and delete it"""
- Scrobble = apps.get_model("scrobbles", "Scrobble")
- now = timezone.now()
- three_days_ago = now - timedelta(days=3)
- # TODO This should be part of a custom manager
- zombie_scrobbles = Scrobble.objects.filter(
- timestamp__lte=three_days_ago,
- is_paused=False,
- played_to_completion=False,
- )
- zombies_found = zombie_scrobbles.count()
- if not dry_run:
- logger.info(f"Deleted {zombies_found} zombie scrobbles")
- zombie_scrobbles.delete()
- return zombies_found
- logger.info(
- f"Found {zombies_found} zombie scrobbles to delete, use dry_run=False to proceed"
- )
- return zombies_found
- def import_from_webdav_for_all_users(restart=False):
- """Grab a list of all users with WebDAV enabled and kickoff imports for them"""
- from scrobbles.models import KoReaderImport
- from books.koreader import fetch_file_from_webdav
- # LastFmImport = apps.get_model("scrobbles", "LastFMImport")
- webdav_enabled_user_ids = UserProfile.objects.filter(
- webdav_url__isnull=False,
- webdav_user__isnull=False,
- webdav_pass__isnull=False,
- webdav_auto_import=True,
- ).values_list("user_id", flat=True)
- logger.info(
- f"start import of {webdav_enabled_user_ids.count()} webdav accounts"
- )
- koreader_import_count = 0
- for user_id in webdav_enabled_user_ids:
- webdav_client = get_webdav_client(user_id)
- try:
- webdav_client.info("var/koreader/statistics.sqlite3")
- koreader_found = True
- except:
- koreader_found = False
- logger.info(
- "no koreader stats file found on webdav",
- extra={"user_id": user_id},
- )
- if koreader_found:
- last_import = (
- KoReaderImport.objects.filter(
- user_id=user_id, processed_finished__isnull=False
- )
- .order_by("processed_finished")
- .last()
- )
- koreader_file_path = fetch_file_from_webdav(1)
- new_hash = get_file_md5_hash(koreader_file_path)
- old_hash = None
- if last_import:
- old_hash = last_import.file_md5_hash()
- if old_hash and new_hash == old_hash:
- logger.info(
- "koreader stats file has not changed",
- extra={
- "user_id": user_id,
- "new_hash": new_hash,
- "old_hash": old_hash,
- "last_import_id": last_import.id,
- },
- )
- continue
- koreader_import, created = KoReaderImport.objects.get_or_create(
- user_id=user_id, processed_finished__isnull=True
- )
- if not created and not restart:
- logger.info(
- f"Not resuming failed KoReader import {koreader_import.id} for user {user_id}, use restart=True to restart"
- )
- continue
- koreader_import.save_sqlite_file_to_self(koreader_file_path)
- process_koreader_import.delay(koreader_import.id)
- koreader_import_count += 1
- return koreader_import_count
- def media_class_to_foreign_key(media_class: str) -> str:
- return re.sub(r"(?<!^)(?=[A-Z])", "_", media_class).lower()
- def get_file_md5_hash(file_path: str) -> str:
- with open(file_path, "rb") as f:
- file_hash = hashlib.md5()
- while chunk := f.read(8192):
- file_hash.update(chunk)
- return file_hash.hexdigest()
- def deduplicate_tracks():
- from music.models import Track
- # TODO This whole thing should iterate over users
- dups = []
- for t in Track.objects.all():
- if Track.objects.filter(title=t.title, artist=t.artist).exists():
- dups.append(t)
- for b in dups:
- tracks = Track.objects.filter(artist=b.artist, title=b.title)
- first = tracks.first()
- for other in tracks.exclude(id=first.id):
- print("moving scrobbles for ", other.id, " to ", first.id)
- other.scrobble_set.update(track=first)
- print("deleting ", other.id, " - ", other)
- other.delete()
- def send_notifications_for_scrobble(scrobble_id):
- from scrobbles.models import Scrobble
- scrobble = Scrobble.objects.get(id=scrobble_id)
- profile = scrobble.user.profile
- if profile and profile.ntfy_enabled and profile.ntfy_url:
- # TODO allow prority and tags to be configured in the profile
- notify_str = f"{scrobble.media_obj}"
- if scrobble.log and scrobble.log.get("description"):
- notify_str += f" - {scrobble.log.get('description')}"
- requests.post(
- profile.ntfy_url,
- data=notify_str.encode(encoding="utf-8"),
- headers={
- "Title": scrobble.media_obj.strings.verb,
- "Priority": scrobble.media_obj.strings.priority,
- "Tags": scrobble.media_obj.strings.tags,
- },
- )
|