123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- import logging
- import os
- from typing import Any
- from urllib.parse import unquote
- import feedparser
- from dateutil.parser import ParserError, parse
- from podcasts.models import PodcastEpisode
- logger = logging.getLogger(__name__)
- # TODO This should be configurable in settings or per deploy
- PODCAST_DATE_FORMAT = "YYYY-MM-DD"
- def parse_duration(d):
- if not d:
- return None
- if d.isdigit():
- return int(d)
- parts = [int(p) for p in d.split(":")]
- while len(parts) < 3:
- parts.insert(0, 0)
- h, m, s = parts
- return h * 3600 + m * 60 + s
- def fetch_metadata_from_rss(uri: str) -> dict[str, Any]:
- log_context = {"mopidy_uri": uri, "media_type": "Podcast"}
- podcast_data: dict[str, Any] = {}
- try:
- feed = feedparser.parse(uri.split("#")[0])
- target_guid = uri.split("#")[1]
- except IndexError:
- logger.warning("Tried to parse uri as RSS feed, but no target found", extra=log_context)
- return podcast_data
- podcast_publisher = feed.feed.get("itunes_publisher")
- podcast_owner = feed.feed.itunes_owner.get("name") if isinstance(feed.feed.itunes_owner, dict) else feed.feed.itunes_owner
- podcast_other = feed.feed.get("managingeditor") or feed.feed.get("copyright")
- podcast_data = {
- "podcast_name": feed.feed.get("title", "Unknown Podcast"),
- # "podcast_description": feed.feed.get("description", ""),
- # "podcast_link": feed.feed.get("link", ""),
- "podcast_producer": podcast_publisher or podcast_owner or podcast_other
- }
- for entry in feed.entries:
- if target_guid in target_guid:
- logger.info("🎧 Episode found in RSS feed", extra=log_context)
- podcast_data["title"] = entry.title
- podcast_data["episode_num"] = entry.guid
- podcast_data["pub_date"] = entry.get("published", None)
- podcast_data["run_time_seconds"] = parse_duration(entry.get("itunes_duration", None))
- # podcast_data["description"] = entry.get("description", None)
- # podcast_data["episode_url"] = entry.enclosures[0].href if entry.get("enclosures") else None
- return podcast_data
- else:
- logger.info("Episode not found in RSS feed.")
- def parse_mopidy_uri(uri: str) -> dict[str, Any]:
- podcast_data: dict[str, Any] = {}
- logger.debug(f"Parsing URI: {uri}")
- if "https://" in uri:
- return fetch_metadata_from_rss(uri)
- parsed_uri = os.path.splitext(unquote(uri))[0].split("/")
- podcast_data = {
- "episode_filename": parsed_uri[-1],
- "episode_num": None,
- "podcast_name": parsed_uri[-2].strip(),
- "pub_date": None,
- }
- episode_str = parsed_uri[-1]
- episode_num = None
- episode_num_pad = 0
- try:
- # Without episode numbers the date will lead
- podcast_data["pub_date"] = parse(episode_str[0:10])
- except ParserError:
- podcast_data["episode_num"] = int(episode_str.split("-")[0])
- episode_num_pad = len(str(podcast_data["episode_num"])) + 1
- try:
- # Beacuse we have epsiode numbers on
- podcast_data["pub_date"] = parse(
- episode_str[
- episode_num_pad : len(PODCAST_DATE_FORMAT)
- + episode_num_pad
- ]
- )
- except ParserError:
- pub_date = ""
- gap_to_strip = 0
- if podcast_data["pub_date"]:
- gap_to_strip += len(PODCAST_DATE_FORMAT)
- if podcast_data["episode_num"]:
- gap_to_strip += episode_num_pad
- podcast_data["episode_name"] = episode_str[gap_to_strip:].replace("-", " ").strip()
- return podcast_data
- def get_or_create_podcast(post_data: dict) -> PodcastEpisode:
- logger.info("Looking up podcast", extra={"post_data": post_data, "media_type": "Podcast"})
- mopidy_uri = post_data.get("mopidy_uri", "")
- parsed_data = parse_mopidy_uri(mopidy_uri)
- return PodcastEpisode.find_or_create(**parsed_data)
|