123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- import re
- import codecs
- import logging
- import os
- import sqlite3
- from datetime import datetime
- from enum import Enum
- from typing import Iterable, List
- import pytz
- import requests
- from books.models import Author, Book, Page
- from pylast import httpx, tempfile
- from scrobbles.models import Scrobble
- from stream_sqlite import stream_sqlite
- from vrobbler.apps.books.openlibrary import get_author_openlibrary_id
- logger = logging.getLogger(__name__)
- class KoReaderBookColumn(Enum):
- ID = 0
- TITLE = 1
- AUTHORS = 2
- NOTES = 3
- LAST_OPEN = 4
- HIGHLIGHTS = 5
- PAGES = 6
- SERIES = 7
- LANGUAGE = 8
- MD5 = 9
- TOTAL_READ_TIME = 10
- TOTAL_READ_PAGES = 11
- class KoReaderPageStatColumn(Enum):
- ID_BOOK = 0
- PAGE = 1
- START_TIME = 2
- DURATION = 3
- TOTAL_PAGES = 4
- def _sqlite_bytes(sqlite_url):
- with httpx.stream("GET", sqlite_url) as r:
- yield from r.iter_bytes(chunk_size=65_536)
- def get_book_map_from_sqlite(rows: Iterable) -> dict:
- """Given an interable of sqlite rows from the books table, lookup existing
- books, create ones that don't exist, and return a mapping of koreader IDs to
- primary key IDs for page creation.
- """
- book_id_map = {}
- for book_row in rows:
- book = Book.objects.filter(
- koreader_md5=book_row[KoReaderBookColumn.MD5.value]
- ).first()
- if not book:
- book, created = Book.objects.get_or_create(
- title=book_row[KoReaderBookColumn.TITLE.value]
- )
- if created:
- total_pages = book_row[KoReaderBookColumn.PAGES.value]
- run_time = total_pages * book.AVG_PAGE_READING_SECONDS
- ko_authors = book_row[
- KoReaderBookColumn.AUTHORS.value
- ].replace("\n", ", ")
- # Strip middle initials, OpenLibrary often fails with these
- ko_authors = re.sub(" [A-Z]. ", " ", ko_authors)
- book_dict = {
- "title": book_row[KoReaderBookColumn.TITLE.value],
- "pages": total_pages,
- "koreader_md5": book_row[KoReaderBookColumn.MD5.value],
- "koreader_id": int(book_row[KoReaderBookColumn.ID.value]),
- "koreader_authors": ko_authors,
- "run_time_seconds": run_time,
- }
- Book.objects.filter(pk=book.id).update(**book_dict)
- # Add authors
- authors = ko_authors.split(", ")
- author_list = []
- for author_str in authors:
- logger.debug(f"Looking up author {author_str}")
- if author_str == "N/A":
- continue
- author, created = Author.objects.get_or_create(
- name=author_str
- )
- if created:
- author.openlibrary_id = get_author_openlibrary_id(
- author_str
- )
- author.save(update_fields=["openlibrary_id"])
- author.fix_metadata()
- logger.debug(f"Created author {author}")
- book.authors.add(author)
- # This will try to fix metadata by looking it up on OL
- book.fix_metadata()
- playback_position_seconds = 0
- if book_row[KoReaderBookColumn.TOTAL_READ_TIME.value]:
- playback_position_seconds = book_row[
- KoReaderBookColumn.TOTAL_READ_TIME.value
- ]
- pages_read = 0
- if book_row[KoReaderBookColumn.TOTAL_READ_PAGES.value]:
- pages_read = int(
- book_row[KoReaderBookColumn.TOTAL_READ_PAGES.value]
- )
- timestamp = datetime.utcfromtimestamp(
- book_row[KoReaderBookColumn.LAST_OPEN.value]
- ).replace(tzinfo=pytz.utc)
- book.refresh_from_db()
- book_id_map[book_row[KoReaderBookColumn.ID.value]] = book.id
- return book_id_map
- def build_scrobbles_from_pages(
- rows: Iterable, book_id_map: dict, user_id: int
- ) -> List[Scrobble]:
- new_scrobbles = []
- new_scrobbles = []
- pages_created = []
- for page_row in rows:
- koreader_id = page_row[KoReaderPageStatColumn.ID_BOOK.value]
- page_number = page_row[KoReaderPageStatColumn.PAGE.value]
- ts = page_row[KoReaderPageStatColumn.START_TIME.value]
- book_id = book_id_map[koreader_id]
- page, page_created = Page.objects.get_or_create(
- book_id=book_id, number=page_number, user_id=user_id
- )
- if page_created:
- page.start_time = datetime.utcfromtimestamp(ts).replace(
- tzinfo=pytz.utc
- )
- page.duration_seconds = page_row[
- KoReaderPageStatColumn.DURATION.value
- ]
- page.save(update_fields=["start_time", "duration_seconds"])
- pages_created.append(page)
- for page in pages_created:
- if page.is_scrobblable:
- # Page number is a placeholder, we'll re-preocess this after creation
- logger.debug(
- f"Queueing scrobble for {page.book}, page {page.number}"
- )
- new_scrobble = Scrobble(
- book_id=page.book_id,
- user_id=user_id,
- source="KOReader",
- timestamp=page.start_time,
- played_to_completion=True,
- in_progress=False,
- book_pages_read=page.number,
- long_play_complete=False,
- )
- new_scrobbles.append(new_scrobble)
- return new_scrobbles
- def enrich_koreader_scrobbles(scrobbles: list) -> None:
- """Given a list of scrobbles, update pages read, long play seconds and check
- for media completion"""
- for scrobble in scrobbles:
- if scrobble.next:
- # Set pages read to the starting page of the next scrobble minus one, if it exists
- scrobble.book_pages_read = scrobble.next.book_pages_read - 1
- scrobble.save(update_fields=["book_pages_read"])
- else:
- # Set pages read to the last page we have
- scrobble.book_pages_read = scrobble.book.page_set.last().number
- scrobble.save(update_fields=["book_pages_read", "long_play_complete"])
- def process_koreader_sqlite_url(file_url, user_id) -> list:
- book_id_map = {}
- new_scrobbles = []
- for table_name, pragma_table_info, rows in stream_sqlite(
- _sqlite_bytes(file_url), max_buffer_size=1_048_576
- ):
- if table_name == "book":
- book_id_map = get_book_map_from_sqlite(rows)
- if table_name == "page":
- new_scrobbles = build_scrobbles_from_pages(
- rows, book_id_map, user_id
- )
- created = []
- if new_scrobbles:
- created = Scrobble.objects.bulk_create(new_scrobbles)
- enrich_koreader_scrobbles(created)
- logger.info(
- f"Created {len(created)} scrobbles",
- extra={"created_scrobbles": created},
- )
- return created
- def process_koreader_sqlite_file(file_path, user_id) -> list:
- """Given a sqlite file from KoReader, open the book table, iterate
- over rows creating scrobbles from each book found"""
- # Create a SQL connection to our SQLite database
- con = sqlite3.connect(file_path)
- cur = con.cursor()
- book_id_map = get_book_map_from_sqlite(cur.execute("SELECT * FROM book"))
- new_scrobbles = build_scrobbles_from_pages(
- cur.execute("SELECT * from page_stat_data"), book_id_map, user_id
- )
- created = []
- if new_scrobbles:
- created = Scrobble.objects.bulk_create(new_scrobbles)
- enrich_koreader_scrobbles(created)
- logger.info(
- f"Created {len(created)} scrobbles",
- extra={"created_scrobbles": created},
- )
- return created
- def process_koreader_sqlite(file_path: str, user_id: int) -> list:
- is_os_file = "https://" not in file_path
- if is_os_file:
- created = process_koreader_sqlite_file(file_path, user_id)
- else:
- created = process_koreader_sqlite_url(file_path, user_id)
- return created
|