Matthew Welch
c4e6824a8a
Change error logging to log messages as error rather than info Add 404 error when video or comic not found Change paths to use pathlib
982 lines
31 KiB
Python
982 lines
31 KiB
Python
import datetime
|
|
import inspect
|
|
import logging
|
|
import os
|
|
|
|
import sqlalchemy
|
|
from comicapi.issuestring import IssueString
|
|
from flask import current_app
|
|
from flask_login import UserMixin, current_user
|
|
from sqlalchemy import ARRAY, TIMESTAMP, Boolean, Column, DateTime, Integer, Numeric, String, create_engine, \
|
|
func, over, REAL
|
|
from sqlalchemy.dialects.postgresql import BYTEA
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import scoped_session, sessionmaker
|
|
from sqlalchemy.pool import NullPool
|
|
from sqlalchemy.sql.expression import cast
|
|
from werkzeug.security import check_password_hash
|
|
|
|
engine = create_engine("***REMOVED***", poolclass=NullPool)
|
|
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
|
|
session_factory = sessionmaker(bind=engine)
|
|
Session = scoped_session(session_factory)
|
|
|
|
Base = declarative_base()
|
|
Base.metadata.schema = "rpiwebapp"
|
|
|
|
|
|
class Comic(Base):
|
|
__tablename__ = "comics"
|
|
|
|
path = Column(String, unique=True)
|
|
tagorigin = Column(String)
|
|
series = Column(String)
|
|
issue = Column(REAL)
|
|
issuetext = Column(String)
|
|
title = Column(String)
|
|
publisher = Column(String)
|
|
month = Column(Integer)
|
|
year = Column(Integer)
|
|
day = Column(Integer)
|
|
seriesyear = Column(Integer)
|
|
issuecount = Column(Integer)
|
|
volume = Column(String)
|
|
genre = Column(String)
|
|
language = Column(String)
|
|
comments = Column(String)
|
|
volumecount = Column(Integer)
|
|
criticalrating = Column(String)
|
|
country = Column(String)
|
|
alternateseries = Column(String)
|
|
alternatenumber = Column(String)
|
|
alternatecount = Column(Integer)
|
|
imprint = Column(String)
|
|
notes = Column(String)
|
|
weblink = Column(String)
|
|
format = Column(String)
|
|
manga = Column(String)
|
|
blackandwhite = Column(String)
|
|
pagecount = Column(Integer)
|
|
maturityrating = Column(String)
|
|
storyarc = Column(String)
|
|
seriesgroup = Column(String)
|
|
scaninfo = Column(String)
|
|
characters = Column(String)
|
|
teams = Column(String)
|
|
locations = Column(String)
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
|
|
def __init__(self, data):
|
|
i = 0
|
|
try:
|
|
for column in self.__table__.columns:
|
|
if column.name == "id":
|
|
continue
|
|
setattr(self, column.name, data[i])
|
|
i += 1
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + "Comic" + " " + str(type(e)) + " " + str(e))
|
|
|
|
def __repr__(self):
|
|
return "<Comic: {series} {issue}>".format(series=self.series, issue=self.issuetext)
|
|
|
|
|
|
class ComicThumbnail(Base):
|
|
__tablename__ = "comic_thumbnails"
|
|
|
|
comic_id = Column(Integer)
|
|
pagenumber = Column(Integer)
|
|
image = Column(BYTEA)
|
|
type = Column(String)
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
|
|
def __init__(self, data):
|
|
i = 0
|
|
try:
|
|
for column in self.__table__.columns:
|
|
if column.name == "id":
|
|
continue
|
|
setattr(self, column.name, data[i])
|
|
i += 1
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + "ComicThumbnail" + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
class Movie(Base):
|
|
__tablename__ = "movies"
|
|
|
|
path = Column(String, primary_key=True)
|
|
tmdb_id = Column(Integer)
|
|
title = Column(String)
|
|
year = Column(Integer)
|
|
description = Column(String)
|
|
extended = Column(Boolean)
|
|
directors_cut = Column(Boolean)
|
|
poster_path = Column(String)
|
|
backdrop_path = Column(String)
|
|
res_4k = Column(Boolean)
|
|
|
|
def __init__(self, data):
|
|
i = 0
|
|
try:
|
|
for column in self.__table__.columns:
|
|
setattr(self, column.name, data[i])
|
|
i += 1
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + "Movie" + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
class TvShow(Base):
|
|
__tablename__ = "tv_shows"
|
|
|
|
tmdb_id = Column(Integer, primary_key=True)
|
|
title = Column(String)
|
|
year = Column(Integer)
|
|
description = Column(String)
|
|
poster_path = Column(String)
|
|
path = Column(String)
|
|
|
|
def __init__(self, data):
|
|
i = 0
|
|
try:
|
|
for column in self.__table__.columns:
|
|
setattr(self, column.name, data[i])
|
|
i += 1
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + "TvShow" + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
class TvEpisode(Base):
|
|
__tablename__ = "tv_episodes"
|
|
|
|
tmdb_id = Column(Integer, primary_key=True)
|
|
parent_tmdb_id = Column(String)
|
|
title = Column(String)
|
|
season = Column(Integer)
|
|
episode = Column(Integer)
|
|
description = Column(String)
|
|
still_path = Column(String)
|
|
path = Column(String)
|
|
|
|
def __init__(self, data):
|
|
i = 0
|
|
try:
|
|
for column in self.__table__.columns:
|
|
setattr(self, column.name, data[i])
|
|
i += 1
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + "TvEpisode" + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
class DupComic(Base):
|
|
__tablename__ = "duplicate_comics"
|
|
|
|
path = Column(String, unique=True)
|
|
tagorigin = Column(String)
|
|
series = Column(String)
|
|
issue = Column(Integer)
|
|
issuetext = Column(String)
|
|
title = Column(String)
|
|
publisher = Column(String)
|
|
month = Column(Integer)
|
|
year = Column(Integer)
|
|
day = Column(Integer)
|
|
seriesyear = Column(Integer)
|
|
issuecount = Column(Integer)
|
|
volume = Column(String)
|
|
genre = Column(String)
|
|
language = Column(String)
|
|
comments = Column(String)
|
|
volumecount = Column(Integer)
|
|
criticalrating = Column(String)
|
|
country = Column(String)
|
|
alternateseries = Column(String)
|
|
alternatenumber = Column(String)
|
|
alternatecount = Column(Integer)
|
|
imprint = Column(String)
|
|
notes = Column(String)
|
|
weblink = Column(String)
|
|
format = Column(String)
|
|
manga = Column(String)
|
|
blackandwhite = Column(String)
|
|
pagecount = Column(Integer)
|
|
maturityrating = Column(String)
|
|
storyarc = Column(String)
|
|
seriesgroup = Column(String)
|
|
scaninfo = Column(String)
|
|
characters = Column(String)
|
|
teams = Column(String)
|
|
locations = Column(String)
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
|
|
def __init__(self, data):
|
|
i = 0
|
|
try:
|
|
for column in self.__table__.columns:
|
|
if column.name == "id":
|
|
continue
|
|
setattr(self, column.name, data[i])
|
|
i += 1
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + "DupComic" + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
class Game(Base):
|
|
__tablename__ = "games"
|
|
|
|
title = Column(String)
|
|
game_id = Column(Integer, primary_key=True)
|
|
description = Column(String)
|
|
poster_path = Column(String)
|
|
path = Column(String)
|
|
windows = Column(Boolean)
|
|
mac = Column(Boolean)
|
|
linux = Column(Boolean)
|
|
title_sanitized = Column(String)
|
|
|
|
def __init__(self, data):
|
|
i = 0
|
|
try:
|
|
for column in self.__table__.columns:
|
|
setattr(self, column.name, data[i])
|
|
i += 1
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + "Game" + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
class User(Base, UserMixin):
|
|
__tablename__ = "users"
|
|
|
|
id = Column(Numeric, primary_key=True)
|
|
username = Column(String)
|
|
email = Column(String, unique=True)
|
|
passwordHash = Column(String(128))
|
|
isAdmin = Column(Boolean, default=False)
|
|
|
|
def __init__(self, data):
|
|
i = 0
|
|
try:
|
|
for column in self.__table__.columns:
|
|
setattr(self, column.name, data[i])
|
|
i += 1
|
|
pass
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + "User" + " " + str(type(e)) + " " + str(e))
|
|
|
|
def get_id(self):
|
|
return self.email
|
|
|
|
def check_password(self, password):
|
|
if not self.passwordHash:
|
|
return
|
|
result = check_password_hash(self.passwordHash, password)
|
|
return result
|
|
|
|
|
|
class UserTvMovieData(Base):
|
|
__tablename__ = "user_tv_movie_data"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
user = Column(String)
|
|
tmdb_id = Column(String)
|
|
parent_tmdb = Column(String)
|
|
time = Column(Integer)
|
|
length = Column(Integer)
|
|
finished = Column(Boolean, default=False)
|
|
time_stamp = Column(DateTime)
|
|
extended = Column(Boolean, default=False)
|
|
directors_cut = Column(Boolean, default=False)
|
|
|
|
def __init__(self, data):
|
|
i = 0
|
|
try:
|
|
for column in self.__table__.columns:
|
|
if column.name == "id":
|
|
continue
|
|
setattr(self, column.name, data[i])
|
|
i += 1
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + "UserTvMovieData" + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
class TvMovieKeywords(Base):
|
|
__tablename__ = "tv_movie_keywords"
|
|
|
|
tmdb_id = Column(String)
|
|
extended = Column(Boolean, default=False)
|
|
directors_cut = Column(Boolean, default=False)
|
|
key_words = Column(ARRAY(String))
|
|
id = Column(Integer, primary_key=True)
|
|
date = Column(TIMESTAMP)
|
|
|
|
def __init__(self, data):
|
|
i = 0
|
|
try:
|
|
for column in self.__table__.columns:
|
|
if column.name == "id" or column.name == "date":
|
|
continue
|
|
setattr(self, column.name, data[i])
|
|
i += 1
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + "TvMovieKeywords" + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
Base.metadata.create_all(engine)
|
|
|
|
|
|
"""class UserComicData(Base):
|
|
__tablename__ = "user_comic_data"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
user = Column(String)
|
|
comic_id = Column(Integer)
|
|
viewed = Column(Boolean, default=False)
|
|
|
|
def __init__(self, data):
|
|
i = 0
|
|
try:
|
|
for column in self.__table__.columns:
|
|
setattr(self, column.name, data[i])
|
|
i += 1
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))"""
|
|
|
|
|
|
# def get_db():
|
|
# db = getattr(g, '_database', None)
|
|
# if db is None:
|
|
# db = g._database = psycopg2.connect(host="bombur", database="rpiwebapp", user="rpiwebapp", password="hello").cursor()
|
|
#
|
|
# #db.row_factory = sqlite3.Row
|
|
# return db
|
|
|
|
|
|
# def get_imdb():
|
|
# db = getattr(g, '_imdb_database', None)
|
|
# if db is None:
|
|
# db = g._imdb_database = sqlite3.connect(IMDB_DATABASE)
|
|
#
|
|
# db.row_factory = sqlite3.Row
|
|
# return db
|
|
|
|
|
|
def update_user_tv_movie_data(tmdb_id, parent_id, time, length, finished=False, extended=False, directors_cut=False):
|
|
session = Session()
|
|
email = current_user.email
|
|
user_data = (
|
|
session.query(UserTvMovieData)
|
|
.filter(
|
|
UserTvMovieData.tmdb_id == tmdb_id,
|
|
UserTvMovieData.user == email,
|
|
UserTvMovieData.extended == extended,
|
|
UserTvMovieData.directors_cut == directors_cut,
|
|
)
|
|
.one_or_none()
|
|
)
|
|
if user_data:
|
|
user_data.time = time
|
|
user_data.finished = finished
|
|
user_data.time_stamp = datetime.datetime.now()
|
|
if not user_data.length > 0 and length > 0:
|
|
user_data.length = length
|
|
session.commit()
|
|
return user_data
|
|
else:
|
|
data = UserTvMovieData(
|
|
(email, tmdb_id, parent_id, time, length, finished, datetime.datetime.now(), extended, directors_cut))
|
|
session.add(data)
|
|
session.commit()
|
|
return data
|
|
|
|
|
|
def add_user(data):
|
|
session = Session()
|
|
user = User(data)
|
|
session.add(user)
|
|
session.commit()
|
|
return user
|
|
|
|
|
|
def add_movies(movies):
|
|
session = Session()
|
|
for movie_data in movies:
|
|
movie = Movie(movie_data)
|
|
session.add(movie)
|
|
session.commit()
|
|
|
|
|
|
def add_tv_shows(tv_show_data):
|
|
try:
|
|
session = Session()
|
|
tv_show = TvShow(tv_show_data)
|
|
result = session.query(TvShow).filter(TvShow.tmdb_id == tv_show.tmdb_id).one_or_none()
|
|
if not result:
|
|
session.add(tv_show)
|
|
session.commit()
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
def add_tv_episodes(episodes):
|
|
session = Session()
|
|
for episode_data in episodes:
|
|
try:
|
|
episode = TvEpisode(episode_data)
|
|
if not session.query(TvEpisode).filter(TvEpisode.tmdb_id == episode.tmdb_id).one_or_none():
|
|
session.add(episode)
|
|
session.commit()
|
|
else:
|
|
current_app.logger.info(f"TV episode: The TMDB id {episode.tmdb_id} for {episode.path} is already in the database.")
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
def add_comics(meta, thumbnails):
|
|
session = Session()
|
|
data = []
|
|
for info in meta:
|
|
issue = IssueString(info[1].issue).asFloat()
|
|
data.append(
|
|
[
|
|
info[0],
|
|
info[1].tagOrigin,
|
|
info[1].series,
|
|
(issue or -1),
|
|
info[1].issue,
|
|
info[1].title,
|
|
info[1].publisher,
|
|
info[1].month,
|
|
info[1].year,
|
|
info[1].day,
|
|
info[1].seriesYear,
|
|
info[1].issueCount,
|
|
info[1].volume,
|
|
info[1].genre,
|
|
info[1].language,
|
|
info[1].comments,
|
|
info[1].volumeCount,
|
|
info[1].criticalRating,
|
|
info[1].country,
|
|
info[1].alternateSeries,
|
|
info[1].alternateNumber,
|
|
info[1].alternateCount,
|
|
info[1].imprint,
|
|
info[1].notes,
|
|
info[1].webLink,
|
|
info[1].format,
|
|
info[1].manga,
|
|
info[1].blackAndWhite,
|
|
info[1].pageCount,
|
|
info[1].maturityRating,
|
|
info[1].storyArc,
|
|
info[1].seriesGroup,
|
|
info[1].scanInfo,
|
|
info[1].characters,
|
|
info[1].teams,
|
|
info[1].locations,
|
|
]
|
|
)
|
|
for comic_data in data:
|
|
for i in range(len(comic_data)):
|
|
if comic_data[i] == "":
|
|
comic_data[i] = None
|
|
dup_comics = []
|
|
for comic_data, images in zip(data, thumbnails):
|
|
try:
|
|
comic = Comic(comic_data)
|
|
if comic.publisher is None:
|
|
dup_comics.append(comic_data)
|
|
continue
|
|
session.add(comic)
|
|
session.commit()
|
|
comic_id = session.query(Comic.id).order_by(Comic.id.desc()).first()[0]
|
|
for index in range(len(images)):
|
|
thumbnail = ComicThumbnail([comic_id, index, images[index][0], images[index][1]])
|
|
session.add(thumbnail)
|
|
session.commit()
|
|
except IntegrityError as e:
|
|
session.rollback()
|
|
dup_comics.append(comic_data)
|
|
for dup in dup_comics:
|
|
try:
|
|
dup_comic = DupComic(dup)
|
|
session.add(dup_comic)
|
|
session.commit()
|
|
except IntegrityError as e:
|
|
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
session.rollback()
|
|
current_app.logger.info("{} comic{} added".format(len(meta), "s" if len(meta) > 1 or len(meta) < 1 else ""))
|
|
|
|
|
|
def add_games(games):
|
|
session = Session()
|
|
for game_data in games:
|
|
game = Game(game_data)
|
|
session.add(game)
|
|
session.commit()
|
|
|
|
|
|
def update_game(game_data):
|
|
session = Session()
|
|
game = session.query(Game).filter(Game.game_id == game_data[0]).one_or_none()
|
|
game.windows = game_data[1]
|
|
game.mac = game_data[2]
|
|
game.linux = game_data[3]
|
|
session.commit()
|
|
|
|
|
|
def db_get_all_comics():
|
|
session = Session()
|
|
result = session.query(Comic).all()
|
|
return result
|
|
|
|
|
|
def db_get_series_by_publisher(publisher):
|
|
session = Session()
|
|
result = (
|
|
session.query(Comic).filter(Comic.publisher == publisher).order_by(Comic.series, Comic.seriesyear,
|
|
Comic.issue).distinct(Comic.series).all()
|
|
)
|
|
series = result
|
|
return series
|
|
|
|
|
|
def db_get_comics_in_series(series, publisher, series_year):
|
|
session = Session()
|
|
result = (
|
|
session.query(Comic).filter(Comic.publisher == publisher, Comic.series == series,
|
|
Comic.seriesyear == series_year).order_by(Comic.issue).all()
|
|
)
|
|
comics = result
|
|
return comics
|
|
|
|
|
|
def get_publishers():
|
|
session = Session()
|
|
result = session.query(Comic.publisher).distinct(Comic.publisher).order_by(Comic.publisher).all()
|
|
publishers = [r for (r,) in result]
|
|
return publishers
|
|
|
|
|
|
def db_get_comic_by_id(comic_id):
|
|
session = Session()
|
|
comic = session.query(Comic).filter(Comic.id == comic_id).one_or_none()
|
|
return comic
|
|
|
|
|
|
def db_get_thumbnail_by_id_page(comic_id, pageNumber):
|
|
session = Session()
|
|
thumbnail = session.query(ComicThumbnail).filter(ComicThumbnail.comic_id == comic_id,
|
|
ComicThumbnail.pagenumber == pageNumber).one_or_none()
|
|
return thumbnail
|
|
|
|
|
|
def db_get_comic(comic_id):
|
|
session = Session()
|
|
comic = session.query(Comic).filter(Comic.id == comic_id).one_or_none()
|
|
# comic = session.query(Comic).filter(Comic.publisher == publisher, Comic.series == series, Comic.seriesyear == series_year, Comic.issue == issue).one_or_none()
|
|
return comic
|
|
|
|
|
|
def comic_path_in_db(path):
|
|
try:
|
|
session = Session()
|
|
result = session.query(Comic).filter(Comic.path == path).one_or_none()
|
|
result2 = session.query(DupComic).filter(DupComic.path == path).one_or_none()
|
|
if result or result2:
|
|
return True
|
|
except Exception as e:
|
|
current_app.logger.error(path)
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return False
|
|
|
|
|
|
def movie_path_in_db(path):
|
|
try:
|
|
session = Session()
|
|
result = session.query(Movie).filter(Movie.path == path).one_or_none()
|
|
if result:
|
|
return True
|
|
except Exception as e:
|
|
current_app.logger.error(path)
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return False
|
|
|
|
|
|
def tv_show_path_in_db(path):
|
|
try:
|
|
session = Session()
|
|
result = session.query(TvShow).filter(TvShow.path == path).one_or_none()
|
|
if result:
|
|
return True
|
|
except Exception as e:
|
|
current_app.logger.error(path)
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return False
|
|
|
|
|
|
def tv_episode_path_in_db(path):
|
|
try:
|
|
session = Session()
|
|
result = session.query(TvEpisode).filter(TvEpisode.path == path).one_or_none()
|
|
if result:
|
|
return True
|
|
except Exception as e:
|
|
current_app.logger.error(path)
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return False
|
|
|
|
|
|
def game_in_db(game_id):
|
|
try:
|
|
session = Session()
|
|
result = session.query(Game).filter(Game.game_id == game_id).one_or_none()
|
|
if result:
|
|
return True
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return False
|
|
|
|
|
|
# def imdb_get_movie(title, year):
|
|
# row = get_imdb().execute("SELECT tconst, runtimeMinutes FROM title_basics WHERE (originalTitle LIKE ? OR primaryTitle LIKE ?) AND (titleType LIKE '%movie' OR titleType='video') AND startYear=?", (title, title, year)).fetchone()
|
|
# return row
|
|
#
|
|
#
|
|
# def imdb_get_tv_show(title, year, info={}):
|
|
# if not info:
|
|
# row = get_imdb().execute(
|
|
# "SELECT tconst FROM title_basics WHERE (originalTitle LIKE ? OR primaryTitle LIKE ?) AND (titleType LIKE 'tvSeries' OR titleType LIKE 'tvMiniSeries') AND startYear=?",
|
|
# (title, title, year)).fetchone()
|
|
# else:
|
|
# row = get_imdb().execute(
|
|
# "SELECT tconst FROM title_basics WHERE originalTitle=? AND primaryTitle=? AND titleType=? AND startYear=? AND endYear=? AND runtimeMinutes=? AND genres=?",
|
|
# (info["originalTitle"], info["primaryTitle"], info["titleType"], info["startYear"], info["endYear"], info["runtimeMinutes"], info["genres"])).fetchone()
|
|
# return row
|
|
#
|
|
#
|
|
# def imdb_get_tv_episode(imdb_id, season, episode):
|
|
# row = get_imdb().execute(
|
|
# "SELECT tconst FROM title_episode WHERE parentTconst=? AND seasonNumber=? AND episodeNumber=?",
|
|
# [imdb_id, season, episode]).fetchone()
|
|
# return row
|
|
|
|
|
|
# def tmdb_get_movie_by_imdb_id(imdb_id):
|
|
# data = tmdb.get_movie_data(imdb_id)
|
|
# return data
|
|
#
|
|
#
|
|
# def tmdb_get_tv_show_by_imdb_id(imdb_id):
|
|
# data = tmdb.get_tv_show_data(imdb_id)
|
|
# return data
|
|
#
|
|
#
|
|
# def tmdb_get_tv_episode_by_imdb_id(imdb_id):
|
|
# data = tmdb.get_tv_episode_data(imdb_id)
|
|
# return data
|
|
|
|
|
|
def db_get_all_movies():
|
|
session = Session()
|
|
movies = session.query(Movie).order_by(Movie.title, Movie.year).all()
|
|
if current_user:
|
|
email = current_user.email
|
|
movies = [(movie,
|
|
session.execute("SELECT rpiwebapp.is_movie_finished({}, '{}')".format(movie.tmdb_id, email)).first()[
|
|
0]) for movie in movies]
|
|
return movies
|
|
|
|
|
|
def db_get_movie_by_tmdb_id(tmdb_id, extended=False, directors_cut=False):
|
|
session = Session()
|
|
result = session.query(Movie).filter(Movie.tmdb_id == tmdb_id, Movie.extended == extended,
|
|
Movie.directors_cut == directors_cut).one_or_none()
|
|
return result
|
|
|
|
|
|
def tv_movie_sort(a):
|
|
if type(a) is tuple:
|
|
return a[0].title
|
|
return a.title
|
|
|
|
|
|
def get_all_tv_movies():
|
|
session = Session()
|
|
movies = session.query(Movie).order_by(Movie.title, Movie.year).all()
|
|
shows = session.query(TvShow).order_by(TvShow.title, TvShow.year).all()
|
|
if current_user:
|
|
email = current_user.email
|
|
shows = [
|
|
(i, session.execute("SELECT rpiwebapp.is_tv_show_finished({}, '{}')".format(i.tmdb_id, email)).first()[0])
|
|
for i in shows]
|
|
movies = [
|
|
(i, session.execute("SELECT rpiwebapp.is_movie_finished({}, '{}')".format(i.tmdb_id, email)).first()[0]) for
|
|
i in movies]
|
|
tv_movies = movies + shows
|
|
tv_movies = sorted(tv_movies, key=tv_movie_sort)
|
|
return tv_movies
|
|
|
|
|
|
def get_tv_movie_by_tmdb_id(tmdb_id, extended=False, directors_cut=False):
|
|
session = Session()
|
|
result = session.query(Movie).filter(Movie.tmdb_id == tmdb_id, Movie.extended == extended,
|
|
Movie.directors_cut == directors_cut).one_or_none()
|
|
if not result:
|
|
result = session.query(TvShow).filter(TvShow.tmdb_id == tmdb_id).one_or_none()
|
|
return result
|
|
|
|
|
|
def get_currently_watching():
|
|
pass
|
|
|
|
|
|
def get_all_tv_shows():
|
|
session = Session()
|
|
result = session.query(TvShow).order_by(TvShow.title, TvShow.year).all()
|
|
if current_user:
|
|
email = current_user.email
|
|
shows = [
|
|
(i, session.execute("SELECT rpiwebapp.is_tv_show_finished({}, {})".format(i.tmdb_id, email)).first()[0]) for
|
|
i in result]
|
|
else:
|
|
shows = result
|
|
return shows
|
|
|
|
|
|
def get_tv_show(tmdb_id):
|
|
session = Session()
|
|
result = session.query(TvShow).filter(TvShow.tmdb_id == tmdb_id).one_or_none()
|
|
return result
|
|
|
|
|
|
def get_tv_show_episodes_by_tmdb_id(tmdb_id):
|
|
session = Session()
|
|
result = session.query(TvEpisode).filter(TvEpisode.parent_tmdb_id == tmdb_id).order_by(TvEpisode.season,
|
|
TvEpisode.episode).all()
|
|
return result
|
|
|
|
|
|
def db_get_episode_by_tmdb_id(tmdb_id):
|
|
session = Session()
|
|
result = session.query(TvEpisode).filter(TvEpisode.tmdb_id == tmdb_id).one_or_none()
|
|
return result
|
|
|
|
|
|
def db_get_user_tv_movie_data(tmdb_id, extended=False, directors_cut=False):
|
|
session = Session()
|
|
email = current_user.email
|
|
result = (
|
|
session.query(UserTvMovieData)
|
|
.filter(
|
|
UserTvMovieData.user == email,
|
|
UserTvMovieData.tmdb_id == tmdb_id,
|
|
UserTvMovieData.extended == extended,
|
|
UserTvMovieData.directors_cut == directors_cut,
|
|
)
|
|
.one_or_none()
|
|
)
|
|
return result
|
|
|
|
|
|
def db_get_user_tv_show_episodes_data(parent_tmdb_id) -> list:
|
|
session = Session()
|
|
email = current_user.email
|
|
result = session.query(UserTvMovieData).filter(UserTvMovieData.user == email,
|
|
UserTvMovieData.parent_tmdb == parent_tmdb_id).all()
|
|
return result
|
|
|
|
|
|
def db_get_current_tv_show_episode_and_data(parent_tmdb_id, episodes):
|
|
session = Session()
|
|
email = current_user.email
|
|
result = session.query(UserTvMovieData).filter(UserTvMovieData.user == email,
|
|
UserTvMovieData.parent_tmdb == parent_tmdb_id).all()
|
|
if not result:
|
|
return episodes[0], None
|
|
most_recent_data = result[0]
|
|
most_recent_episode = episodes[0]
|
|
for episode_data in result:
|
|
if episode_data.time_stamp and most_recent_data.time_stamp:
|
|
if episode_data.time_stamp > most_recent_data.time_stamp:
|
|
most_recent_data = episode_data
|
|
for episode in episodes:
|
|
if episode.tmdb_id == most_recent_data.tmdb_id:
|
|
if most_recent_data.finished:
|
|
if episode == episodes[-1]:
|
|
most_recent_episode = episodes[0]
|
|
most_recent_data = None
|
|
break
|
|
most_recent_episode = episodes[episodes.index(episode) + 1]
|
|
for episode_data in result:
|
|
if most_recent_episode.tmdb_id == episode_data.tmdb_id:
|
|
most_recent_data = episode_data
|
|
break
|
|
else:
|
|
most_recent_data = None
|
|
break
|
|
most_recent_episode = episode
|
|
break
|
|
return most_recent_episode, most_recent_data
|
|
|
|
|
|
def get_all_games():
|
|
session = Session()
|
|
result = session.query(Game).order_by(Game.title).all()
|
|
return result
|
|
|
|
|
|
def get_windows_games():
|
|
session = Session()
|
|
result = session.query(Game).filter(Game.windows == True).order_by(Game.title).all()
|
|
return result
|
|
|
|
|
|
def get_mac_games():
|
|
session = Session()
|
|
result = session.query(Game).filter(Game.mac == True).order_by(Game.title).all()
|
|
return result
|
|
|
|
|
|
def get_linux_games():
|
|
session = Session()
|
|
result = session.query(Game).filter(Game.linux == True).order_by(Game.title).all()
|
|
return result
|
|
|
|
|
|
def get_game(game_id):
|
|
session = Session()
|
|
result = session.query(Game).filter(Game.game_id == game_id).one_or_none()
|
|
return result
|
|
|
|
|
|
def db_search_table_columns_by_query(query, table, columns, group=[], order=[]):
|
|
session = Session()
|
|
results = {}
|
|
final_query = "%" + query.replace(" ", "%") + "%"
|
|
for column in columns:
|
|
results[column.name] = [
|
|
i[0]
|
|
for i in session.query(table, over(func.rank(), partition_by=group, order_by=order))
|
|
.filter(cast(column, sqlalchemy.String).ilike(final_query))
|
|
.all()
|
|
]
|
|
return results
|
|
|
|
|
|
def db_search_comics(query):
|
|
publishers = []
|
|
series = []
|
|
comics = []
|
|
|
|
results = db_search_table_columns_by_query(query, Comic, [Comic.publisher, Comic.title, Comic.series, Comic.year])
|
|
series_results = db_search_table_columns_by_query(
|
|
query, Comic, [Comic.publisher, Comic.title, Comic.series, Comic.year], group=[Comic.series, Comic.seriesyear],
|
|
order=[Comic.issue]
|
|
)
|
|
for row in results["publisher"]:
|
|
if row["publisher"] not in publishers:
|
|
publishers.append(row.publisher)
|
|
for row in series_results["series"]:
|
|
if row not in series:
|
|
series.append(row)
|
|
for row in results["title"]:
|
|
if row not in comics:
|
|
comics.append(row)
|
|
for row in results["year"]:
|
|
if row not in comics:
|
|
comics.append(row)
|
|
|
|
return {"publisher": publishers, "series": series, "comics": comics}
|
|
|
|
|
|
def db_search_movies(query):
|
|
results = db_search_table_columns_by_query(query, Movie, [Movie.title, Movie.year, Movie.description],
|
|
order=[Movie.title])
|
|
movies = []
|
|
for movie in results["title"]:
|
|
if movie not in movies:
|
|
movies.append(movie)
|
|
for movie in results["description"]:
|
|
if movie not in movies:
|
|
movies.append(movie)
|
|
for movie in results["year"]:
|
|
if movie not in movies:
|
|
movies.append(movie)
|
|
session = Session()
|
|
email = current_user.email
|
|
movies = [(i, session.execute("SELECT rpiwebapp.is_movie_finished({}, '{}')".format(i.tmdb_id, email)).first()[0])
|
|
for i in movies]
|
|
|
|
return movies
|
|
|
|
|
|
def db_search_tv_shows(query):
|
|
results = db_search_table_columns_by_query(query, TvShow, [TvShow.title, TvShow.year, TvShow.description],
|
|
order=[TvShow.title])
|
|
tv_shows = []
|
|
for show in results["title"]:
|
|
if show not in tv_shows:
|
|
tv_shows.append(show)
|
|
for show in results["description"]:
|
|
if show not in tv_shows:
|
|
tv_shows.append(show)
|
|
for show in results["year"]:
|
|
if show not in tv_shows:
|
|
tv_shows.append(show)
|
|
session = Session()
|
|
email = current_user.email
|
|
shows = [(i, session.execute("SELECT rpiwebapp.is_tv_show_finished({}, '{}')".format(i.tmdb_id, email)).first()[0])
|
|
for i in tv_shows]
|
|
return shows
|
|
|
|
|
|
def db_search_tv_movie(query):
|
|
movies = db_search_movies(query)
|
|
shows = db_search_tv_shows(query)
|
|
tv_movies = sorted(movies + shows, key=tv_movie_sort)
|
|
return tv_movies
|
|
|
|
|
|
def resize_image(image, new_width=256, new_height=256):
|
|
new_image = image
|
|
orig_height = new_image.height
|
|
orig_width = new_image.width
|
|
if orig_height >= orig_width:
|
|
width = int((orig_width / orig_height) * new_height)
|
|
height = new_height
|
|
else:
|
|
height = int((orig_height / orig_width) * new_width)
|
|
width = new_width
|
|
new_image.thumbnail(width, height)
|
|
return new_image
|
|
|
|
|
|
# def fix_thumbnails():
|
|
# new_height = 256
|
|
# new_width = 256
|
|
# print("Start fix thumbnail size")
|
|
# rows = get_db().execute("SELECT * FROM rpiwebapp.comic_thumbnails")
|
|
# print("got list of all thumbnails\n")
|
|
#
|
|
# for row in rows:
|
|
# image = Image(file=BytesIO(row["image"]))
|
|
# if image.width > new_width or image.height > new_height:
|
|
# print("id:", row["id"], "pageNumber:", row["pageNumber"])
|
|
# get_db().execute("UPDATE rpiwebapp.comic_thumbnails SET image=? WHERE comic_id=? AND pageNumber=?",
|
|
# [resize_image(image, new_width, new_height).make_blob(), row["id"], row["pageNumber"]])
|
|
# get_db().commit()
|
|
# print("Finished fix thumbnail size")
|
|
|
|
|
|
def get_user(email):
|
|
try:
|
|
session = Session()
|
|
result = session.query(User).filter(User.email == email).one_or_none()
|
|
if result:
|
|
return result
|
|
return None
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|