|
@@ -1,7 +1,9 @@
|
|
|
import logging
|
|
|
import os
|
|
|
+from typing import Any
|
|
|
from urllib.parse import unquote
|
|
|
|
|
|
+import feedparser
|
|
|
from dateutil.parser import ParserError, parse
|
|
|
from podcasts.models import PodcastEpisode
|
|
|
|
|
@@ -10,26 +12,80 @@ logger = logging.getLogger(__name__)
|
|
|
# TODO This should be configurable in settings or per deploy
|
|
|
PODCAST_DATE_FORMAT = "YYYY-MM-DD"
|
|
|
|
|
|
+def parse_duration(d):
|
|
|
+ if not d:
|
|
|
+ return None
|
|
|
+ if d.isdigit():
|
|
|
+ return int(d)
|
|
|
+ parts = [int(p) for p in d.split(":")]
|
|
|
+ while len(parts) < 3:
|
|
|
+ parts.insert(0, 0)
|
|
|
+ h, m, s = parts
|
|
|
+ return h * 3600 + m * 60 + s
|
|
|
+
|
|
|
+def fetch_metadata_from_rss(uri: str) -> dict[str, Any]:
|
|
|
+ log_context = {"mopidy_uri": uri, "media_type": "Podcast"}
|
|
|
+ podcast_data: dict[str, Any] = {}
|
|
|
+
|
|
|
+ try:
|
|
|
+ feed = feedparser.parse(uri.split("#")[0])
|
|
|
+ target_guid = uri.split("#")[1]
|
|
|
+ except IndexError:
|
|
|
+ logger.warning("Tried to parse uri as RSS feed, but no target found", extra=log_context)
|
|
|
+ return podcast_data
|
|
|
+
|
|
|
+ podcast_data = {
|
|
|
+ "podcast_name": feed.feed.get("title", "Unknown Podcast"),
|
|
|
+ "podcast_description": feed.feed.get("description", ""),
|
|
|
+ "podcast_link": feed.feed.get("link", ""),
|
|
|
+ }
|
|
|
+
|
|
|
+ for entry in feed.entries:
|
|
|
+ if target_guid in target_guid:
|
|
|
+ logger.info("🎧 Episode found in RSS feed", extra=log_context)
|
|
|
+ podcast_data["episode_name"] = entry.title
|
|
|
+ podcast_data["episode_num"] = entry.guid
|
|
|
+ podcast_data["episode_pub_date"] = entry.get("published", None)
|
|
|
+ podcast_data["episode_description"] = entry.get("description", None)
|
|
|
+ podcast_data["episode_url"] = entry.enclosures[0].href if entry.get("enclosures") else None
|
|
|
+ podcast_data["episode_runtime_seconds"] = parse_duration(entry.get("itunes_duration", None))
|
|
|
+ return podcast_data
|
|
|
+ else:
|
|
|
+ logger.info("Episode not found in RSS feed.")
|
|
|
+
|
|
|
+
|
|
|
+def parse_mopidy_uri(uri: str) -> dict[str, Any]:
|
|
|
+ podcast_data: dict[str, Any] = {}
|
|
|
|
|
|
-def parse_mopidy_uri(uri: str) -> dict:
|
|
|
logger.debug(f"Parsing URI: {uri}")
|
|
|
+ if "https://" in uri:
|
|
|
+ return fetch_metadata_from_rss(uri)
|
|
|
+
|
|
|
+
|
|
|
parsed_uri = os.path.splitext(unquote(uri))[0].split("/")
|
|
|
|
|
|
+ podcast_data = {
|
|
|
+ "episode_filename": parsed_uri[-1],
|
|
|
+ "episode_num": None,
|
|
|
+ "podcast_name": parsed_uri[-2].strip(),
|
|
|
+ "pub_date": None,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
episode_str = parsed_uri[-1]
|
|
|
- podcast_name = parsed_uri[-2].strip()
|
|
|
episode_num = None
|
|
|
episode_num_pad = 0
|
|
|
|
|
|
try:
|
|
|
# Without episode numbers the date will lead
|
|
|
- pub_date = parse(episode_str[0:10])
|
|
|
+ podcast_data["pub_date"] = parse(episode_str[0:10])
|
|
|
except ParserError:
|
|
|
- episode_num = int(episode_str.split("-")[0])
|
|
|
- episode_num_pad = len(str(episode_num)) + 1
|
|
|
+ podcast_data["episode_num"] = int(episode_str.split("-")[0])
|
|
|
+ episode_num_pad = len(str(podcast_data["episode_num"])) + 1
|
|
|
|
|
|
try:
|
|
|
# Beacuse we have epsiode numbers on
|
|
|
- pub_date = parse(
|
|
|
+ podcast_data["pub_date"] = parse(
|
|
|
episode_str[
|
|
|
episode_num_pad : len(PODCAST_DATE_FORMAT)
|
|
|
+ episode_num_pad
|
|
@@ -39,22 +95,19 @@ def parse_mopidy_uri(uri: str) -> dict:
|
|
|
pub_date = ""
|
|
|
|
|
|
gap_to_strip = 0
|
|
|
- if pub_date:
|
|
|
+ if podcast_data["pub_date"]:
|
|
|
gap_to_strip += len(PODCAST_DATE_FORMAT)
|
|
|
- if episode_num:
|
|
|
+ if podcast_data["episode_num"]:
|
|
|
gap_to_strip += episode_num_pad
|
|
|
|
|
|
- episode_name = episode_str[gap_to_strip:].replace("-", " ").strip()
|
|
|
+ podcast_data["episode_name"] = episode_str[gap_to_strip:].replace("-", " ").strip()
|
|
|
|
|
|
- return {
|
|
|
- "episode_filename": episode_name,
|
|
|
- "episode_num": episode_num,
|
|
|
- "podcast_name": podcast_name,
|
|
|
- "pub_date": pub_date,
|
|
|
- }
|
|
|
+ return podcast_data
|
|
|
|
|
|
|
|
|
def get_or_create_podcast(post_data: dict) -> PodcastEpisode:
|
|
|
+ logger.info("Looking up podcast", extra={"post_data": post_data, "media_type": "Podcast"})
|
|
|
+
|
|
|
mopidy_uri = post_data.get("mopidy_uri", "")
|
|
|
parsed_data = parse_mopidy_uri(mopidy_uri)
|
|
|
|
|
@@ -66,9 +119,10 @@ def get_or_create_podcast(post_data: dict) -> PodcastEpisode:
|
|
|
podcast_dict = {"name": podcast_name}
|
|
|
|
|
|
episode_name = parsed_data.get("episode_filename")
|
|
|
+ run_time_seconds = parsed_data.get("episode_runtime_seconds", post_data.get("run_time", 2700))
|
|
|
episode_dict = {
|
|
|
"title": episode_name,
|
|
|
- "run_time_seconds": post_data.get("run_time"),
|
|
|
+ "run_time_seconds": run_time_seconds,
|
|
|
"number": parsed_data.get("episode_num"),
|
|
|
"pub_date": parsed_data.get("pub_date"),
|
|
|
"mopidy_uri": mopidy_uri,
|