Quellcode durchsuchen

[video] Update how we get video metadata for YT add

Colin Powell vor 7 Monaten
Ursprung
Commit
c109ed79eb

+ 4 - 9
tests/videos_tests/test_imdb.py

@@ -1,11 +1,6 @@
-import pytest
+from videos.sources.imdb import lookup_video_from_imdb
 
-from videos.imdb import lookup_video_from_imdb
 
-
-@pytest.mark.skip(reason="Need to sort out third party API testing")
-def test_lookup_imdb_bad_id(caplog):
-    data = lookup_video_from_imdb("3409324")
-    assert data is None
-    assert caplog.records[0].levelname == "WARNING"
-    assert caplog.records[0].msg == "IMDB ID should begin with 'tt' 3409324"
+def test_lookup_imdb():
+    metadata = lookup_video_from_imdb("8946378")
+    assert metadata.title == "Knives Out"

+ 8 - 0
tests/videos_tests/test_youtube.py

@@ -0,0 +1,8 @@
+import pytest
+from videos.sources.youtube import lookup_video_from_youtube
+
+
+@pytest.mark.django_db
+def test_lookup_youtube_id():
+    metadata = lookup_video_from_youtube("RZxs9pAv99Y")
+    assert metadata.title == "No Pun Included's Board Game of the Year 2024"

+ 1 - 0
vrobbler/apps/scrobbles/constants.py

@@ -33,6 +33,7 @@ SCROBBLE_CONTENT_URLS = {
     "-u": "https://untappd.com/",
     "-b": "https://www.amazon.com/",
     "-t": "https://app.todoist.com/app/task/{id}",
+    "-i": "https://www.youtube.com/watch?v=",
 }
 
 EXCLUDE_FROM_NOW_PLAYING = ("GeoLocation",)

+ 17 - 10
vrobbler/apps/scrobbles/scrobblers.py

@@ -143,11 +143,11 @@ def web_scrobbler_scrobble_media(
     parsed = post_data.get("data").get("song").get("parsed")
     processed = post_data.get("data").get("song").get("processed")
     video, created = Video.objects.get_or_create(
-        video_type=Video.VideoType.YOUTUBE, 
+        video_type=Video.VideoType.YOUTUBE,
         youtube_url=parsed.get("originUrl"),
     )
     timestamp = datetime.utcfromtimestamp(
-        post_data.get("time", 0)/1000
+        post_data.get("time", 0) / 1000
     ).replace(tzinfo=pytz.utc)
 
     if created or event_name == "nowplaying":
@@ -156,7 +156,9 @@ def web_scrobbler_scrobble_media(
         if not processed.get("duration"):
             video.run_time_seconds = 1500
         # TODO maybe artist could be the series?
-        video.title = " - ".join([processed.get("artist"), processed.get("track")])
+        video.title = " - ".join(
+            [processed.get("artist"), processed.get("track")]
+        )
         video.save()
         return video.scrobble_for_user(
             user_id,
@@ -165,7 +167,9 @@ def web_scrobbler_scrobble_media(
             status=event_name,
         )
 
-    scrobble = Scrobble.objects.filter(user_id=user_id, video=video, in_progress=True).first()
+    scrobble = Scrobble.objects.filter(
+        user_id=user_id, video=video, in_progress=True
+    ).first()
     if not scrobble:
         return video.scrobble_for_user(
             user_id,
@@ -179,11 +183,13 @@ def web_scrobbler_scrobble_media(
         scrobble.resume()
     return scrobble
 
-def manual_scrobble_video(imdb_id: str, user_id: int):
-    if "tt" not in imdb_id:
-        imdb_id = "tt" + imdb_id
 
-    video = Video.find_or_create({JELLYFIN_POST_KEYS.get("IMDB_ID"): imdb_id})
+def manual_scrobble_video(video_id: str, user_id: int):
+    if "tt" in video_id:
+        video = Video.get_from_imdb_id(video_id)
+
+    else:
+        video = Video.get_from_youtube_id(video_id)
 
     # When manually scrobbling, try finding a source from the series
     source = "Vrobbler"
@@ -330,14 +336,15 @@ def manual_scrobble_from_url(url: str, user_id: int) -> Scrobble:
         content_key = "-w"
         item_id = url
 
+    # Try generic search for any URL with digit-based IDs
     if not item_id:
         try:
             item_id = re.findall("\d+", url)[0]
         except IndexError:
             pass
 
-    if content_key == "-t":
-        item_id = url
+    if content_key == "-i" and not item_id:
+        item_id = url.split("v=")[1].split("&")[0]
 
     scrobble_fn = MANUAL_SCROBBLE_FNS[content_key]
     return eval(scrobble_fn)(item_id, user_id)

+ 1 - 88
vrobbler/apps/videos/imdb.py

@@ -1,88 +1 @@
-import logging
-from typing import Optional
-
-from imdb import Cinemagoer, helpers
-
-imdb_client = Cinemagoer()
-
-logger = logging.getLogger(__name__)
-
-
-def lookup_video_from_imdb(
-    name_or_id: str, kind: str = "movie"
-) -> Optional[dict]:
-
-    # Very few video titles start with tt, but IMDB IDs often come in with it
-    if name_or_id.startswith("tt"):
-        name_or_id = name_or_id[2:]
-
-    imdb_id = None
-
-    try:
-        imdb_id = int(name_or_id)
-    except ValueError:
-        pass
-
-    video_metadata = None
-    if imdb_id:
-        imdb_result = imdb_client.get_movie(name_or_id)
-        imdb_client.update(imdb_result, info=["plot", "synopsis", "taglines"])
-        video_metadata = imdb_result
-
-    if not video_metadata:
-        imdb_results = imdb_client.search_movie(name_or_id)
-        if len(imdb_results) > 1:
-            for result in imdb_results:
-                if result["kind"] == kind:
-                    video_metadata = result
-                    break
-
-        if len(imdb_results) == 1:
-            video_metadata = imdb_results[0]
-        imdb_client.update(
-            video_metadata,
-            info=["plot", "synopsis", "taglines", "next_episode", "genres"],
-        )
-
-    if not video_metadata:
-        logger.info(
-            f"[lookup_video_from_imdb] no video found on imdb",
-            extra={"name_or_id": name_or_id},
-        )
-        return None
-
-    imdb_client.update(video_metadata)
-
-    cover_url = video_metadata.get("cover url")
-    if cover_url:
-        cover_url = helpers.resizeImage(cover_url, width=800)
-
-    from videos.models import Video
-
-    video_type = Video.VideoType.MOVIE
-    series_name = None
-    if video_metadata.get("kind") == "episode":
-        series_name = video_metadata.get("episode of", None).data.get(
-            "title", None
-        )
-        video_type = Video.VideoType.TV_EPISODE
-
-    run_time_seconds = 0
-    if video_metadata.get("runtimes"):
-        run_time_seconds = int(video_metadata.get("runtimes")[0]) * 60
-
-    return {
-        "title": video_metadata.get("title"),
-        "imdb_id": video_metadata.get("imdbID"),
-        "video_type": video_type,
-        "run_time_seconds": run_time_seconds,
-        "episode_number": video_metadata.get("episode", None),
-        "season_number": video_metadata.get("season", None),
-        "next_imdb_id": video_metadata.get("next episode", None),
-        "year": video_metadata.get("year", None),
-        "series_name": series_name,
-        "plot": video_metadata.get("plot outline"),
-        "imdb_rating": video_metadata.get("rating"),
-        "cover_url": cover_url,
-        "genres": video_metadata.get("genres"),
-    }
+#!/usr/bin/env python3

+ 18 - 0
vrobbler/apps/videos/migrations/0021_video_upload_date.py

@@ -0,0 +1,18 @@
+# Generated by Django 4.2.16 on 2025-01-20 04:16
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+    dependencies = [
+        ("videos", "0020_channel_youtube_id"),
+    ]
+
+    operations = [
+        migrations.AddField(
+            model_name="video",
+            name="upload_date",
+            field=models.DateField(blank=True, null=True),
+        ),
+    ]

+ 70 - 51
vrobbler/apps/videos/models.py

@@ -1,7 +1,8 @@
 import logging
-from typing import Dict, Optional
+from typing import Optional
 from uuid import uuid4
 
+import pendulum
 import requests
 from django.conf import settings
 from django.core.files.base import ContentFile
@@ -18,7 +19,13 @@ from scrobbles.mixins import (
     ScrobblableMixin,
 )
 from taggit.managers import TaggableManager
-from videos.imdb import lookup_video_from_imdb
+from videos.services.metadata import VideoMetadata
+from videos.sources.imdb import lookup_video_from_imdb
+from vrobbler.apps.videos.sources.youtube import lookup_video_from_youtube
+
+YOUTUBE_VIDEO_URL = "https://www.youtube.com/watch?v="
+YOUTUBE_CHANNEL_URL = "https://www.youtube.com/channel/"
+IMDB_VIDEO_URL = "https://www.imdb.com/title/tt"
 
 logger = logging.getLogger(__name__)
 BNULL = {"blank": True, "null": True}
@@ -43,14 +50,15 @@ class Channel(TimeStampedModel):
     youtube_id = models.CharField(max_length=255, **BNULL)
     genre = TaggableManager(through=ObjectWithGenres)
 
-    def __str__(self):
+    def __str__(self) -> str:
         return self.name
 
     def get_absolute_url(self):
         return reverse("videos:channel_detail", kwargs={"slug": self.uuid})
 
-    def youtube_link(self):
-        return f"https://www.youtube.com/user/t{self.yt_username}"
+    @property
+    def youtube_url(self):
+        return YOUTUBE_CHANNEL_URL + self.youtube_id
 
     @property
     def primary_image_url(self) -> str:
@@ -109,8 +117,10 @@ class Series(TimeStampedModel):
     def get_absolute_url(self):
         return reverse("videos:series_detail", kwargs={"slug": self.uuid})
 
-    def imdb_link(self):
-        return f"https://www.imdb.com/title/tt{self.imdb_id}"
+    def imdb_link(self) -> str:
+        if self.imdb_id:
+            return IMDB_VIDEO_URL + self.imdb_id
+        return ""
 
     @property
     def primary_image_url(self) -> str:
@@ -150,16 +160,12 @@ class Series(TimeStampedModel):
         name_or_id = self.name
         if self.imdb_id:
             name_or_id = self.imdb_id
-        imdb_dict = lookup_video_from_imdb(name_or_id)
-        if not imdb_dict:
+        video_metadata: VideoMetadata = lookup_video_from_imdb(name_or_id)
+
+        if not video_metadata.title:
             logger.warning(f"No imdb data for {self}")
             return
 
-        self.imdb_id = imdb_dict.get("imdb_id")
-        self.imdb_rating = imdb_dict.get("imdb_rating")
-        self.plot = imdb_dict.get("plot")
-        self.save(update_fields=["imdb_id", "imdb_rating", "plot"])
-
         cover_url = imdb_dict.get("cover_url")
 
         if (not self.cover_image or force_update) and cover_url:
@@ -175,6 +181,7 @@ class Series(TimeStampedModel):
 class Video(ScrobblableMixin):
     COMPLETION_PERCENT = getattr(settings, "VIDEO_COMPLETION_PERCENT", 90)
     SECONDS_TO_STALE = getattr(settings, "VIDEO_SECONDS_TO_STALE", 14400)
+    METADATA_CLASS = VideoMetadata
 
     class VideoType(models.TextChoices):
         UNKNOWN = "U", _("Unknown")
@@ -218,12 +225,14 @@ class Video(ScrobblableMixin):
     tmdb_id = models.CharField(max_length=20, **BNULL)
     youtube_id = models.CharField(max_length=255, **BNULL)
     plot = models.TextField(**BNULL)
-    year = models.IntegerField(**BNULL)
+    upload_date = models.DateField(**BNULL)
 
     class Meta:
         unique_together = [["title", "imdb_id"]]
 
     def __str__(self):
+        if not self.title:
+            return self.youtube_id or self.imdb_id
         if self.video_type == self.VideoType.TV_EPISODE:
             return f"{self.title} / [S{self.season_number}E{self.episode_number}] {self.tv_series}"
         if self.video_type == self.VideoType.YOUTUBE:
@@ -248,12 +257,14 @@ class Video(ScrobblableMixin):
         return self.imdb_link
 
     @property
-    def link(self):
+    def link(self) -> str:
         return self.imdb_link
 
     @property
-    def youtube_link(self):
-        return f"https://www.youtube.com/watch?v={self.youtube_id}"
+    def youtube_link(self) -> str:
+        if self.youtube_id:
+            return YOUTUBE_BASE_URL + self.youtube_id
+        return ""
 
     @property
     def primary_image_url(self) -> str:
@@ -266,44 +277,52 @@ class Video(ScrobblableMixin):
     def strings(self) -> ScrobblableConstants:
         return ScrobblableConstants(verb="Watching", tags="movie_camera")
 
-    def fix_metadata(self, force_update=False):
-        imdb_dict = lookup_video_from_imdb(self.imdb_id)
-        if not imdb_dict:
-            logger.warn(f"No imdb data for {self}")
-            return
-        if imdb_dict.get("runtimes") and len(imdb_dict.get("runtimes")) > 0:
-            self.run_time_seconds = int(imdb_dict.get("runtimes")[0]) * 60
-        if (
-            imdb_dict.get("run_time_seconds")
-            and imdb_dict.get("run_time_seconds") > 0
-        ):
-            self.run_time_seconds = int(imdb_dict.get("run_time_seconds"))
-        self.imdb_rating = imdb_dict.get("imdb_rating")
-        self.plot = imdb_dict.get("plot")
-        self.year = imdb_dict.get("year")
-        self.save(
-            update_fields=["imdb_rating", "plot", "year", "run_time_seconds"]
-        )
-
-        cover_url = imdb_dict.get("cover_url")
-
-        if (not self.cover_image or force_update) and cover_url:
-            r = requests.get(cover_url)
+    def save_image_from_url(self, url: str, force_update: bool = False):
+        if not self.cover_image or (force_update and url):
+            r = requests.get(url)
             if r.status_code == 200:
                 fname = f"{self.title}_{self.uuid}.jpg"
                 self.cover_image.save(fname, ContentFile(r.content), save=True)
 
-        if genres := imdb_dict.get("genres"):
-            self.genre.add(*genres)
+    @classmethod
+    def get_from_youtube_id(
+        cls, youtube_id: str, overwrite: bool = False
+    ) -> "Video":
+        video, created = cls.objects.get_or_create(youtube_id=youtube_id)
+        if not created and not overwrite:
+            return video
+
+        vdict, cover, genres = lookup_video_from_youtube(
+            youtube_id
+        ).as_dict_with_cover_and_genres()
+        if created or overwrite:
+            for k, v in vdict.items():
+                setattr(video, k, v)
+            video.save()
+
+            video.save_image_from_url(cover)
+            video.genre.add(*genres)
+        return video
 
-    def scrape_cover_from_url(
-        self, cover_url: str, force_update: bool = False
-    ):
-        if not self.cover_image or force_update:
-            r = requests.get(cover_url)
-            if r.status_code == 200:
-                fname = f"{self.title}_{self.uuid}.jpg"
-                self.cover_image.save(fname, ContentFile(r.content), save=True)
+    @classmethod
+    def get_from_imdb_id(cls, imdb_id: str, overwrite: bool = False):
+        video, created = cls.objects.get_or_create(imdb_id=imdb_id)
+        if not created and not overwrite:
+            return video
+
+        vdict, cover, genres = lookup_video_from_imdb(
+            imdb_id
+        ).as_dict_with_cover_and_genres()
+        if created or overwrite:
+            for k, v in vdict.items():
+                if k == "imdb_id":
+                    v = "tt" + v
+                setattr(video, k, v)
+            video.save()
+
+            video.save_image_from_url(cover)
+            video.genre.add(*genres)
+        return video
 
     @classmethod
     def find_or_create(

+ 65 - 0
vrobbler/apps/videos/services/metadata.py

@@ -0,0 +1,65 @@
+from enum import Enum
+from typing import Optional
+
+import pendulum
+from meta_yt import YouTube
+
+
+YOUTUBE_VIDEO_URL = "https://www.youtube.com/watch?v="
+IMDB_VIDEO_URL = "https://www.imdb.com/title/tt"
+
+
+class VideoType(Enum):
+    UNKNOWN = "U"
+    TV_EPISODE = "E"
+    MOVIE = "M"
+    SKATE_VIDEO = "S"
+    YOUTUBE = "Y"
+
+    @classmethod
+    def as_choices(cls) -> tuple:
+        return tuple((i.name, i.value) for i in cls)
+
+
+class VideoMetadata:
+    title: str
+    video_type: VideoType = VideoType.UNKNOWN
+    run_time_seconds: int = (
+        60  # Silly default, but things break if this is 0 or null
+    )
+    imdb_id: Optional[str]
+    youtube_id: Optional[str]
+
+    # IMDB specific
+    episode_number: Optional[str]
+    season_number: Optional[str]
+    next_imdb_id: Optional[str]
+    year: Optional[int]
+    series_id: Optional[int]
+    plot: Optional[str]
+    imdb_rating: Optional[str]
+    cover_url: Optional[str]
+    overview: Optional[str]
+
+    # YouTube specific
+    channel_id: Optional[int]
+
+    # General
+    cover_url: Optional[str]
+    genres: list[str]
+
+    def __init__(
+        self,
+        imdb_id: Optional[str] = "",
+        youtube_id: Optional[str] = "",
+        run_time_seconds: int = 900,
+    ):
+        self.imdb_id = imdb_id
+        self.youtube_id = youtube_id
+        self.run_time_seconds = run_time_seconds
+
+    def as_dict_with_cover_and_genres(self) -> tuple:
+        video_dict = vars(self)
+        cover = video_dict.pop("cover_url")
+        genres = video_dict.pop("genres")
+        return video_dict, cover, genres

+ 87 - 0
vrobbler/apps/videos/sources/imdb.py

@@ -0,0 +1,87 @@
+import logging
+
+from imdb import Cinemagoer, helpers
+from videos.services import metadata
+
+imdb_client = Cinemagoer()
+
+logger = logging.getLogger(__name__)
+
+
+def lookup_video_from_imdb(
+    name_or_id: str, kind: str = "movie"
+) -> metadata.VideoMetadata:
+    from videos.models import Series
+
+    # Very few video titles start with tt, but IMDB IDs often come in with it
+    if name_or_id.startswith("tt"):
+        name_or_id = name_or_id[2:]
+
+    imdb_id = None
+
+    try:
+        imdb_id = int(name_or_id)
+    except ValueError:
+        pass
+
+    video_metadata = metadata.VideoMetadata(imdb_id=imdb_id)
+    imdb_data: dict = {}
+
+    imdb_result = imdb_client.get_movie(name_or_id)
+    imdb_client.update(imdb_result, info=["plot", "synopsis", "taglines"])
+    imdb_data = imdb_result
+
+    if not imdb_data:
+        imdb_results = imdb_client.search_movie(name_or_id)
+        if len(imdb_results) > 1:
+            for result in imdb_results:
+                if result["kind"] == kind:
+                    imdb_data = result
+                    break
+
+        if len(imdb_results) == 1:
+            imdb_data = imdb_results[0]
+        imdb_client.update(
+            imdb_data,
+            info=["plot", "synopsis", "taglines", "next_episode", "genres"],
+        )
+
+    if not imdb_data:
+        logger.info(
+            f"[lookup_video_from_imdb] no video found on imdb",
+            extra={"name_or_id": name_or_id},
+        )
+        return None
+
+    imdb_client.update(imdb_data)
+
+    video_metadata.cover_url = imdb_data.get("cover url")
+    if video_metadata.cover_url:
+        video_metadata.cover_url = helpers.resizeImage(
+            video_metadata.cover_url, width=800
+        )
+
+    video_metadata.video_type = metadata.VideoType.MOVIE
+    series_name = None
+    if imdb_data.get("kind") == "episode":
+        series_name = imdb_data.get("episode of", None).data.get("title", None)
+        series, series_created = Series.objects.get_or_create(name=series_name)
+        video_metadata.video_type = metadata.VideoType.TV_EPISODE
+        video_metadata.series_id = series.id
+
+    if imdb_data.get("runtimes"):
+        video_metadata.run_time_seconds = (
+            int(imdb_data.get("runtimes")[0]) * 60
+        )
+
+    video_metadata.title = imdb_data.get("title", "")
+    video_metadata.imdb_id = imdb_data.get("imdbID")
+    video_metadata.episode_number = imdb_data.get("episode", None)
+    video_metadata.season_number = imdb_data.get("season", None)
+    video_metadata.next_imdb_id = imdb_data.get("next episode", None)
+    video_metadata.year = imdb_data.get("year", None)
+    video_metadata.plot = imdb_data.get("plot outline")
+    video_metadata.imdb_rating = imdb_data.get("rating")
+    video_metadata.genres = imdb_data.get("genres")
+
+    return video_metadata

+ 0 - 0
vrobbler/apps/videos/skatevideosite.py → vrobbler/apps/videos/sources/skatevideosite.py


+ 44 - 0
vrobbler/apps/videos/sources/youtube.py

@@ -0,0 +1,44 @@
+import pendulum
+from meta_yt import Video, YouTube
+from videos.services import metadata
+
+YOUTUBE_VIDEO_URL = "https://www.youtube.com/watch?v="
+YOUTUBE_CHANNEL_URL = "https://www.youtube.com/channel/"
+
+
+def lookup_video_from_youtube(youtube_id: str) -> metadata.VideoMetadata:
+    from videos.models import Channel
+
+    yt_metadata: Optional[Video] = YouTube(
+        YOUTUBE_VIDEO_URL + youtube_id
+    ).video
+
+    if yt_metadata:
+        video_metadata = metadata.VideoMetadata(youtube_id=youtube_id)
+
+        if yt_metadata.channel:
+            channel, _created = Channel.objects.get_or_create(
+                youtube_id=yt_metadata.channel_id,
+                name=yt_metadata.channel,
+            )
+            video_metadata.channel_id = channel.id
+
+        video_metadata.title = yt_metadata.title
+        video_metadata.run_time_seconds = yt_metadata.duration
+        video_metadata.video_type = metadata.VideoType.YOUTUBE
+        video_metadata.youtube_id = yt_metadata.video_id
+        video_metadata.cover_url = yt_metadata.thumbnail
+        video_metadata.genres = yt_metadata.keywords
+        video_metadata.overview = yt_metadata.metadata.get("videoDetails").get(
+            "shortDescription"
+        )
+
+        date_str = (
+            yt_metadata.metadata.get("microformat")
+            .get("playerMicroformatRenderer")
+            .get("uploadDate")
+        )
+        if date_str:
+            video_metadata.upload_date = pendulum.parse(date_str).date()
+            video_metadata.year = video_metadata.upload_date.year
+        return video_metadata

+ 1 - 1
vrobbler/apps/videos/utils.py

@@ -60,7 +60,7 @@ def get_or_create_video(data_dict: dict, post_keys: dict, force_update=False):
             video.genre.add(*genres)
 
         if cover_url := video_dict.pop("cover_url", None):
-            video.scrape_cover_from_url(cover_url)
+            video.save_image_from_url(cover_url)
 
         Video.objects.filter(pk=video.id).update(**video_dict)
         video.refresh_from_db()

+ 1 - 0
vrobbler/settings-testing.py

@@ -114,6 +114,7 @@ INSTALLED_APPS = [
     "tasks",
     "trails",
     "beers",
+    "foods",
     "lifeevents",
     "moods",
     "mathfilters",