utils.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import logging
  2. import os
  3. from typing import Any
  4. from urllib.parse import unquote
  5. import feedparser
  6. from dateutil.parser import ParserError, parse
  7. from podcasts.models import PodcastEpisode
  8. logger = logging.getLogger(__name__)
  9. # TODO This should be configurable in settings or per deploy
  10. PODCAST_DATE_FORMAT = "YYYY-MM-DD"
  11. def parse_duration(d):
  12. if not d:
  13. return None
  14. if d.isdigit():
  15. return int(d)
  16. parts = [int(p) for p in d.split(":")]
  17. while len(parts) < 3:
  18. parts.insert(0, 0)
  19. h, m, s = parts
  20. return h * 3600 + m * 60 + s
  21. def fetch_metadata_from_rss(uri: str) -> dict[str, Any]:
  22. log_context = {"mopidy_uri": uri, "media_type": "Podcast"}
  23. podcast_data: dict[str, Any] = {}
  24. try:
  25. feed = feedparser.parse(uri.split("#")[0])
  26. target_guid = uri.split("#")[1]
  27. except IndexError:
  28. logger.warning("Tried to parse uri as RSS feed, but no target found", extra=log_context)
  29. return podcast_data
  30. podcast_publisher = feed.feed.get("itunes_publisher")
  31. podcast_owner = feed.feed.itunes_owner.get("name") if isinstance(feed.feed.itunes_owner, dict) else feed.feed.itunes_owner
  32. podcast_other = feed.feed.get("managingeditor") or feed.feed.get("copyright")
  33. podcast_data = {
  34. "podcast_name": feed.feed.get("title", "Unknown Podcast"),
  35. # "podcast_description": feed.feed.get("description", ""),
  36. # "podcast_link": feed.feed.get("link", ""),
  37. "podcast_producer": podcast_publisher or podcast_owner or podcast_other
  38. }
  39. for entry in feed.entries:
  40. if target_guid in target_guid:
  41. logger.info("🎧 Episode found in RSS feed", extra=log_context)
  42. podcast_data["title"] = entry.title
  43. podcast_data["episode_num"] = entry.guid
  44. podcast_data["pub_date"] = entry.get("published", None)
  45. podcast_data["run_time_seconds"] = parse_duration(entry.get("itunes_duration", None))
  46. # podcast_data["description"] = entry.get("description", None)
  47. # podcast_data["episode_url"] = entry.enclosures[0].href if entry.get("enclosures") else None
  48. return podcast_data
  49. else:
  50. logger.info("Episode not found in RSS feed.")
  51. def parse_mopidy_uri(uri: str) -> dict[str, Any]:
  52. podcast_data: dict[str, Any] = {}
  53. logger.debug(f"Parsing URI: {uri}")
  54. if "https://" in uri:
  55. return fetch_metadata_from_rss(uri)
  56. parsed_uri = os.path.splitext(unquote(uri))[0].split("/")
  57. podcast_data = {
  58. "title": parsed_uri[-1],
  59. "episode_num": None,
  60. "podcast_name": parsed_uri[-2].strip(),
  61. "pub_date": None,
  62. }
  63. episode_str = parsed_uri[-1]
  64. episode_num = None
  65. episode_num_pad = 0
  66. try:
  67. # Without episode numbers the date will lead
  68. podcast_data["pub_date"] = parse(episode_str[0:10])
  69. except ParserError:
  70. podcast_data["episode_num"] = int(episode_str.split("-")[0])
  71. episode_num_pad = len(str(podcast_data["episode_num"])) + 1
  72. try:
  73. # Beacuse we have epsiode numbers on
  74. podcast_data["pub_date"] = parse(
  75. episode_str[
  76. episode_num_pad : len(PODCAST_DATE_FORMAT)
  77. + episode_num_pad
  78. ]
  79. )
  80. except ParserError:
  81. pub_date = ""
  82. gap_to_strip = 0
  83. if podcast_data["pub_date"]:
  84. gap_to_strip += len(PODCAST_DATE_FORMAT)
  85. if podcast_data["episode_num"]:
  86. gap_to_strip += episode_num_pad
  87. podcast_data["title"] = episode_str[gap_to_strip:].replace("-", " ").strip()
  88. return podcast_data
  89. def get_or_create_podcast(post_data: dict) -> PodcastEpisode:
  90. logger.info("Looking up podcast", extra={"post_data": post_data, "media_type": "Podcast"})
  91. mopidy_uri = post_data.get("mopidy_uri", "")
  92. parsed_data = parse_mopidy_uri(mopidy_uri)
  93. return PodcastEpisode.find_or_create(**parsed_data)