lastfm.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import logging
  2. import time
  3. from datetime import datetime, timedelta
  4. import pylast
  5. import pytz
  6. from django.conf import settings
  7. from music.utils import (
  8. get_or_create_album,
  9. get_or_create_artist,
  10. get_or_create_track,
  11. )
  12. logger = logging.getLogger(__name__)
  13. PYLAST_ERRORS = tuple(
  14. getattr(pylast, exc_name)
  15. for exc_name in (
  16. "ScrobblingError",
  17. "NetworkError",
  18. "MalformedResponseError",
  19. "WSError",
  20. )
  21. if hasattr(pylast, exc_name)
  22. )
  23. class LastFM:
  24. def __init__(self, user):
  25. try:
  26. self.client = pylast.LastFMNetwork(
  27. api_key=getattr(settings, "LASTFM_API_KEY"),
  28. api_secret=getattr(settings, "LASTFM_SECRET_KEY"),
  29. username=user.profile.lastfm_username,
  30. password_hash=pylast.md5(user.profile.lastfm_password),
  31. )
  32. self.user = self.client.get_user(user.profile.lastfm_username)
  33. self.vrobbler_user = user
  34. except PYLAST_ERRORS as e:
  35. logger.error(f"Error during Last.fm setup: {e}")
  36. def import_from_lastfm(self, last_processed=None):
  37. """Given a last processed time, import all scrobbles from LastFM since then"""
  38. from scrobbles.models import Scrobble
  39. new_scrobbles = []
  40. source = "Last.fm"
  41. source_id = ""
  42. latest_scrobbles = self.get_last_scrobbles(time_from=last_processed)
  43. for scrobble in latest_scrobbles:
  44. timestamp = scrobble.pop('timestamp')
  45. artist = get_or_create_artist(scrobble.pop('artist'))
  46. album = get_or_create_album(scrobble.pop('album'), artist)
  47. scrobble['artist'] = artist
  48. scrobble['album'] = album
  49. track = get_or_create_track(**scrobble)
  50. new_scrobble = Scrobble(
  51. user=self.vrobbler_user,
  52. timestamp=timestamp,
  53. source=source,
  54. source_id=source_id,
  55. track=track,
  56. played_to_completion=True,
  57. in_progress=False,
  58. )
  59. # Vrobbler scrobbles on finish, LastFM scrobbles on start
  60. ten_seconds_eariler = timestamp - timedelta(seconds=15)
  61. ten_seconds_later = timestamp + timedelta(seconds=15)
  62. existing = Scrobble.objects.filter(
  63. created__gte=ten_seconds_eariler,
  64. created__lte=ten_seconds_later,
  65. track=track,
  66. ).first()
  67. if existing:
  68. logger.debug(f"Skipping existing scrobble {new_scrobble}")
  69. continue
  70. logger.debug(f"Queued scrobble {new_scrobble} for creation")
  71. new_scrobbles.append(new_scrobble)
  72. created = Scrobble.objects.bulk_create(new_scrobbles)
  73. logger.info(
  74. f"Created {len(created)} scrobbles",
  75. extra={'created_scrobbles': created},
  76. )
  77. return created
  78. @staticmethod
  79. def undo_lastfm_import(process_log, dryrun=True):
  80. """Given a newline separated list of scrobbles, delete them"""
  81. from scrobbles.models import Scrobble
  82. if not process_log:
  83. logger.warning("No lines in process log found to undo")
  84. return
  85. for line in process_log.split('\n'):
  86. scrobble_id = line.split("\t")[0]
  87. scrobble = Scrobble.objects.filter(id=scrobble_id).first()
  88. if not scrobble:
  89. logger.warning(
  90. f"Could not find scrobble {scrobble_id} to undo"
  91. )
  92. continue
  93. logger.info(f"Removing scrobble {scrobble_id}")
  94. if not dryrun:
  95. scrobble.delete()
  96. def get_last_scrobbles(self, time_from=None, time_to=None):
  97. """Given a user, Last.fm api key, and secret key, grab a list of scrobbled
  98. tracks"""
  99. scrobbles = []
  100. if time_from:
  101. time_from = int(time_from.timestamp())
  102. if time_to:
  103. time_to = int(time_to.timestamp())
  104. if not time_from and not time_to:
  105. found_scrobbles = self.user.get_recent_tracks(limit=None)
  106. else:
  107. found_scrobbles = self.user.get_recent_tracks(
  108. time_from=time_from, time_to=time_to
  109. )
  110. for scrobble in found_scrobbles:
  111. run_time_ticks = scrobble.track.get_duration()
  112. run_time = run_time_ticks / 1000
  113. scrobbles.append(
  114. {
  115. "artist": scrobble.track.get_artist().name,
  116. "album": scrobble.album,
  117. "title": scrobble.track.title,
  118. "mbid": scrobble.track.get_mbid(),
  119. "run_time": int(run_time),
  120. "run_time_ticks": run_time_ticks,
  121. "timestamp": datetime.utcfromtimestamp(
  122. int(scrobble.timestamp)
  123. ).replace(tzinfo=pytz.utc),
  124. }
  125. )
  126. return scrobbles