utils.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. import hashlib
  2. import logging
  3. import re
  4. from datetime import datetime, timedelta
  5. from urllib.parse import urlparse
  6. from zoneinfo import ZoneInfo
  7. import pytz
  8. from django.apps import apps
  9. from django.contrib.auth import get_user_model
  10. from django.db import models
  11. from django.utils import timezone
  12. from profiles.models import UserProfile
  13. from profiles.utils import now_user_timezone
  14. from scrobbles.constants import LONG_PLAY_MEDIA
  15. from scrobbles.notifications import NtfyNotification
  16. from scrobbles.tasks import (
  17. process_koreader_import,
  18. process_lastfm_import,
  19. process_retroarch_import,
  20. )
  21. from webdav.client import get_webdav_client
  22. logger = logging.getLogger(__name__)
  23. User = get_user_model()
  24. def timestamp_user_tz_to_utc(timestamp: int, user_tz: ZoneInfo) -> datetime:
  25. return user_tz.localize(datetime.utcfromtimestamp(timestamp)).astimezone(
  26. pytz.utc
  27. )
  28. def convert_to_seconds(run_time: str) -> int:
  29. """Jellyfin sends run time as 00:00:00 string. We want the run time to
  30. actually be in seconds so we'll convert it"
  31. This is actually deprecated, as we now convert to seconds before saving.
  32. But for older videos, we'll leave this here.
  33. """
  34. run_time_int = 0
  35. if ":" in str(run_time):
  36. run_time_list = run_time.split(":")
  37. hours = int(run_time_list[0])
  38. minutes = int(run_time_list[1])
  39. seconds = int(run_time_list[2])
  40. run_time_int = int((((hours * 60) + minutes) * 60) + seconds)
  41. return run_time_int
  42. def get_scrobbles_for_media(media_obj, user: User) -> models.QuerySet:
  43. Scrobble = apps.get_model(app_label="scrobbles", model_name="Scrobble")
  44. media_query = None
  45. media_class = media_obj.__class__.__name__
  46. if media_class == "Book":
  47. media_query = models.Q(book=media_obj)
  48. if media_class == "VideoGame":
  49. media_query = models.Q(video_game=media_obj)
  50. if media_class == "Brickset":
  51. media_query = models.Q(brickset=media_obj)
  52. if media_class == "Task":
  53. media_query = models.Q(task=media_obj)
  54. if not media_query:
  55. logger.warn(f"Do not know about media {media_class} 🙍")
  56. return QuerySet()
  57. return Scrobble.objects.filter(media_query, user=user)
  58. def get_recently_played_board_games(user: User) -> dict:
  59. ...
  60. def get_long_plays_in_progress(user: User) -> dict:
  61. """Find all books where the last scrobble is not marked complete"""
  62. media_dict = {
  63. "active": [],
  64. "inactive": [],
  65. }
  66. now = now_user_timezone(user.profile)
  67. for app, model in LONG_PLAY_MEDIA.items():
  68. media_obj = apps.get_model(app_label=app, model_name=model)
  69. for media in media_obj.objects.all():
  70. last_scrobble = media.scrobble_set.filter(user=user).last()
  71. if last_scrobble and last_scrobble.long_play_complete == False:
  72. days_past = (now - last_scrobble.timestamp).days
  73. if days_past > 7:
  74. media_dict["inactive"].append(media)
  75. else:
  76. media_dict["active"].append(media)
  77. media_dict["active"].reverse()
  78. media_dict["inactive"].reverse()
  79. return media_dict
  80. def get_long_plays_completed(user: User) -> list:
  81. """Find all books where the last scrobble is not marked complete"""
  82. media_list = []
  83. for app, model in LONG_PLAY_MEDIA.items():
  84. media_obj = apps.get_model(app_label=app, model_name=model)
  85. for media in media_obj.objects.all():
  86. if (
  87. media.scrobble_set.all()
  88. and media.scrobble_set.filter(user=user)
  89. .order_by("timestamp")
  90. .last()
  91. .long_play_complete
  92. == True
  93. ):
  94. media_list.append(media)
  95. return media_list
  96. def import_lastfm_for_all_users(restart=False):
  97. """Grab a list of all users with LastFM enabled and kickoff imports for them"""
  98. LastFmImport = apps.get_model("scrobbles", "LastFMImport")
  99. lastfm_enabled_user_ids = UserProfile.objects.filter(
  100. lastfm_username__isnull=False,
  101. lastfm_password__isnull=False,
  102. lastfm_auto_import=True,
  103. ).values_list("user_id", flat=True)
  104. lastfm_import_count = 0
  105. for user_id in lastfm_enabled_user_ids:
  106. lfm_import, created = LastFmImport.objects.get_or_create(
  107. user_id=user_id, processed_finished__isnull=True
  108. )
  109. if not created and not restart:
  110. logger.info(
  111. f"Not resuming failed LastFM import {lfm_import.id} for user {user_id}, use restart=True to restart"
  112. )
  113. continue
  114. process_lastfm_import.delay(lfm_import.id)
  115. lastfm_import_count += 1
  116. return lastfm_import_count
  117. def import_retroarch_for_all_users(restart=False):
  118. """Grab a list of all users with Retroarch enabled and kickoff imports for them"""
  119. RetroarchImport = apps.get_model("scrobbles", "RetroarchImport")
  120. retroarch_enabled_user_ids = UserProfile.objects.filter(
  121. retroarch_path__isnull=False,
  122. retroarch_auto_import=True,
  123. ).values_list("user_id", flat=True)
  124. retroarch_import_count = 0
  125. for user_id in retroarch_enabled_user_ids:
  126. retroarch_import, created = RetroarchImport.objects.get_or_create(
  127. user_id=user_id, processed_finished__isnull=True
  128. )
  129. if not created and not restart:
  130. logger.info(
  131. f"Not resuming failed LastFM import {retroarch_import.id} for user {user_id}, use restart=True to restart"
  132. )
  133. continue
  134. process_retroarch_import.delay(retroarch_import.id)
  135. retroarch_import_count += 1
  136. return retroarch_import_count
  137. def delete_zombie_scrobbles(dry_run=True):
  138. """Look for any scrobble over a day old that is not paused and still in progress and delete it"""
  139. Scrobble = apps.get_model("scrobbles", "Scrobble")
  140. now = timezone.now()
  141. three_days_ago = now - timedelta(days=3)
  142. # TODO This should be part of a custom manager
  143. zombie_scrobbles = Scrobble.objects.filter(
  144. timestamp__lte=three_days_ago,
  145. is_paused=False,
  146. played_to_completion=False,
  147. )
  148. zombies_found = zombie_scrobbles.count()
  149. if not dry_run:
  150. logger.info(f"Deleted {zombies_found} zombie scrobbles")
  151. zombie_scrobbles.delete()
  152. return zombies_found
  153. logger.info(
  154. f"Found {zombies_found} zombie scrobbles to delete, use dry_run=False to proceed"
  155. )
  156. return zombies_found
  157. def import_from_webdav_for_all_users(restart=False):
  158. """Grab a list of all users with WebDAV enabled and kickoff imports for them"""
  159. from books.koreader import fetch_file_from_webdav
  160. from scrobbles.models import KoReaderImport
  161. # LastFmImport = apps.get_model("scrobbles", "LastFMImport")
  162. webdav_enabled_user_ids = UserProfile.objects.filter(
  163. webdav_url__isnull=False,
  164. webdav_user__isnull=False,
  165. webdav_pass__isnull=False,
  166. webdav_auto_import=True,
  167. ).values_list("user_id", flat=True)
  168. logger.info(
  169. f"start import of {webdav_enabled_user_ids.count()} webdav accounts"
  170. )
  171. koreader_import_count = 0
  172. for user_id in webdav_enabled_user_ids:
  173. webdav_client = get_webdav_client(user_id)
  174. try:
  175. webdav_client.info("var/koreader/statistics.sqlite3")
  176. koreader_found = True
  177. except:
  178. koreader_found = False
  179. logger.info(
  180. "no koreader stats file found on webdav",
  181. extra={"user_id": user_id},
  182. )
  183. if koreader_found:
  184. last_import = (
  185. KoReaderImport.objects.filter(
  186. user_id=user_id, processed_finished__isnull=False
  187. )
  188. .order_by("processed_finished")
  189. .last()
  190. )
  191. koreader_file_path = fetch_file_from_webdav(1)
  192. new_hash = get_file_md5_hash(koreader_file_path)
  193. old_hash = None
  194. if last_import:
  195. old_hash = last_import.file_md5_hash()
  196. if old_hash and new_hash == old_hash:
  197. logger.info(
  198. "koreader stats file has not changed",
  199. extra={
  200. "user_id": user_id,
  201. "new_hash": new_hash,
  202. "old_hash": old_hash,
  203. "last_import_id": last_import.id,
  204. },
  205. )
  206. continue
  207. koreader_import, created = KoReaderImport.objects.get_or_create(
  208. user_id=user_id, processed_finished__isnull=True
  209. )
  210. if not created and not restart:
  211. logger.info(
  212. f"Not resuming failed KoReader import {koreader_import.id} for user {user_id}, use restart=True to restart"
  213. )
  214. continue
  215. koreader_import.save_sqlite_file_to_self(koreader_file_path)
  216. process_koreader_import.delay(koreader_import.id)
  217. koreader_import_count += 1
  218. return koreader_import_count
  219. def media_class_to_foreign_key(media_class: str) -> str:
  220. return re.sub(r"(?<!^)(?=[A-Z])", "_", media_class).lower()
  221. def get_file_md5_hash(file_path: str) -> str:
  222. with open(file_path, "rb") as f:
  223. file_hash = hashlib.md5()
  224. while chunk := f.read(8192):
  225. file_hash.update(chunk)
  226. return file_hash.hexdigest()
  227. def send_stop_notifications_for_in_progress_scrobbles() -> int:
  228. """Get all inprogress scrobbles and check if they're passed their media obj length.
  229. If so, send out a notification to offer to stop the scrobble."""
  230. from scrobbles.models import Scrobble
  231. scrobbles_in_progress_qs = Scrobble.objects.filter(
  232. played_to_completion=False, in_progress=True
  233. ).exclude(media_type=Scrobble.MediaType.GEO_LOCATION)
  234. notifications_sent = 0
  235. for scrobble in scrobbles_in_progress_qs:
  236. elapsed_scrobble_seconds = (
  237. timezone.now() - scrobble.timestamp
  238. ).seconds
  239. if elapsed_scrobble_seconds > scrobble.media_obj.run_time_seconds:
  240. NtfyNotification(scrobble, end=True).send()
  241. notifications_sent += 1
  242. return notifications_sent
  243. def extract_domain(url):
  244. parsed_url = urlparse(url)
  245. domain = (
  246. parsed_url.netloc.split(".")[-2]
  247. + "."
  248. + parsed_url.netloc.split(".")[-1]
  249. )
  250. return domain