Matthew Welch
1fb4a869b0
Added initial implementation of user data for tv shows and movies as well as OAuth for Google sign in.
280 lines
7.4 KiB
Python
280 lines
7.4 KiB
Python
from flask import Flask
|
|
from flask import render_template, request, g, redirect, url_for, flash, current_app
|
|
from flask_login import LoginManager, current_user, login_user, logout_user, login_required
|
|
|
|
from oauthlib.oauth2 import WebApplicationClient
|
|
|
|
import threading
|
|
import logging
|
|
import inotify.adapters, inotify.constants
|
|
import inspect
|
|
import datetime
|
|
import requests
|
|
import json
|
|
|
|
import scripts.func as func
|
|
from scripts import database
|
|
from admin import admin
|
|
from movies import movies
|
|
from comics import comics
|
|
from tv import tv
|
|
|
|
|
|
class NullHandler(logging.Handler):
|
|
def emit(self, record=None):
|
|
pass
|
|
|
|
def debug(self, *arg):
|
|
pass
|
|
nullLog = NullHandler()
|
|
inotify.adapters._LOGGER = nullLog
|
|
|
|
|
|
GOOGLE_CLIENT_ID = "***REMOVED***"
|
|
GOOGLE_CLIENT_SECRET = "***REMOVED***"
|
|
GOOGLE_DISCOVERY_URL = (
|
|
"https://accounts.google.com/.well-known/openid-configuration"
|
|
)
|
|
client = WebApplicationClient(GOOGLE_CLIENT_ID)
|
|
|
|
|
|
def get_google_provider_cfg():
|
|
return requests.get(GOOGLE_DISCOVERY_URL).json()
|
|
|
|
|
|
app = Flask(__name__)
|
|
app.register_blueprint(comics.Comics)
|
|
app.register_blueprint(admin.Admin)
|
|
app.register_blueprint(movies.Movies)
|
|
app.register_blueprint(tv.TV)
|
|
app.config["SECRET_KEY"] = "***REMOVED***"
|
|
app.logger.setLevel("DEBUG")
|
|
|
|
login_manager = LoginManager(app)
|
|
login_manager.login_view = "login"
|
|
|
|
|
|
MOBILE_DEVICES = ["android", "blackberry", "ipad", "iphone"]
|
|
|
|
|
|
def get_comics():
|
|
with app.app_context():
|
|
func.get_comics()
|
|
database.verify_paths()
|
|
i = inotify.adapters.InotifyTree(func.COMICS_DIRECTORY)
|
|
for event in i.event_gen(yield_nones=False):
|
|
(header, type_names, path, filename) = event
|
|
file_path = path+"/"+filename
|
|
if "IN_CLOSE_WRITE" in type_names:
|
|
func.get_comic(file_path)
|
|
if "IN_DELETE" in type_names:
|
|
database.verify_path(file_path)
|
|
|
|
|
|
def get_movies():
|
|
with app.app_context():
|
|
func.get_movies()
|
|
i = inotify.adapters.InotifyTree(func.MOVIES_DIRECTORY)
|
|
for event in i.event_gen(yield_nones=False):
|
|
(header, type_names, path, filename) = event
|
|
if "IN_CLOSE_WRITE" in type_names:
|
|
func.get_movies()
|
|
|
|
|
|
def get_tv_shows():
|
|
with app.app_context():
|
|
func.get_tv_shows()
|
|
func.get_tv_episodes()
|
|
|
|
|
|
with app.app_context():
|
|
current_app.logger.info("server start")
|
|
database.initialize_db()
|
|
thread = threading.Thread(target=get_comics, args=())
|
|
thread.daemon = True
|
|
thread.start()
|
|
thread2 = threading.Thread(target=get_movies, args=())
|
|
thread2.daemon = True
|
|
thread2.start()
|
|
thread3 = threading.Thread(target=get_tv_shows, args=())
|
|
thread3.daemon = True
|
|
thread3.start()
|
|
|
|
|
|
@app.teardown_appcontext
|
|
def close_connection(exception):
|
|
db = getattr(g, '_database', None)
|
|
if db is not None:
|
|
db.close()
|
|
|
|
|
|
def update_comic_db(sender, **kw):
|
|
try:
|
|
database.add_comics(kw["meta"], kw["thumbnails"])
|
|
except Exception as e:
|
|
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
def update_movie_db(sender, **kw):
|
|
try:
|
|
database.add_movies(kw["movies"])
|
|
except Exception as e:
|
|
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
def update_tv_show_db(sender, **kw):
|
|
try:
|
|
database.add_tv_shows(kw["tv_show"])
|
|
except Exception as e:
|
|
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
def update_tv_episodes_db(sender, **kw):
|
|
try:
|
|
database.add_tv_episodes(kw["tv_episodes"])
|
|
except Exception as e:
|
|
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
func.comic_loaded.connect(update_comic_db)
|
|
func.movie_loaded.connect(update_movie_db)
|
|
func.tv_show_loaded.connect(update_tv_show_db)
|
|
func.tv_episodes_loaded.connect(update_tv_episodes_db)
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(email):
|
|
try:
|
|
return database.get_user(email)
|
|
except Exception as e:
|
|
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
try:
|
|
google_provider_cfg = get_google_provider_cfg()
|
|
authorization_endpoint = google_provider_cfg["authorization_endpoint"]
|
|
|
|
request_uri = client.prepare_request_uri(
|
|
authorization_endpoint,
|
|
redirect_uri=request.base_url + "/callback",
|
|
scope=["openid", "email", "profile"],
|
|
)
|
|
|
|
if request.method == "POST":
|
|
email = request.form.get("email")
|
|
password = request.form.get("password")
|
|
user = database.get_user(email)
|
|
if user is None or not user.check_password(password):
|
|
flash("invalid email or password")
|
|
return redirect(url_for("login"))
|
|
login_user(user, remember=True, duration=datetime.timedelta(days=7))
|
|
next_page = request.args.get("next")
|
|
if not next_page:
|
|
next_page = url_for("home")
|
|
return redirect(next_page)
|
|
return render_template("login.html", title="login", auth_url=request_uri)
|
|
except Exception as e:
|
|
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(e)
|
|
|
|
|
|
@app.route("/login/callback")
|
|
def callback():
|
|
try:
|
|
# Get authorization code Google sent back to you
|
|
code = request.args.get("code")
|
|
|
|
google_provider_cfg = get_google_provider_cfg()
|
|
token_endpoint = google_provider_cfg["token_endpoint"]
|
|
|
|
token_url, headers, body = client.prepare_token_request(
|
|
token_endpoint,
|
|
authorization_response=request.url,
|
|
redirect_url=request.base_url,
|
|
code=code
|
|
)
|
|
token_response = requests.post(
|
|
token_url,
|
|
headers=headers,
|
|
data=body,
|
|
auth=(GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET)
|
|
)
|
|
|
|
client.parse_request_body_response(json.dumps(token_response.json()))
|
|
|
|
userinfo_endpoint = google_provider_cfg["userinfo_endpoint"]
|
|
uri, headers, body = client.add_token(userinfo_endpoint)
|
|
userinfo_response = requests.get(uri, headers=headers, data=body)
|
|
|
|
if userinfo_response.json().get("email_verified"):
|
|
unique_id = userinfo_response.json()["sub"]
|
|
users_email = userinfo_response.json()["email"]
|
|
users_name = userinfo_response.json()["given_name"]
|
|
else:
|
|
return "User email not available or not verified by Google.", 400
|
|
|
|
data = (unique_id, users_name, users_email, None, False)
|
|
|
|
current_app.logger.info("user data from google: " + str(data))
|
|
user = database.get_user(users_email)
|
|
|
|
if not user:
|
|
user = database.add_user(data)
|
|
current_app.logger.info("new user: {} created".format(users_email))
|
|
|
|
current_app.logger.info("email: "+str(user.email))
|
|
current_app.logger.info("username: "+str(user.username))
|
|
current_app.logger.info("authenticated: "+str(user.is_authenticated))
|
|
current_app.logger.info("active: "+str(user.is_active))
|
|
current_app.logger.info("id: "+str(user.get_id()))
|
|
login_user(user, remember=True, duration=datetime.timedelta(days=7), force=True)
|
|
|
|
return redirect(url_for("home"))
|
|
except Exception as e:
|
|
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(e)
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
try:
|
|
logout_user()
|
|
return redirect(url_for("login"))
|
|
except Exception as e:
|
|
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(e)
|
|
|
|
|
|
@app.route("/")
|
|
def root():
|
|
return redirect(url_for("home"))
|
|
|
|
|
|
@app.route("/home")
|
|
@login_required
|
|
def home():
|
|
try:
|
|
return render_template("home.html", title="Hello World", current_user=current_user)
|
|
except Exception as e:
|
|
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
|
|
return str(e)
|
|
|
|
|
|
@app.route("/music")
|
|
@login_required
|
|
def music():
|
|
return "No music"
|
|
|
|
|
|
@app.route("/games")
|
|
@login_required
|
|
def games():
|
|
return "No Games"
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run()
|