aggregators.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. from datetime import datetime, timedelta
  2. from django.apps import apps
  3. from django.db.models import Count, Q, QuerySet
  4. from django.utils import timezone
  5. from profiles.utils import now_user_timezone
  6. from scrobbles.models import Scrobble
  7. from videos.models import Video
  8. def scrobble_counts(user=None):
  9. now = timezone.now()
  10. user_filter = Q()
  11. if user and user.is_authenticated:
  12. now = now_user_timezone(user.profile)
  13. user_filter = Q(user=user)
  14. start_of_today = datetime.combine(
  15. now.date(), datetime.min.time(), now.tzinfo
  16. )
  17. starting_day_of_current_week = now.date() - timedelta(
  18. days=now.today().isoweekday() % 7
  19. )
  20. starting_day_of_current_month = now.date().replace(day=1)
  21. starting_day_of_current_year = now.date().replace(month=1, day=1)
  22. finished_scrobbles_qs = Scrobble.objects.filter(
  23. user_filter, played_to_completion=True
  24. )
  25. data = {}
  26. data['today'] = finished_scrobbles_qs.filter(
  27. timestamp__gte=start_of_today
  28. ).count()
  29. data['week'] = finished_scrobbles_qs.filter(
  30. timestamp__gte=starting_day_of_current_week
  31. ).count()
  32. data['month'] = finished_scrobbles_qs.filter(
  33. timestamp__gte=starting_day_of_current_month
  34. ).count()
  35. data['year'] = finished_scrobbles_qs.filter(
  36. timestamp__gte=starting_day_of_current_year
  37. ).count()
  38. data['alltime'] = finished_scrobbles_qs.count()
  39. return data
  40. def week_of_scrobbles(
  41. user=None, start=None, media: str = 'tracks'
  42. ) -> dict[str, int]:
  43. now = timezone.now()
  44. user_filter = Q()
  45. if user and user.is_authenticated:
  46. now = now_user_timezone(user.profile)
  47. user_filter = Q(user=user)
  48. if not start:
  49. start = datetime.combine(now.date(), datetime.min.time(), now.tzinfo)
  50. scrobble_day_dict = {}
  51. base_qs = Scrobble.objects.filter(user_filter, played_to_completion=True)
  52. media_filter = Q(track__isnull=False)
  53. if media == 'movies':
  54. media_filter = Q(video__video_type=Video.VideoType.MOVIE)
  55. if media == 'series':
  56. media_filter = Q(video__video_type=Video.VideoType.TV_EPISODE)
  57. for day in range(6, -1, -1):
  58. start_day = start - timedelta(days=day)
  59. end = datetime.combine(start_day, datetime.max.time(), now.tzinfo)
  60. day_of_week = start_day.strftime('%A')
  61. scrobble_day_dict[day_of_week] = base_qs.filter(
  62. media_filter,
  63. timestamp__gte=start_day,
  64. timestamp__lte=end,
  65. played_to_completion=True,
  66. ).count()
  67. return scrobble_day_dict
  68. def live_charts(
  69. user: "User",
  70. media_type: str = "Track",
  71. chart_period: str = "all",
  72. limit: int = 15,
  73. ) -> QuerySet:
  74. now = timezone.now()
  75. tzinfo = now.tzinfo
  76. now = now.date()
  77. if user.is_authenticated:
  78. now = now_user_timezone(user.profile)
  79. tzinfo = now.tzinfo
  80. start_of_today = datetime.combine(now, datetime.min.time(), tzinfo)
  81. start_day_of_week = now - timedelta(days=now.today().isoweekday() % 7)
  82. start_day_of_month = now.replace(day=1)
  83. start_day_of_year = now.replace(month=1, day=1)
  84. media_model = apps.get_model(app_label='music', model_name=media_type)
  85. period_queries = {
  86. 'today': {'scrobble__timestamp__gte': start_of_today},
  87. 'week': {'scrobble__timestamp__gte': start_day_of_week},
  88. 'month': {'scrobble__timestamp__gte': start_day_of_month},
  89. 'year': {'scrobble__timestamp__gte': start_day_of_year},
  90. 'all': {},
  91. }
  92. time_filter = Q()
  93. completion_filter = Q(
  94. scrobble__user=user, scrobble__played_to_completion=True
  95. )
  96. user_filter = Q(scrobble__user=user)
  97. count_field = "scrobble"
  98. if media_type == "Artist":
  99. for period, query_dict in period_queries.items():
  100. period_queries[period] = {
  101. "track__" + k: v for k, v in query_dict.items()
  102. }
  103. completion_filter = Q(
  104. track__scrobble__user=user,
  105. track__scrobble__played_to_completion=True,
  106. )
  107. count_field = "track__scrobble"
  108. user_filter = Q(track__scrobble__user=user)
  109. time_filter = Q(**period_queries[chart_period])
  110. return (
  111. media_model.objects.filter(user_filter, time_filter)
  112. .annotate(
  113. num_scrobbles=Count(
  114. count_field,
  115. filter=completion_filter,
  116. distinct=True,
  117. )
  118. )
  119. .order_by("-num_scrobbles")[:limit]
  120. )
  121. def artist_scrobble_count(artist_id: int, filter: str = "today") -> int:
  122. return Scrobble.objects.filter(track__artist=artist_id).count()