utils.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import logging
  2. import re
  3. from django.db import IntegrityError, models, transaction
  4. from music.constants import VARIOUS_ARTIST_DICT
  5. logger = logging.getLogger(__name__)
  6. def clean_artist_name(name: str) -> str:
  7. """Remove featured names from artist string."""
  8. if " feat. " in name.lower():
  9. name = re.split(" feat. ", name, flags=re.IGNORECASE)[0].strip()
  10. if " w. " in name.lower():
  11. name = re.split(" w. ", name, flags=re.IGNORECASE)[0].strip()
  12. if " featuring " in name.lower():
  13. name = re.split(" featuring ", name, flags=re.IGNORECASE)[0].strip()
  14. # if " & " in name.lower() and "of the wand" not in name.lower():
  15. # name = re.split("&", name, flags=re.IGNORECASE)[0].strip()
  16. return name
  17. def get_or_create_various_artists() -> "Artist":
  18. from music.models import Artist
  19. artist = Artist.objects.filter(name="Various Artists").first()
  20. if not artist:
  21. artist = Artist.objects.create(**VARIOUS_ARTIST_DICT)
  22. logger.info("Created Various Artists placeholder")
  23. return artist
  24. def deduplicate_tracks(commit=False) -> int:
  25. from music.models import Track
  26. duplicates = (
  27. Track.objects.values("artist", "title")
  28. .annotate(dup_count=models.Count("id"))
  29. .filter(dup_count__gt=1)
  30. )
  31. query = models.Q()
  32. for dup in duplicates:
  33. query |= models.Q(artist=dup["artist"], title=dup["title"])
  34. duplicate_tracks = Track.objects.filter(query)
  35. for b in duplicate_tracks:
  36. tracks = Track.objects.filter(artist=b.artist, title=b.title)
  37. first = tracks.first()
  38. for other in tracks.exclude(id=first.id):
  39. print("Moving scrobbles for", other.id, " to ", first.id)
  40. if commit:
  41. with transaction.atomic():
  42. other.scrobble_set.update(track=first)
  43. print("deleting ", other.id, " - ", other)
  44. try:
  45. other.delete()
  46. except IntegrityError as e:
  47. print(
  48. "could not delete ",
  49. other.id,
  50. f": IntegrityError {e}",
  51. )
  52. return len(duplicate_tracks)
  53. def condense_albums(commit: bool = False):
  54. from music.models import Track
  55. from scrobbles.models import Scrobble
  56. processed_ids = []
  57. for track in Track.objects.all():
  58. albums_to_add = []
  59. duplicates = (
  60. Track.objects.filter(title=track.title, artist=track.artist)
  61. .exclude(id=track.id)
  62. .exclude(id__in=processed_ids)
  63. )
  64. if commit and track.album:
  65. albums_to_add.append(track.album)
  66. for dup_track in duplicates:
  67. logger.info(f"Adding {dup_track.album} to {track} albums")
  68. if commit and dup_track.album:
  69. track.albums.add(dup_track.album)
  70. # Find out if this track appears more than once
  71. duplicates = Track.objects.filter(
  72. title=track.title, artist=track.artist
  73. )
  74. if duplicates.count() > 1:
  75. logger.info(f"Track appears more than once, condensing: {track}")
  76. albums_to_add.extend([d.album for d in duplicates])
  77. # Find all scrobbles
  78. duplicate_ids = duplicates.values_list("id", flat=True)
  79. scrobbles = Scrobble.objects.filter(track_id__in=duplicate_ids)
  80. logger.info(
  81. f"Found {scrobbles.count()} scrobbles to merge onto {track}"
  82. )
  83. if commit:
  84. scrobbles.update(track=track)
  85. track.albums.add(*list(set(albums_to_add)))
  86. processed_ids.extend(duplicate_ids)
  87. if commit:
  88. Track.objects.filter(scrobble__isnull=True).delete()
  89. return len(set(processed_ids))