rpiwebapp-public/scripts/database.py

161 lines
4.6 KiB
Python
Raw Normal View History

2019-07-06 23:00:00 -07:00
from flask import Flask
from flask import g
import sqlite3
import os, time
from comicapi.issuestring import IssueString
DATABASE = "/var/lib/rpiWebApp/database.db"
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(DATABASE)
return db
def initialize_db():
get_db().execute("""CREATE TABLE IF NOT EXISTS "comics" (
"path" TEXT UNIQUE,
"tagOrigin" TEXT,
"series" TEXT,
"issue" REAL,
"issueText" TEXT,
"title" TEXT,
"publisher" TEXT,
"month" INTEGER,
"year" INTEGER,
"day" INTEGER,
"seriesYear" INTEGER,
"issueCount" INTEGER,
"volume" TEXT,
"genre" TEXT,
"language" TEXT,
"comments" TEXT,
"volumeCount" INTEGER,
"criticalRating" TEXT,
"country" TEXT,
"alternateSeries" TEXT,
"alternateNumber" TEXT,
"alternateCount" INTEGER,
"imprint" TEXT,
"notes" TEXT,
"webLink" TEXT,
"format" TEXT,
"manga" TEXT,
"blackAndWhite" TEXT,
"pageCount" TEXT,
"maturityRating" TEXT,
"storyArc" TEXT,
"seriesGroup" TEXT,
"scanInfo" TEXT,
"characters" TEXT,
"teams" TEXT,
"locations" TEXT,
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE
)""")
get_db().execute("""CREATE TABLE IF NOT EXISTS "credits" (
"id" INTEGER NOT NULL,
"role" TEXT,
"name" TEXT,
"primary" INTEGER NOT NULL
)""")
get_db().execute("CREATE INDEX IF NOT EXISTS path_index ON comics(path);")
get_db().commit()
def table_exists(table_name):
cursor = get_db().execute("SELECT name FROM sqlite_master WHERE type='table' AND name='{}'".format(table_name))
get_db().commit()
if cursor.rowcount == 0:
return False
return True
def add_comics(meta):
data = []
for info in meta:
issue = IssueString(info[1].issue).asFloat()
data.append((info[0], info[1].tagOrigin, info[1].series, (issue or -1), info[1].issue, info[1].title, info[1].publisher,
info[1].month, info[1].year, info[1].day, info[1].seriesYear, info[1].issueCount, info[1].volume, info[1].genre,
info[1].language, info[1].comments, info[1].volumeCount, info[1].criticalRating, info[1].country,
info[1].alternateSeries, info[1].alternateNumber, info[1].alternateCount, info[1].imprint,
info[1].notes, info[1].webLink, info[1].format, info[1].manga, info[1].blackAndWhite,
info[1].pageCount, info[1].maturityRating, info[1].storyArc, info[1].seriesGroup, info[1].scanInfo,
info[1].characters, info[1].teams, info[1].locations))
get_db().executemany(
"""INSERT into comics(path, tagOrigin, series, issue, issueText, title, publisher,
month, year, day, seriesYear, issueCount, volume, genre, language, comments, volumeCount,
criticalRating, country, alternateSeries, alternateNumber, alternateCount, imprint,
notes, webLink, format, manga, blackAndWhite, pageCount, maturityRating, storyArc,
seriesGroup, scanInfo, characters, teams, locations)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
data)
get_db().commit()
def db_get_all_comics():
original_factory = get_db().row_factory
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
get_db().row_factory = dict_factory
result = get_db().execute("SELECT series, issue, title FROM comics ORDER BY series, issue").fetchall()
get_db().commit()
get_db().row_factory = original_factory
return result
def db_get_series_by_publisher(publisher):
series = []
rows = get_db().execute("SELECT DISTINCT series, seriesYear, publisher FROM comics WHERE publisher LIKE ? ORDER BY series, seriesYear", [publisher]).fetchall()
for row in rows:
series.append(row)
return series
def db_get_comics_in_series(series, publisher, series_year):
comics = []
rows = get_db().execute("SELECT series, issue, title FROM comics WHERE series LIKE ? AND publisher LIKE ? AND seriesYear LIKE ? ORDER BY series, issue", [series, publisher, series_year]).fetchall()
get_db().commit()
for row in rows:
comics.append(row)
return comics
def get_publishers():
publishers = []
rows = get_db().execute("SELECT DISTINCT publisher FROM comics ORDER BY publisher").fetchall()
for row in rows:
publishers.append(row[0])
return publishers
def comic_path_in_db(path):
try:
result = get_db().execute("SELECT path FROM comics WHERE path LIKE ?", [path])
get_db().commit()
if result.fetchone():
return True
except Exception as e:
print(path)
print(e)
return False
def verify_paths():
while True:
paths = get_db().execute("SELECT path FROM comics").fetchall()
get_db().commit()
for path in paths:
if not os.path.exists(path[0]):
get_db().execute("DELETE FROM comics WHERE path LIKE ?", [path[0]])
get_db().commit()
time.sleep(60*60*24*5)