rpiwebapp-public/rpiWebApp.py

356 lines
10 KiB
Python

from flask import Flask
from flask import render_template, request, g, redirect, url_for, flash, current_app, Response
from flask_login import LoginManager, current_user, login_user, logout_user, login_required
from oauthlib.oauth2 import WebApplicationClient
import threading
import logging
import inotify.adapters, inotify.constants
import inspect
import datetime
import requests
import json
import os
import base64
import scripts.func as func
from scripts import database
from admin import admin
from comics import comics
from tv_movies import tv_movies
from games import games
class NullHandler(logging.Handler):
def emit(self, record=None):
pass
def debug(self, *arg):
pass
nullLog = NullHandler()
inotify.adapters._LOGGER = nullLog
GOOGLE_CLIENT_ID = "***REMOVED***"
GOOGLE_CLIENT_SECRET = "***REMOVED***"
GOOGLE_DISCOVERY_URL = (
"https://accounts.google.com/.well-known/openid-configuration"
)
client = WebApplicationClient(GOOGLE_CLIENT_ID)
def get_google_provider_cfg():
return requests.get(GOOGLE_DISCOVERY_URL).json()
app = Flask(__name__)
app.register_blueprint(comics.Comics)
app.register_blueprint(admin.Admin)
app.register_blueprint(tv_movies.TV_Movies)
app.register_blueprint(games.Games)
app.config["SECRET_KEY"] = "***REMOVED***"
app.logger.setLevel("DEBUG")
# app.use_x_sendfile = True
login_manager = LoginManager(app)
login_manager.login_view = "login"
app.config["REMEMBER_COOKIE_DOMAIN"] = "narnian.us"
MOBILE_DEVICES = ["android", "blackberry", "ipad", "iphone"]
def get_comics():
with app.app_context():
i = inotify.adapters.InotifyTree(func.COMICS_DIRECTORY)
func.get_comics()
for event in i.event_gen(yield_nones=False):
(header, type_names, path, filename) = event
file_path = os.path.join(path, filename)
if "IN_CLOSE_WRITE" in type_names or "IN_MOVED_TO" in type_names:
func.get_comic(file_path)
def get_movies():
with app.app_context():
i = inotify.adapters.InotifyTree(func.MOVIES_DIRECTORY)
func.get_movies()
for event in i.event_gen(yield_nones=False):
(header, type_names, path, filename) = event
file_path = os.path.join(path, filename)
if "IN_CLOSE_WRITE" in type_names or "IN_MOVED_TO" in type_names:
func.get_movie(file_path)
def get_tv_shows():
with app.app_context():
i = inotify.adapters.InotifyTree(func.TV_SHOWS_DIRECTORY)
func.get_tv_shows()
func.get_tv_episodes()
for event in i.event_gen(yield_nones=False):
(header, type_names, path, filename) = event
file_path = os.path.join(path, filename)
if "IN_CLOSE_WRITE" in type_names or "IN_MOVED_TO" in type_names:
func.get_tv_shows()
func.get_tv_episode(file_path)
def get_games():
with app.app_context():
func.get_games()
with app.app_context():
current_app.logger.info("server start")
thread = threading.Thread(target=get_comics, args=())
thread.daemon = True
thread.start()
thread2 = threading.Thread(target=get_movies, args=())
thread2.daemon = True
thread2.start()
thread3 = threading.Thread(target=get_tv_shows, args=())
thread3.daemon = True
thread3.start()
thread4 = threading.Thread(target=get_games, args=())
thread4.daemon = True
thread4.start()
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
def update_comic_db(sender, **kw):
try:
database.add_comics(kw["meta"], kw["thumbnails"])
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
def update_movie_db(sender, **kw):
try:
database.add_movies(kw["movies"])
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
def update_tv_show_db(sender, **kw):
try:
database.add_tv_shows(kw["tv_show"])
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
def update_tv_episodes_db(sender, **kw):
try:
database.add_tv_episodes(kw["tv_episodes"])
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
def update_games_db(sender, **kw):
try:
database.add_games(kw["games"])
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
func.comic_loaded.connect(update_comic_db)
func.movie_loaded.connect(update_movie_db)
func.tv_show_loaded.connect(update_tv_show_db)
func.tv_episodes_loaded.connect(update_tv_episodes_db)
func.games_loaded.connect(update_games_db)
@login_manager.user_loader
def load_user(email):
try:
return database.get_user(email)
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
@login_manager.request_loader
def load_user_from_request(request):
try:
api_key = request.headers.get('Authorization')
if api_key:
api_key = api_key.replace('Basic ', '', 1)
try:
api_key = base64.b64decode(api_key).decode("utf-8")
except TypeError:
pass
email = api_key.split(":")[0]
password = api_key.split(":")[1]
user = load_user(email)
if user and user.check_password(password):
return user
return None
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
@app.route("/login", methods=["GET", "POST"])
def login():
try:
google_provider_cfg = get_google_provider_cfg()
authorization_endpoint = google_provider_cfg["authorization_endpoint"]
request_uri = client.prepare_request_uri(
authorization_endpoint,
redirect_uri=request.base_url + "/callback",
scope=["openid", "email", "profile"],
)
if request.method == "POST":
email = request.form.get("email")
password = request.form.get("password")
user = database.get_user(email)
if user is None or not user.check_password(password):
flash("invalid email or password")
return redirect(url_for("login"))
login_user(user, remember=True, duration=datetime.timedelta(days=7))
next_page = request.args.get("next")
if not next_page:
next_page = url_for("home")
return redirect(next_page)
return render_template("login.html", title="login", auth_url=request_uri)
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(e)
@app.route("/login/callback")
def callback():
try:
# Get authorization code Google sent back to you
code = request.args.get("code")
google_provider_cfg = get_google_provider_cfg()
token_endpoint = google_provider_cfg["token_endpoint"]
token_url, headers, body = client.prepare_token_request(
token_endpoint,
authorization_response=request.url,
redirect_url=request.base_url,
code=code
)
token_response = requests.post(
token_url,
headers=headers,
data=body,
auth=(GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET)
)
client.parse_request_body_response(json.dumps(token_response.json()))
userinfo_endpoint = google_provider_cfg["userinfo_endpoint"]
uri, headers, body = client.add_token(userinfo_endpoint)
userinfo_response = requests.get(uri, headers=headers, data=body)
if userinfo_response.json().get("email_verified"):
unique_id = userinfo_response.json()["sub"]
users_email = userinfo_response.json()["email"]
users_name = userinfo_response.json()["given_name"]
else:
return "User email not available or not verified by Google.", 400
data = (unique_id, users_name, users_email, None, False)
current_app.logger.info("user data from google: " + str(data))
user = database.get_user(users_email)
if not user:
user = database.add_user(data)
current_app.logger.info("new user: {} created".format(users_email))
current_app.logger.info("email: "+str(user.email))
current_app.logger.info("username: "+str(user.username))
current_app.logger.info("authenticated: "+str(user.is_authenticated))
current_app.logger.info("active: "+str(user.is_active))
current_app.logger.info("id: "+str(user.get_id()))
login_user(user, remember=True, duration=datetime.timedelta(days=7), force=True)
return redirect(url_for("home"))
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(e)
@app.route("/logout")
def logout():
try:
logout_user()
return redirect(url_for("login"))
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(e)
@app.route("/")
def root():
return redirect(url_for("home"))
@app.route("/home")
@login_required
def home():
try:
return render_template("home.html", title="Home", current_user=current_user)
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(e)
@app.after_request
def apply_headers(response: Response):
try:
user_name = "anonymous"
if current_user:
user_name = getattr(current_user, "email", "anonymous")
response.set_cookie("rpi_name", user_name)
return response
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(e)
@app.route("/music")
@login_required
def music():
return "No music"
@app.errorhandler(404)
def resource_not_found(e):
try:
return render_template("404.html"), 404
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(e)
@app.errorhandler(Exception)
def handle_exception(e):
return render_template("error.html", e=e), 500
games.Games.register_error_handler(404, resource_not_found)
tv_movies.TV_Movies.register_error_handler(404, resource_not_found)
comics.Comics.register_error_handler(404, resource_not_found)
admin.Admin.register_error_handler(404, resource_not_found)
games.Games.register_error_handler(Exception, handle_exception)
tv_movies.TV_Movies.register_error_handler(Exception, handle_exception)
comics.Comics.register_error_handler(Exception, handle_exception)
admin.Admin.register_error_handler(Exception, handle_exception)
if __name__ == '__main__':
app.run()