tsv.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import csv
  2. import logging
  3. from datetime import datetime
  4. import pytz
  5. from scrobbles.models import Scrobble
  6. from music.utils import (
  7. get_or_create_album,
  8. get_or_create_artist,
  9. get_or_create_track,
  10. )
  11. logger = logging.getLogger(__name__)
  12. def process_audioscrobbler_tsv_file(file_path, user_id, user_tz=None):
  13. """Takes a path to a file of TSV data and imports it as past scrobbles"""
  14. new_scrobbles = []
  15. if not user_tz:
  16. user_tz = pytz.utc
  17. with open(file_path) as infile:
  18. source = 'Audioscrobbler File'
  19. rows = csv.reader(infile, delimiter="\t")
  20. source_id = ""
  21. for row_num, row in enumerate(rows):
  22. if row_num in [0, 1, 2]:
  23. if "Rockbox" in row[0]:
  24. source = "Rockbox"
  25. source_id += row[0] + "\n"
  26. continue
  27. if len(row) > 8:
  28. logger.warning(
  29. 'Improper row length during Audioscrobbler import',
  30. extra={'row': row},
  31. )
  32. continue
  33. artist = get_or_create_artist(row[0])
  34. album = get_or_create_album(row[1], artist)
  35. track = get_or_create_track(
  36. title=row[2],
  37. mbid=row[7],
  38. artist=artist,
  39. album=album,
  40. run_time=row[4],
  41. run_time_ticks=int(row[4]) * 1000,
  42. )
  43. timestamp = (
  44. datetime.utcfromtimestamp(int(row[6]))
  45. .replace(tzinfo=user_tz)
  46. .astimezone(pytz.utc)
  47. )
  48. new_scrobble = Scrobble(
  49. user_id=user_id,
  50. timestamp=timestamp,
  51. source=source,
  52. source_id=source_id,
  53. track=track,
  54. played_to_completion=True,
  55. in_progress=False,
  56. )
  57. existing = Scrobble.objects.filter(
  58. timestamp=timestamp, track=track
  59. ).first()
  60. if existing:
  61. logger.debug(f"Skipping existing scrobble {new_scrobble}")
  62. continue
  63. logger.debug(f"Queued scrobble {new_scrobble} for creation")
  64. new_scrobbles.append(new_scrobble)
  65. created = Scrobble.objects.bulk_create(new_scrobbles)
  66. logger.info(
  67. f"Created {len(created)} scrobbles",
  68. extra={'created_scrobbles': created},
  69. )
  70. return created