Преглед изворни кода

[music] Reorganize importer and fix lookups

Colin Powell пре 2 месеци
родитељ
комит
edf9fbd9c1

+ 5 - 5
vrobbler/apps/music/admin.py

@@ -50,17 +50,17 @@ class TrackAdmin(admin.ModelAdmin):
     date_hierarchy = "created"
     list_display = (
         "title",
-        "album",
+        "primary_album",
         "artist",
         "musicbrainz_id",
     )
-    raw_id_fields = (
-        "album",
-        "artist",
-    )
+    raw_id_fields = ("artist", "albums", "album")
     list_filter = ("album", "artist")
     search_fields = ("title",)
     ordering = ("-created",)
+    filter_horizontal = [
+        "albums",
+    ]
     inlines = [
         ScrobbleInline,
     ]

+ 0 - 162
vrobbler/apps/music/lastfm.py

@@ -1,162 +0,0 @@
-import logging
-from datetime import datetime, timedelta
-
-import pylast
-import pytz
-from django.conf import settings
-from music.models import Track
-
-logger = logging.getLogger(__name__)
-
-PYLAST_ERRORS = tuple(
-    getattr(pylast, exc_name)
-    for exc_name in (
-        "ScrobblingError",
-        "NetworkError",
-        "MalformedResponseError",
-        "WSError",
-    )
-    if hasattr(pylast, exc_name)
-)
-
-
-class LastFM:
-    def __init__(self, user):
-        try:
-            self.client = pylast.LastFMNetwork(
-                api_key=getattr(settings, "LASTFM_API_KEY"),
-                api_secret=getattr(settings, "LASTFM_SECRET_KEY"),
-                username=user.profile.lastfm_username,
-                password_hash=pylast.md5(user.profile.lastfm_password),
-            )
-            self.user = self.client.get_user(user.profile.lastfm_username)
-            self.vrobbler_user = user
-        except PYLAST_ERRORS as e:
-            logger.error(f"Error during Last.fm setup: {e}")
-
-    def import_from_lastfm(self, last_processed=None):
-        """Given a last processed time, import all scrobbles from LastFM since then"""
-        from scrobbles.models import Scrobble
-
-        new_scrobbles = []
-        source = "Last.fm"
-        lastfm_scrobbles = self.get_last_scrobbles(time_from=last_processed)
-
-        for lfm_scrobble in lastfm_scrobbles:
-            track = Track.find_or_create(
-                title=lfm_scrobble.get("title"),
-                artist_name=lfm_scrobble.get("artist"),
-                album_name=lfm_scrobble.get("album"),
-            )
-
-            timezone = settings.TIME_ZONE
-            if self.vrobbler_user.profile:
-                timezone = self.vrobbler_user.profile.timezone
-
-            timestamp = lfm_scrobble.get("timestamp")
-            new_scrobble = Scrobble(
-                user=self.vrobbler_user,
-                timestamp=timestamp,
-                source=source,
-                track=track,
-                timezone=timezone,
-                played_to_completion=True,
-                in_progress=False,
-                media_type=Scrobble.MediaType.TRACK,
-            )
-            # Vrobbler scrobbles on finish, LastFM scrobbles on start
-            seconds_eariler = timestamp - timedelta(seconds=20)
-            seconds_later = timestamp + timedelta(seconds=20)
-            existing = Scrobble.objects.filter(
-                created__gte=seconds_eariler,
-                created__lte=seconds_later,
-                track=track,
-            ).first()
-            if existing:
-                logger.debug(f"Skipping existing scrobble {new_scrobble}")
-                continue
-            logger.debug(f"Queued scrobble {new_scrobble} for creation")
-            new_scrobbles.append(new_scrobble)
-
-        created = Scrobble.objects.bulk_create(new_scrobbles)
-        # TODO Add a notification for users that their import is complete
-        logger.info(
-            f"Last.fm import fnished",
-            extra={
-                "scrobbles_created": len(created),
-                "user_id": self.vrobbler_user,
-                "lastfm_user": self.user,
-            },
-        )
-        return created
-
-    def get_last_scrobbles(self, time_from=None, time_to=None):
-        """Given a user, Last.fm api key, and secret key, grab a list of scrobbled
-        tracks"""
-        lfm_params = {}
-        scrobbles = []
-        if time_from:
-            lfm_params["time_from"] = int(time_from.timestamp())
-        if time_to:
-            lfm_params["time_to"] = int(time_to.timestamp())
-
-        # if not time_from and not time_to:
-        lfm_params["limit"] = None
-
-        found_scrobbles = self.user.get_recent_tracks(**lfm_params)
-        # TOOD spin this out into a celery task over certain threshold of found scrobbles?
-
-        for scrobble in found_scrobbles:
-            logger.debug(f"Processing {scrobble}")
-            run_time = None
-            mbid = None
-            artist = None
-
-            log_dict = {"scrobble": scrobble}
-            try:
-                run_time = int(scrobble.track.get_duration() / 1000)
-                mbid = scrobble.track.get_mbid()
-                artist = scrobble.track.get_artist().name
-                log_dict["artist"] = artist
-                log_dict["mbid"] = mbid
-                log_dict["run_time"] = run_time
-            except pylast.MalformedResponseError as e:
-                logger.warning(e)
-            except pylast.WSError as e:
-                logger.info(
-                    "LastFM barfed trying to get the track for {scrobble.track}",
-                    extra=log_dict,
-                )
-            except pylast.NetworkError as e:
-                logger.info(
-                    "LastFM barfed trying to get the track for {scrobble.track}",
-                    extra=log_dict,
-                )
-
-            if not artist:
-                logger.info(
-                    f"Silly LastFM, no artist found for scrobble",
-                    extra=log_dict,
-                )
-                continue
-
-            # TODO figure out if this will actually work
-            # timestamp = datetime.fromtimestamp(int(scrobble.timestamp), UTC)
-            timestamp = datetime.utcfromtimestamp(
-                int(scrobble.timestamp)
-            ).replace(tzinfo=pytz.utc)
-
-            logger.info(
-                f"Scrobble appended to list for bulk create", extra=log_dict
-            )
-            scrobbles.append(
-                {
-                    "artist": artist,
-                    "album": scrobble.album,
-                    "title": scrobble.track.title,
-                    "mbid": mbid,
-                    "run_time_seconds": run_time,
-                    "timestamp": timestamp,
-                }
-            )
-        return scrobbles

+ 188 - 145
vrobbler/apps/music/models.py

@@ -15,6 +15,11 @@ from imagekit.processors import ResizeToFit
 from music.allmusic import get_allmusic_slug, scrape_data_from_allmusic
 from music.bandcamp import get_bandcamp_slug
 from music.musicbrainz import (
+    get_album_metadata,
+    get_album_metadata_with_artist,
+    get_artist_metadata_extended,
+    get_recording_mbid_exact,
+    get_track_metadata_with_artist,
     lookup_album_dict_from_mb,
     lookup_album_from_mb,
     lookup_track_from_mb,
@@ -177,56 +182,75 @@ class Artist(TimeStampedModel):
 
     @classmethod
     def find_or_create(
-        cls, name: str = "", musicbrainz_id: str = ""
+        cls, name: str, album_name: str = "", track_name: str = ""
     ) -> "Artist":
-        keys = {}
-        if name:
-            name = clean_artist_name(name)
-            keys["name"] = name
-
-        if musicbrainz_id:
-            keys["musicbrainz_id"] = musicbrainz_id
-
-        if not keys:
-            raise Exception("Must have name, mb_id or both to lookup artist")
+        """The biggest challenge to finding artists is that the search often
+        fails miserably unless you can look it up along with an album or a track name.
 
-        artist = cls.objects.filter(**keys).first()
-
-        if not artist:
-            artist = cls.objects.filter(
-                models.Q(name=name) | models.Q(alt_names__icontains=name)
-            ).first()
-
-        # Does not exist, look it up from Musicbrainz
-        if not artist:
-            alt_name = None
-            try:
-                artist_dict = lookup_artist_from_mb(name)
-                musicbrainz_id = musicbrainz_id or artist_dict.get("id", "")
-                if name != artist_dict.get("name", ""):
-                    alt_name = name
-                    name = artist_dict.get("name", "")
-            except ValueError:
-                pass
-
-            if musicbrainz_id:
-                artist = cls.objects.filter(
-                    musicbrainz_id=musicbrainz_id
-                ).first()
-                if artist and alt_name:
-                    if not artist.alt_names:
-                        artist.alt_names = alt_name
-                    else:
-                        artist.alt_names += f"\\{alt_name}"
-                    artist.save(update_fields=["alt_names"])
+        Thus, when we find or create an artist, we should always provide an optional
+        album name or track name, but probably not both."""
+        if album_name:
+            logger.info(
+                f"Looking for artist with name {name} and album {album_name}"
+            )
+        if track_name:
+            logger.info(
+                f"Looking for artist with name {name} and track {track_name}"
+            )
+        keys = {}
 
+        name = clean_artist_name(name)
+        keys["name"] = name
+        artist = cls.objects.filter(name=name).first()
+
+        if artist:
+            return artist
+
+        # alt_name = None
+        artist_dict = {}
+        if album_name:
+            album_dict = get_album_metadata_with_artist(album_name, name)
+            if album_dict:
+                artist_dict = album_dict.get("primary_artist")
+        if track_name:
+            track_dict = get_track_metadata_with_artist(track_name, name)
+            if track_dict:
+                artist_dict = track_dict.get("primary_artist")
+
+        if not artist_dict:
+            artist, created = cls.objects.get_or_create(name=name)
+            if created:
+                artist.fix_metadata()
+            return artist
+
+        musicbrainz_id = artist_dict.get("mbid")
+        found_name = artist_dict.get("name", name)
+        if found_name and name != found_name:
+            alt_name = found_name
+
+        artist = cls.objects.filter(
+            name=name, musicbrainz_id=musicbrainz_id
+        ).first()
         if not artist:
             artist = cls.objects.create(
-                name=name, musicbrainz_id=musicbrainz_id, alt_names=alt_name
+                name=found_name,
+                musicbrainz_id=musicbrainz_id,
             )
-            # TODO maybe this should be spun off into an async task?
             artist.fix_metadata()
 
+        # TODO: See if this alt_names stuff actually works or causes hard to debug problems
+        # If we did find our artist, but the found name is slightly differnt, record that
+        # if artist and alt_name:
+        #    if not artist.alt_names:
+        #        artist.alt_names = alt_name
+        #    else:
+        #        artist.alt_names += f"\\{alt_name}"
+        #    logger.info(
+        #        f"Add alt_name {alt_name} to artist {artist}",
+        #        extra={"alt_name": alt_name, "artist_id": artist.id},
+        #    )
+        #    artist.save(update_fields=["alt_names"])
+
         return artist
 
 
@@ -319,7 +343,7 @@ class Album(TimeStampedModel):
             )
             return
 
-        if not self.allmusic_id or force:
+        if self.album_artist and (not self.allmusic_id or force):
             slug = get_allmusic_slug(self.album_artist.name, self.name)
             if not slug:
                 logger.info(
@@ -350,7 +374,12 @@ class Album(TimeStampedModel):
             logger.info(f"No data for {self} found in TheAudioDB")
             return
 
-        Album.objects.filter(pk=self.pk).update(**album_data)
+        try:
+            Album.objects.filter(pk=self.pk).update(**album_data)
+        except:
+            logger.info(
+                f"Could not save info for album {self} with data {album_data}"
+            )
 
     def scrape_bandcamp(self, force=False) -> None:
         if not self.bandcamp_id or force:
@@ -489,65 +518,75 @@ class Album(TimeStampedModel):
         return f"https://bandcamp.com/search?q={album} {artist}&item_type=a"
 
     @classmethod
-    def find_or_create(
-        cls, name: str, artist_name: str, musicbrainz_id: str = ""
-    ) -> "Album":
-        if not name or not artist_name:
-            raise Exception(
-                "Must have at least name and artist name to lookup album"
-            )
+    def find_or_create(cls, name: str, artist_name: str) -> "Album":
+        logger.info(
+            f"Looking for album with name {name} and artist_name {artist_name}"
+        )
+        artist = Artist.find_or_create(artist_name, album_name=name)
+        album_dict = get_album_metadata_with_artist(name, artist.name)
 
-        album = None
-        if musicbrainz_id:
-            album = cls.objects.filter(
-                musicbrainz_id=musicbrainz_id,
+        if not album_dict:
+            logger.info(
+                f"Could not find album {name} with artist {artist.name} on musicbrainz"
+            )
+            album, created = Album.objects.get_or_create(
                 name=name,
-                album_artist__name=artist_name,
-            ).first()
-        if not album and musicbrainz_id:
-            album = cls.objects.filter(
-                musicbrainz_id=musicbrainz_id,
-            ).first()
-        if not album:
-            album = cls.objects.filter(
-                models.Q(name=name) | models.Q(alt_names__icontains=name),
-                album_artist__name=artist_name,
-            ).first()
+            )
+            if created:
+                # album.fix_metadata()
+                # album.fetch_artwork()
+                ...
+            return album
 
-        if not album:
-            alt_name = None
-            try:
-                album_dict = lookup_album_dict_from_mb(
-                    name, artist_name=artist_name
-                )
-                musicbrainz_id = musicbrainz_id or album_dict.get("mb_id", "")
-                found_name = album_dict.get("title", "")
-                if found_name and name != found_name:
-                    alt_name = name
-                    name = found_name
-            except ValueError:
-                pass
-
-            if musicbrainz_id:
-                album = cls.objects.filter(
-                    musicbrainz_id=musicbrainz_id
+        if not artist:
+            artist_dict = album_dict.get("primary_artist", {})
+            if artist_dict:
+                artist = Artist.objects.filter(
+                    musicbrainz_id=artist_dict.get("mbid"),
                 ).first()
-                if album and alt_name:
-                    if not album.alt_names:
-                        album.alt_names = alt_name
-                    else:
-                        album.alt_names += f"\\{alt_name}"
-                    album.save(update_fields=["alt_names"])
-            if not album:
-                artist = Artist.find_or_create(name=artist_name)
-                album = cls.objects.create(
-                    name=name,
-                    album_artist=artist,
-                    musicbrainz_id=musicbrainz_id,
-                    alt_names=alt_name,
-                )
-                # TODO maybe do this in a separate process?
-                album.fix_metadata()
+                if not artist:
+                    artist = Artist.objects.create(
+                        musicbrainz_id=artist_dict.get("mbid"),
+                    )
+
+        extra_artists = []
+        if not artist and len(album_dict.get("all_artists")) > 1:
+            artist = Artist.objects.filter(name="Various Artists").first()
+            extra_artists.append(artist)
+
+        if not artist:
+            raise Exception("No album artist found, and not a compliation")
+
+        album = cls.objects.filter(
+            models.Q(name=name) | models.Q(alt_names__icontains=name),
+            album_artist=artist,
+        ).first()
+
+        alt_name = None
+        found_name = album_dict.get("album_title", name)
+        if found_name and name != found_name:
+            alt_name = name
+
+        album = Album.objects.filter(
+            name=found_name, musicbrainz_id=album_dict.get("mbid")
+        ).first()
+
+        if not album:
+            year = None
+            if album_dict.get("release_date"):
+                year = album_dict.get("release_date", "").split("-")[0]
+            album = Album.objects.create(
+                name=found_name,
+                musicbrainz_id=album_dict.get("mbid"),
+                musicbrainz_releasegroup_id=album_dict.get(
+                    "release_group_mbid"
+                ),
+                year=year,
+                album_artist=artist,
+                alt_names=alt_name,
+            )
+            album.artists.add(*extra_artists)
+            album.fetch_artwork()
 
         return album
 
@@ -568,6 +607,8 @@ class Track(ScrobblableMixin):
 
     @property
     def primary_album(self):
+        if self.album:
+            return self.album
         return self.albums.order_by("year").first()
 
     def get_absolute_url(self):
@@ -594,8 +635,8 @@ class Track(ScrobblableMixin):
         url = ""
         if self.artist.thumbnail:
             url = self.artist.thumbnail_medium.url
-        if self.album and self.album.cover_image:
-            url = self.album.cover_image_medium.url
+        if self.primary_album and self.primary_album.cover_image:
+            url = self.primary_album.cover_image_medium.url
         return url
 
     @classmethod
@@ -603,9 +644,8 @@ class Track(ScrobblableMixin):
         cls,
         title: str = "",
         artist_name: str = "",
-        musicbrainz_id: str = "",
         album_name: str = "",
-        run_time_seconds: int = 900,
+        run_time_seconds: int | None = None,
         enrich: bool = False,
         commit: bool = True,
     ) -> "Track":
@@ -615,62 +655,65 @@ class Track(ScrobblableMixin):
         name
 
         Optionally, we can update any found artists with overwrite."""
-        created = False
-        if musicbrainz_id:
-            track = cls.objects.filter(musicbrainz_id=musicbrainz_id).first()
-            artist = track.artist
-            if not track and not (title and album_name):
-                raise Exception(
-                    "Cannot find track with musicbrainz_id and no track title or artist name provided."
-                )
+        album = None
+        if album_name:
+            logger.info("Looking up album for: {album_name}")
+            album = Album.find_or_create(
+                name=album_name, artist_name=artist_name
+            )
+            artist = album.album_artist
         else:
+            artist = Artist.find_or_create(artist_name, track_name=title)
+        if not artist:
             artist = Artist.find_or_create(artist_name)
-            track, created = cls.objects.get_or_create(
-                title=title, artist=artist
-            )
 
-        if not created:
+        lookup_keys = {"title": title, "artist": artist}
+        if run_time_seconds:
+            lookup_keys["run_time_seconds"] = run_time_seconds
+        logger.info(f"Looking up track using: {lookup_keys}")
+        track = cls.objects.filter(**lookup_keys).first()
+        if track:
             logger.info(
-                "Found exact match for track by name and artist",
+                "Found match for track by name and artist, not going to musicbrainz ",
                 extra={
+                    "track_id": track.id,
                     "title": title,
                     "artist_name": artist_name,
-                    "track_id": track.id,
+                    "run_time_seconds": run_time_seconds,
                 },
             )
+            return track
 
-            if track.album and album_name != track.album.name:
-                # TODO found track, but it's on a different album ... associations?
-                logger.info("Found track by artist, but album is different.")
-                album = Album.find_or_create()
-
-        if enrich:
-            album = None
-            if album_name:
-                album = Album.find_or_create(album_name)
-
-            if artist.musicbrainz_id:
-                track_dict = lookup_track_from_mb(title, artist.musicbrainz_id)
-                musicbrainz_id = musicbrainz_id or track_dict.get("id", "")
+        track = cls.objects.filter(title=title, artist=artist).first()
+        if not track:
+            track, _ = cls.objects.get_or_create(title=title, artist=artist)
 
-                found_title: bool = track_dict.get("name", False)
-                mismatched_title: bool = title != track_dict.get("name", "")
-                if found_title and mismatched_title:
-                    logger.warning(
-                        "Source track title and found title do not match",
-                        extra={"title": title, "track_dict": track_dict},
-                    )
+        if album:
+            track.albums.add(album)
 
-            if not run_time_seconds:
-                run_time_seconds = int(
-                    int(track_dict.get("length", 900000)) / 1000
+        if enrich or not track.run_time_seconds:
+            logger.info(
+                f"Enriching track {track}",
+                extra={
+                    "title": title,
+                    "artist_name": artist_name,
+                    "track_id": track.id,
+                },
+            )
+            try:
+                mbid, length = get_recording_mbid_exact(
+                    title, artist_name, album_name
                 )
-
-            track.album = album
-            track.artist = artist
-            track.run_time_seconds = run_time_seconds
+            except Exception:
+                print("No musicbrainz result found, cannot enrich")
+                return track
+            track.run_time_seconds = run_time_seconds or int(length / 1000)
+            track.musicbrainz_id = mbid
             if commit:
                 track.save()
-                # TODO Also set cover art and tags
 
         return track
+
+    def fix_metadata(self, force_update=False):
+
+        ...

+ 353 - 6
vrobbler/apps/music/musicbrainz.py

@@ -1,16 +1,17 @@
+from datetime import datetime
 import logging
-from typing import Iterable
 
 import musicbrainzngs
 from dateutil.parser import parse
 
 logger = logging.getLogger(__name__)
 
+musicbrainzngs.set_useragent("Vrobbler", "1.0", "help@unbl.ink")
+
 
 def lookup_album_from_mb(musicbrainz_id: str) -> dict:
     release_dict = {}
 
-    musicbrainzngs.set_useragent("vrobbler", "0.3.0")
     release_data = musicbrainzngs.get_release_by_id(
         musicbrainz_id,
         includes=["artists", "release-groups", "recordings"],
@@ -51,7 +52,6 @@ def lookup_album_from_mb(musicbrainz_id: str) -> dict:
 
 
 def lookup_album_dict_from_mb(release_name: str, artist_name: str) -> dict:
-    musicbrainzngs.set_useragent("vrobbler", "0.3.0")
 
     top_result = {}
 
@@ -84,7 +84,6 @@ def lookup_album_dict_from_mb(release_name: str, artist_name: str) -> dict:
 
 
 def lookup_artist_from_mb(artist_name: str) -> dict:
-    musicbrainzngs.set_useragent("vrobbler", "0.3.0")
 
     try:
         top_result = musicbrainzngs.search_artists(artist=artist_name)[
@@ -104,7 +103,7 @@ def lookup_artist_from_mb(artist_name: str) -> dict:
 
 
 def lookup_track_from_mb(
-    track_name: str, artist_mb_id: str, album_mb_id: str
+    track_name: str, artist_mb_id: str, album_mb_id: str = ""
 ) -> dict:
     logger.info(
         "[lookup_track_from_mb] called",
@@ -114,7 +113,6 @@ def lookup_track_from_mb(
             "album_mb_id": album_mb_id,
         },
     )
-    musicbrainzngs.set_useragent("vrobbler", "0.3.0")
 
     try:
         results = musicbrainzngs.search_recordings(
@@ -138,3 +136,352 @@ def lookup_track_from_mb(
         return {}
 
     return top_result
+
+
+def get_album_metadata(album_name, artist_name, strict=True) -> dict:
+    """
+    Get detailed metadata for an album from MusicBrainz.
+
+    :param album_name: Name of the album
+    :param artist_name: Name of the artist
+    :param strict: If True, only exact matches on album and artist (case-insensitive)
+    :return: dict with album metadata, or None if not found
+    """
+    try:
+        result = musicbrainzngs.search_releases(
+            release=album_name, artist=artist_name, limit=5
+        )
+
+        for release in result.get("release-list", []):
+            title = release["title"]
+            primary_artist = release["artist-credit"][0]["artist"]["name"]
+
+            title_match = title.lower() == album_name.lower()
+            artist_match = primary_artist.lower() == artist_name.lower()
+
+            if not strict or (title_match and artist_match):
+                all_artists = [
+                    ac["artist"]["name"]
+                    for ac in release["artist-credit"]
+                    if isinstance(ac, dict) and "artist" in ac
+                ]
+
+                return {
+                    "album_title": title,
+                    "primary_artist": primary_artist,
+                    "all_artists": all_artists,
+                    "mbid": release["id"],
+                    "release_date": release.get(
+                        "date"
+                    ),  # May be partial (e.g., just year)
+                    "release_group_mbid": release["release-group"]["id"],
+                }
+
+        return {}
+
+    except musicbrainzngs.WebServiceError as e:
+        print("MusicBrainz error:", e)
+        return {}
+
+
+def get_recording_mbid_exact(
+    track_title: str, artist_name: str, album_name: str
+) -> tuple[str, int]:
+    try:
+        result = musicbrainzngs.search_releases(
+            artist=artist_name, release=album_name, limit=1
+        )
+        releases = result.get("release-list", [])
+        if not releases:
+            raise Exception("No releases found")
+
+        release_id = releases[0]["id"]
+
+        release_data = musicbrainzngs.get_release_by_id(
+            release_id, includes=["recordings"]
+        )
+        tracks = release_data["release"]["medium-list"][0]["track-list"]
+
+        for track in tracks:
+            if track["recording"]["title"].lower() == track_title.lower():
+                return track["recording"]["id"], int(
+                    track["recording"]["length"]
+                )
+
+        raise Exception("No recording found")
+    except musicbrainzngs.WebServiceError as e:
+        print(f"MusicBrainz error: {e}")
+        raise Exception(e)
+
+
+def get_artist_metadata_extended(artist_name, strict=True):
+    """
+    Fetch artist metadata including MBID, name, origin, tags, and description.
+
+    :param artist_name: The artist's name
+    :param strict: If True, only return exact name match
+    :return: dict with metadata, or None if not found
+    """
+    try:
+        # Step 1: Search for artist
+        search_results = musicbrainzngs.search_artists(
+            artist=artist_name, limit=5
+        )
+        for artist in search_results.get("artist-list", []):
+            if not strict or artist["name"].lower() == artist_name.lower():
+                mbid = artist["id"]
+
+                # Step 2: Get detailed info about the artist
+                details = musicbrainzngs.get_artist_by_id(
+                    mbid, includes=["tags", "url-rels"]
+                )["artist"]
+
+                begin_date = details.get("life-span", {}).get("begin")
+                area = details.get("area", {}).get("name")
+                disambiguation = details.get("disambiguation")
+                tags = [t["name"] for t in details.get("tag-list", [])]
+
+                # Step 3: Try to find a Wikipedia or Wikidata link
+                description_url = None
+                for rel in details.get("url-relation-list", []):
+                    if rel["type"] == "wikipedia":
+                        description_url = rel["target"]
+                        break
+                    elif rel["type"] == "wikidata":
+                        description_url = rel["target"]
+
+                return {
+                    "mbid": mbid,
+                    "name": details["name"],
+                    "disambiguation": disambiguation,
+                    "begin_date": begin_date,
+                    "area": area,
+                    "tags": tags,
+                    "description_url": description_url,  # user can fetch summary if needed
+                }
+
+        return None
+    except musicbrainzngs.WebServiceError as e:
+        print("MusicBrainz error:", e)
+        return None
+
+
+def get_artist_metadata_brief(artist_id):
+    """Fetch basic artist metadata by MBID."""
+    try:
+        details = musicbrainzngs.get_artist_by_id(
+            artist_id, includes=["tags", "aliases", "url-rels"]
+        )["artist"]
+
+        begin_date = details.get("life-span", {}).get("begin")
+        area = details.get("area", {}).get("name")
+        disambiguation = details.get("disambiguation")
+        tags = [t["name"] for t in details.get("tag-list", [])]
+
+        description_url = None
+        for rel in details.get("url-relation-list", []):
+            if rel["type"] == "wikipedia":
+                description_url = rel["target"]
+                break
+            elif rel["type"] == "wikidata" and not description_url:
+                description_url = rel["target"]
+
+        return {
+            "mbid": artist_id,
+            "name": details["name"],
+            "disambiguation": disambiguation,
+            "begin_date": begin_date,
+            "area": area,
+            "tags": tags,
+            "description_url": description_url,
+        }
+
+    except musicbrainzngs.WebServiceError as e:
+        print("MusicBrainz error (artist lookup):", e)
+        return None
+
+
+def parse_date(date_str):
+    """Parse MusicBrainz date format into sortable datetime object."""
+    if not date_str:
+        return None
+    for fmt in ("%Y-%m-%d", "%Y-%m", "%Y"):
+        try:
+            return datetime.strptime(date_str, fmt)
+        except ValueError:
+            continue
+    return None
+
+
+def get_album_metadata_with_artist(album_name, artist_name, strict=True):
+    """
+    Get metadata for the earliest release of an album and its primary artist.
+
+    :param album_name: Album title
+    :param artist_name: Name of the artist
+    :param strict: If True, enforce exact match for album and artist
+    :return: dict with album and primary artist metadata
+    """
+    try:
+        result = musicbrainzngs.search_releases(
+            release=album_name, artist=artist_name, limit=100
+        )
+
+        query_album = album_name.strip().casefold()
+        query_artist = artist_name.strip().casefold()
+
+        valid_releases = []
+        for release in result.get("release-list", []):
+            release_title = release["title"].strip()
+            primary_artist = release["artist-credit"][0]["artist"]
+            artist_name_actual = primary_artist["name"].strip()
+
+            if strict:
+                if release_title.casefold() != query_album:
+                    continue
+                if artist_name_actual.casefold() != query_artist:
+                    continue
+
+            release_date = parse_date(release.get("date"))
+            valid_releases.append((release, release_date))
+
+        if not valid_releases:
+            return None
+
+        # Sort releases by earliest release date
+        valid_releases.sort(key=lambda x: x[1] or datetime.max)
+        release, _ = valid_releases[0]
+
+        primary_artist = release["artist-credit"][0]["artist"]
+        all_artists = [
+            ac["artist"]["name"]
+            for ac in release["artist-credit"]
+            if "artist" in ac
+        ]
+
+        artist_metadata = get_artist_metadata_brief(primary_artist["id"])
+
+        return {
+            "album_title": release["title"],
+            "primary_artist_name": primary_artist["name"],
+            "all_artists": all_artists,
+            "mbid": release["id"],
+            "release_group_mbid": release["release-group"]["id"],
+            "release_date": release.get("date"),
+            "primary_artist": artist_metadata,
+        }
+
+    except musicbrainzngs.WebServiceError as e:
+        print("MusicBrainz error (album lookup):", e)
+        return None
+
+
+def get_artist_metadata_brief(artist_id):
+    try:
+        details = musicbrainzngs.get_artist_by_id(
+            artist_id, includes=["tags", "aliases", "url-rels"]
+        )["artist"]
+
+        begin_date = details.get("life-span", {}).get("begin")
+        area = details.get("area", {}).get("name")
+        disambiguation = details.get("disambiguation")
+        tags = [t["name"] for t in details.get("tag-list", [])]
+
+        description_url = None
+        for rel in details.get("url-relation-list", []):
+            if rel["type"] == "wikipedia":
+                description_url = rel["target"]
+                break
+            elif rel["type"] == "wikidata" and not description_url:
+                description_url = rel["target"]
+
+        return {
+            "mbid": artist_id,
+            "name": details["name"],
+            "disambiguation": disambiguation,
+            "begin_date": begin_date,
+            "area": area,
+            "tags": tags,
+            "description_url": description_url,
+        }
+
+    except musicbrainzngs.WebServiceError as e:
+        print("MusicBrainz error (artist lookup):", e)
+        return None
+
+
+def get_track_metadata_with_artist(track_title, artist_name, strict=True):
+    """
+    Get metadata for the earliest-known recording of a track, including artist info.
+
+    :param track_title: Track title
+    :param artist_name: Artist name
+    :param strict: If True, match exactly (case-insensitive)
+    :return: dict with track + release + artist metadata
+    """
+    try:
+        result = musicbrainzngs.search_recordings(
+            recording=track_title, artist=artist_name, limit=100
+        )
+
+        query_track = track_title.strip().casefold()
+        query_artist = artist_name.strip().casefold()
+
+        valid_candidates = []
+
+        for recording in result.get("recording-list", []):
+            rec_title = recording["title"].strip()
+            artist_credit = recording["artist-credit"][0]["artist"]
+            artist_name_actual = artist_credit["name"].strip()
+
+            if strict:
+                if rec_title.casefold() != query_track:
+                    continue
+                if artist_name_actual.casefold() != query_artist:
+                    continue
+
+            if "release-list" not in recording:
+                continue
+
+            for release in recording["release-list"]:
+                release_date = parse_date(release.get("date"))
+                if release_date:
+                    valid_candidates.append(
+                        (recording["id"], release, release_date)
+                    )
+
+        if not valid_candidates:
+            return None
+
+        # Pick the earliest release
+        valid_candidates.sort(key=lambda x: x[2])
+        recording_id, release, _ = valid_candidates[0]
+
+        # Fetch full recording info
+        full_recording = musicbrainzngs.get_recording_by_id(
+            recording_id, includes=["artists", "releases"]
+        )["recording"]
+
+        primary_artist = full_recording["artist-credit"][0]["artist"]
+        all_artists = [
+            ac["artist"]["name"]
+            for ac in full_recording["artist-credit"]
+            if "artist" in ac
+        ]
+        artist_metadata = get_artist_metadata_brief(primary_artist["id"])
+
+        return {
+            "track_title": full_recording["title"],
+            "length_ms": full_recording.get("length"),
+            "recording_mbid": recording_id,
+            "release_title": release["title"],
+            "release_date": release.get("date"),
+            "release_group_mbid": release["release-group"]["id"],
+            "primary_artist_name": primary_artist["name"],
+            "all_artists": all_artists,
+            "primary_artist": artist_metadata,
+        }
+
+    except musicbrainzngs.WebServiceError as e:
+        print("MusicBrainz error (track lookup):", e)
+        return None

+ 6 - 4
vrobbler/apps/music/utils.py

@@ -9,12 +9,14 @@ logger = logging.getLogger(__name__)
 
 def clean_artist_name(name: str) -> str:
     """Remove featured names from artist string."""
-    if "feat." in name.lower():
+    if " feat. " in name.lower():
         name = re.split("feat.", name, flags=re.IGNORECASE)[0].strip()
-    if "featuring" in name.lower():
+    if " w. " in name.lower():
+        name = re.split("feat.", name, flags=re.IGNORECASE)[0].strip()
+    if " featuring " in name.lower():
         name = re.split("featuring", name, flags=re.IGNORECASE)[0].strip()
-    if "&" in name.lower():
-        name = re.split("&", name, flags=re.IGNORECASE)[0].strip()
+    # if " & " in name.lower() and "of the wand" not in name.lower():
+    #    name = re.split("&", name, flags=re.IGNORECASE)[0].strip()
 
     return name
 

+ 163 - 0
vrobbler/apps/scrobbles/importers/lastfm.py

@@ -0,0 +1,163 @@
+import logging
+from datetime import datetime, timedelta
+
+import pylast
+import pytz
+from django.conf import settings
+from music.models import Track
+
+logger = logging.getLogger(__name__)
+
+PYLAST_ERRORS = tuple(
+    getattr(pylast, exc_name)
+    for exc_name in (
+        "ScrobblingError",
+        "NetworkError",
+        "MalformedResponseError",
+        "WSError",
+    )
+    if hasattr(pylast, exc_name)
+)
+
+
+class LastFM:
+    def __init__(self, user):
+        try:
+            self.client = pylast.LastFMNetwork(
+                api_key=getattr(settings, "LASTFM_API_KEY"),
+                api_secret=getattr(settings, "LASTFM_SECRET_KEY"),
+                username=user.profile.lastfm_username,
+                password_hash=pylast.md5(user.profile.lastfm_password),
+            )
+            self.user = self.client.get_user(user.profile.lastfm_username)
+            self.vrobbler_user = user
+        except PYLAST_ERRORS as e:
+            logger.error(f"Error during Last.fm setup: {e}")
+
+    def import_from_lastfm(self, last_processed=None):
+        """Given a last processed time, import all scrobbles from LastFM since then"""
+        from scrobbles.models import Scrobble
+
+        new_scrobbles = []
+        source = "Last.fm"
+        lastfm_scrobbles = self.get_last_scrobbles(time_from=last_processed)
+
+        for lfm_scrobble in lastfm_scrobbles:
+            track = Track.find_or_create(
+                title=lfm_scrobble.get("title"),
+                artist_name=lfm_scrobble.get("artist"),
+                album_name=lfm_scrobble.get("album"),
+                enrich=True,
+            )
+
+            timezone = settings.TIME_ZONE
+            if self.vrobbler_user.profile:
+                timezone = self.vrobbler_user.profile.timezone
+
+            timestamp = lfm_scrobble.get("timestamp")
+            new_scrobble = Scrobble(
+                user=self.vrobbler_user,
+                timestamp=timestamp,
+                source=source,
+                track=track,
+                timezone=timezone,
+                played_to_completion=True,
+                in_progress=False,
+                media_type=Scrobble.MediaType.TRACK,
+            )
+            # Vrobbler scrobbles on finish, LastFM scrobbles on start
+            seconds_eariler = timestamp - timedelta(seconds=20)
+            seconds_later = timestamp + timedelta(seconds=20)
+            existing = Scrobble.objects.filter(
+                created__gte=seconds_eariler,
+                created__lte=seconds_later,
+                track=track,
+            ).first()
+            if existing:
+                logger.debug(f"Skipping existing scrobble {new_scrobble}")
+                continue
+            logger.debug(f"Queued scrobble {new_scrobble} for creation")
+            new_scrobbles.append(new_scrobble)
+
+        created = Scrobble.objects.bulk_create(new_scrobbles)
+        # TODO Add a notification for users that their import is complete
+        logger.info(
+            f"Last.fm import fnished",
+            extra={
+                "scrobbles_created": len(created),
+                "user_id": self.vrobbler_user,
+                "lastfm_user": self.user,
+            },
+        )
+        return created
+
+    def get_last_scrobbles(self, time_from=None, time_to=None):
+        """Given a user, Last.fm api key, and secret key, grab a list of scrobbled
+        tracks"""
+        lfm_params = {}
+        scrobbles = []
+        if time_from:
+            lfm_params["time_from"] = int(time_from.timestamp())
+        if time_to:
+            lfm_params["time_to"] = int(time_to.timestamp())
+
+        # if not time_from and not time_to:
+        lfm_params["limit"] = None
+
+        found_scrobbles = self.user.get_recent_tracks(**lfm_params)
+        # TOOD spin this out into a celery task over certain threshold of found scrobbles?
+
+        for scrobble in found_scrobbles:
+            logger.info(f"Processing {scrobble}")
+            run_time = None
+            mbid = None
+            artist = None
+
+            log_dict = {"scrobble": scrobble}
+            try:
+                run_time = int(scrobble.track.get_duration() / 1000)
+                mbid = scrobble.track.get_mbid()
+                artist = scrobble.track.get_artist().name
+                log_dict["artist"] = artist
+                log_dict["mbid"] = mbid
+                log_dict["run_time"] = run_time
+            except pylast.MalformedResponseError as e:
+                logger.warning(e)
+            except pylast.WSError as e:
+                logger.info(
+                    "LastFM barfed trying to get the track for {scrobble.track}",
+                    extra=log_dict,
+                )
+            except pylast.NetworkError as e:
+                logger.info(
+                    "LastFM barfed trying to get the track for {scrobble.track}",
+                    extra=log_dict,
+                )
+
+            if not artist:
+                logger.info(
+                    f"Silly LastFM, no artist found for scrobble",
+                    extra=log_dict,
+                )
+                continue
+
+            # TODO figure out if this will actually work
+            # timestamp = datetime.fromtimestamp(int(scrobble.timestamp), UTC)
+            timestamp = datetime.utcfromtimestamp(
+                int(scrobble.timestamp)
+            ).replace(tzinfo=pytz.utc)
+
+            logger.info(
+                f"Scrobble appended to list for bulk create", extra=log_dict
+            )
+            scrobbles.append(
+                {
+                    "artist": artist,
+                    "album": scrobble.album,
+                    "title": scrobble.track.title,
+                    "mbid": mbid,
+                    "run_time_seconds": run_time,
+                    "timestamp": timestamp,
+                }
+            )
+        return scrobbles

+ 17 - 13
vrobbler/apps/scrobbles/importers/tsv.py

@@ -1,23 +1,21 @@
 import codecs
 import csv
 import logging
+from datetime import datetime, timedelta
 
-import pytz
 import requests
+from django.contrib.auth import get_user_model
 from music.models import Track
 from scrobbles.constants import AsTsvColumn
 from scrobbles.models import Scrobble
 
-from scrobbles.utils import timestamp_user_tz_to_utc
-
 logger = logging.getLogger(__name__)
 
 
-def process_audioscrobbler_tsv_file(file_path, user_id, user_tz=None):
+def import_audioscrobbler_tsv_file(file_path, user_id):
     """Takes a path to a file of TSV data and imports it as past scrobbles"""
     new_scrobbles = []
-    if not user_tz:
-        user_tz = pytz.utc
+    user = get_user_model().objects.get(id=user_id)
 
     is_os_file = "https://" not in file_path
 
@@ -44,11 +42,13 @@ def process_audioscrobbler_tsv_file(file_path, user_id, user_tz=None):
             )
             continue
 
+        album_name = row[AsTsvColumn["ALBUM_NAME"].value]
         track = Track.find_or_create(
             title=row[AsTsvColumn["TRACK_NAME"].value],
-            musicbrainz_id=row[AsTsvColumn["MB_ID"].value],
             artist_name=row[AsTsvColumn["ARTIST_NAME"].value],
-            album_name=row[AsTsvColumn["ALBUM_NAME"].value],
+            album_name=album_name,
+            run_time_seconds=int(row[AsTsvColumn["RUN_TIME_SECONDS"].value]),
+            enrich=True,
         )
 
         # TODO Set all this up as constants
@@ -62,22 +62,26 @@ def process_audioscrobbler_tsv_file(file_path, user_id, user_tz=None):
             )
             continue
 
-        timestamp = timestamp_user_tz_to_utc(
-            int(row[AsTsvColumn["TIMESTAMP"].value]), user_tz
+        timestamp = user.profile.get_timestamp_with_tz(
+            datetime.fromtimestamp(int(row[AsTsvColumn["TIMESTAMP"].value]))
         )
+        stop_timestamp = timestamp + timedelta(seconds=track.run_time_seconds)
 
         new_scrobble = Scrobble(
-            user_id=user_id,
+            user=user,
             timestamp=timestamp,
+            stop_timestamp=stop_timestamp,
             source=source,
-            log={"rockbox_info": rockbox_info},
+            log={"rockbox_info": rockbox_info, "album_name": album_name},
+            playback_position_seconds=track.run_time_seconds,
             track=track,
             played_to_completion=True,
             in_progress=False,
             media_type=Scrobble.MediaType.TRACK,
+            timezone=timestamp.tzinfo.name,
         )
         existing = Scrobble.objects.filter(
-            timestamp=timestamp, track=track
+            timestamp=timestamp, track=track, user=user
         ).first()
         if existing:
             logger.debug(f"Skipping existing scrobble {new_scrobble}")

+ 9 - 11
vrobbler/apps/scrobbles/models.py

@@ -6,6 +6,7 @@ from collections import defaultdict
 from typing import Optional
 from uuid import uuid4
 from zoneinfo import ZoneInfo
+
 import pendulum
 import pytz
 from beers.models import Beer
@@ -26,7 +27,6 @@ from imagekit.processors import ResizeToFit
 from lifeevents.models import LifeEvent
 from locations.models import GeoLocation
 from moods.models import Mood
-from music.lastfm import LastFM
 from music.models import Artist, Track
 from podcasts.models import PodcastEpisode
 from profiles.utils import (
@@ -40,6 +40,7 @@ from profiles.utils import (
 from puzzles.models import Puzzle
 from scrobbles import dataclasses as logdata
 from scrobbles.constants import LONG_PLAY_MEDIA, MEDIA_END_PADDING_SECONDS
+from scrobbles.importers.lastfm import LastFM
 from scrobbles.notifications import NtfyNotification
 from scrobbles.stats import build_charts
 from scrobbles.utils import get_file_md5_hash, media_class_to_foreign_key
@@ -247,7 +248,7 @@ class AudioScrobblerTSVImport(BaseFileImportMixin):
     tsv_file = models.FileField(upload_to=get_path, **BNULL)
 
     def process(self, force=False):
-        from scrobbles.tsv import process_audioscrobbler_tsv_file
+        from scrobbles.importers.tsv import import_audioscrobbler_tsv_file
 
         if self.processed_finished and not force:
             logger.info(
@@ -257,13 +258,8 @@ class AudioScrobblerTSVImport(BaseFileImportMixin):
 
         self.mark_started()
 
-        tz = None
-        user_id = None
-        if self.user:
-            user_id = self.user.id
-            tz = self.user.profile.tzinfo
-        scrobbles = process_audioscrobbler_tsv_file(
-            self.upload_file_path, user_id, user_tz=tz
+        scrobbles = import_audioscrobbler_tsv_file(
+            self.upload_file_path, self.user.id
         )
         self.record_log(scrobbles)
         self.mark_finished()
@@ -758,8 +754,10 @@ class Scrobble(TimeStampedModel):
 
     @property
     def local_stop_timestamp(self):
-        if self.stop_tiemstamp:
-            return timezone.localtime(self.stop_timestamp, timezone=self.tzinfo)
+        if self.stop_timestamp:
+            return timezone.localtime(
+                self.stop_timestamp, timezone=self.tzinfo
+            )
 
     @property
     def scrobble_media_key(self) -> str:

+ 0 - 1
vrobbler/apps/scrobbles/scrobblers.py

@@ -132,7 +132,6 @@ def jellyfin_scrobble_media(
             run_time_seconds=convert_to_seconds(
                 post_data.get("RunTime", 900000)
             ),
-            musicbrainz_id=post_data.get("Provider_musicbrainztrack", ""),
         )
         # A hack because we don't worry about updating music ... we either finish it or we don't
         playback_position_seconds = 0

+ 3 - 2
vrobbler/apps/scrobbles/utils.py

@@ -1,8 +1,9 @@
 import hashlib
 import logging
 import re
-from datetime import datetime, timedelta, tzinfo
+from datetime import datetime, timedelta
 from urllib.parse import urlparse
+from zoneinfo import ZoneInfo
 
 import pytz
 from django.apps import apps
@@ -24,7 +25,7 @@ logger = logging.getLogger(__name__)
 User = get_user_model()
 
 
-def timestamp_user_tz_to_utc(timestamp: int, user_tz: tzinfo) -> datetime:
+def timestamp_user_tz_to_utc(timestamp: int, user_tz: ZoneInfo) -> datetime:
     return user_tz.localize(datetime.utcfromtimestamp(timestamp)).astimezone(
         pytz.utc
     )