Cleanup sqlite

Remove the import rename
use sqlite3.Row allows retrieving value by name
This commit is contained in:
Timmy Welch 2023-07-28 23:22:35 -07:00
parent f43f51aa2f
commit 4151c0e113

View File

@ -21,7 +21,7 @@ import json
import logging
import os
import pathlib
import sqlite3 as lite
import sqlite3
from typing import Any
from comictalker.resulttypes import ComicIssue, ComicSeries, Credit
@ -68,7 +68,8 @@ class ComicCacher:
# this will wipe out any existing version
open(self.db_file, "wb").close()
con = lite.connect(self.db_file)
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
# create tables
with con:
@ -76,15 +77,18 @@ class ComicCacher:
# source_name,name,id,start_year,publisher,image,description,count_of_issues
cur.execute(
"CREATE TABLE SeriesSearchCache("
+ "search_term TEXT,"
+ "id TEXT NOT NULL,"
+ "timestamp DATE DEFAULT (datetime('now','localtime')),"
+ "source_name TEXT NOT NULL)"
+ "id TEXT NOT NULL,"
+ "source_name TEXT NOT NULL,"
+ "search_term TEXT,"
+ "PRIMARY KEY (id, source_name, search_term))"
)
cur.execute(
"CREATE TABLE Series("
+ "timestamp DATE DEFAULT (datetime('now','localtime')), "
+ "id TEXT NOT NULL,"
+ "source_name TEXT NOT NULL,"
+ "name TEXT,"
+ "publisher TEXT,"
+ "count_of_issues INT,"
@ -95,14 +99,14 @@ class ComicCacher:
+ "description TEXT,"
+ "genres TEXT," # Newline separated. For filtering etc.
+ "format TEXT,"
+ "timestamp DATE DEFAULT (datetime('now','localtime')), "
+ "source_name TEXT NOT NULL,"
+ "PRIMARY KEY (id, source_name))"
)
cur.execute(
"CREATE TABLE Issues("
+ "timestamp DATE DEFAULT (datetime('now','localtime')), "
+ "id TEXT NOT NULL,"
+ "source_name TEXT NOT NULL,"
+ "series_id TEXT,"
+ "name TEXT,"
+ "issue_number TEXT,"
@ -111,8 +115,6 @@ class ComicCacher:
+ "cover_date TEXT,"
+ "site_detail_url TEXT,"
+ "description TEXT,"
+ "timestamp DATE DEFAULT (datetime('now','localtime')), "
+ "source_name TEXT NOT NULL,"
+ "aliases TEXT," # Newline separated
+ "alt_image_urls TEXT," # Newline separated URLs
+ "characters TEXT," # Newline separated
@ -133,7 +135,8 @@ class ComicCacher:
)
def add_search_results(self, source_name: str, search_term: str, ct_search_results: list[ComicSeries]) -> None:
con = lite.connect(self.db_file)
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
con.text_factory = str
@ -148,7 +151,7 @@ class ComicCacher:
# now add in new results
for record in ct_search_results:
cur.execute(
"INSERT INTO SeriesSearchCache " + "(source_name, search_term, id) " + "VALUES(?, ?, ?)",
"INSERT INTO SeriesSearchCache (source_name, search_term, id) VALUES(?, ?, ?)",
(source_name, search_term.casefold(), record.id),
)
@ -171,15 +174,16 @@ class ComicCacher:
def get_search_results(self, source_name: str, search_term: str) -> list[ComicSeries]:
results = []
con = lite.connect(self.db_file)
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
con.text_factory = str
cur = con.cursor()
cur.execute(
"SELECT * FROM SeriesSearchCache INNER JOIN Series on"
" SeriesSearchCache.id=Series.id AND SeriesSearchCache.source_name=Series.source_name"
" WHERE search_term=? AND SeriesSearchCache.source_name=?",
+ " SeriesSearchCache.id=Series.id AND SeriesSearchCache.source_name=Series.source_name"
+ " WHERE search_term=? AND SeriesSearchCache.source_name=?",
[search_term.casefold(), source_name],
)
@ -187,17 +191,17 @@ class ComicCacher:
# now process the results
for record in rows:
result = ComicSeries(
id=record[4],
name=record[5],
publisher=record[6],
count_of_issues=record[7],
count_of_volumes=record[8],
start_year=record[9],
image_url=record[10],
aliases=record[11].strip().splitlines(),
description=record[12],
genres=record[13].strip().splitlines(),
format=record[14],
id=record["id"],
name=record["name"],
publisher=record["publisher"],
count_of_issues=record["count_of_issues"],
count_of_volumes=record["count_of_volumes"],
start_year=record["start_year"],
image_url=record["image_url"],
aliases=record["aliases"].strip().splitlines(),
description=record["description"],
genres=record["genres"].strip().splitlines(),
format=record["format"],
)
results.append(result)
@ -205,7 +209,8 @@ class ComicCacher:
return results
def add_series_info(self, source_name: str, series_record: ComicSeries) -> None:
con = lite.connect(self.db_file)
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
cur = con.cursor()
@ -230,7 +235,8 @@ class ComicCacher:
self.upsert(cur, "series", data)
def add_series_issues_info(self, source_name: str, series_issues: list[ComicIssue]) -> None:
con = lite.connect(self.db_file)
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
cur = con.cursor()
@ -273,7 +279,8 @@ class ComicCacher:
def get_series_info(self, series_id: str, source_name: str, purge: bool = True) -> ComicSeries | None:
result: ComicSeries | None = None
con = lite.connect(self.db_file)
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
cur = con.cursor()
con.text_factory = str
@ -284,7 +291,7 @@ class ComicCacher:
cur.execute("DELETE FROM Series WHERE timestamp < ?", [str(a_week_ago)])
# fetch
cur.execute("SELECT * FROM Series" " WHERE id=? AND source_name=?", [series_id, source_name])
cur.execute("SELECT * FROM Series WHERE id=? AND source_name=?", [series_id, source_name])
row = cur.fetchone()
@ -293,17 +300,17 @@ class ComicCacher:
# since ID is primary key, there is only one row
result = ComicSeries(
id=row[0],
name=row[1],
publisher=row[2],
count_of_issues=row[3],
count_of_volumes=row[4],
start_year=row[5],
image_url=row[6],
aliases=row[7].strip().splitlines(),
description=row[8],
genres=row[9].strip().splitlines(),
format=row[10],
id=row["id"],
name=row["name"],
publisher=row["publisher"],
count_of_issues=row["count_of_issues"],
count_of_volumes=row["count_of_volumes"],
start_year=row["start_year"],
image_url=row["image_url"],
aliases=row["aliases"].strip().splitlines(),
description=row["description"],
genres=row["genres"].strip().splitlines(),
format=row["format"],
)
return result
@ -323,7 +330,8 @@ class ComicCacher:
count_of_volumes=None,
format=None,
)
con = lite.connect(self.db_file)
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
cur = con.cursor()
con.text_factory = str
@ -343,35 +351,35 @@ class ComicCacher:
for row in rows:
credits = []
try:
for credit in json.loads(row[15]):
for credit in json.loads(row["credits"]):
credits.append(Credit(**credit))
except Exception:
logger.exception("credits failed")
record = ComicIssue(
id=row[0],
name=row[2],
issue_number=row[3],
volume=row[25],
site_detail_url=row[7],
cover_date=row[6],
image_url=row[4],
description=row[8],
series=series,
aliases=row[11].strip().splitlines(),
alt_image_urls=row[12].strip().splitlines(),
characters=row[13].strip().splitlines(),
locations=row[14].strip().splitlines(),
credits=credits,
teams=row[16].strip().splitlines(),
story_arcs=row[17].strip().splitlines(),
genres=row[18].strip().splitlines(),
tags=row[19].strip().splitlines(),
critical_rating=row[20],
manga=row[21],
maturity_rating=row[22],
language=row[23],
country=row[24],
complete=bool(row[26]),
id=row["id"],
name=row["name"],
issue_number=row["issue_number"],
image_url=row["image_url"],
cover_date=row["cover_date"],
site_detail_url=row["site_detail_url"],
description=row["description"],
aliases=row["aliases"].strip().splitlines(),
alt_image_urls=row["alt_image_urls"].strip().splitlines(),
characters=row["characters"].strip().splitlines(),
locations=row["locations"].strip().splitlines(),
teams=row["teams"].strip().splitlines(),
story_arcs=row["story_arcs"].strip().splitlines(),
genres=row["genres"].strip().splitlines(),
tags=row["tags"].strip().splitlines(),
critical_rating=row["critical_rating"],
manga=row["manga"],
maturity_rating=row["maturity_rating"],
language=row["language"],
country=row["country"],
volume=row["volume"],
complete=bool(["complete"]),
)
results.append(record)
@ -379,7 +387,8 @@ class ComicCacher:
return results
def get_issue_info(self, issue_id: int, source_name: str) -> ComicIssue | None:
con = lite.connect(self.db_file)
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
cur = con.cursor()
con.text_factory = str
@ -396,8 +405,8 @@ class ComicCacher:
if row:
# get_series_info should only fail if someone is doing something weird
series = self.get_series_info(row[1], source_name, False) or ComicSeries(
id=row[1],
series = self.get_series_info(row["id"], source_name, False) or ComicSeries(
id=row["id"],
name="",
description="",
genres=[],
@ -413,40 +422,40 @@ class ComicCacher:
# now process the results
credits = []
try:
for credit in json.loads(row[15]):
for credit in json.loads(row["credits"]):
credits.append(Credit(**credit))
except Exception:
logger.exception("credits failed")
record = ComicIssue(
id=row[0],
name=row[2],
issue_number=row[3],
volume=row[25],
site_detail_url=row[7],
cover_date=row[6],
image_url=row[4],
description=row[8],
series=series,
aliases=row[11].strip().splitlines(),
alt_image_urls=row[12].strip().splitlines(),
characters=row[13].strip().splitlines(),
locations=row[14].strip().splitlines(),
credits=credits,
teams=row[16].strip().splitlines(),
story_arcs=row[17].strip().splitlines(),
genres=row[18].strip().splitlines(),
tags=row[19].strip().splitlines(),
critical_rating=row[20],
manga=row[21],
maturity_rating=row[22],
language=row[23],
country=row[24],
complete=bool(row[26]),
id=row["id"],
name=row["name"],
issue_number=row["issue_number"],
image_url=row["image_url"],
cover_date=row["cover_date"],
site_detail_url=row["site_detail_url"],
description=row["description"],
aliases=row["aliases"].strip().splitlines(),
alt_image_urls=row["alt_image_urls"].strip().splitlines(),
characters=row["characters"].strip().splitlines(),
locations=row["locations"].strip().splitlines(),
teams=row["teams"].strip().splitlines(),
story_arcs=row["story_arcs"].strip().splitlines(),
genres=row["genres"].strip().splitlines(),
tags=row["tags"].strip().splitlines(),
critical_rating=row["critical_rating"],
manga=row["manga"],
maturity_rating=row["maturity_rating"],
language=row["language"],
country=row["country"],
volume=row["volume"],
complete=bool(row["complete"]),
)
return record
def upsert(self, cur: lite.Cursor, tablename: str, data: dict[str, Any]) -> None:
def upsert(self, cur: sqlite3.Cursor, tablename: str, data: dict[str, Any]) -> None:
"""This does an insert if the given PK doesn't exist, and an
update it if does