Matthew Welch
c49eb99cd2
Added users so that the site can only be used if logged in. Thumbnails for each comic are also generated with ImageMagick.
239 lines
7.3 KiB
Python
239 lines
7.3 KiB
Python
from flask import Flask
|
|
from flask import g
|
|
from flask_login import UserMixin
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
import sqlite3
|
|
import os, time
|
|
|
|
from comicapi.issuestring import IssueString
|
|
|
|
DATABASE = "/var/lib/rpiWebApp/database.db"
|
|
DATABASE2 = "C:\\Users\\Matthew\\Documents\\MyPrograms\\Websites\\rpi web interface\\database.db"
|
|
|
|
|
|
def get_db():
|
|
db = getattr(g, '_database', None)
|
|
if db is None:
|
|
try:
|
|
db = g._database = sqlite3.connect(DATABASE)
|
|
except Exception:
|
|
db = g._database = sqlite3.connect(DATABASE2)
|
|
|
|
db.row_factory = sqlite3.Row
|
|
return db
|
|
|
|
|
|
def initialize_db():
|
|
get_db().execute("""CREATE TABLE IF NOT EXISTS "comics" (
|
|
"path" TEXT UNIQUE,
|
|
"tagOrigin" TEXT,
|
|
"series" TEXT,
|
|
"issue" REAL,
|
|
"issueText" TEXT,
|
|
"title" TEXT,
|
|
"publisher" TEXT,
|
|
"month" INTEGER,
|
|
"year" INTEGER,
|
|
"day" INTEGER,
|
|
"seriesYear" INTEGER,
|
|
"issueCount" INTEGER,
|
|
"volume" TEXT,
|
|
"genre" TEXT,
|
|
"language" TEXT,
|
|
"comments" TEXT,
|
|
"volumeCount" INTEGER,
|
|
"criticalRating" TEXT,
|
|
"country" TEXT,
|
|
"alternateSeries" TEXT,
|
|
"alternateNumber" TEXT,
|
|
"alternateCount" INTEGER,
|
|
"imprint" TEXT,
|
|
"notes" TEXT,
|
|
"webLink" TEXT,
|
|
"format" TEXT,
|
|
"manga" TEXT,
|
|
"blackAndWhite" TEXT,
|
|
"pageCount" INTEGER,
|
|
"maturityRating" TEXT,
|
|
"storyArc" TEXT,
|
|
"seriesGroup" TEXT,
|
|
"scanInfo" TEXT,
|
|
"characters" TEXT,
|
|
"teams" TEXT,
|
|
"locations" TEXT,
|
|
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE
|
|
)""")
|
|
get_db().execute("""CREATE TABLE IF NOT EXISTS "credits" (
|
|
"id" INTEGER NOT NULL,
|
|
"role" TEXT,
|
|
"name" TEXT,
|
|
"primary" INTEGER NOT NULL
|
|
)""")
|
|
get_db().execute("""CREATE TABLE IF NOT EXISTS "comic_thumbnails" (
|
|
"id" INTEGER,
|
|
"pageNumber" INTEGER,
|
|
"image" BLOB,
|
|
"type" TEXT
|
|
)""")
|
|
get_db().execute("""CREATE TABLE IF NOT EXISTS "users" (
|
|
"username" TEXT UNIQUE PRIMARY KEY,
|
|
"passwordHash" VARCHAR(128),
|
|
"isAdmin" INTEGER NOT NULL DEFAULT 0
|
|
)""")
|
|
get_db().execute("CREATE INDEX IF NOT EXISTS path_index ON comics(path);")
|
|
get_db().execute("CREATE INDEX IF NOT EXISTS id_index ON comic_thumbnails(id);")
|
|
get_db().commit()
|
|
|
|
|
|
def table_exists(table_name):
|
|
cursor = get_db().execute("SELECT name FROM sqlite_master WHERE type='table' AND name='{}'".format(table_name))
|
|
get_db().commit()
|
|
if cursor.rowcount == 0:
|
|
return False
|
|
return True
|
|
|
|
|
|
def add_comics(meta, thumbnails):
|
|
data = []
|
|
for info in meta:
|
|
issue = IssueString(info[1].issue).asFloat()
|
|
data.append((info[0], info[1].tagOrigin, info[1].series, (issue or -1), info[1].issue, info[1].title, info[1].publisher,
|
|
info[1].month, info[1].year, info[1].day, info[1].seriesYear, info[1].issueCount, info[1].volume, info[1].genre,
|
|
info[1].language, info[1].comments, info[1].volumeCount, info[1].criticalRating, info[1].country,
|
|
info[1].alternateSeries, info[1].alternateNumber, info[1].alternateCount, info[1].imprint,
|
|
info[1].notes, info[1].webLink, info[1].format, info[1].manga, info[1].blackAndWhite,
|
|
info[1].pageCount, info[1].maturityRating, info[1].storyArc, info[1].seriesGroup, info[1].scanInfo,
|
|
info[1].characters, info[1].teams, info[1].locations))
|
|
for comic, images in zip(data, thumbnails):
|
|
get_db().execute("""INSERT INTO comics(path, tagOrigin, series, issue, issueText, title, publisher,
|
|
month, year, day, seriesYear, issueCount, volume, genre, language, comments, volumeCount,
|
|
criticalRating, country, alternateSeries, alternateNumber, alternateCount, imprint,
|
|
notes, webLink, format, manga, blackAndWhite, pageCount, maturityRating, storyArc,
|
|
seriesGroup, scanInfo, characters, teams, locations)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", comic)
|
|
get_db().commit()
|
|
comic_id = get_db().execute("SELECT id from comics ORDER BY id DESC LIMIT 1").fetchone()[0]
|
|
for index in range(len(images)):
|
|
get_db().execute("INSERT INTO comic_thumbnails(id, pageNumber, image, type) VALUES (?,?,?,?)", (comic_id, index, images[index][0], images[index][1]))
|
|
get_db().commit()
|
|
|
|
|
|
def add_comic_thumbnails(thumbnails):
|
|
comic_id = 1
|
|
row = get_db().execute("SELECT id from comic_thumbnails ORDER BY id DESC LIMIT 1").fetchone()
|
|
if row:
|
|
comic_id = row[0]+1
|
|
print("start comics added:", len(thumbnails))
|
|
for images in thumbnails:
|
|
print("pages in comic:", len(images))
|
|
for index in range(len(images)):
|
|
print("comic page added:", index)
|
|
get_db().execute("INSERT INTO comic_thumbnails(id, pageNumber, image) VALUES (?,?,?)", (comic_id, index, images[index]))
|
|
comic_id += 1
|
|
get_db().commit()
|
|
|
|
|
|
def db_get_all_comics():
|
|
result = get_db().execute("SELECT * FROM comics ORDER BY series, issue").fetchall()
|
|
get_db().commit()
|
|
return result
|
|
|
|
|
|
def db_get_series_by_publisher(publisher):
|
|
series = []
|
|
rows = get_db().execute("SELECT publisher, series, seriesYear, id, min(issue) FROM comics WHERE publisher=? GROUP BY publisher, series, seriesYear ORDER BY series, seriesYear", [publisher]).fetchall()
|
|
for row in rows:
|
|
series.append(row)
|
|
return series
|
|
|
|
|
|
def db_get_comics_in_series(series, publisher, series_year):
|
|
comics = []
|
|
rows = get_db().execute("SELECT * FROM comics WHERE series=? AND publisher=? AND seriesYear=? ORDER BY series, issue", [series, publisher, series_year]).fetchall()
|
|
get_db().commit()
|
|
for row in rows:
|
|
comics.append(row)
|
|
return comics
|
|
|
|
|
|
def get_publishers():
|
|
publishers = []
|
|
rows = get_db().execute("SELECT DISTINCT publisher FROM comics ORDER BY publisher").fetchall()
|
|
|
|
for row in rows:
|
|
publishers.append(row[0])
|
|
return publishers
|
|
|
|
|
|
def db_get_comic_by_id(comic_id):
|
|
row = get_db().execute("SELECT * FROM comics WHERE id=?", [comic_id]).fetchone()
|
|
return row
|
|
|
|
|
|
def db_get_thumbnail_by_id_page(comic_id, pageNumber):
|
|
row = get_db().execute("SELECT * FROM comic_thumbnails WHERE id=? AND pageNumber=?", [comic_id, pageNumber]).fetchone()
|
|
return row
|
|
|
|
|
|
def db_get_comic(publisher, series, series_year, issue):
|
|
row = get_db().execute("SELECT * FROM comics WHERE publisher=? AND series=? AND seriesYear=? AND issue=?",
|
|
[publisher, series, series_year, issue]).fetchone()
|
|
return row
|
|
|
|
|
|
def comic_path_in_db(path):
|
|
try:
|
|
result = get_db().execute("SELECT path FROM comics WHERE path=?", [path])
|
|
get_db().commit()
|
|
|
|
if result.fetchone():
|
|
return True
|
|
except Exception as e:
|
|
print(path)
|
|
print(e)
|
|
return False
|
|
|
|
|
|
def verify_paths():
|
|
while True:
|
|
paths = get_db().execute("SELECT path FROM comics").fetchall()
|
|
get_db().commit()
|
|
for path in paths:
|
|
if not os.path.exists(path[0]):
|
|
get_db().execute("DELETE FROM comics WHERE path LIKE ?", [path[0]])
|
|
get_db().commit()
|
|
time.sleep(60*60*24*5)
|
|
|
|
|
|
def get_user(username):
|
|
user_data = get_db().execute("SELECT * FROM users WHERE username=?", [username]).fetchone()
|
|
if user_data:
|
|
return User(user_data)
|
|
return None
|
|
|
|
|
|
def verify_user(username, password):
|
|
password_hash = get_db().execute("SELECT passwordHash FROM users WHERE username=?", [username]).fetchone()
|
|
if password_hash:
|
|
valid_password = check_password_hash(password_hash["passwordHash"], password)
|
|
if valid_password:
|
|
user_data = get_db().execute("SELECT * FROM users WHERE username=?", [username]).fetchone()
|
|
user = User(user_data)
|
|
return user
|
|
else:
|
|
return False
|
|
|
|
|
|
class User(UserMixin):
|
|
def __init__(self, user):
|
|
self.username = user["username"]
|
|
self.password_hash = user["passwordHash"]
|
|
self.is_admin = user["isAdmin"]
|
|
|
|
def get_id(self):
|
|
return self.username
|
|
|
|
def check_password(self, password):
|
|
result = check_password_hash(self.password_hash, password)
|
|
return result
|