From 4151c0e113f4317001087aa45608fb91006f49e2 Mon Sep 17 00:00:00 2001 From: Timmy Welch Date: Fri, 28 Jul 2023 23:22:35 -0700 Subject: [PATCH] Cleanup sqlite Remove the import rename use sqlite3.Row allows retrieving value by name --- comictalker/comiccacher.py | 191 +++++++++++++++++++------------------ 1 file changed, 100 insertions(+), 91 deletions(-) diff --git a/comictalker/comiccacher.py b/comictalker/comiccacher.py index 51dc311..ebb3a5c 100644 --- a/comictalker/comiccacher.py +++ b/comictalker/comiccacher.py @@ -21,7 +21,7 @@ import json import logging import os import pathlib -import sqlite3 as lite +import sqlite3 from typing import Any from comictalker.resulttypes import ComicIssue, ComicSeries, Credit @@ -68,7 +68,8 @@ class ComicCacher: # this will wipe out any existing version open(self.db_file, "wb").close() - con = lite.connect(self.db_file) + con = sqlite3.connect(self.db_file) + con.row_factory = sqlite3.Row # create tables with con: @@ -76,15 +77,18 @@ class ComicCacher: # source_name,name,id,start_year,publisher,image,description,count_of_issues cur.execute( "CREATE TABLE SeriesSearchCache(" - + "search_term TEXT," - + "id TEXT NOT NULL," + "timestamp DATE DEFAULT (datetime('now','localtime'))," - + "source_name TEXT NOT NULL)" + + "id TEXT NOT NULL," + + "source_name TEXT NOT NULL," + + "search_term TEXT," + + "PRIMARY KEY (id, source_name, search_term))" ) cur.execute( "CREATE TABLE Series(" + + "timestamp DATE DEFAULT (datetime('now','localtime')), " + "id TEXT NOT NULL," + + "source_name TEXT NOT NULL," + "name TEXT," + "publisher TEXT," + "count_of_issues INT," @@ -95,14 +99,14 @@ class ComicCacher: + "description TEXT," + "genres TEXT," # Newline separated. For filtering etc. + "format TEXT," - + "timestamp DATE DEFAULT (datetime('now','localtime')), " - + "source_name TEXT NOT NULL," + "PRIMARY KEY (id, source_name))" ) cur.execute( "CREATE TABLE Issues(" + + "timestamp DATE DEFAULT (datetime('now','localtime')), " + "id TEXT NOT NULL," + + "source_name TEXT NOT NULL," + "series_id TEXT," + "name TEXT," + "issue_number TEXT," @@ -111,8 +115,6 @@ class ComicCacher: + "cover_date TEXT," + "site_detail_url TEXT," + "description TEXT," - + "timestamp DATE DEFAULT (datetime('now','localtime')), " - + "source_name TEXT NOT NULL," + "aliases TEXT," # Newline separated + "alt_image_urls TEXT," # Newline separated URLs + "characters TEXT," # Newline separated @@ -133,7 +135,8 @@ class ComicCacher: ) def add_search_results(self, source_name: str, search_term: str, ct_search_results: list[ComicSeries]) -> None: - con = lite.connect(self.db_file) + con = sqlite3.connect(self.db_file) + con.row_factory = sqlite3.Row with con: con.text_factory = str @@ -148,7 +151,7 @@ class ComicCacher: # now add in new results for record in ct_search_results: cur.execute( - "INSERT INTO SeriesSearchCache " + "(source_name, search_term, id) " + "VALUES(?, ?, ?)", + "INSERT INTO SeriesSearchCache (source_name, search_term, id) VALUES(?, ?, ?)", (source_name, search_term.casefold(), record.id), ) @@ -171,15 +174,16 @@ class ComicCacher: def get_search_results(self, source_name: str, search_term: str) -> list[ComicSeries]: results = [] - con = lite.connect(self.db_file) + con = sqlite3.connect(self.db_file) + con.row_factory = sqlite3.Row with con: con.text_factory = str cur = con.cursor() cur.execute( "SELECT * FROM SeriesSearchCache INNER JOIN Series on" - " SeriesSearchCache.id=Series.id AND SeriesSearchCache.source_name=Series.source_name" - " WHERE search_term=? AND SeriesSearchCache.source_name=?", + + " SeriesSearchCache.id=Series.id AND SeriesSearchCache.source_name=Series.source_name" + + " WHERE search_term=? AND SeriesSearchCache.source_name=?", [search_term.casefold(), source_name], ) @@ -187,17 +191,17 @@ class ComicCacher: # now process the results for record in rows: result = ComicSeries( - id=record[4], - name=record[5], - publisher=record[6], - count_of_issues=record[7], - count_of_volumes=record[8], - start_year=record[9], - image_url=record[10], - aliases=record[11].strip().splitlines(), - description=record[12], - genres=record[13].strip().splitlines(), - format=record[14], + id=record["id"], + name=record["name"], + publisher=record["publisher"], + count_of_issues=record["count_of_issues"], + count_of_volumes=record["count_of_volumes"], + start_year=record["start_year"], + image_url=record["image_url"], + aliases=record["aliases"].strip().splitlines(), + description=record["description"], + genres=record["genres"].strip().splitlines(), + format=record["format"], ) results.append(result) @@ -205,7 +209,8 @@ class ComicCacher: return results def add_series_info(self, source_name: str, series_record: ComicSeries) -> None: - con = lite.connect(self.db_file) + con = sqlite3.connect(self.db_file) + con.row_factory = sqlite3.Row with con: cur = con.cursor() @@ -230,7 +235,8 @@ class ComicCacher: self.upsert(cur, "series", data) def add_series_issues_info(self, source_name: str, series_issues: list[ComicIssue]) -> None: - con = lite.connect(self.db_file) + con = sqlite3.connect(self.db_file) + con.row_factory = sqlite3.Row with con: cur = con.cursor() @@ -273,7 +279,8 @@ class ComicCacher: def get_series_info(self, series_id: str, source_name: str, purge: bool = True) -> ComicSeries | None: result: ComicSeries | None = None - con = lite.connect(self.db_file) + con = sqlite3.connect(self.db_file) + con.row_factory = sqlite3.Row with con: cur = con.cursor() con.text_factory = str @@ -284,7 +291,7 @@ class ComicCacher: cur.execute("DELETE FROM Series WHERE timestamp < ?", [str(a_week_ago)]) # fetch - cur.execute("SELECT * FROM Series" " WHERE id=? AND source_name=?", [series_id, source_name]) + cur.execute("SELECT * FROM Series WHERE id=? AND source_name=?", [series_id, source_name]) row = cur.fetchone() @@ -293,17 +300,17 @@ class ComicCacher: # since ID is primary key, there is only one row result = ComicSeries( - id=row[0], - name=row[1], - publisher=row[2], - count_of_issues=row[3], - count_of_volumes=row[4], - start_year=row[5], - image_url=row[6], - aliases=row[7].strip().splitlines(), - description=row[8], - genres=row[9].strip().splitlines(), - format=row[10], + id=row["id"], + name=row["name"], + publisher=row["publisher"], + count_of_issues=row["count_of_issues"], + count_of_volumes=row["count_of_volumes"], + start_year=row["start_year"], + image_url=row["image_url"], + aliases=row["aliases"].strip().splitlines(), + description=row["description"], + genres=row["genres"].strip().splitlines(), + format=row["format"], ) return result @@ -323,7 +330,8 @@ class ComicCacher: count_of_volumes=None, format=None, ) - con = lite.connect(self.db_file) + con = sqlite3.connect(self.db_file) + con.row_factory = sqlite3.Row with con: cur = con.cursor() con.text_factory = str @@ -343,35 +351,35 @@ class ComicCacher: for row in rows: credits = [] try: - for credit in json.loads(row[15]): + for credit in json.loads(row["credits"]): credits.append(Credit(**credit)) except Exception: logger.exception("credits failed") record = ComicIssue( - id=row[0], - name=row[2], - issue_number=row[3], - volume=row[25], - site_detail_url=row[7], - cover_date=row[6], - image_url=row[4], - description=row[8], series=series, - aliases=row[11].strip().splitlines(), - alt_image_urls=row[12].strip().splitlines(), - characters=row[13].strip().splitlines(), - locations=row[14].strip().splitlines(), credits=credits, - teams=row[16].strip().splitlines(), - story_arcs=row[17].strip().splitlines(), - genres=row[18].strip().splitlines(), - tags=row[19].strip().splitlines(), - critical_rating=row[20], - manga=row[21], - maturity_rating=row[22], - language=row[23], - country=row[24], - complete=bool(row[26]), + id=row["id"], + name=row["name"], + issue_number=row["issue_number"], + image_url=row["image_url"], + cover_date=row["cover_date"], + site_detail_url=row["site_detail_url"], + description=row["description"], + aliases=row["aliases"].strip().splitlines(), + alt_image_urls=row["alt_image_urls"].strip().splitlines(), + characters=row["characters"].strip().splitlines(), + locations=row["locations"].strip().splitlines(), + teams=row["teams"].strip().splitlines(), + story_arcs=row["story_arcs"].strip().splitlines(), + genres=row["genres"].strip().splitlines(), + tags=row["tags"].strip().splitlines(), + critical_rating=row["critical_rating"], + manga=row["manga"], + maturity_rating=row["maturity_rating"], + language=row["language"], + country=row["country"], + volume=row["volume"], + complete=bool(["complete"]), ) results.append(record) @@ -379,7 +387,8 @@ class ComicCacher: return results def get_issue_info(self, issue_id: int, source_name: str) -> ComicIssue | None: - con = lite.connect(self.db_file) + con = sqlite3.connect(self.db_file) + con.row_factory = sqlite3.Row with con: cur = con.cursor() con.text_factory = str @@ -396,8 +405,8 @@ class ComicCacher: if row: # get_series_info should only fail if someone is doing something weird - series = self.get_series_info(row[1], source_name, False) or ComicSeries( - id=row[1], + series = self.get_series_info(row["id"], source_name, False) or ComicSeries( + id=row["id"], name="", description="", genres=[], @@ -413,40 +422,40 @@ class ComicCacher: # now process the results credits = [] try: - for credit in json.loads(row[15]): + for credit in json.loads(row["credits"]): credits.append(Credit(**credit)) except Exception: logger.exception("credits failed") record = ComicIssue( - id=row[0], - name=row[2], - issue_number=row[3], - volume=row[25], - site_detail_url=row[7], - cover_date=row[6], - image_url=row[4], - description=row[8], series=series, - aliases=row[11].strip().splitlines(), - alt_image_urls=row[12].strip().splitlines(), - characters=row[13].strip().splitlines(), - locations=row[14].strip().splitlines(), credits=credits, - teams=row[16].strip().splitlines(), - story_arcs=row[17].strip().splitlines(), - genres=row[18].strip().splitlines(), - tags=row[19].strip().splitlines(), - critical_rating=row[20], - manga=row[21], - maturity_rating=row[22], - language=row[23], - country=row[24], - complete=bool(row[26]), + id=row["id"], + name=row["name"], + issue_number=row["issue_number"], + image_url=row["image_url"], + cover_date=row["cover_date"], + site_detail_url=row["site_detail_url"], + description=row["description"], + aliases=row["aliases"].strip().splitlines(), + alt_image_urls=row["alt_image_urls"].strip().splitlines(), + characters=row["characters"].strip().splitlines(), + locations=row["locations"].strip().splitlines(), + teams=row["teams"].strip().splitlines(), + story_arcs=row["story_arcs"].strip().splitlines(), + genres=row["genres"].strip().splitlines(), + tags=row["tags"].strip().splitlines(), + critical_rating=row["critical_rating"], + manga=row["manga"], + maturity_rating=row["maturity_rating"], + language=row["language"], + country=row["country"], + volume=row["volume"], + complete=bool(row["complete"]), ) return record - def upsert(self, cur: lite.Cursor, tablename: str, data: dict[str, Any]) -> None: + def upsert(self, cur: sqlite3.Cursor, tablename: str, data: dict[str, Any]) -> None: """This does an insert if the given PK doesn't exist, and an update it if does