"""A python class to manage caching of data from Comic Vine""" # # Copyright 2012-2014 Anthony Beville # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations import datetime import json import logging import os import pathlib import sqlite3 as lite from typing import Any from comictalker.resulttypes import ComicIssue, ComicSeries logger = logging.getLogger(__name__) class ComicCacher: def __init__(self, cache_folder: pathlib.Path, version: str) -> None: self.cache_folder = cache_folder self.db_file = cache_folder / "comic_cache.db" self.version_file = cache_folder / "cache_version.txt" self.version = version # verify that cache is from same version as this one data = "" try: with open(self.version_file, "rb") as f: data = f.read().decode("utf-8") f.close() except Exception: pass if data != version: self.clear_cache() if not os.path.exists(self.db_file): self.create_cache_db() def clear_cache(self) -> None: try: os.unlink(self.db_file) except Exception: pass try: os.unlink(self.version_file) except Exception: pass def create_cache_db(self) -> None: # create the version file with open(self.version_file, "w", encoding="utf-8") as f: f.write(self.version) # this will wipe out any existing version open(self.db_file, "wb").close() con = lite.connect(self.db_file) # create tables with con: cur = con.cursor() # source_name,name,id,start_year,publisher,image,description,count_of_issues cur.execute( "CREATE TABLE SeriesSearchCache(" + "search_term TEXT," + "id INT NOT NULL," + "timestamp DATE DEFAULT (datetime('now','localtime'))," + "source_name TEXT NOT NULL)" ) cur.execute( "CREATE TABLE Series(" + "id INT NOT NULL," + "name TEXT," + "publisher TEXT," + "count_of_issues INT," + "start_year INT," + "image_url TEXT," + "aliases TEXT," # Newline separated + "description TEXT," + "timestamp DATE DEFAULT (datetime('now','localtime')), " + "source_name TEXT NOT NULL," + "PRIMARY KEY (id, source_name))" ) cur.execute( "CREATE TABLE Issues(" + "id INT NOT NULL," + "series_id INT," + "name TEXT," + "issue_number TEXT," + "image_url TEXT," + "thumb_url TEXT," + "cover_date TEXT," + "site_detail_url TEXT," + "description TEXT," + "timestamp DATE DEFAULT (datetime('now','localtime')), " + "source_name TEXT NOT NULL," + "aliases TEXT," # Newline separated + "alt_image_urls TEXT," # Newline separated URLs + "characters TEXT," # Newline separated + "locations TEXT," # Newline separated + "credits TEXT," # JSON: "{"name": "Bob Shakespeare", "role": "Writer"}" + "teams TEXT," # Newline separated + "story_arcs TEXT," # Newline separated + "complete BOOL," # Is the data complete? Includes characters, locations, credits. + "PRIMARY KEY (id, source_name))" ) def add_search_results(self, source_name: str, search_term: str, ct_search_results: list[ComicSeries]) -> None: con = lite.connect(self.db_file) with con: con.text_factory = str cur = con.cursor() # remove all previous entries with this search term cur.execute( "DELETE FROM SeriesSearchCache WHERE search_term = ? AND source_name = ?", [search_term.casefold(), source_name], ) # now add in new results for record in ct_search_results: cur.execute( "INSERT INTO SeriesSearchCache " + "(source_name, search_term, id) " + "VALUES(?, ?, ?)", ( source_name, search_term.casefold(), record["id"], ), ) data = { "id": record["id"], "source_name": source_name, "name": record["name"], "publisher": record.get("publisher", ""), "count_of_issues": record.get("count_of_issues"), "start_year": record.get("start_year"), "image_url": record.get("image_url", ""), "description": record.get("description", ""), "timestamp": datetime.datetime.now(), "aliases": "\n".join(record.get("aliases", [])), } self.upsert(cur, "series", data) def get_search_results(self, source_name: str, search_term: str) -> list[ComicSeries]: results = [] con = lite.connect(self.db_file) with con: con.text_factory = str cur = con.cursor() cur.execute( "SELECT * FROM SeriesSearchCache INNER JOIN Series on" " SeriesSearchCache.id=Series.id AND SeriesSearchCache.source_name=Series.source_name" " WHERE search_term=? AND SeriesSearchCache.source_name=?", [search_term.casefold(), source_name], ) rows = cur.fetchall() # now process the results for record in rows: result = ComicSeries( id=record[4], name=record[5], publisher=record[6], count_of_issues=record[7], start_year=record[8], image_url=record[9], aliases=record[10].strip().splitlines(), description=record[11], ) results.append(result) return results def add_series_info(self, source_name: str, series_record: ComicSeries) -> None: con = lite.connect(self.db_file) with con: cur = con.cursor() timestamp = datetime.datetime.now() data = { "id": series_record["id"], "source_name": source_name, "name": series_record["name"], "publisher": series_record.get("publisher", ""), "count_of_issues": series_record.get("count_of_issues"), "start_year": series_record.get("start_year"), "image_url": series_record.get("image_url", ""), "description": series_record.get("description", ""), "timestamp": timestamp, "aliases": "\n".join(series_record.get("aliases", [])), } self.upsert(cur, "series", data) def add_series_issues_info(self, source_name: str, series_issues: list[ComicIssue]) -> None: con = lite.connect(self.db_file) with con: cur = con.cursor() timestamp = datetime.datetime.now() # add in issues for issue in series_issues: data = { "id": issue["id"], "series_id": issue["series"]["id"], "source_name": source_name, "name": issue["name"], "issue_number": issue["issue_number"], "site_detail_url": issue.get("site_detail_url"), "cover_date": issue.get("cover_date"), "image_url": issue.get("image_url", ""), "thumb_url": issue.get("image_thumb_url", ""), "description": issue.get("description", ""), "timestamp": timestamp, "aliases": "\n".join(issue.get("aliases", [])), "alt_image_urls": "\n".join(issue.get("alt_image_urls", [])), "characters": "\n".join(issue.get("characters", [])), "locations": "\n".join(issue.get("locations", [])), "teams": "\n".join(issue.get("teams", [])), "story_arcs": "\n".join(issue.get("story_arcs", [])), "credits": json.dumps(issue.get("credits")), "complete": issue["complete"], } self.upsert(cur, "issues", data) def get_series_info(self, series_id: int, source_name: str, purge: bool = True) -> ComicSeries | None: result: ComicSeries | None = None con = lite.connect(self.db_file) with con: cur = con.cursor() con.text_factory = str if purge: # purge stale series info a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7) cur.execute("DELETE FROM Series WHERE timestamp < ?", [str(a_week_ago)]) # fetch cur.execute( "SELECT * FROM Series" " WHERE id=? AND source_name=?", [series_id, source_name], ) row = cur.fetchone() if row is None: return result # since ID is primary key, there is only one row result = ComicSeries( id=row[0], name=row[1], publisher=row[2], count_of_issues=row[3], start_year=row[4], image_url=row[5], aliases=row[6].strip().splitlines(), description=row[7], ) return result def get_series_issues_info(self, series_id: int, source_name: str) -> list[ComicIssue]: # get_series_info should only fail if someone is doing something weird series = self.get_series_info(series_id, source_name, False) or ComicSeries(id=series_id, name="") con = lite.connect(self.db_file) with con: cur = con.cursor() con.text_factory = str # purge stale issue info - probably issue data won't change # much.... a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7) cur.execute("DELETE FROM Issues WHERE timestamp < ?", [str(a_week_ago)]) # fetch results: list[ComicIssue] = [] cur.execute( ( "SELECT source_name,id,name,issue_number,site_detail_url,cover_date,image_url,thumb_url,description,aliases,alt_image_urls,characters,locations,credits,teams,story_arcs,complete" " FROM Issues WHERE series_id=? AND source_name=?" ), [series_id, source_name], ) rows = cur.fetchall() # now process the results for row in rows: record = ComicIssue( id=row[1], name=row[2], issue_number=row[3], site_detail_url=row[4], cover_date=row[5], image_url=row[6], description=row[8], series=series, aliases=row[9].strip().splitlines(), alt_image_urls=row[10].strip().splitlines(), characters=row[11].strip().splitlines(), locations=row[12].strip().splitlines(), credits=json.loads(row[13]), teams=row[14].strip().splitlines(), story_arcs=row[15].strip().splitlines(), complete=bool(row[16]), ) results.append(record) return results def get_issue_info(self, issue_id: int, source_name: str) -> ComicIssue | None: con = lite.connect(self.db_file) with con: cur = con.cursor() con.text_factory = str # purge stale issue info - probably issue data won't change # much.... a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7) cur.execute("DELETE FROM Issues WHERE timestamp < ?", [str(a_week_ago)]) cur.execute( ( "SELECT source_name,id,name,issue_number,site_detail_url,cover_date,image_url,thumb_url,description,aliases,series_id,alt_image_urls,characters,locations,credits,teams,story_arcs,complete" " FROM Issues WHERE id=? AND source_name=?" ), [issue_id, source_name], ) row = cur.fetchone() record = None if row: # get_series_info should only fail if someone is doing something weird series = self.get_series_info(row[10], source_name, False) or ComicSeries(id=row[10], name="") # now process the results record = ComicIssue( id=row[1], name=row[2], issue_number=row[3], site_detail_url=row[4], cover_date=row[5], image_url=row[6], image_thumb_url=row[7], description=row[8], series=series, aliases=row[9].strip().splitlines(), alt_image_urls=row[11].strip().splitlines(), characters=row[12].strip().splitlines(), locations=row[13].strip().splitlines(), credits=json.loads(row[14]), teams=row[15].strip().splitlines(), story_arcs=row[16].strip().splitlines(), complete=bool(row[17]), ) return record def upsert(self, cur: lite.Cursor, tablename: str, data: dict[str, Any]) -> None: """This does an insert if the given PK doesn't exist, and an update it if does TODO: should the cursor be created here, and not up the stack? """ keys = "" vals = [] ins_slots = "" set_slots = "" for key in data: if data[key] is None: continue if keys != "": keys += ", " if ins_slots != "": ins_slots += ", " if set_slots != "": set_slots += ", " keys += key vals.append(data[key]) ins_slots += "?" set_slots += key + " = ?" sql_ins = f"INSERT OR REPLACE INTO {tablename} ({keys}) VALUES ({ins_slots})" cur.execute(sql_ins, vals)