소스 검색

Start refactoring the koreader importer

Colin Powell 1 년 전
부모
커밋
3de2be50cf
5개의 변경된 파일485개의 추가작업 그리고 197개의 파일을 삭제
  1. 289 194
      vrobbler/apps/books/koreader.py
  2. 9 1
      vrobbler/apps/books/models.py
  3. 5 2
      vrobbler/apps/books/openlibrary.py
  4. 133 0
      vrobbler/apps/books/tests/conftest.py
  5. 49 0
      vrobbler/apps/books/tests/test_koreader.py

+ 289 - 194
vrobbler/apps/books/koreader.py

@@ -3,7 +3,7 @@ import logging
 import os
 import re
 import sqlite3
-from datetime import datetime
+from datetime import datetime, timedelta
 from enum import Enum
 from typing import Iterable, List
 
@@ -11,12 +11,15 @@ import pytz
 import requests
 from books.models import Author, Book, Page
 from books.openlibrary import get_author_openlibrary_id
+from django.contrib.auth import get_user_model
 from django.db.models import Sum
 from pylast import httpx, tempfile
 from scrobbles.models import Scrobble
+from scrobbles.utils import timestamp_user_tz_to_utc
 from stream_sqlite import stream_sqlite
 
 logger = logging.getLogger(__name__)
+User = get_user_model()
 
 
 class KoReaderBookColumn(Enum):
@@ -47,216 +50,308 @@ def _sqlite_bytes(sqlite_url):
         yield from r.iter_bytes(chunk_size=65_536)
 
 
-def get_book_map_from_sqlite(rows: Iterable) -> dict:
-    """Given an interable of sqlite rows from the books table, lookup existing
-    books, create ones that don't exist, and return a mapping of koreader IDs to
-    primary key IDs for page creation.
-
-    """
-    book_id_map = {}
-
-    for book_row in rows:
-        book = Book.objects.filter(
-            koreader_md5=book_row[KoReaderBookColumn.MD5.value]
-        ).first()
+# Grace period between page reads for it to be a new scrobble
+SESSION_GAP_SECONDS = 3600  # one hour
+
+
+class KoReaderImporter:
+    # Maps a KoReader book ID to the Book.id and total read time of the book in Django
+    # Example:
+    # {"KOREADER_DB_ID": {
+    #     "book_id": <int>,
+    #     "total_seconds": <int>,
+    #     "pages": {
+    #         <int>: {
+    #             "start_ts": <TIMESTAMP>,
+    #             "end_ts": <TIMESTAMP>,
+    #             "duration": <int>
+    #         }
+    #     }
+    # }
+    BOOK_MAP = dict()
+    SQLITE_FILE_URL = str
+    USER_ID = int
+
+    def __init__(self, sqlite_file_url: str, user_id: int):
+        # Map KoReader book IDs to
+        self.SQLITE_FILE_URL = sqlite_file_url
+        self.USER_ID = user_id
+        self.importing_user = User.objects.filter(id=user_id).first()
+
+    def _get_author_str_from_row(self, row):
+        """Given a the raw author string from KoReader, convert it to a single line and
+        strip the middle initials, as OpenLibrary lookup usually fails with those.
+        """
+        ko_authors = row[KoReaderBookColumn.AUTHORS.value].replace("\n", ", ")
+        # Strip middle initials, OpenLibrary often fails with these
+        return re.sub(" [A-Z]. ", " ", ko_authors)
+
+    def _lookup_or_create_authors_from_author_str(
+        self, ko_author_str: str
+    ) -> list:
+        author_str_list = ko_author_str.split(", ")
+        author_list = []
+        for author_str in author_str_list:
+            logger.debug(f"Looking up author {author_str}")
+            # KoReader gave us nothing, bail
+            if author_str == "N/A":
+                logger.warn(
+                    f"KoReader author string is N/A, no authors to find"
+                )
+                continue
 
-        if not book:
-            book, created = Book.objects.get_or_create(
-                title=book_row[KoReaderBookColumn.TITLE.value]
-            )
+            author = Author.objects.filter(name=author_str).first()
+            if not author:
+                author = Author.objects.create(
+                    name=author_str,
+                    openlibrary_id=get_author_openlibrary_id(author_str),
+                )
+                author.fix_metadata()
+                logger.debug(f"Created author {author}")
+            author_list.append(author)
+        return author_list
+
+    def get_or_create_books(self, rows):
+        """Given an interable of sqlite rows from the books table, lookup existing
+        books, create ones that don't exist, and return a mapping of koreader IDs to
+        primary key IDs for page creation.
+
+        """
+        book_id_map = {}
+
+        for book_row in rows:
+            book = Book.objects.filter(
+                koreader_md5=book_row[KoReaderBookColumn.MD5.value]
+            ).first()
 
-            if created:
+            if not book:
+                # No KoReader book yet, create it
+                author_str = self._get_author_str_from_row(book_row)
                 total_pages = book_row[KoReaderBookColumn.PAGES.value]
-                run_time = total_pages * book.AVG_PAGE_READING_SECONDS
-                ko_authors = book_row[
-                    KoReaderBookColumn.AUTHORS.value
-                ].replace("\n", ", ")
-                # Strip middle initials, OpenLibrary often fails with these
-                ko_authors = re.sub(" [A-Z]. ", " ", ko_authors)
-                book_dict = {
-                    "title": book_row[KoReaderBookColumn.TITLE.value],
-                    "pages": total_pages,
-                    "koreader_md5": book_row[KoReaderBookColumn.MD5.value],
-                    "koreader_id": int(book_row[KoReaderBookColumn.ID.value]),
-                    "koreader_authors": ko_authors,
-                    "run_time_seconds": run_time,
-                }
-                Book.objects.filter(pk=book.id).update(**book_dict)
-
-                # Add authors
-                authors = ko_authors.split(", ")
-                author_list = []
-                for author_str in authors:
-                    logger.debug(f"Looking up author {author_str}")
-                    if author_str == "N/A":
-                        continue
-
-                    author = Author.objects.filter(name=author_str).first()
-                    if not author:
-                        author = Author.objects.create(
-                            name=author_str,
-                            openlibrary_id=get_author_openlibrary_id(
-                                author_str
-                            ),
-                        )
-                        author.fix_metadata()
-                        logger.debug(f"Created author {author}")
-                    book.authors.add(author)
-
-                # This will try to fix metadata by looking it up on OL
+                run_time = total_pages * Book.AVG_PAGE_READING_SECONDS
+
+                book = Book.objects.create(
+                    koreader_md5=book_row[KoReaderBookColumn.MD5.value],
+                    title=book_row[KoReaderBookColumn.TITLE.value],
+                    koreader_id=book_row[KoReaderBookColumn.ID.value],
+                    koreader_authors=author_str,
+                    pages=total_pages,
+                    run_time_seconds=run_time,
+                )
                 book.fix_metadata()
 
-        book.refresh_from_db()
-        total_seconds = 0
-        if book_row[KoReaderBookColumn.TOTAL_READ_TIME.value]:
-            total_seconds = book_row[KoReaderBookColumn.TOTAL_READ_TIME.value]
-
-        book_id_map[book_row[KoReaderBookColumn.ID.value]] = (
-            book.id,
-            total_seconds,
-        )
-
-    return book_id_map
-
-
-def build_scrobbles_from_pages(
-    rows: Iterable, book_id_map: dict, user_id: int
-) -> List[Scrobble]:
-    new_scrobbles = []
-
-    new_scrobbles = []
-    pages_found = []
-    book_read_time_map = {}
-    for page_row in rows:
-        koreader_id = page_row[KoReaderPageStatColumn.ID_BOOK.value]
-        if koreader_id not in book_id_map.keys():
-            continue
-        page_number = page_row[KoReaderPageStatColumn.PAGE.value]
-        ts = page_row[KoReaderPageStatColumn.START_TIME.value]
-        book_id = book_id_map[koreader_id][0]
-        book_read_time_map[book_id] = book_id_map[koreader_id][1]
-
-        page, page_created = Page.objects.get_or_create(
-            book_id=book_id, number=page_number, user_id=user_id
-        )
-        if page_created:
-            page.start_time = datetime.utcfromtimestamp(ts).replace(
-                tzinfo=pytz.utc
-            )
-            page.duration_seconds = page_row[
-                KoReaderPageStatColumn.DURATION.value
-            ]
-            page.save(update_fields=["start_time", "duration_seconds"])
-        pages_found.append(page)
-
-    playback_position_seconds = 0
-    for page in set(pages_found):
-        # Add up page seconds to set the aggregate time of all pages to reading time
-        playback_position_seconds = (
-            playback_position_seconds + page.duration_seconds
-        )
-        if page.is_scrobblable:
-            # Check to see if a scrobble with this timestamp, book and user already exists
-            scrobble = Scrobble.objects.filter(
-                timestamp=page.start_time,
-                book_id=page.book_id,
-                user_id=user_id,
-            ).first()
-            if not scrobble:
-                logger.debug(
-                    f"Queueing scrobble for {page.book}, page {page.number}"
+                # Add authors
+                author_list = self._lookup_or_create_authors_from_author_str(
+                    author_str
                 )
-                new_scrobble = Scrobble(
-                    book_id=page.book_id,
-                    user_id=user_id,
-                    source="KOReader",
-                    media_type=Scrobble.MediaType.BOOK,
-                    timestamp=page.start_time,
-                    played_to_completion=True,
-                    playback_position_seconds=playback_position_seconds,
-                    in_progress=False,
-                    book_pages_read=page.number,
-                    long_play_complete=False,
+                if author_list:
+                    book.authors.add(*author_list)
+
+                # self._lookup_authors
+
+            book.refresh_from_db()
+            total_seconds = 0
+            if book_row[KoReaderBookColumn.TOTAL_READ_TIME.value]:
+                total_seconds = book_row[
+                    KoReaderBookColumn.TOTAL_READ_TIME.value
+                ]
+
+            book_id_map[book_row[KoReaderBookColumn.ID.value]] = {
+                "book_id": book.id,
+                "total_seconds": total_seconds,
+            }
+        self.BOOK_MAP = book_id_map
+
+    def load_page_data_to_map(self, rows: Iterable) -> List[Scrobble]:
+        """Given rows of page data from KoReader, parse each row and build
+        scrobbles for our user, loading the page data into the page_data
+        field on the scrobble instance.
+        """
+        for page_row in rows:
+            koreader_book_id = page_row[KoReaderPageStatColumn.ID_BOOK.value]
+            if "pages" not in self.BOOK_MAP[koreader_book_id].keys():
+                self.BOOK_MAP[koreader_book_id]["pages"] = {}
+
+            if koreader_book_id not in self.BOOK_MAP.keys():
+                logger.warn(
+                    f"Found a page without a corresponding book ID ({koreader_book_id}) in KoReader DB",
+                    {"page_row": page_row},
+                )
+                continue
+
+            page_number = page_row[KoReaderPageStatColumn.PAGE.value]
+            duration = page_row[KoReaderPageStatColumn.DURATION.value]
+            start_ts = page_row[KoReaderPageStatColumn.START_TIME.value]
+            if self.importing_user:
+                start_ts = timestamp_user_tz_to_utc(
+                    page_row[KoReaderPageStatColumn.START_TIME.value],
+                    self.importing_user.timezone,
                 )
-                new_scrobbles.append(new_scrobble)
-            # After setting a scrobblable page, reset our accumulator
-            playback_position_seconds = 0
-    return new_scrobbles
-
 
-def enrich_koreader_scrobbles(scrobbles: list) -> None:
-    """Given a list of scrobbles, update pages read, long play seconds and check
-    for media completion"""
+            self.BOOK_MAP[koreader_book_id]["pages"][page_number] = {
+                "duration": duration,
+                "start_ts": start_ts,
+                "end_ts": start_ts + duration,
+            }
 
-    for scrobble in scrobbles:
-        scrobble.book_pages_read = scrobble.book.page_set.last().number
-        # But if there's a next scrobble, set pages read to their starting page
-        #
-        if scrobble.next:
-            scrobble.book_pages_read = scrobble.next.book_pages_read - 1
-        scrobble.long_play_seconds = scrobble.book.page_set.filter(
-            number__lte=scrobble.book_pages_read
-        ).aggregate(Sum("duration_seconds"))["duration_seconds__sum"]
+    def build_scrobbles_from_pages(self) -> List[Scrobble]:
+        scrobbles_to_create = []
 
-        scrobble.save(update_fields=["book_pages_read", "long_play_seconds"])
+        for koreader_book_id, book_dict in self.BOOK_MAP.items():
+            book_id = book_dict["book_id"]
+            if "pages" not in book_dict.keys():
+                logger.warn(f"No page data found in book map for {book_id}")
+                continue
 
+            should_create_scrobble = False
+            scrobble_page_data = {}
+            playback_position_seconds = 0
+            prev_page_stats = {}
+
+            pages_processed = 0
+            total_pages = len(self.BOOK_MAP[koreader_book_id]["pages"])
+
+            for page_number, stats in self.BOOK_MAP[koreader_book_id][
+                "pages"
+            ].items():
+                pages_processed += 1
+                # Accumulate our page data for this scrobble
+                scrobble_page_data[page_number] = stats
+
+                seconds_from_last_page = 0
+                if prev_page_stats:
+                    seconds_from_last_page = stats.get(
+                        "end_ts"
+                    ) - prev_page_stats.get("start_ts")
+                playback_position_seconds = (
+                    playback_position_seconds + stats.get("duration")
+                )
 
-def process_koreader_sqlite_url(file_url, user_id) -> list:
-    book_id_map = {}
-    new_scrobbles = []
+                if (
+                    seconds_from_last_page > SESSION_GAP_SECONDS
+                    or pages_processed == total_pages
+                ):
+                    should_create_scrobble = True
 
-    for table_name, pragma_table_info, rows in stream_sqlite(
-        _sqlite_bytes(file_url), max_buffer_size=1_048_576
-    ):
-        logger.debug(f"Found table {table_name} - processing")
-        if table_name == "book":
-            book_id_map = get_book_map_from_sqlite(rows)
+                print(
+                    f"Seconds: {seconds_from_last_page} - {should_create_scrobble}"
+                )
+                if should_create_scrobble:
+                    first_page_in_scrobble = list(scrobble_page_data.keys())[0]
+                    timestamp = datetime.utcfromtimestamp(
+                        int(
+                            scrobble_page_data.get(first_page_in_scrobble).get(
+                                "start_ts"
+                            )
+                        )
+                    ).replace(tzinfo=pytz.utc)
+
+                    scrobble = Scrobble.objects.filter(
+                        timestamp=timestamp,
+                        book_id=book_id,
+                        # user_id=self.importing_user.id,
+                    ).first()
+                    if not scrobble:
+                        logger.info(
+                            f"Queueing scrobble for {book_id}, page {page_number}"
+                        )
+                        scrobbles_to_create.append(
+                            Scrobble(
+                                book_id=book_id,
+                                # user_id=self.importing_user.id,
+                                source="KOReader",
+                                media_type=Scrobble.MediaType.BOOK,
+                                timestamp=timestamp,
+                                played_to_completion=True,
+                                playback_position_seconds=playback_position_seconds,
+                                in_progress=False,
+                                book_page_data=scrobble_page_data,
+                                book_pages_read=page_number,
+                                long_play_complete=False,
+                            )
+                        )
+                        # Then start over
+                        should_create_scrobble = False
+                        playback_position_seconds = 0
+                        scrobble_page_data = {}
+
+                prev_page_stats = stats
+        return scrobbles_to_create
+
+    def _enrich_koreader_scrobbles(self, scrobbles: list) -> None:
+        """Given a list of scrobbles, update pages read, long play seconds and check
+        for media completion"""
+
+        for scrobble in scrobbles:
+            # But if there's a next scrobble, set pages read to their starting page
+            #
+            if scrobble.next:
+                scrobble.book_pages_read = scrobble.next.book_pages_read - 1
+            scrobble.long_play_seconds = scrobble.book.page_set.filter(
+                number__lte=scrobble.book_pages_read
+            ).aggregate(Sum("duration_seconds"))["duration_seconds__sum"]
+
+            scrobble.save(
+                update_fields=["book_pages_read", "long_play_seconds"]
+            )
 
-        if table_name == "page_stat_data":
-            new_scrobbles = build_scrobbles_from_pages(
-                rows, book_id_map, user_id
+    def process_file(self):
+        new_scrobbles = []
+
+        for table_name, pragma_table_info, rows in stream_sqlite(
+            _sqlite_bytes(self.FILE_URL), max_buffer_size=1_048_576
+        ):
+            logger.debug(f"Found table {table_name} - processing")
+            if table_name == "book":
+                self.get_or_create_books(rows)
+
+            if table_name == "page_stat_data":
+                self.build_scrobbles_from_page_data(rows)
+
+                # new_scrobbles = build_scrobbles_from_pages(
+                #    rows, book_id_map, user_id
+                # )
+                # logger.debug(f"Creating {len(new_scrobbles)} new scrobbles")
+
+        created = []
+        if new_scrobbles:
+            created = Scrobble.objects.bulk_create(new_scrobbles)
+            self._enrich_koreader_scrobbles(created)
+            logger.info(
+                f"Created {len(created)} scrobbles",
+                extra={"created_scrobbles": created},
             )
-            logger.debug(f"Creating {len(new_scrobbles)} new scrobbles")
-
-    created = []
-    if new_scrobbles:
-        created = Scrobble.objects.bulk_create(new_scrobbles)
-        enrich_koreader_scrobbles(created)
-        logger.info(
-            f"Created {len(created)} scrobbles",
-            extra={"created_scrobbles": created},
+        return created
+
+    def process_koreader_sqlite_file(self, file_path, user_id) -> list:
+        """Given a sqlite file from KoReader, open the book table, iterate
+        over rows creating scrobbles from each book found"""
+        # Create a SQL connection to our SQLite database
+        con = sqlite3.connect(file_path)
+        cur = con.cursor()
+
+        book_id_map = self.get_or_create_books(
+            cur.execute("SELECT * FROM book")
         )
-    return created
-
-
-def process_koreader_sqlite_file(file_path, user_id) -> list:
-    """Given a sqlite file from KoReader, open the book table, iterate
-    over rows creating scrobbles from each book found"""
-    # Create a SQL connection to our SQLite database
-    con = sqlite3.connect(file_path)
-    cur = con.cursor()
-
-    book_id_map = get_book_map_from_sqlite(cur.execute("SELECT * FROM book"))
-    new_scrobbles = build_scrobbles_from_pages(
-        cur.execute("SELECT * from page_stat_data"), book_id_map, user_id
-    )
-
-    created = []
-    if new_scrobbles:
-        created = Scrobble.objects.bulk_create(new_scrobbles)
-        enrich_koreader_scrobbles(created)
-        logger.info(
-            f"Created {len(created)} scrobbles",
-            extra={"created_scrobbles": created},
+        new_scrobbles = self.build_scrobbles_from_pages(
+            cur.execute("SELECT * from page_stat_data"), book_id_map, user_id
         )
-    return created
 
+        created = []
+        if new_scrobbles:
+            created = Scrobble.objects.bulk_create(new_scrobbles)
+            self._enrich_koreader_scrobbles(created)
+            logger.info(
+                f"Created {len(created)} scrobbles",
+                extra={"created_scrobbles": created},
+            )
+        return created
 
-def process_koreader_sqlite(file_path: str, user_id: int) -> list:
-    is_os_file = "https://" not in file_path
+    def process_koreader_sqlite(file_path: str, user_id: int) -> list:
+        is_os_file = "https://" not in file_path
 
-    if is_os_file:
-        created = process_koreader_sqlite_file(file_path, user_id)
-    else:
-        created = process_koreader_sqlite_url(file_path, user_id)
-    return created
+        if is_os_file:
+            created = process_koreader_sqlite_file(file_path, user_id)
+        else:
+            created = process_koreader_sqlite_url(file_path, user_id)
+        return created

+ 9 - 1
vrobbler/apps/books/models.py

@@ -274,6 +274,12 @@ class Book(LongPlayScrobblableMixin):
         if not book:
             data = lookup_book_from_openlibrary(lookup_id, author)
 
+            if not data:
+                logger.error(
+                    f"No book found on openlibrary, or in our database for {lookup_id}"
+                )
+                return book
+
             book, book_created = cls.objects.get_or_create(isbn=data["isbn"])
             if book_created:
                 book.fix_metadata(data=data)
@@ -292,9 +298,11 @@ class Book(LongPlayScrobblableMixin):
 
 
 class Page(TimeStampedModel):
-    user = models.ForeignKey(User, on_delete=models.CASCADE)
+    """DEPRECATED, we need to migrate pages into page_data on scrobbles and move on"""
+
     book = models.ForeignKey(Book, on_delete=models.CASCADE)
     number = models.IntegerField()
+    user = models.ForeignKey(User, on_delete=models.CASCADE)
     start_time = models.DateTimeField(**BNULL)
     end_time = models.DateTimeField(**BNULL)
     duration_seconds = models.IntegerField(**BNULL)

+ 5 - 2
vrobbler/apps/books/openlibrary.py

@@ -36,8 +36,11 @@ def get_author_openlibrary_id(name: str) -> str:
         logger.warn(f"No author results found from search for {name}")
         return ""
 
-    result = results.get("docs", [])
-    return result[0].get("key")
+    try:
+        result = results.get("docs", [])[0]
+    except IndexError:
+        result = {"key": ""}
+    return result.get("key")
 
 
 def lookup_author_from_openlibrary(olid: str) -> dict:

+ 133 - 0
vrobbler/apps/books/tests/conftest.py

@@ -0,0 +1,133 @@
+import hashlib
+import pytest
+import random
+
+from vrobbler.apps.books.koreader import (
+    KoReaderBookColumn,
+    KoReaderImporter,
+    KoReaderPageStatColumn,
+)
+
+ordinal = lambda n: "%d%s" % (
+    n,
+    "tsnrhtdd"[(n // 10 % 10 != 1) * (n % 10 < 4) * n % 10 :: 4],
+)
+AVERAGE_PAGE_READING_SECONDS = 60
+
+
+class DummyResponse:
+    status_code = 200
+
+    def status_code(self):
+        return self.status_code
+
+
+@pytest.fixture
+def valid_response():
+    return DummyResponse()
+
+
+class KoReaderBookRows:
+    id = 1
+    DEFAULT_STR = "N/A"
+    DEFAULT_INT = 0
+    DEFAULT_TIME = 1703800469
+    BOOK_ROWS = []
+    PAGE_STATS_ROWS = []
+
+    def _gen_random_row(self, i):
+        wiggle = random.randrange(15)
+        title = f"Memoirs, Volume {i}"
+        return [
+            i,
+            title,
+            f"Lord Beaverbrook the {ordinal(i)}",
+            self.DEFAULT_INT + wiggle / 10,
+            self.DEFAULT_TIME + i * wiggle,
+            0,
+            300 + wiggle,
+            self.DEFAULT_STR,
+            self.DEFAULT_STR,
+            hashlib.md5(title.encode()),
+            i * wiggle * 20,
+            120,
+        ]
+
+    def _generate_random_book_rows(self, book_count):
+        if book_count > 0:
+            for i in range(1, book_count + 1):
+                self.BOOK_ROWS.append(self._gen_random_row(i))
+
+    def _generate_custom_book_row(self, **kwargs):
+        title = kwargs.get("title", self.DEFAULT_STR)
+        if title and title != "N/A":
+            self.BOOK_ROWS.append(
+                [
+                    kwargs.get("id", self.id),
+                    kwargs.get("title", self.DEFAULT_STR),
+                    kwargs.get("authors", self.DEFAULT_STR),
+                    kwargs.get("notes", self.DEFAULT_INT),
+                    kwargs.get("last_open", self.DEFAULT_TIME),
+                    kwargs.get("highlights", self.DEFAULT_INT),
+                    kwargs.get("pages", self.DEFAULT_INT),
+                    kwargs.get("series", self.DEFAULT_STR),
+                    kwargs.get("language", self.DEFAULT_STR),
+                    hashlib.md5(title.encode()),
+                    kwargs.get("total_read_time", self.DEFAULT_INT),
+                    kwargs.get("total_read_pages", self.DEFAULT_INT),
+                ]
+            )
+
+    def _generate_random_page_stats_rows(self):
+        for book in self.BOOK_ROWS:
+            pages = book[KoReaderBookColumn.PAGES.value]
+            pages_per_session = 20
+
+            start_time = book[KoReaderBookColumn.LAST_OPEN.value]
+            end_session = False
+            for page_num in range(
+                1, book[KoReaderBookColumn.TOTAL_READ_PAGES.value] + 1
+            ):
+                wiggle = random.randrange(5)
+                self.PAGE_STATS_ROWS.append(
+                    [
+                        book[KoReaderBookColumn.ID.value],
+                        page_num,
+                        start_time,
+                        AVERAGE_PAGE_READING_SECONDS + wiggle,
+                        pages,
+                    ]
+                )
+                if end_session:
+                    start_time += 3600  # one second over an hour, marking a new reading session
+                    end_session = False
+                else:
+                    start_time += AVERAGE_PAGE_READING_SECONDS
+
+                if page_num % pages_per_session == 0:
+                    end_session = True
+
+    def __init__(self, book_count=0, **kwargs):
+        self._generate_random_book_rows(book_count)
+        self._generate_custom_book_row(**kwargs)
+        self._generate_random_page_stats_rows()
+
+
+@pytest.fixture
+def koreader_book_row():
+    return KoReaderBookRows(book_count=1).BOOK_ROWS[0]
+
+
+@pytest.fixture
+def koreader_book_rows():
+    return KoReaderBookRows(book_count=4).BOOK_ROWS
+
+
+@pytest.fixture
+def koreader_rows():
+    return KoReaderBookRows(book_count=1)
+
+
+@pytest.fixture
+def koreader_rows_for_pages():
+    return KoReaderBookRows(book_count=1)

+ 49 - 0
vrobbler/apps/books/tests/test_koreader.py

@@ -0,0 +1,49 @@
+import pytest
+from unittest import mock
+
+from books.koreader import KoReaderImporter, KoReaderBookColumn
+
+
+@pytest.mark.django_db
+@mock.patch("requests.get")
+def test_get_or_create_books(get_mock, koreader_book_rows, valid_response):
+    get_mock.return_value = valid_response
+    importer = KoReaderImporter("test.sqlite3", user_id=1)
+    importer.get_or_create_books(koreader_book_rows)
+    assert len(importer.BOOK_MAP) == 4
+
+
+@pytest.mark.django_db
+@mock.patch("requests.get")
+def test_load_page_data_to_map(get_mock, koreader_rows, valid_response):
+    get_mock.return_value = valid_response
+    importer = KoReaderImporter("test.sqlite3", user_id=1)
+    importer.get_or_create_books(koreader_rows.BOOK_ROWS)
+
+    importer.load_page_data_to_map(koreader_rows.PAGE_STATS_ROWS)
+    assert (
+        len(importer.BOOK_MAP[1]["pages"])
+        == koreader_rows.BOOK_ROWS[0][
+            KoReaderBookColumn.TOTAL_READ_PAGES.value
+        ]
+    )
+
+
+@pytest.mark.django_db
+@mock.patch("requests.get")
+def test_build_scrobbles_from_pages(
+    get_mock, koreader_rows_for_pages, valid_response
+):
+    get_mock.return_value = valid_response
+    importer = KoReaderImporter("test.sqlite3", user_id=1)
+    importer.get_or_create_books(koreader_rows.BOOK_ROWS)
+    importer.load_page_data_to_map(koreader_rows.PAGE_STATS_ROWS)
+    scrobbles = importer.build_scrobbles_from_pages()
+    # Corresponds to number of sessions per book ( 20 pages per session, 120 +/- 15 pages read )
+    assert len(scrobbles) == 6
+    assert len(scrobbles[0].book_page_data.keys()) == 22
+    assert len(scrobbles[1].book_page_data.keys()) == 20
+    assert len(scrobbles[2].book_page_data.keys()) == 20
+    assert len(scrobbles[3].book_page_data.keys()) == 20
+    assert len(scrobbles[4].book_page_data.keys()) == 20
+    assert len(scrobbles[5].book_page_data.keys()) == 18