rpiwebapp-public/scripts/database.py
2019-08-23 21:34:42 -07:00

733 lines
21 KiB
Python

from flask import g, current_app
from flask_login import UserMixin
from werkzeug.security import check_password_hash
from io import BytesIO
from wand.image import Image
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import orm
from sqlalchemy import Column, Integer, String, BLOB, Boolean, Table, MetaData, DateTime
from sqlalchemy.orm import sessionmaker
import sqlite3
import os
import inspect
from comicapi.issuestring import IssueString
from scripts import tmdb
RPI_DATABASE = "/var/lib/rpiWebApp/database.db"
RPI_IMDB_DATABASE = "/var/lib/rpiWebApp/imdb.db"
MC_DATABASE = "***REMOVED***"
MC_IMDB_DATABASE = "C:\\Users\\Matthew\\Documents\\MyPrograms\\Websites\\rpi_web_interface\\imdb.db"
DATABASE = RPI_DATABASE if os.path.exists(RPI_DATABASE) else MC_DATABASE
IMDB_DATABASE = RPI_IMDB_DATABASE if os.path.exists(RPI_IMDB_DATABASE) else MC_IMDB_DATABASE
engine = create_engine("sqlite:///"+DATABASE+"?check_same_thread=False")
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base()
class Comic(Base):
__tablename__ = "comics"
path = Column(String, unique=True)
tagOrigin = Column(String)
series = Column(String)
issue = Column(Integer)
issueText = Column(String)
title = Column(String)
publisher = Column(String)
month = Column(Integer)
year = Column(Integer)
day = Column(Integer)
seriesYear = Column(Integer)
issueCount = Column(Integer)
volume = Column(String)
genre = Column(String)
language = Column(String)
comments = Column(String)
volumeCount = Column(Integer)
criticalRating = Column(String)
country = Column(String)
alternateSeries = Column(String)
alternateNumber = Column(String)
alternateCount = Column(Integer)
imprint = Column(String)
notes = Column(String)
webLink = Column(String)
format = Column(String)
manga = Column(String)
blackAndWhite = Column(String)
pageCount = Column(Integer)
maturityRating = Column(String)
storyArc = Column(String)
seriesGroup = Column(String)
scanInfo = Column(String)
characters = Column(String)
teams = Column(String)
locations = Column(String)
id = Column(Integer, primary_key=True, autoincrement=True)
def __init__(self, data):
i = 0
try:
for column in self.__table__.columns:
if column.name == "id":
continue
setattr(self, column.name, data[i])
i += 1
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) +" "+ str(e))
#print(inspect.stack()[0][3], type(e), e)
def __repr__(self):
return "<Comic: {series} {issue}>".format(series=self.series, issue=self.issueText)
class ComicThumbnail(Base):
__tablename__ = "comic_thumbnails"
comic_id = Column(Integer)
pageNumber = Column(Integer)
image = Column(BLOB)
type = Column(String)
id = Column(Integer, primary_key=True, autoincrement=True)
def __init__(self, data):
i = 0
try:
for column in self.__table__.columns:
if column.name == "id":
continue
setattr(self, column.name, data[i])
i += 1
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) +" "+ str(e))
#print(inspect.stack()[0][3], type(e), e)
class Movie(Base):
__tablename__ = "movies"
path = Column(String)
imdb_id = Column(String, primary_key=True)
tmdb_id = Column(Integer)
title = Column(String)
year = Column(Integer)
length = Column(Integer)
description = Column(String)
extended = Column(Boolean)
directors_cut = Column(Boolean)
poster_path = Column(String)
backdrop_path = Column(String)
def __init__(self, data):
i = 0
try:
for column in self.__table__.columns:
setattr(self, column.name, data[i])
i += 1
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) +" "+ str(e))
#print(inspect.stack()[0][3], type(e), e)
class TvShow(Base):
__tablename__ = "tv_shows"
imdb_id = Column(String, primary_key=True)
tmdb_id = Column(Integer)
title = Column(String)
year = Column(Integer)
description = Column(String)
poster_path = Column(String)
path = Column(String)
def __init__(self, data):
i = 0
try:
for column in self.__table__.columns:
setattr(self, column.name, data[i])
i += 1
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
# print(inspect.stack()[0][3], type(e), e)
class TvEpisode(Base):
__tablename__ = "tv_episodes"
imdb_id = Column(String, primary_key=True)
parent_imdb_id = Column(String)
tmdb_id = Column(Integer)
title = Column(String)
season = Column(Integer)
episode = Column(Integer)
description = Column(String)
still_path = Column(String)
path = Column(String)
def __init__(self, data):
i = 0
try:
for column in self.__table__.columns:
setattr(self, column.name, data[i])
i += 1
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
# print(inspect.stack()[0][3], type(e), e)
class User(Base, UserMixin):
__tablename__ = "users"
username = Column(String, unique=True, primary_key=True)
passwordHash = Column(String(128))
isAdmin = Column(Boolean, default=False)
def __init__(self, data):
i = 0
for column in self.__table__.columns:
setattr(self, column.name, data[i])
i += 1
self.init_user_data()
@orm.reconstructor
def init_user_data(self):
meta = MetaData()
self.comics = Table(
"comics", meta,
Column("comic_id", Integer, primary_key=True),
Column("viewed", Boolean, default=False),
Column("date", DateTime)
)
self.movies = Table(
"movies", meta,
Column("imdb_id", Integer, primary_key=True),
Column("viewed", Boolean, default=False),
Column("date", DateTime)
)
self.tv_episodes = Table(
"tv_episodes", meta,
Column("imdb_id", Integer, primary_key=True),
Column("viewed", Boolean, default=False),
Column("date", DateTime)
)
self.tv_shows = Table(
"tv_shows", meta,
Column("imdb_id", Integer, primary_key=True),
Column("viewed", Boolean, default=False),
Column("date", DateTime)
)
user_engine = create_engine("sqlite:///" + "user_" + self.username + ".db" + "?check_same_thread=False")
self.conn = engine.connect()
meta.create_all(user_engine)
def set_comic_viewed(self, comic_id, val=True):
self.conn.execute(self.comics.insert().values(comic_id=comic_id, viewed=val))
def is_comic_viewed(self, comic_id):
q = self.comics.select().where(self.comics.c.comic_id == comic_id)
result = self.conn.execute(q).fetchone()
if result:
return result["viewed"]
def set_movie_viewed(self, imdb_id, val=True):
self.conn.execute(self.movies.insert().values(comic_id=imdb_id, viewed=val))
def is_movie_viewed(self, imdb_id):
q = self.movies.select().where(self.movies.c.imdb_id == imdb_id)
result = self.conn.execute(q).fetchone()
if result:
return result["viewed"]
def set_tv_episode_viewed(self, imdb_id, val=True):
self.conn.execute(self.tv_episodes.insert().values(comic_id=imdb_id, viewed=val))
def is_tv_episode_viewed(self, imdb_id):
q = self.tv_episodes.select().where(self.tv_episodes.c.imdb_id == imdb_id)
result = self.conn.execute(q).fetchone()
if result:
return result["viewed"]
def set_tv_show_viewed(self, imdb_id, val=True):
self.conn.execute(self.tv_shows.insert().values(comic_id=imdb_id, viewed=val))
def is_tv_show_viewed(self, imdb_id):
q = self.tv_shows.select().where(self.tv_shows.c.imdb_id == imdb_id)
result = self.conn.execute(q).fetchone()
if result:
return result["viewed"]
def get_id(self):
return self.username
def check_password(self, password):
result = check_password_hash(self.passwordHash, password)
return result
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(DATABASE)
db.row_factory = sqlite3.Row
return db
def get_imdb():
db = getattr(g, '_imdb_database', None)
if db is None:
db = g._imdb_database = sqlite3.connect(IMDB_DATABASE)
db.row_factory = sqlite3.Row
return db
def initialize_db():
get_db().execute("""CREATE TABLE IF NOT EXISTS "comics" (
"path" TEXT UNIQUE,
"tagOrigin" TEXT,
"series" TEXT,
"issue" REAL,
"issueText" TEXT,
"title" TEXT,
"publisher" TEXT,
"month" INTEGER,
"year" INTEGER,
"day" INTEGER,
"seriesYear" INTEGER,
"issueCount" INTEGER,
"volume" TEXT,
"genre" TEXT,
"language" TEXT,
"comments" TEXT,
"volumeCount" INTEGER,
"criticalRating" TEXT,
"country" TEXT,
"alternateSeries" TEXT,
"alternateNumber" TEXT,
"alternateCount" INTEGER,
"imprint" TEXT,
"notes" TEXT,
"webLink" TEXT,
"format" TEXT,
"manga" TEXT,
"blackAndWhite" TEXT,
"pageCount" INTEGER,
"maturityRating" TEXT,
"storyArc" TEXT,
"seriesGroup" TEXT,
"scanInfo" TEXT,
"characters" TEXT,
"teams" TEXT,
"locations" TEXT,
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE
)""")
get_db().execute("""CREATE TABLE IF NOT EXISTS "credits" (
"id" INTEGER NOT NULL,
"role" TEXT,
"name" TEXT,
"primary" INTEGER NOT NULL
)""")
get_db().execute("""CREATE TABLE IF NOT EXISTS "comic_thumbnails" (
"comic_id" INTEGER,
"pageNumber" INTEGER,
"image" BLOB,
"type" TEXT,
"id" INTEGER PRIMARY KEY AUTOINCREMENT
)""")
get_db().execute("""CREATE TABLE IF NOT EXISTS "users" (
"username" TEXT UNIQUE PRIMARY KEY,
"passwordHash" VARCHAR(128),
"isAdmin" INTEGER NOT NULL DEFAULT 0
)""")
get_db().execute("""CREATE TABLE IF NOT EXISTS "movies" (
"path" TEXT,
"imdb_id" INTEGER PRIMARY KEY,
"tmdb_id" INTEGER,
"title" TEXT,
"year" INTEGER,
"length" INTEGER,
"description" TEXT,
"extended" INTEGER,
"directors_cut" INTEGER,
"poster_path" TEXT,
"backdrop_path" TEXT
)""")
get_db().execute("""CREATE TABLE IF NOT EXISTS "tv_shows" (
"imdb_id" TEXT PRIMARY KEY ,
"tmdb_id" INTEGER UNIQUE,
"title" TEXT,
"year" INTEGER,
"description" TEXT,
"poster_path" TEXT,
"path" TEXT
)""")
get_db().execute("""CREATE TABLE IF NOT EXISTS "tv_episodes" (
"imdb_id" INTEGER PRIMARY KEY,
"parent_imdb_id" INTEGER,
"tmdb_id" INTEGER UNIQUE,
"title" TEXT,
"season" INTEGER,
"episode" INTEGER,
"description" TEXT,
"still_path" TEXT,
"path" TEXT
)""")
get_db().execute("CREATE INDEX IF NOT EXISTS path_index ON comics(path);")
get_db().execute("CREATE INDEX IF NOT EXISTS comic_id_index ON comic_thumbnails(comic_id);")
get_db().commit()
get_imdb().execute("CREATE INDEX IF NOT EXISTS original_title_index ON title_basics(originalTitle)")
get_imdb().execute("CREATE INDEX IF NOT EXISTS primary_title_index ON title_basics(primaryTitle)")
get_imdb().execute("CREATE INDEX IF NOT EXISTS parent_tconst_index ON title_episode(parentTconst)")
get_imdb().commit()
def add_movies(movies):
for movie_data in movies:
movie = Movie(movie_data)
session.add(movie)
session.commit()
def add_tv_shows(tv_show_data):
try:
tv_show = TvShow(tv_show_data)
session.add(tv_show)
session.commit()
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
# print(inspect.stack()[0][3], type(e), e)
def add_tv_episodes(episodes):
for episode_data in episodes:
try:
episode = TvEpisode(episode_data)
if not session.query(TvEpisode).filter(TvEpisode.tmdb_id == episode.tmdb_id).one_or_none():
session.add(episode)
session.commit()
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
# print(inspect.stack()[0][3], type(e), e)
def add_comics(meta, thumbnails):
data = []
for info in meta:
issue = IssueString(info[1].issue).asFloat()
data.append([info[0], info[1].tagOrigin, info[1].series, (issue or -1), info[1].issue, info[1].title, info[1].publisher,
info[1].month, info[1].year, info[1].day, info[1].seriesYear, info[1].issueCount, info[1].volume, info[1].genre,
info[1].language, info[1].comments, info[1].volumeCount, info[1].criticalRating, info[1].country,
info[1].alternateSeries, info[1].alternateNumber, info[1].alternateCount, info[1].imprint,
info[1].notes, info[1].webLink, info[1].format, info[1].manga, info[1].blackAndWhite,
info[1].pageCount, info[1].maturityRating, info[1].storyArc, info[1].seriesGroup, info[1].scanInfo,
info[1].characters, info[1].teams, info[1].locations])
for comic_data in data:
for i in range(len(comic_data)):
if comic_data[i] == "":
comic_data[i] = None
for comic_data, images in zip(data, thumbnails):
comic = Comic(comic_data)
session.add(comic)
session.commit()
comic_id = session.query(Comic.id).order_by(Comic.id.desc()).first()[0]
for index in range(len(images)):
thumbnail = ComicThumbnail([comic_id, index, images[index][0], images[index][1]])
session.add(thumbnail)
session.commit()
current_app.logger.info("{} comic{} added".format(len(meta), "s" if len(meta)>1 else ""))
print("#"*18)
print("# {} comic{} added #".format(len(meta), "s" if len(meta)>1 else ""))
print("#"*18)
def db_get_all_comics():
result = session.query(Comic).all()
return result
def db_get_series_by_publisher(publisher):
result = session.query(Comic).filter(Comic.publisher == publisher).group_by(Comic.publisher, Comic.series, Comic.seriesYear).order_by(Comic.series, Comic.seriesYear).all()
series = [comic.__dict__ for comic in result]
return series
def db_get_comics_in_series(series, publisher, series_year):
result = session.query(Comic).filter(Comic.publisher == publisher, Comic.series == series, Comic.seriesYear == series_year)\
.order_by(Comic.issue).all()
comics = [comic.__dict__ for comic in result]
return comics
def get_publishers():
result = session.query(Comic.publisher).distinct(Comic.publisher).order_by(Comic.publisher).all()
publishers = [r for (r,) in result]
return publishers
def db_get_comic_by_id(comic_id):
comic = session.query(Comic).filter(Comic.id == comic_id).one().__dict__
return comic
def db_get_thumbnail_by_id_page(comic_id, pageNumber):
thumbnail = session.query(ComicThumbnail).filter(ComicThumbnail.comic_id == comic_id, ComicThumbnail.pageNumber == pageNumber).one().__dict__
return thumbnail
def db_get_comic(publisher, series, series_year, issue):
comic = session.query(Comic).filter(Comic.publisher == publisher, Comic.series == series, Comic.seriesYear == series_year, Comic.issue == issue).one().__dict__
return comic
def comic_path_in_db(path):
try:
result = session.query(Comic).filter(Comic.path == path).one_or_none()
if result:
return True
except Exception as e:
# print(path)
current_app.logger.info(path)
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
# print(inspect.stack()[0][3], type(e), e)
return False
def movie_path_in_db(path):
try:
result = session.query(Movie).filter(Movie.path == path).one_or_none()
if result:
return True
except Exception as e:
# print(path)
current_app.logger.info(path)
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
# print(inspect.stack()[0][3], type(e), e)
return False
def tv_show_path_in_db(path):
try:
result = session.query(TvShow).filter(TvShow.path == path).one_or_none()
if result:
return True
except Exception as e:
# print(path)
current_app.logger.info(path)
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
# print(inspect.stack()[0][3], type(e), e)
return False
def tv_episode_path_in_db(path):
try:
result = session.query(TvEpisode).filter(TvEpisode.path == path).one_or_none()
if result:
return True
except Exception as e:
# print(path)
current_app.logger.info(path)
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
# print(inspect.stack()[0][3], type(e), e)
return False
def verify_paths():
rows = get_db().execute("SELECT path FROM comics").fetchall()
get_db().commit()
for row in rows:
if not os.path.exists(row["path"]):
get_db().execute("DELETE FROM comic_thumbnails WHERE id IN (SELECT id FROM comics WHERE path=?)", [row["path"]])
get_db().execute("DELETE FROM comics WHERE path=?", [row["path"]])
get_db().commit()
def verify_path(path):
if not os.path.exists(path):
row = get_db().execute("SELECT path FROM comics WHERE path=?", [path]).fetchone()
get_db().commit()
if row:
get_db().execute("DELETE FROM comic_thumbnails WHERE id IN (SELECT id FROM comics WHERE path=?)", [path])
get_db().execute("DELETE FROM comics WHERE path=?", [path])
get_db().commit()
def imdb_get_movie(title, year):
row = get_imdb().execute("SELECT tconst, runtimeMinutes FROM title_basics WHERE (originalTitle LIKE ? OR primaryTitle LIKE ?) AND (titleType LIKE '%movie' OR titleType='video') AND startYear=?", (title, title, year)).fetchone()
return row
def imdb_get_tv_show(title, year):
row = get_imdb().execute(
"SELECT tconst FROM title_basics WHERE (originalTitle LIKE ? OR primaryTitle LIKE ?) AND (titleType LIKE 'tvSeries' OR titleType LIKE 'tvMiniSeries') AND startYear=?",
(title, title, year)).fetchone()
return row
def imdb_get_tv_episode(imdb_id, season, episode):
row = get_imdb().execute(
"SELECT tconst FROM title_episode WHERE parentTconst=? AND seasonNumber=? AND episodeNumber=?",
[imdb_id, season, episode]).fetchone()
return row
def tmdb_get_movie_by_imdb_id(imdb_id):
data = tmdb.get_movie_data(imdb_id)
return data
def tmdb_get_tv_show_by_imdb_id(imdb_id):
data = tmdb.get_tv_show_data(imdb_id)
return data
def tmdb_get_tv_episode_by_imdb_id(imdb_id):
data = tmdb.get_tv_episode_data(imdb_id)
return data
def db_get_all_movies():
movies = session.query(Movie).order_by(Movie.title, Movie.year).all()
return movies
def db_get_movie_by_imdb_id(imdb_id, extended=False, directors_cut=False):
result = session.query(Movie).filter(Movie.imdb_id == imdb_id, Movie.extended == extended,
Movie.directors_cut == directors_cut).one()
return result
def get_all_tv_shows():
result = session.query(TvShow).order_by(TvShow.title, TvShow.year).all()
return result
def get_tv_show_episodes_by_imdb_id(imdb_id):
result = session.query(TvEpisode).filter(TvEpisode.parent_imdb_id == imdb_id).order_by(TvEpisode.season, TvEpisode.episode).all()
return result
def db_get_episode_by_imdb_id(imdb_id):
result = session.query(TvEpisode).filter(TvEpisode.imdb_id == imdb_id).one()
return result
def db_search_table_columns_by_query(query, table, columns, group=[], order=[]):
results = {}
final_query = "%" + query.replace(" ", "%") + "%"
for column in columns:
results[column.name] = session.query(table).filter(column.like(final_query)).group_by(*group).order_by(*order).all()
return results
def db_search_comics(query):
publishers = []
series = []
comics = []
results = db_search_table_columns_by_query(query, Comic, [Comic.publisher, Comic.title, Comic.series, Comic.year])
series_results = db_search_table_columns_by_query(query, Comic, [Comic.publisher, Comic.title, Comic.series, Comic.year],
group=[Comic.series, Comic.seriesYear], order=[Comic.issue])
for row in results["publisher"]:
if row["publisher"] not in publishers:
publishers.append(row.publisher)
for row in series_results["series"]:
dict = row.__dict__
if dict not in series:
series.append(dict)
for row in results["title"]:
dict = row.__dict__
if dict not in comics:
comics.append(dict)
for row in results["year"]:
dict = row.__dict__
if dict not in comics:
comics.append(dict)
return {"publisher": publishers, "series": series, "comics": comics}
def db_search_movies(query):
results = db_search_table_columns_by_query(query, Movie, [Movie.title, Movie.year, Movie.description], order=[Movie.title])
movies = []
for movie in results["title"]:
if movie.__dict__ not in movies:
movies.append(movie.__dict__)
for movie in results["description"]:
if movie.__dict__ not in movies:
movies.append(movie.__dict__)
for movie in results["year"]:
if movie.__dict__ not in movies:
movies.append(movie.__dict__)
return movies
def db_search_tv_shows(query):
results = db_search_table_columns_by_query(query, TvShow, [TvShow.title, TvShow.year, TvShow.description], order=[TvShow.title])
tv_shows = []
for show in results["title"]:
if show.__dict__ not in tv_shows:
tv_shows.append(show.__dict__)
for show in results["description"]:
if show.__dict__ not in tv_shows:
tv_shows.append(show.__dict__)
for show in results["year"]:
if show.__dict__ not in tv_shows:
tv_shows.append(show.__dict__)
return tv_shows
def resize_image(image, new_width=256, new_height=256):
new_image = image
orig_height = new_image.height
orig_width = new_image.width
if orig_height >= orig_width:
width = int((orig_width / orig_height) * new_height)
height = new_height
else:
height = int((orig_height / orig_width) * new_width)
width = new_width
new_image.thumbnail(width, height)
return new_image
def fix_thumbnails():
new_height = 256
new_width = 256
print("Start fix thumbnail size")
rows = get_db().execute("SELECT * FROM comic_thumbnails")
print("got list of all thumbnails\n")
for row in rows:
image = Image(file=BytesIO(row["image"]))
if image.width > new_width or image.height > new_height:
print("id:", row["id"], "pageNumber:", row["pageNumber"])
get_db().execute("UPDATE comic_thumbnails SET image=? WHERE comic_id=? AND pageNumber=?",
[resize_image(image, new_width, new_height).make_blob(), row["id"], row["pageNumber"]])
get_db().commit()
print("Finished fix thumbnail size")
def get_user(username):
result = session.query(User).filter(User.username == username).one()
if result:
return result
return None