123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- import logging
- import re
- from django.db import IntegrityError, models, transaction
- from music.constants import VARIOUS_ARTIST_DICT
- logger = logging.getLogger(__name__)
- def clean_artist_name(name: str) -> str:
- """Remove featured names from artist string."""
- if " feat. " in name.lower():
- name = re.split(" feat. ", name, flags=re.IGNORECASE)[0].strip()
- if " w. " in name.lower():
- name = re.split(" w. ", name, flags=re.IGNORECASE)[0].strip()
- if " featuring " in name.lower():
- name = re.split(" featuring ", name, flags=re.IGNORECASE)[0].strip()
- # if " & " in name.lower() and "of the wand" not in name.lower():
- # name = re.split("&", name, flags=re.IGNORECASE)[0].strip()
- return name
- def get_or_create_various_artists() -> "Artist":
- from music.models import Artist
- artist = Artist.objects.filter(name="Various Artists").first()
- if not artist:
- artist = Artist.objects.create(**VARIOUS_ARTIST_DICT)
- logger.info("Created Various Artists placeholder")
- return artist
- def deduplicate_tracks(commit=False) -> int:
- from music.models import Track
- duplicates = (
- Track.objects.values("artist", "title")
- .annotate(dup_count=models.Count("id"))
- .filter(dup_count__gt=1)
- )
- query = models.Q()
- for dup in duplicates:
- query |= models.Q(artist=dup["artist"], title=dup["title"])
- duplicate_tracks = Track.objects.filter(query)
- for b in duplicate_tracks:
- tracks = Track.objects.filter(artist=b.artist, title=b.title)
- first = tracks.first()
- for other in tracks.exclude(id=first.id):
- print("Moving scrobbles for", other.id, " to ", first.id)
- if commit:
- with transaction.atomic():
- other.scrobble_set.update(track=first)
- print("deleting ", other.id, " - ", other)
- try:
- other.delete()
- except IntegrityError as e:
- print(
- "could not delete ",
- other.id,
- f": IntegrityError {e}",
- )
- return len(duplicate_tracks)
- def condense_albums(commit: bool = False):
- from music.models import Track
- from scrobbles.models import Scrobble
- processed_ids = []
- for track in Track.objects.all():
- albums_to_add = []
- duplicates = (
- Track.objects.filter(title=track.title, artist=track.artist)
- .exclude(id=track.id)
- .exclude(id__in=processed_ids)
- )
- if commit and track.album:
- albums_to_add.append(track.album)
- for dup_track in duplicates:
- logger.info(f"Adding {dup_track.album} to {track} albums")
- if commit and dup_track.album:
- track.albums.add(dup_track.album)
- # Find out if this track appears more than once
- duplicates = Track.objects.filter(
- title=track.title, artist=track.artist
- )
- if duplicates.count() > 1:
- logger.info(f"Track appears more than once, condensing: {track}")
- albums_to_add.extend([d.album for d in duplicates])
- # Find all scrobbles
- duplicate_ids = duplicates.values_list("id", flat=True)
- scrobbles = Scrobble.objects.filter(track_id__in=duplicate_ids)
- logger.info(
- f"Found {scrobbles.count()} scrobbles to merge onto {track}"
- )
- if commit:
- scrobbles.update(track=track)
- track.albums.add(*list(set(albums_to_add)))
- processed_ids.extend(duplicate_ids)
- if commit:
- Track.objects.filter(scrobble__isnull=True).delete()
- return len(set(processed_ids))
|