123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import logging
- from datetime import datetime, timedelta
- import pylast
- import pytz
- from django.conf import settings
- from music.models import Track
- logger = logging.getLogger(__name__)
- PYLAST_ERRORS = tuple(
- getattr(pylast, exc_name)
- for exc_name in (
- "ScrobblingError",
- "NetworkError",
- "MalformedResponseError",
- "WSError",
- )
- if hasattr(pylast, exc_name)
- )
- class LastFM:
- def __init__(self, user):
- try:
- self.client = pylast.LastFMNetwork(
- api_key=getattr(settings, "LASTFM_API_KEY"),
- api_secret=getattr(settings, "LASTFM_SECRET_KEY"),
- username=user.profile.lastfm_username,
- password_hash=pylast.md5(user.profile.lastfm_password),
- )
- self.user = self.client.get_user(user.profile.lastfm_username)
- self.vrobbler_user = user
- except PYLAST_ERRORS as e:
- logger.error(f"Error during Last.fm setup: {e}")
- def import_from_lastfm(self, last_processed=None):
- """Given a last processed time, import all scrobbles from LastFM since then"""
- from scrobbles.models import Scrobble
- new_scrobbles = []
- source = "Last.fm"
- lastfm_scrobbles = self.get_last_scrobbles(time_from=last_processed)
- for lfm_scrobble in lastfm_scrobbles:
- track = Track.find_or_create(
- title=lfm_scrobble.get("title"),
- artist_name=lfm_scrobble.get("artist"),
- album_name=lfm_scrobble.get("album"),
- )
- timezone = settings.TIME_ZONE
- if self.vrobbler_user.profile:
- timezone = self.vrobbler_user.profile.timezone
- timestamp = lfm_scrobble.get("timestamp")
- new_scrobble = Scrobble(
- user=self.vrobbler_user,
- timestamp=timestamp,
- source=source,
- track=track,
- timezone=timezone,
- played_to_completion=True,
- in_progress=False,
- media_type=Scrobble.MediaType.TRACK,
- )
- # Vrobbler scrobbles on finish, LastFM scrobbles on start
- seconds_eariler = timestamp - timedelta(seconds=20)
- seconds_later = timestamp + timedelta(seconds=20)
- existing = Scrobble.objects.filter(
- created__gte=seconds_eariler,
- created__lte=seconds_later,
- track=track,
- ).first()
- if existing:
- logger.debug(f"Skipping existing scrobble {new_scrobble}")
- continue
- logger.debug(f"Queued scrobble {new_scrobble} for creation")
- new_scrobbles.append(new_scrobble)
- created = Scrobble.objects.bulk_create(new_scrobbles)
- # TODO Add a notification for users that their import is complete
- logger.info(
- f"Last.fm import fnished",
- extra={
- "scrobbles_created": len(created),
- "user_id": self.vrobbler_user,
- "lastfm_user": self.user,
- },
- )
- return created
- def get_last_scrobbles(self, time_from=None, time_to=None):
- """Given a user, Last.fm api key, and secret key, grab a list of scrobbled
- tracks"""
- lfm_params = {}
- scrobbles = []
- if time_from:
- lfm_params["time_from"] = int(time_from.timestamp())
- if time_to:
- lfm_params["time_to"] = int(time_to.timestamp())
- # if not time_from and not time_to:
- lfm_params["limit"] = None
- found_scrobbles = self.user.get_recent_tracks(**lfm_params)
- # TOOD spin this out into a celery task over certain threshold of found scrobbles?
- for scrobble in found_scrobbles:
- logger.debug(f"Processing {scrobble}")
- run_time = None
- mbid = None
- artist = None
- log_dict = {"scrobble": scrobble}
- try:
- run_time = int(scrobble.track.get_duration() / 1000)
- mbid = scrobble.track.get_mbid()
- artist = scrobble.track.get_artist().name
- log_dict["artist"] = artist
- log_dict["mbid"] = mbid
- log_dict["run_time"] = run_time
- except pylast.MalformedResponseError as e:
- logger.warning(e)
- except pylast.WSError as e:
- logger.info(
- "LastFM barfed trying to get the track for {scrobble.track}",
- extra=log_dict,
- )
- except pylast.NetworkError as e:
- logger.info(
- "LastFM barfed trying to get the track for {scrobble.track}",
- extra=log_dict,
- )
- if not artist:
- logger.info(
- f"Silly LastFM, no artist found for scrobble",
- extra=log_dict,
- )
- continue
- # TODO figure out if this will actually work
- # timestamp = datetime.fromtimestamp(int(scrobble.timestamp), UTC)
- timestamp = datetime.utcfromtimestamp(
- int(scrobble.timestamp)
- ).replace(tzinfo=pytz.utc)
- logger.info(
- f"Scrobble appended to list for bulk create", extra=log_dict
- )
- scrobbles.append(
- {
- "artist": artist,
- "album": scrobble.album,
- "title": scrobble.track.title,
- "mbid": mbid,
- "run_time_seconds": run_time,
- "timestamp": timestamp,
- }
- )
- return scrobbles
|