tsv.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import csv
  2. import logging
  3. from datetime import datetime
  4. import pytz
  5. from scrobbles.models import Scrobble
  6. from music.utils import (
  7. get_or_create_album,
  8. get_or_create_artist,
  9. get_or_create_track,
  10. )
  11. logger = logging.getLogger(__name__)
  12. def process_audioscrobbler_tsv_file(file_path, user_id, user_tz=None):
  13. """Takes a path to a file of TSV data and imports it as past scrobbles"""
  14. new_scrobbles = []
  15. if not user_tz:
  16. user_tz = pytz.utc
  17. with open(file_path) as infile:
  18. source = "Audioscrobbler File"
  19. rows = csv.reader(infile, delimiter="\t")
  20. source_id = ""
  21. for row_num, row in enumerate(rows):
  22. if row_num in [0, 1, 2]:
  23. if "Rockbox" in row[0]:
  24. source = "Rockbox"
  25. source_id += row[0] + "\n"
  26. continue
  27. if len(row) > 8:
  28. logger.warning(
  29. "Improper row length during Audioscrobbler import",
  30. extra={"row": row},
  31. )
  32. continue
  33. artist = get_or_create_artist(row[0])
  34. album = get_or_create_album(row[1], artist)
  35. track = get_or_create_track(
  36. title=row[2],
  37. mbid=row[7],
  38. artist=artist,
  39. album=album,
  40. run_time_seconds=int(row[4]),
  41. )
  42. timestamp = (
  43. datetime.utcfromtimestamp(int(row[6]))
  44. .replace(tzinfo=user_tz)
  45. .astimezone(pytz.utc)
  46. )
  47. new_scrobble = Scrobble(
  48. user_id=user_id,
  49. timestamp=timestamp,
  50. source=source,
  51. source_id=source_id,
  52. track=track,
  53. played_to_completion=True,
  54. in_progress=False,
  55. )
  56. existing = Scrobble.objects.filter(
  57. timestamp=timestamp, track=track
  58. ).first()
  59. if existing:
  60. logger.debug(f"Skipping existing scrobble {new_scrobble}")
  61. continue
  62. logger.debug(f"Queued scrobble {new_scrobble} for creation")
  63. new_scrobbles.append(new_scrobble)
  64. created = Scrobble.objects.bulk_create(new_scrobbles)
  65. logger.info(
  66. f"Created {len(created)} scrobbles",
  67. extra={"created_scrobbles": created},
  68. )
  69. return created