lastfm.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import logging
  2. from datetime import datetime, timedelta
  3. import pylast
  4. import pytz
  5. from django.conf import settings
  6. from music.models import Track
  7. logger = logging.getLogger(__name__)
  8. PYLAST_ERRORS = tuple(
  9. getattr(pylast, exc_name)
  10. for exc_name in (
  11. "ScrobblingError",
  12. "NetworkError",
  13. "MalformedResponseError",
  14. "WSError",
  15. )
  16. if hasattr(pylast, exc_name)
  17. )
  18. class LastFM:
  19. def __init__(self, user):
  20. try:
  21. self.client = pylast.LastFMNetwork(
  22. api_key=getattr(settings, "LASTFM_API_KEY"),
  23. api_secret=getattr(settings, "LASTFM_SECRET_KEY"),
  24. username=user.profile.lastfm_username,
  25. password_hash=pylast.md5(user.profile.lastfm_password),
  26. )
  27. self.user = self.client.get_user(user.profile.lastfm_username)
  28. self.vrobbler_user = user
  29. except PYLAST_ERRORS as e:
  30. logger.error(f"Error during Last.fm setup: {e}")
  31. def import_from_lastfm(self, last_processed=None):
  32. """Given a last processed time, import all scrobbles from LastFM since then"""
  33. from scrobbles.models import Scrobble
  34. new_scrobbles = []
  35. source = "Last.fm"
  36. lastfm_scrobbles = self.get_last_scrobbles(time_from=last_processed)
  37. for lfm_scrobble in lastfm_scrobbles:
  38. track = Track.find_or_create(
  39. title=lfm_scrobble.get("title"),
  40. artist_name=lfm_scrobble.get("artist"),
  41. album_name=lfm_scrobble.get("album"),
  42. )
  43. timezone = settings.TIME_ZONE
  44. if self.vrobbler_user.profile:
  45. timezone = self.vrobbler_user.profile.timezone
  46. timestamp = lfm_scrobble.get("timestamp")
  47. new_scrobble = Scrobble(
  48. user=self.vrobbler_user,
  49. timestamp=timestamp,
  50. source=source,
  51. track=track,
  52. timezone=timezone,
  53. played_to_completion=True,
  54. in_progress=False,
  55. media_type=Scrobble.MediaType.TRACK,
  56. )
  57. # Vrobbler scrobbles on finish, LastFM scrobbles on start
  58. seconds_eariler = timestamp - timedelta(seconds=20)
  59. seconds_later = timestamp + timedelta(seconds=20)
  60. existing = Scrobble.objects.filter(
  61. created__gte=seconds_eariler,
  62. created__lte=seconds_later,
  63. track=track,
  64. ).first()
  65. if existing:
  66. logger.debug(f"Skipping existing scrobble {new_scrobble}")
  67. continue
  68. logger.debug(f"Queued scrobble {new_scrobble} for creation")
  69. new_scrobbles.append(new_scrobble)
  70. created = Scrobble.objects.bulk_create(new_scrobbles)
  71. # TODO Add a notification for users that their import is complete
  72. logger.info(
  73. f"Last.fm import fnished",
  74. extra={
  75. "scrobbles_created": len(created),
  76. "user_id": self.vrobbler_user,
  77. "lastfm_user": self.user,
  78. },
  79. )
  80. return created
  81. def get_last_scrobbles(self, time_from=None, time_to=None):
  82. """Given a user, Last.fm api key, and secret key, grab a list of scrobbled
  83. tracks"""
  84. lfm_params = {}
  85. scrobbles = []
  86. if time_from:
  87. lfm_params["time_from"] = int(time_from.timestamp())
  88. if time_to:
  89. lfm_params["time_to"] = int(time_to.timestamp())
  90. # if not time_from and not time_to:
  91. lfm_params["limit"] = None
  92. found_scrobbles = self.user.get_recent_tracks(**lfm_params)
  93. # TOOD spin this out into a celery task over certain threshold of found scrobbles?
  94. for scrobble in found_scrobbles:
  95. logger.debug(f"Processing {scrobble}")
  96. run_time = None
  97. mbid = None
  98. artist = None
  99. log_dict = {"scrobble": scrobble}
  100. try:
  101. run_time = int(scrobble.track.get_duration() / 1000)
  102. mbid = scrobble.track.get_mbid()
  103. artist = scrobble.track.get_artist().name
  104. log_dict["artist"] = artist
  105. log_dict["mbid"] = mbid
  106. log_dict["run_time"] = run_time
  107. except pylast.MalformedResponseError as e:
  108. logger.warning(e)
  109. except pylast.WSError as e:
  110. logger.info(
  111. "LastFM barfed trying to get the track for {scrobble.track}",
  112. extra=log_dict,
  113. )
  114. except pylast.NetworkError as e:
  115. logger.info(
  116. "LastFM barfed trying to get the track for {scrobble.track}",
  117. extra=log_dict,
  118. )
  119. if not artist:
  120. logger.info(
  121. f"Silly LastFM, no artist found for scrobble",
  122. extra=log_dict,
  123. )
  124. continue
  125. # TODO figure out if this will actually work
  126. # timestamp = datetime.fromtimestamp(int(scrobble.timestamp), UTC)
  127. timestamp = datetime.utcfromtimestamp(
  128. int(scrobble.timestamp)
  129. ).replace(tzinfo=pytz.utc)
  130. logger.info(
  131. f"Scrobble appended to list for bulk create", extra=log_dict
  132. )
  133. scrobbles.append(
  134. {
  135. "artist": artist,
  136. "album": scrobble.album,
  137. "title": scrobble.track.title,
  138. "mbid": mbid,
  139. "run_time_seconds": run_time,
  140. "timestamp": timestamp,
  141. }
  142. )
  143. return scrobbles