rpiwebapp-public/rpiWebApp.py

284 lines
7.8 KiB
Python

from flask import Flask
from flask import render_template, request, g, redirect, url_for, flash, current_app
from flask_login import LoginManager, current_user, login_user, logout_user, login_required
from oauthlib.oauth2 import WebApplicationClient
import threading
import logging
import inotify.adapters, inotify.constants
import inspect
import datetime
import requests
import json
import os
import scripts.func as func
from scripts import database
from admin import admin
from comics import comics
from tv_movies import tv_movies
class NullHandler(logging.Handler):
def emit(self, record=None):
pass
def debug(self, *arg):
pass
nullLog = NullHandler()
inotify.adapters._LOGGER = nullLog
GOOGLE_CLIENT_ID = "***REMOVED***"
GOOGLE_CLIENT_SECRET = "***REMOVED***"
GOOGLE_DISCOVERY_URL = (
"https://accounts.google.com/.well-known/openid-configuration"
)
client = WebApplicationClient(GOOGLE_CLIENT_ID)
def get_google_provider_cfg():
return requests.get(GOOGLE_DISCOVERY_URL).json()
app = Flask(__name__)
app.register_blueprint(comics.Comics)
app.register_blueprint(admin.Admin)
app.register_blueprint(tv_movies.TV_Movies)
app.config["SECRET_KEY"] = "***REMOVED***"
app.logger.setLevel("DEBUG")
login_manager = LoginManager(app)
login_manager.login_view = "login"
MOBILE_DEVICES = ["android", "blackberry", "ipad", "iphone"]
def get_comics():
with app.app_context():
i = inotify.adapters.InotifyTree(func.COMICS_DIRECTORY)
func.get_comics()
for event in i.event_gen(yield_nones=False):
(header, type_names, path, filename) = event
file_path = os.path.join(path, filename)
if "IN_CLOSE_WRITE" in type_names or "IN_MOVED_TO" in type_names:
func.get_comic(file_path)
def get_movies():
with app.app_context():
i = inotify.adapters.InotifyTree(func.MOVIES_DIRECTORY)
func.get_movies()
for event in i.event_gen(yield_nones=False):
(header, type_names, path, filename) = event
file_path = os.path.join(path, filename)
if "IN_CLOSE_WRITE" in type_names or "IN_MOVED_TO" in type_names:
func.get_movie(file_path)
def get_tv_shows():
with app.app_context():
i = inotify.adapters.InotifyTree(func.TV_SHOWS_DIRECTORY)
func.get_tv_shows()
func.get_tv_episodes()
for event in i.event_gen(yield_nones=False):
(header, type_names, path, filename) = event
file_path = os.path.join(path, filename)
if "IN_CLOSE_WRITE" in type_names or "IN_MOVED_TO" in type_names:
func.get_tv_shows()
func.get_tv_episode(file_path)
with app.app_context():
current_app.logger.info("server start")
#database.initialize_db()
thread = threading.Thread(target=get_comics, args=())
thread.daemon = True
thread.start()
thread2 = threading.Thread(target=get_movies, args=())
thread2.daemon = True
thread2.start()
thread3 = threading.Thread(target=get_tv_shows, args=())
thread3.daemon = True
thread3.start()
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
def update_comic_db(sender, **kw):
try:
database.add_comics(kw["meta"], kw["thumbnails"])
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
def update_movie_db(sender, **kw):
try:
database.add_movies(kw["movies"])
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
def update_tv_show_db(sender, **kw):
try:
database.add_tv_shows(kw["tv_show"])
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
def update_tv_episodes_db(sender, **kw):
try:
database.add_tv_episodes(kw["tv_episodes"])
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
func.comic_loaded.connect(update_comic_db)
func.movie_loaded.connect(update_movie_db)
func.tv_show_loaded.connect(update_tv_show_db)
func.tv_episodes_loaded.connect(update_tv_episodes_db)
@login_manager.user_loader
def load_user(email):
try:
return database.get_user(email)
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
@app.route("/login", methods=["GET", "POST"])
def login():
try:
google_provider_cfg = get_google_provider_cfg()
authorization_endpoint = google_provider_cfg["authorization_endpoint"]
request_uri = client.prepare_request_uri(
authorization_endpoint,
redirect_uri=request.base_url + "/callback",
scope=["openid", "email", "profile"],
)
if request.method == "POST":
email = request.form.get("email")
password = request.form.get("password")
user = database.get_user(email)
if user is None or not user.check_password(password):
flash("invalid email or password")
return redirect(url_for("login"))
login_user(user, remember=True, duration=datetime.timedelta(days=7))
next_page = request.args.get("next")
if not next_page:
next_page = url_for("home")
return redirect(next_page)
return render_template("login.html", title="login", auth_url=request_uri)
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(e)
@app.route("/login/callback")
def callback():
try:
# Get authorization code Google sent back to you
code = request.args.get("code")
google_provider_cfg = get_google_provider_cfg()
token_endpoint = google_provider_cfg["token_endpoint"]
token_url, headers, body = client.prepare_token_request(
token_endpoint,
authorization_response=request.url,
redirect_url=request.base_url,
code=code
)
token_response = requests.post(
token_url,
headers=headers,
data=body,
auth=(GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET)
)
client.parse_request_body_response(json.dumps(token_response.json()))
userinfo_endpoint = google_provider_cfg["userinfo_endpoint"]
uri, headers, body = client.add_token(userinfo_endpoint)
userinfo_response = requests.get(uri, headers=headers, data=body)
if userinfo_response.json().get("email_verified"):
unique_id = userinfo_response.json()["sub"]
users_email = userinfo_response.json()["email"]
users_name = userinfo_response.json()["given_name"]
else:
return "User email not available or not verified by Google.", 400
data = (unique_id, users_name, users_email, None, False)
current_app.logger.info("user data from google: " + str(data))
user = database.get_user(users_email)
if not user:
user = database.add_user(data)
current_app.logger.info("new user: {} created".format(users_email))
current_app.logger.info("email: "+str(user.email))
current_app.logger.info("username: "+str(user.username))
current_app.logger.info("authenticated: "+str(user.is_authenticated))
current_app.logger.info("active: "+str(user.is_active))
current_app.logger.info("id: "+str(user.get_id()))
login_user(user, remember=True, duration=datetime.timedelta(days=7), force=True)
return redirect(url_for("home"))
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(e)
@app.route("/logout")
def logout():
try:
logout_user()
return redirect(url_for("login"))
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(e)
@app.route("/")
def root():
return redirect(url_for("home"))
@app.route("/home")
@login_required
def home():
try:
return render_template("home.html", title="Hello World", current_user=current_user)
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return str(e)
@app.route("/music")
@login_required
def music():
return "No music"
@app.route("/games")
@login_required
def games():
return "No Games"
if __name__ == '__main__':
app.run()