Matthew Welch
c4e6824a8a
Change error logging to log messages as error rather than info Add 404 error when video or comic not found Change paths to use pathlib
401 lines
13 KiB
Python
401 lines
13 KiB
Python
import base64
|
|
import datetime
|
|
import inspect
|
|
import json
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import threading
|
|
from urllib.parse import urljoin, urlsplit, urlunsplit
|
|
|
|
import inotify.adapters
|
|
import inotify.constants
|
|
import requests
|
|
from flask import Flask, Response, current_app, flash, g, redirect, render_template, request, url_for, make_response
|
|
from flask_login import LoginManager, current_user, login_required, login_user, logout_user
|
|
from oauthlib.oauth2 import WebApplicationClient
|
|
|
|
import func
|
|
from admin import admin
|
|
from comics import comics
|
|
from games import games
|
|
import database
|
|
from tv_movies import tv_movies
|
|
|
|
|
|
class NullHandler(logging.Handler):
|
|
def emit(self, record=None):
|
|
pass
|
|
|
|
def debug(self, *arg):
|
|
pass
|
|
|
|
|
|
nullLog = NullHandler()
|
|
inotify.adapters._LOGGER = nullLog
|
|
|
|
|
|
GOOGLE_CLIENT_ID = "***REMOVED***"
|
|
GOOGLE_CLIENT_SECRET = "***REMOVED***"
|
|
GOOGLE_DISCOVERY_URL = "https://accounts.google.com/.well-known/openid-configuration"
|
|
client = WebApplicationClient(GOOGLE_CLIENT_ID)
|
|
|
|
|
|
def get_google_provider_cfg():
|
|
return requests.get(GOOGLE_DISCOVERY_URL).json()
|
|
|
|
|
|
app = Flask(__name__)
|
|
app.register_blueprint(comics.Comics)
|
|
app.register_blueprint(admin.Admin)
|
|
app.register_blueprint(tv_movies.TV_Movies)
|
|
app.register_blueprint(games.Games)
|
|
app.config["SECRET_KEY"] = "***REMOVED***"
|
|
app.logger.setLevel("DEBUG")
|
|
# app.use_x_sendfile = True
|
|
|
|
login_manager = LoginManager(app)
|
|
# login_manager.login_view = "login"
|
|
app.config["REMEMBER_COOKIE_DOMAIN"] = "narnian.us"
|
|
|
|
|
|
MOBILE_DEVICES = ["android", "blackberry", "ipad", "iphone"]
|
|
|
|
|
|
def get_comics():
|
|
with app.app_context():
|
|
i = inotify.adapters.InotifyTree(func.COMICS_DIRECTORY)
|
|
new_dirs = []
|
|
func.get_comics()
|
|
while True:
|
|
for event in i.event_gen(timeout_s=5*60, yield_nones=False):
|
|
(header, type_names, path, filename) = event
|
|
file_path = pathlib.Path(path, filename)
|
|
if "IN_CLOSE_WRITE" in type_names or "IN_MOVED_TO" in type_names:
|
|
func.get_comic(file_path)
|
|
elif "IN_CREATE" in type_names:
|
|
if file_path.is_dir():
|
|
new_dirs.append(file_path)
|
|
for new_dir in new_dirs:
|
|
for file in new_dir.glob("*"):
|
|
func.get_comic(file)
|
|
new_dirs.clear()
|
|
|
|
|
|
|
|
def get_movies():
|
|
with app.app_context():
|
|
i = inotify.adapters.InotifyTree(func.MOVIES_DIRECTORY)
|
|
|
|
func.get_movies()
|
|
|
|
for event in i.event_gen(yield_nones=False):
|
|
(header, type_names, path, filename) = event
|
|
file_path = pathlib.Path(path, filename)
|
|
if "IN_CLOSE_WRITE" in type_names or "IN_MOVED_TO" in type_names:
|
|
func.get_movie(file_path)
|
|
|
|
|
|
def get_tv_shows():
|
|
with app.app_context():
|
|
i = inotify.adapters.InotifyTree(func.TV_SHOWS_DIRECTORY)
|
|
func.get_tv_shows()
|
|
func.get_tv_episodes()
|
|
for event in i.event_gen(yield_nones=False):
|
|
(header, type_names, path, filename) = event
|
|
file_path = pathlib.Path(path, filename)
|
|
if "IN_CLOSE_WRITE" in type_names or "IN_MOVED_TO" in type_names:
|
|
if file_path.is_dir():
|
|
func.get_tv_shows()
|
|
else:
|
|
func.get_tv_episode(file_path)
|
|
|
|
|
|
def get_games():
|
|
with app.app_context():
|
|
i = inotify.adapters.Inotify()
|
|
i.add_watch(func.GAMES_DIRECTORY)
|
|
for directory in os.listdir(func.GAMES_DIRECTORY):
|
|
path = pathlib.Path(func.GAMES_DIRECTORY, directory)
|
|
if path.is_dir():
|
|
i.add_watch(str(path))
|
|
|
|
func.get_games()
|
|
func.update_games()
|
|
|
|
for event in i.event_gen(yield_nones=False):
|
|
(header, type_names, path, filename) = event
|
|
file_path = pathlib.Path(path, filename)
|
|
if "IN_CLOSE_WRITE" in type_names or "IN_MOVED_TO" in type_names:
|
|
func.get_game(file_path)
|
|
elif "IN_CREATE" in type_names:
|
|
if file_path.is_dir() and len(file_path.name) > 2:
|
|
i.add_watch(str(file_path))
|
|
elif "IN_DELETE_SELF" in type_names:
|
|
if file_path.is_dir():
|
|
i.remove_watch(file_path)
|
|
|
|
|
|
with app.app_context():
|
|
current_app.logger.info("server start")
|
|
thread = threading.Thread(target=get_comics, args=())
|
|
thread.daemon = True
|
|
thread.start()
|
|
thread2 = threading.Thread(target=get_movies, args=())
|
|
thread2.daemon = True
|
|
thread2.start()
|
|
thread3 = threading.Thread(target=get_tv_shows, args=())
|
|
thread3.daemon = True
|
|
thread3.start()
|
|
thread4 = threading.Thread(target=get_games, args=())
|
|
thread4.daemon = True
|
|
thread4.start()
|
|
|
|
|
|
@app.teardown_appcontext
|
|
def close_connection(exception):
|
|
db = getattr(g, "_database", None)
|
|
if db is not None:
|
|
db.close()
|
|
|
|
|
|
def update_comic_db(sender, **kw):
|
|
try:
|
|
database.add_comics(kw["meta"], kw["thumbnails"])
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
def update_movie_db(sender, **kw):
|
|
try:
|
|
database.add_movies(kw["movies"])
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
def update_tv_show_db(sender, **kw):
|
|
try:
|
|
database.add_tv_shows(kw["tv_show"])
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
def update_tv_episodes_db(sender, **kw):
|
|
try:
|
|
database.add_tv_episodes(kw["tv_episodes"])
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
def update_games_db(sender, **kw):
|
|
try:
|
|
database.add_games(kw["games"])
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
func.comic_loaded.connect(update_comic_db)
|
|
func.movie_loaded.connect(update_movie_db)
|
|
func.tv_show_loaded.connect(update_tv_show_db)
|
|
func.tv_episodes_loaded.connect(update_tv_episodes_db)
|
|
func.games_loaded.connect(update_games_db)
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(email):
|
|
try:
|
|
return database.get_user(email)
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
@login_manager.request_loader
|
|
def load_user_from_request(request):
|
|
try:
|
|
basic_auth = request.headers.get("Authorization")
|
|
api_key = request.args.get("api_key")
|
|
auth_key = basic_auth if basic_auth else api_key
|
|
if auth_key:
|
|
auth_key = auth_key.replace("Basic ", "", 1)
|
|
try:
|
|
auth_key = base64.b64decode(auth_key).decode("utf-8")
|
|
except TypeError:
|
|
pass
|
|
email = auth_key.split(":")[0]
|
|
password = auth_key.split(":")[1]
|
|
user = load_user(email)
|
|
if user and user.check_password(password):
|
|
return user
|
|
return None
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
try:
|
|
url = urlsplit(request.url)
|
|
google_redirect = urlunsplit(("", "", url.path, url.query, ""))
|
|
next_page = google_redirect
|
|
if "login" in url.path:
|
|
next_page = url_for("home")
|
|
if request.args.get("url", default=None):
|
|
next_page = request.args.get("url", default=None)
|
|
|
|
google_provider_cfg = get_google_provider_cfg()
|
|
authorization_endpoint = google_provider_cfg["authorization_endpoint"]
|
|
|
|
request_uri = client.prepare_request_uri(
|
|
authorization_endpoint,
|
|
redirect_uri=urljoin(request.host_url, url_for("callback")),
|
|
scope=["openid", "email", "profile"],
|
|
state=next_page,
|
|
hd="narnian.us",
|
|
)
|
|
|
|
if request.method == "POST":
|
|
email = request.form.get("email")
|
|
password = request.form.get("password")
|
|
user = database.get_user(email)
|
|
if user is None or not user.check_password(password):
|
|
flash("invalid email or password")
|
|
return redirect(url_for("login"))
|
|
login_user(user, remember=True, duration=datetime.timedelta(days=7))
|
|
return redirect(next_page)
|
|
return render_template("login.html", title="login", auth_url=request_uri)
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(e)
|
|
|
|
|
|
@app.route("/login/callback")
|
|
def callback():
|
|
try:
|
|
# Get authorization code Google sent back to you
|
|
code = request.args.get("code")
|
|
|
|
google_provider_cfg = get_google_provider_cfg()
|
|
token_endpoint = google_provider_cfg["token_endpoint"]
|
|
|
|
token_url, headers, body = client.prepare_token_request(
|
|
token_endpoint, authorization_response=request.url, redirect_url=request.base_url, code=code
|
|
)
|
|
token_response = requests.post(token_url, headers=headers, data=body, auth=(GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET))
|
|
|
|
client.parse_request_body_response(json.dumps(token_response.json()))
|
|
|
|
userinfo_endpoint = google_provider_cfg["userinfo_endpoint"]
|
|
uri, headers, body = client.add_token(userinfo_endpoint)
|
|
userinfo_response = requests.get(uri, headers=headers, data=body)
|
|
|
|
if userinfo_response.json().get("email_verified"):
|
|
unique_id = userinfo_response.json()["sub"]
|
|
users_email = userinfo_response.json()["email"]
|
|
users_name = userinfo_response.json()["given_name"]
|
|
else:
|
|
return "User email not available or not verified by Google.", 400
|
|
|
|
data = (unique_id, users_name, users_email, None, False)
|
|
|
|
current_app.logger.info("user data from google: " + str(data))
|
|
user = database.get_user(users_email)
|
|
|
|
if not user:
|
|
user = database.add_user(data)
|
|
current_app.logger.info("new user: {} created".format(users_email))
|
|
|
|
current_app.logger.info("email: " + str(user.email))
|
|
current_app.logger.info("username: " + str(user.username))
|
|
current_app.logger.info("authenticated: " + str(user.is_authenticated))
|
|
current_app.logger.info("active: " + str(user.is_active))
|
|
current_app.logger.info("id: " + str(user.get_id()))
|
|
login_user(user, remember=True, duration=datetime.timedelta(days=7), force=True)
|
|
|
|
return redirect(request.args.get("state"))
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(e)
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
try:
|
|
logout_user()
|
|
return redirect(url_for("login"))
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(e)
|
|
|
|
|
|
@app.route("/")
|
|
def root():
|
|
return redirect(url_for("home"))
|
|
|
|
|
|
@app.route("/home")
|
|
@login_required
|
|
def home():
|
|
try:
|
|
return render_template("home.html", title="Home", current_user=current_user)
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(e)
|
|
|
|
|
|
@app.after_request
|
|
def apply_headers(response: Response):
|
|
try:
|
|
user_name = "anonymous"
|
|
if current_user:
|
|
user_name = getattr(current_user, "email", "anonymous")
|
|
response.set_cookie("rpi_name", user_name)
|
|
response.headers.set("email", user_name)
|
|
return response
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(e)
|
|
|
|
|
|
@app.route("/music")
|
|
@login_required
|
|
def music():
|
|
return "No music"
|
|
|
|
|
|
@app.errorhandler(404)
|
|
def resource_not_found(e):
|
|
try:
|
|
return render_template("404.html"), 404
|
|
except Exception as e:
|
|
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(e)
|
|
|
|
|
|
@login_manager.unauthorized_handler
|
|
def handle_unauthorized():
|
|
temp = login()
|
|
t = make_response(temp)
|
|
t.headers['WWW-Authenticate'] = 'Basic realm="Access to the staging site"'
|
|
return t, 401
|
|
|
|
|
|
@app.errorhandler(Exception)
|
|
def handle_exception(e):
|
|
return render_template("error.html", e=e), 500
|
|
|
|
|
|
games.Games.register_error_handler(404, resource_not_found)
|
|
tv_movies.TV_Movies.register_error_handler(404, resource_not_found)
|
|
comics.Comics.register_error_handler(404, resource_not_found)
|
|
admin.Admin.register_error_handler(404, resource_not_found)
|
|
games.Games.register_error_handler(Exception, handle_exception)
|
|
tv_movies.TV_Movies.register_error_handler(Exception, handle_exception)
|
|
comics.Comics.register_error_handler(Exception, handle_exception)
|
|
admin.Admin.register_error_handler(Exception, handle_exception)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run()
|