utils.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. import logging
  2. import os
  3. from datetime import datetime, timedelta, tzinfo
  4. from urllib.parse import unquote
  5. import pytz
  6. from dateutil.parser import ParserError, parse
  7. from django.apps import apps
  8. from django.contrib.auth import get_user_model
  9. from django.db import models
  10. from django.utils import timezone
  11. from profiles.models import UserProfile
  12. from profiles.utils import now_user_timezone
  13. from scrobbles.constants import LONG_PLAY_MEDIA
  14. from scrobbles.tasks import process_lastfm_import, process_retroarch_import
  15. logger = logging.getLogger(__name__)
  16. User = get_user_model()
  17. def timestamp_user_tz_to_utc(timestamp: int, user_tz: tzinfo) -> datetime:
  18. return user_tz.localize(datetime.utcfromtimestamp(timestamp)).astimezone(
  19. pytz.utc
  20. )
  21. def convert_to_seconds(run_time: str) -> int:
  22. """Jellyfin sends run time as 00:00:00 string. We want the run time to
  23. actually be in seconds so we'll convert it"
  24. This is actually deprecated, as we now convert to seconds before saving.
  25. But for older videos, we'll leave this here.
  26. """
  27. run_time_int = 0
  28. if ":" in str(run_time):
  29. run_time_list = run_time.split(":")
  30. hours = int(run_time_list[0])
  31. minutes = int(run_time_list[1])
  32. seconds = int(run_time_list[2])
  33. run_time_int = int((((hours * 60) + minutes) * 60) + seconds)
  34. return run_time_int
  35. def parse_mopidy_uri(uri: str) -> dict:
  36. logger.debug(f"Parsing URI: {uri}")
  37. parsed_uri = os.path.splitext(unquote(uri))[0].split("/")
  38. episode_str = parsed_uri[-1]
  39. podcast_name = parsed_uri[-2]
  40. episode_num = None
  41. try:
  42. # Without episode numbers the date will lead
  43. pub_date = parse(episode_str[0:10])
  44. except ParserError:
  45. try:
  46. # Beacuse we have epsiode numbers on
  47. pub_date = episode_str[4:14]
  48. episode_num = int(episode_str.split("-")[0])
  49. except ParserError:
  50. pub_date = ""
  51. episode_num = int(episode_str.split("-")[0])
  52. gap_to_strip = 0
  53. if pub_date:
  54. gap_to_strip += 11
  55. if episode_num:
  56. gap_to_strip += len(str(episode_num)) + 1
  57. episode_name = episode_str[gap_to_strip:].replace("-", " ")
  58. return {
  59. "episode_filename": episode_name,
  60. "episode_num": episode_num,
  61. "podcast_name": podcast_name,
  62. "pub_date": pub_date,
  63. }
  64. def check_scrobble_for_finish(
  65. scrobble: "Scrobble", force_to_100=False, force_finish=False
  66. ) -> None:
  67. completion_percent = scrobble.media_obj.COMPLETION_PERCENT
  68. if scrobble.percent_played >= completion_percent or force_finish:
  69. logger.info(f"{scrobble.id} finished at {scrobble.percent_played}")
  70. scrobble.playback_position_seconds = (
  71. scrobble.media_obj.run_time_seconds
  72. )
  73. scrobble.in_progress = False
  74. scrobble.is_paused = False
  75. scrobble.played_to_completion = True
  76. scrobble.save(
  77. update_fields=[
  78. "in_progress",
  79. "is_paused",
  80. "played_to_completion",
  81. "playback_position_seconds",
  82. ]
  83. )
  84. else:
  85. logger.info(
  86. f"{scrobble.id} not complete at {scrobble.percent_played}%"
  87. )
  88. def check_long_play_for_finish(scrobble):
  89. seconds_elapsed = (timezone.now() - scrobble.timestamp).seconds
  90. past_seconds = 0
  91. # Set our playback seconds, and calc long play seconds
  92. scrobble.playback_position_seconds = seconds_elapsed
  93. if scrobble.previous:
  94. past_seconds = scrobble.previous.long_play_seconds
  95. scrobble.long_play_seconds = past_seconds + seconds_elapsed
  96. # Long play scrobbles are always finished when we say they are
  97. scrobble.played_to_completion = True
  98. scrobble.save(
  99. update_fields=[
  100. "playback_position_seconds",
  101. "played_to_completion",
  102. "long_play_seconds",
  103. ]
  104. )
  105. def get_scrobbles_for_media(media_obj, user: User) -> models.QuerySet:
  106. Scrobble = apps.get_model(app_label="scrobbles", model_name="Scrobble")
  107. media_query = None
  108. media_class = media_obj.__class__.__name__
  109. if media_class == "Book":
  110. media_query = models.Q(book=media_obj)
  111. if media_class == "VideoGame":
  112. media_query = models.Q(video_game=media_obj)
  113. if not media_query:
  114. logger.warn("Do not know about media {media_class} 🙍")
  115. return []
  116. return Scrobble.objects.filter(media_query, user=user)
  117. def get_recently_played_board_games(user: User) -> dict:
  118. ...
  119. def get_long_plays_in_progress(user: User) -> dict:
  120. """Find all books where the last scrobble is not marked complete"""
  121. media_dict = {
  122. "active": [],
  123. "inactive": [],
  124. }
  125. now = now_user_timezone(user.profile)
  126. for app, model in LONG_PLAY_MEDIA.items():
  127. media_obj = apps.get_model(app_label=app, model_name=model)
  128. for media in media_obj.objects.all():
  129. last_scrobble = media.scrobble_set.filter(user=user).last()
  130. if last_scrobble and last_scrobble.long_play_complete == False:
  131. days_past = (now - last_scrobble.timestamp).days
  132. if days_past > 7:
  133. media_dict["inactive"].append(media)
  134. else:
  135. media_dict["active"].append(media)
  136. media_dict["active"].reverse()
  137. media_dict["inactive"].reverse()
  138. return media_dict
  139. def get_long_plays_completed(user: User) -> list:
  140. """Find all books where the last scrobble is not marked complete"""
  141. media_list = []
  142. for app, model in LONG_PLAY_MEDIA.items():
  143. media_obj = apps.get_model(app_label=app, model_name=model)
  144. for media in media_obj.objects.all():
  145. if (
  146. media.scrobble_set.all()
  147. and media.scrobble_set.filter(user=user)
  148. .order_by("timestamp")
  149. .last()
  150. .long_play_complete
  151. == True
  152. ):
  153. media_list.append(media)
  154. return media_list
  155. def import_lastfm_for_all_users(restart=False):
  156. """Grab a list of all users with LastFM enabled and kickoff imports for them"""
  157. LastFmImport = apps.get_model("scrobbles", "LastFMImport")
  158. lastfm_enabled_user_ids = UserProfile.objects.filter(
  159. lastfm_username__isnull=False,
  160. lastfm_password__isnull=False,
  161. lastfm_auto_import=True,
  162. ).values_list("user_id", flat=True)
  163. lastfm_import_count = 0
  164. for user_id in lastfm_enabled_user_ids:
  165. lfm_import, created = LastFmImport.objects.get_or_create(
  166. user_id=user_id, processed_finished__isnull=True
  167. )
  168. if not created and not restart:
  169. logger.info(
  170. f"Not resuming failed LastFM import {lfm_import.id} for user {user_id}, use restart=True to restart"
  171. )
  172. continue
  173. process_lastfm_import.delay(lfm_import.id)
  174. lastfm_import_count += 1
  175. return lastfm_import_count
  176. def import_retroarch_for_all_users(restart=False):
  177. """Grab a list of all users with Retroarch enabled and kickoff imports for them"""
  178. RetroarchImport = apps.get_model("scrobbles", "RetroarchImport")
  179. retroarch_enabled_user_ids = UserProfile.objects.filter(
  180. retroarch_path__isnull=False,
  181. retroarch_auto_import=True,
  182. ).values_list("user_id", flat=True)
  183. retroarch_import_count = 0
  184. for user_id in retroarch_enabled_user_ids:
  185. retroarch_import, created = RetroarchImport.objects.get_or_create(
  186. user_id=user_id, processed_finished__isnull=True
  187. )
  188. if not created and not restart:
  189. logger.info(
  190. f"Not resuming failed LastFM import {retroarch_import.id} for user {user_id}, use restart=True to restart"
  191. )
  192. continue
  193. process_retroarch_import.delay(retroarch_import.id)
  194. retroarch_import_count += 1
  195. return retroarch_import_count
  196. def delete_zombie_scrobbles(dry_run=True):
  197. """Look for any scrobble over a day old that is not paused and still in progress and delete it"""
  198. Scrobble = apps.get_model("scrobbles", "Scrobble")
  199. now = timezone.now()
  200. three_days_ago = now - timedelta(days=3)
  201. # TODO This should be part of a custom manager
  202. zombie_scrobbles = Scrobble.objects.filter(
  203. timestamp__lte=three_days_ago,
  204. is_paused=False,
  205. played_to_completion=False,
  206. )
  207. zombies_found = zombie_scrobbles.count()
  208. if not dry_run:
  209. logger.info(f"Deleted {zombies_found} zombie scrobbles")
  210. zombie_scrobbles.delete()
  211. return zombies_found
  212. logger.info(
  213. f"Found {zombies_found} zombie scrobbles to delete, use dry_run=False to proceed"
  214. )
  215. return zombies_found