utils.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. import hashlib
  2. import logging
  3. import re
  4. from datetime import datetime, timedelta, tzinfo
  5. import pytz
  6. import requests
  7. from django.apps import apps
  8. from django.contrib.auth import get_user_model
  9. from django.db import models
  10. from django.utils import timezone
  11. from profiles.models import UserProfile
  12. from profiles.utils import now_user_timezone
  13. from scrobbles.constants import LONG_PLAY_MEDIA
  14. from scrobbles.tasks import (
  15. process_koreader_import,
  16. process_lastfm_import,
  17. process_retroarch_import,
  18. )
  19. from webdav.client import get_webdav_client
  20. logger = logging.getLogger(__name__)
  21. User = get_user_model()
  22. def timestamp_user_tz_to_utc(timestamp: int, user_tz: tzinfo) -> datetime:
  23. return user_tz.localize(datetime.utcfromtimestamp(timestamp)).astimezone(
  24. pytz.utc
  25. )
  26. def convert_to_seconds(run_time: str) -> int:
  27. """Jellyfin sends run time as 00:00:00 string. We want the run time to
  28. actually be in seconds so we'll convert it"
  29. This is actually deprecated, as we now convert to seconds before saving.
  30. But for older videos, we'll leave this here.
  31. """
  32. run_time_int = 0
  33. if ":" in str(run_time):
  34. run_time_list = run_time.split(":")
  35. hours = int(run_time_list[0])
  36. minutes = int(run_time_list[1])
  37. seconds = int(run_time_list[2])
  38. run_time_int = int((((hours * 60) + minutes) * 60) + seconds)
  39. return run_time_int
  40. def get_scrobbles_for_media(media_obj, user: User) -> models.QuerySet:
  41. Scrobble = apps.get_model(app_label="scrobbles", model_name="Scrobble")
  42. media_query = None
  43. media_class = media_obj.__class__.__name__
  44. if media_class == "Book":
  45. media_query = models.Q(book=media_obj)
  46. if media_class == "VideoGame":
  47. media_query = models.Q(video_game=media_obj)
  48. if media_class == "Brickset":
  49. media_query = models.Q(brickset=media_obj)
  50. if media_class == "Task":
  51. media_query = models.Q(task=media_obj)
  52. if not media_query:
  53. logger.warn(f"Do not know about media {media_class} 🙍")
  54. return QuerySet()
  55. return Scrobble.objects.filter(media_query, user=user)
  56. def get_recently_played_board_games(user: User) -> dict:
  57. ...
  58. def get_long_plays_in_progress(user: User) -> dict:
  59. """Find all books where the last scrobble is not marked complete"""
  60. media_dict = {
  61. "active": [],
  62. "inactive": [],
  63. }
  64. now = now_user_timezone(user.profile)
  65. for app, model in LONG_PLAY_MEDIA.items():
  66. media_obj = apps.get_model(app_label=app, model_name=model)
  67. for media in media_obj.objects.all():
  68. last_scrobble = media.scrobble_set.filter(user=user).last()
  69. if last_scrobble and last_scrobble.long_play_complete == False:
  70. days_past = (now - last_scrobble.timestamp).days
  71. if days_past > 7:
  72. media_dict["inactive"].append(media)
  73. else:
  74. media_dict["active"].append(media)
  75. media_dict["active"].reverse()
  76. media_dict["inactive"].reverse()
  77. return media_dict
  78. def get_long_plays_completed(user: User) -> list:
  79. """Find all books where the last scrobble is not marked complete"""
  80. media_list = []
  81. for app, model in LONG_PLAY_MEDIA.items():
  82. media_obj = apps.get_model(app_label=app, model_name=model)
  83. for media in media_obj.objects.all():
  84. if (
  85. media.scrobble_set.all()
  86. and media.scrobble_set.filter(user=user)
  87. .order_by("timestamp")
  88. .last()
  89. .long_play_complete
  90. == True
  91. ):
  92. media_list.append(media)
  93. return media_list
  94. def import_lastfm_for_all_users(restart=False):
  95. """Grab a list of all users with LastFM enabled and kickoff imports for them"""
  96. LastFmImport = apps.get_model("scrobbles", "LastFMImport")
  97. lastfm_enabled_user_ids = UserProfile.objects.filter(
  98. lastfm_username__isnull=False,
  99. lastfm_password__isnull=False,
  100. lastfm_auto_import=True,
  101. ).values_list("user_id", flat=True)
  102. lastfm_import_count = 0
  103. for user_id in lastfm_enabled_user_ids:
  104. lfm_import, created = LastFmImport.objects.get_or_create(
  105. user_id=user_id, processed_finished__isnull=True
  106. )
  107. if not created and not restart:
  108. logger.info(
  109. f"Not resuming failed LastFM import {lfm_import.id} for user {user_id}, use restart=True to restart"
  110. )
  111. continue
  112. process_lastfm_import.delay(lfm_import.id)
  113. lastfm_import_count += 1
  114. return lastfm_import_count
  115. def import_retroarch_for_all_users(restart=False):
  116. """Grab a list of all users with Retroarch enabled and kickoff imports for them"""
  117. RetroarchImport = apps.get_model("scrobbles", "RetroarchImport")
  118. retroarch_enabled_user_ids = UserProfile.objects.filter(
  119. retroarch_path__isnull=False,
  120. retroarch_auto_import=True,
  121. ).values_list("user_id", flat=True)
  122. retroarch_import_count = 0
  123. for user_id in retroarch_enabled_user_ids:
  124. retroarch_import, created = RetroarchImport.objects.get_or_create(
  125. user_id=user_id, processed_finished__isnull=True
  126. )
  127. if not created and not restart:
  128. logger.info(
  129. f"Not resuming failed LastFM import {retroarch_import.id} for user {user_id}, use restart=True to restart"
  130. )
  131. continue
  132. process_retroarch_import.delay(retroarch_import.id)
  133. retroarch_import_count += 1
  134. return retroarch_import_count
  135. def delete_zombie_scrobbles(dry_run=True):
  136. """Look for any scrobble over a day old that is not paused and still in progress and delete it"""
  137. Scrobble = apps.get_model("scrobbles", "Scrobble")
  138. now = timezone.now()
  139. three_days_ago = now - timedelta(days=3)
  140. # TODO This should be part of a custom manager
  141. zombie_scrobbles = Scrobble.objects.filter(
  142. timestamp__lte=three_days_ago,
  143. is_paused=False,
  144. played_to_completion=False,
  145. )
  146. zombies_found = zombie_scrobbles.count()
  147. if not dry_run:
  148. logger.info(f"Deleted {zombies_found} zombie scrobbles")
  149. zombie_scrobbles.delete()
  150. return zombies_found
  151. logger.info(
  152. f"Found {zombies_found} zombie scrobbles to delete, use dry_run=False to proceed"
  153. )
  154. return zombies_found
  155. def import_from_webdav_for_all_users(restart=False):
  156. """Grab a list of all users with WebDAV enabled and kickoff imports for them"""
  157. from scrobbles.models import KoReaderImport
  158. from books.koreader import fetch_file_from_webdav
  159. # LastFmImport = apps.get_model("scrobbles", "LastFMImport")
  160. webdav_enabled_user_ids = UserProfile.objects.filter(
  161. webdav_url__isnull=False,
  162. webdav_user__isnull=False,
  163. webdav_pass__isnull=False,
  164. webdav_auto_import=True,
  165. ).values_list("user_id", flat=True)
  166. logger.info(
  167. f"start import of {webdav_enabled_user_ids.count()} webdav accounts"
  168. )
  169. koreader_import_count = 0
  170. for user_id in webdav_enabled_user_ids:
  171. webdav_client = get_webdav_client(user_id)
  172. try:
  173. webdav_client.info("var/koreader/statistics.sqlite3")
  174. koreader_found = True
  175. except:
  176. koreader_found = False
  177. logger.info(
  178. "no koreader stats file found on webdav",
  179. extra={"user_id": user_id},
  180. )
  181. if koreader_found:
  182. last_import = (
  183. KoReaderImport.objects.filter(
  184. user_id=user_id, processed_finished__isnull=False
  185. )
  186. .order_by("processed_finished")
  187. .last()
  188. )
  189. koreader_file_path = fetch_file_from_webdav(1)
  190. new_hash = get_file_md5_hash(koreader_file_path)
  191. old_hash = None
  192. if last_import:
  193. old_hash = last_import.file_md5_hash()
  194. if old_hash and new_hash == old_hash:
  195. logger.info(
  196. "koreader stats file has not changed",
  197. extra={
  198. "user_id": user_id,
  199. "new_hash": new_hash,
  200. "old_hash": old_hash,
  201. "last_import_id": last_import.id,
  202. },
  203. )
  204. continue
  205. koreader_import, created = KoReaderImport.objects.get_or_create(
  206. user_id=user_id, processed_finished__isnull=True
  207. )
  208. if not created and not restart:
  209. logger.info(
  210. f"Not resuming failed KoReader import {koreader_import.id} for user {user_id}, use restart=True to restart"
  211. )
  212. continue
  213. koreader_import.save_sqlite_file_to_self(koreader_file_path)
  214. process_koreader_import.delay(koreader_import.id)
  215. koreader_import_count += 1
  216. return koreader_import_count
  217. def media_class_to_foreign_key(media_class: str) -> str:
  218. return re.sub(r"(?<!^)(?=[A-Z])", "_", media_class).lower()
  219. def get_file_md5_hash(file_path: str) -> str:
  220. with open(file_path, "rb") as f:
  221. file_hash = hashlib.md5()
  222. while chunk := f.read(8192):
  223. file_hash.update(chunk)
  224. return file_hash.hexdigest()
  225. def deduplicate_tracks():
  226. from music.models import Track
  227. # TODO This whole thing should iterate over users
  228. dups = []
  229. for t in Track.objects.all():
  230. if Track.objects.filter(title=t.title, artist=t.artist).exists():
  231. dups.append(t)
  232. for b in dups:
  233. tracks = Track.objects.filter(artist=b.artist, title=b.title)
  234. first = tracks.first()
  235. for other in tracks.exclude(id=first.id):
  236. print("moving scrobbles for ", other.id, " to ", first.id)
  237. other.scrobble_set.update(track=first)
  238. print("deleting ", other.id, " - ", other)
  239. other.delete()
  240. def send_notifications_for_scrobble(scrobble_id):
  241. from scrobbles.models import Scrobble
  242. scrobble = Scrobble.objects.get(id=scrobble_id)
  243. profile = scrobble.user.profile
  244. if profile and profile.ntfy_enabled and profile.ntfy_url:
  245. # TODO allow prority and tags to be configured in the profile
  246. notify_str = f"{scrobble.media_obj}"
  247. if scrobble.log and scrobble.log.get("description"):
  248. notify_str += f" - {scrobble.log.get('description')}"
  249. requests.post(
  250. profile.ntfy_url,
  251. data=notify_str.encode(encoding="utf-8"),
  252. headers={
  253. "Title": scrobble.media_obj.strings.verb,
  254. "Priority": scrobble.media_obj.strings.priority,
  255. "Tags": scrobble.media_obj.strings.tags,
  256. },
  257. )