retroarch.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import json
  2. import logging
  3. import os
  4. from datetime import datetime, timedelta
  5. from typing import List
  6. import pytz
  7. from dateutil.parser import ParserError, parse
  8. from django.apps import apps
  9. from django.conf import settings
  10. from django.contrib.auth import get_user_model
  11. from scrobbles.utils import convert_to_seconds
  12. from videogames.models import VideoGame
  13. from videogames.scrapers import scrape_game_name_from_adb
  14. from videogames.utils import get_or_create_videogame
  15. from vrobbler.apps.scrobbles.exceptions import UserNotFound
  16. from vrobbler.apps.videogames.exceptions import GameNotFound
  17. logger = logging.getLogger(__name__)
  18. User = get_user_model()
  19. def load_game_data(directory_path: str, user_tz=None) -> dict:
  20. """Given a path to a directory, cycle through each found lrtl file and
  21. generate game data.
  22. Example json file as follows:
  23. Name: "Sonic The Hedgehog 2 (World).lrtl"
  24. Contents:
  25. {
  26. "version": "1.0",
  27. "runtime": "0:20:19",
  28. "last_played": "2023-05-23 15:30:15"
  29. }
  30. """
  31. directory = os.fsencode(directory_path)
  32. games = {}
  33. if not user_tz:
  34. user_tz = settings.TIME_ZONE
  35. for file in os.listdir(directory):
  36. filename = os.fsdecode(file)
  37. if not filename.endswith("lrtl"):
  38. logger.info(f'Skipping "{filename}", not lrtl file')
  39. continue
  40. game_name = filename.split(".lrtl")[0].split(" (")[0]
  41. with open("".join([directory_path, filename])) as f:
  42. try:
  43. games[game_name] = json.load(f)
  44. except json.JSONDecodeError:
  45. logger.warn(
  46. f"Could not decode JSOn for {game_name} and file {filename}"
  47. )
  48. # Convert runtime to seconds
  49. games[game_name]["runtime"] = convert_to_seconds(
  50. games[game_name]["runtime"]
  51. )
  52. # Convert last_played to datetime in user timezone
  53. games[game_name]["last_played"] = parse(
  54. games[game_name]["last_played"]
  55. ).replace(tzinfo=user_tz)
  56. return games
  57. def import_retroarch_lrtl_files(playlog_path: str, user_id: int) -> List[dict]:
  58. """Given a path to Retroarch lrtl game log file data,
  59. gather
  60. For each found log file, we'll do:
  61. 1. Look up game, create if it doesn't exist
  62. 2. Check for existing scrobbles
  63. 3. Create new scrobble if last_played != last_scrobble.timestamp
  64. 4. Calculate scrobble time from runtime - last_scrobble.long_play_time
  65. """
  66. Scrobble = apps.get_model("scrobbles", "Scrobble")
  67. user = User.objects.filter(pk=user_id).first()
  68. if not user:
  69. logger.warning(f"User ID {user_id} is not valid, cannot scrobble")
  70. raise UserNotFound
  71. game_logs = load_game_data(
  72. playlog_path, pytz.timezone(user.profile.timezone)
  73. )
  74. found_game = None
  75. new_scrobbles = []
  76. for game_name, game_data in game_logs.items():
  77. # Use the retroarch name, because we can't change those but may want to
  78. # tweak the found game
  79. logger.info(f"Received name {game_name}")
  80. try:
  81. mame_name = scrape_game_name_from_adb(game_name)
  82. except GameNotFound as e:
  83. logger.warning(e)
  84. continue
  85. if mame_name:
  86. logger.info(f"Found name {game_name}")
  87. game_name = mame_name
  88. found_game = VideoGame.objects.filter(retroarch_name=game_name).first()
  89. if not found_game:
  90. try:
  91. found_game = get_or_create_videogame(game_name)
  92. except GameNotFound as e:
  93. logger.warning(f"Game not found for: {e}")
  94. continue
  95. if found_game:
  96. found_game.retroarch_name = game_name
  97. found_game.save(update_fields=["retroarch_name"])
  98. end_datetime = game_data.get("last_played")
  99. if found_game:
  100. found_scrobble = found_game.scrobble_set.filter(
  101. stop_timestamp=end_datetime
  102. )
  103. if found_scrobble:
  104. logger.info(f"Skipping scrobble for game {found_game.id}")
  105. continue
  106. last_scrobble = found_game.scrobble_set.last()
  107. # Default to 0 for delta, but if there's an past scrobble, use that
  108. delta_runtime = 0
  109. if last_scrobble:
  110. delta_runtime = last_scrobble.long_play_seconds
  111. playback_position_seconds = game_data["runtime"] - delta_runtime
  112. timestamp = end_datetime - timedelta(
  113. seconds=playback_position_seconds
  114. )
  115. if playback_position_seconds < 30:
  116. logger.info(
  117. f"Video game {found_game.id} played for less than 30 seconds, skipping"
  118. )
  119. new_scrobbles.append(
  120. Scrobble(
  121. video_game_id=found_game.id,
  122. timestamp=timestamp,
  123. stop_timestamp=game_data["last_played"],
  124. playback_position_seconds=playback_position_seconds,
  125. played_to_completion=True,
  126. in_progress=False,
  127. long_play_seconds=game_data["runtime"],
  128. user_id=user_id,
  129. source="Retroarch",
  130. source_id="Imported from Retroarch play log file",
  131. media_type=Scrobble.MediaType.VIDEO_GAME,
  132. )
  133. )
  134. created_scrobbles = Scrobble.objects.bulk_create(new_scrobbles)
  135. logger.info(f"Created {len(created_scrobbles)} scrobbles")
  136. return new_scrobbles