lastfm.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import logging
  2. import time
  3. from datetime import datetime, timedelta
  4. import pylast
  5. import pytz
  6. from django.conf import settings
  7. from django.utils import timezone
  8. from music.utils import (
  9. get_or_create_album,
  10. get_or_create_artist,
  11. get_or_create_track,
  12. )
  13. logger = logging.getLogger(__name__)
  14. PYLAST_ERRORS = tuple(
  15. getattr(pylast, exc_name)
  16. for exc_name in (
  17. "ScrobblingError",
  18. "NetworkError",
  19. "MalformedResponseError",
  20. "WSError",
  21. )
  22. if hasattr(pylast, exc_name)
  23. )
  24. class LastFM:
  25. def __init__(self, user):
  26. try:
  27. self.client = pylast.LastFMNetwork(
  28. api_key=getattr(settings, "LASTFM_API_KEY"),
  29. api_secret=getattr(settings, "LASTFM_SECRET_KEY"),
  30. username=user.profile.lastfm_username,
  31. password_hash=pylast.md5(user.profile.lastfm_password),
  32. )
  33. self.user = self.client.get_user(user.profile.lastfm_username)
  34. self.vrobbler_user = user
  35. except PYLAST_ERRORS as e:
  36. logger.error(f"Error during Last.fm setup: {e}")
  37. def import_from_lastfm(self, last_processed=None):
  38. """Given a last processed time, import all scrobbles from LastFM since then"""
  39. from scrobbles.models import Scrobble
  40. new_scrobbles = []
  41. source = "Last.fm"
  42. lastfm_scrobbles = self.get_last_scrobbles(time_from=last_processed)
  43. for lfm_scrobble in lastfm_scrobbles:
  44. track = get_or_create_track(
  45. lfm_scrobble,
  46. {
  47. "TRACK_TITLE": "track",
  48. "ARTIST_NAME": "artist",
  49. "ALBUM_NAME": "album",
  50. "TIMESTAMP": "timestamp",
  51. },
  52. )
  53. timezone = settings.TIME_ZONE
  54. if self.vrobbler_user.profile:
  55. timezone = self.vrobbler_user.profile.timezone
  56. new_scrobble = Scrobble(
  57. user=self.vrobbler_user,
  58. timestamp=timestamp,
  59. source=source,
  60. track=track,
  61. timezone=timezone,
  62. played_to_completion=True,
  63. in_progress=False,
  64. media_type=Scrobble.MediaType.TRACK,
  65. )
  66. # Vrobbler scrobbles on finish, LastFM scrobbles on start
  67. seconds_eariler = timestamp - timedelta(seconds=20)
  68. seconds_later = timestamp + timedelta(seconds=20)
  69. existing = Scrobble.objects.filter(
  70. created__gte=seconds_eariler,
  71. created__lte=seconds_later,
  72. track=track,
  73. ).first()
  74. if existing:
  75. logger.debug(f"Skipping existing scrobble {new_scrobble}")
  76. continue
  77. logger.debug(f"Queued scrobble {new_scrobble} for creation")
  78. new_scrobbles.append(new_scrobble)
  79. created = Scrobble.objects.bulk_create(new_scrobbles)
  80. logger.info(
  81. f"Created {len(created)} scrobbles",
  82. extra={"created_scrobbles": created},
  83. )
  84. return created
  85. def get_last_scrobbles(self, time_from=None, time_to=None):
  86. """Given a user, Last.fm api key, and secret key, grab a list of scrobbled
  87. tracks"""
  88. lfm_params = {}
  89. scrobbles = []
  90. if time_from:
  91. lfm_params["time_from"] = int(time_from.timestamp())
  92. if time_to:
  93. lfm_params["time_to"] = int(time_to.timestamp())
  94. # if not time_from and not time_to:
  95. lfm_params["limit"] = None
  96. found_scrobbles = self.user.get_recent_tracks(**lfm_params)
  97. # TOOD spin this out into a celery task over certain threshold of found scrobbles?
  98. for scrobble in found_scrobbles:
  99. logger.debug(f"Processing {scrobble}")
  100. run_time = None
  101. mbid = None
  102. artist = None
  103. try:
  104. run_time = int(scrobble.track.get_duration() / 1000)
  105. mbid = scrobble.track.get_mbid()
  106. artist = scrobble.track.get_artist().name
  107. except pylast.MalformedResponseError as e:
  108. logger.warn(e)
  109. except pylast.WSError as e:
  110. logger.warn(
  111. "LastFM barfed trying to get the track for {scrobble.track}"
  112. )
  113. except pylast.NetworkError as e:
  114. logger.warn(
  115. "LastFM barfed trying to get the track for {scrobble.track}"
  116. )
  117. if not artist:
  118. logger.warn(f"Silly LastFM, no artist found for {scrobble}")
  119. continue
  120. timestamp = datetime.utcfromtimestamp(
  121. int(scrobble.timestamp)
  122. ).replace(tzinfo=pytz.utc)
  123. logger.info(f"{artist},{scrobble.track.title},{timestamp}")
  124. scrobbles.append(
  125. {
  126. "artist": artist,
  127. "album": scrobble.album,
  128. "title": scrobble.track.title,
  129. "mbid": mbid,
  130. "run_time_seconds": run_time,
  131. "timestamp": timestamp,
  132. }
  133. )
  134. return scrobbles