Compare commits

..

No commits in common. "d131482dd8ee6fc2067f10c9a98804615ca1a593" and "9da0104e73224c08bd25a624139219aade41ace2" have entirely different histories.

21 changed files with 163 additions and 386 deletions

View File

@ -1,2 +0,0 @@
[flake8]
max_line_length = 150

View File

@ -1,43 +0,0 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: name-tests-test
- id: requirements-txt-fixer
- repo: https://github.com/asottile/setup-cfg-fmt
rev: v2.5.0
hooks:
- id: setup-cfg-fmt
- repo: https://github.com/asottile/reorder_python_imports
rev: v3.12.0
hooks:
- id: reorder-python-imports
args: [--py37-plus, --add-import, 'from __future__ import annotations']
- repo: https://github.com/asottile/add-trailing-comma
rev: v3.1.0
hooks:
- id: add-trailing-comma
- repo: https://github.com/asottile/pyupgrade
rev: v3.15.0
hooks:
- id: pyupgrade
- repo: https://github.com/pre-commit/mirrors-autopep8
rev: v2.0.4
hooks:
- id: autopep8
- repo: https://github.com/PyCQA/flake8
rev: 6.1.0
hooks:
- id: flake8
additional_dependencies:
- flake8-use-pathlib
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.7.0
hooks:
- id: mypy
additional_dependencies:
- types-toml
- types-requests

View File

@ -1,15 +1,6 @@
from __future__ import annotations
import json
from flask import Blueprint
from flask import jsonify
from flask import redirect
from flask import render_template
from flask import request
from flask import url_for
from flask_security import permissions_accepted
from flask import render_template, Blueprint, request, jsonify, redirect, url_for
from flask_security import roles_required, permissions_accepted
from QuizTheWord import database
@ -125,12 +116,12 @@ def query_users():
def parse_question_query(query):
if query:
parsed_query: dict = json.loads(query)
if "multiple_choice" in parsed_query.keys():
parsed_query["multiple_choice"] = True if parsed_query["multiple_choice"] == "true" else False
if "hidden_answer" in parsed_query.keys():
parsed_query["hidden_answer"] = True if parsed_query["hidden_answer"] == "true" else False
return parsed_query
query: dict = json.loads(query)
if "multiple_choice" in query.keys():
query["multiple_choice"] = True if query["multiple_choice"] == "true" else False
if "hidden_answer" in query.keys():
query["hidden_answer"] = True if query["hidden_answer"] == "true" else False
return query
return None
@ -158,15 +149,15 @@ def parse_user_form_data(form):
data = {
"username": form.get("username"),
"email": form.get("email"),
"roles": form.getlist("roles"),
"roles": form.getlist("roles")
}
return data
def parse_user_query(query):
if query:
parsed_query: dict = json.loads(query)
query: dict = json.loads(query)
# if "admin" in query.keys():
# query["admin"] = True if query["admin"] == "true" else False
return parsed_query
return {}
return query
return None

View File

@ -23,12 +23,6 @@
<div id="hidden-answer-form" class="form-group"></div>
</div>
<button id="save" name="save" type="submit" class="btn btn-primary">Save</button>
{% if request.path != '/admin/questions/add' %}
<div class="mt-3">
<a class='btn btn-secondary' href='{{ url_for('admin.create_question') }}'>Add Another Question</a>
</div>
{% endif %}
</form>
{% endblock %}

View File

@ -1,25 +1,9 @@
from __future__ import annotations
import os
from authlib.integrations.flask_client import OAuth
from flask import flash
from flask import Flask
from flask import jsonify
from flask import redirect
from flask import render_template
from flask import request
from flask import session
from flask import url_for
from flask_security import current_user
from flask_security import login_user
from flask_security import logout_user
from flask_security import Security
from flask_security import SQLAlchemySessionUserDatastore
from flask import request, render_template, jsonify, Flask, url_for, redirect, flash, session
from flask_security import Security, SQLAlchemySessionUserDatastore, login_user, logout_user, current_user
from QuizTheWord import database
from QuizTheWord.admin import admin
# from QuizTheWord import config
app = Flask(__name__)
environment_configuration = os.environ.get('CONFIGURATION_SETUP', "QuizTheWord.config.Development")
@ -27,15 +11,6 @@ with app.app_context():
app.config.from_object(environment_configuration)
user_datastore = SQLAlchemySessionUserDatastore(database.get_session(), database.User, database.Role)
security = Security(app, user_datastore, False)
CONF_URL = "https://accounts.google.com/.well-known/openid-configuration"
oauth = OAuth(app)
oauth.register(
name="google",
server_metadata_url=CONF_URL,
client_kwargs={
"scope": "openid email profile",
},
)
app.register_blueprint(admin.Admin)
@ -101,7 +76,7 @@ def check_answer():
def next_hidden_answer():
if "hidden_answer_ids" not in session:
session["hidden_answer_ids"] = []
difficulty = request.args.get("difficulty", 1, int)
difficulty = request.args.get("difficulty", 1)
next_question = database.next_hidden_answer(session["hidden_answer_ids"], difficulty)
if next_question:
session["hidden_answer_ids"].append(next_question.question_id)
@ -116,7 +91,7 @@ def next_hidden_answer():
def next_multiple_choice():
if "multiple_choice_ids" not in session:
session["multiple_choice_ids"] = []
difficulty = request.args.get("difficulty", 1, int)
difficulty = request.args.get("difficulty", 1)
next_question = database.next_multiple_choice(session["multiple_choice_ids"], difficulty)
if next_question:
session["multiple_choice_ids"].append(next_question.question_id)
@ -127,22 +102,8 @@ def next_multiple_choice():
return jsonify(None), 204
def get_auth_url(redirect_uri, **kwargs):
rv = oauth.google.create_authorization_url(redirect_uri, **kwargs)
if oauth.google.request_token_url:
request_token = rv.pop('request_token', None)
oauth.google._save_request_token(request_token)
oauth.google.save_authorize_data(request, redirect_uri=redirect_uri, **rv)
return rv["url"]
@app.route("/login", methods=["GET", "POST"])
def login():
redirect_uri = url_for("auth_google", _external=True)
google_url = get_auth_url(redirect_uri)
next_page = request.args.get("next", default=url_for("index"))
if request.method == "POST":
email = request.form.get("email")
@ -154,27 +115,7 @@ def login():
return redirect(url_for("login"))
login_user(user, remember=remember)
return redirect(next_page)
return render_template("login.html", title="login", google_url=google_url)
@app.route("/login/auth/google")
def auth_google():
token = oauth.google.authorize_access_token()
user = oauth.google.parse_id_token(token)
unique_id = user["sub"]
email = user["email"]
username = user["given_name"]
user = database.get_user(google_id=unique_id)
if user is not None:
login_user(user)
else:
user = database.add_user(email, None, username, google_id=unique_id)
if user is not None:
login_user(user)
else:
flash("That email is already in use. Login with that email or choose a different account.")
return redirect(url_for("login"))
return redirect(url_for("index"))
return render_template("login.html", title="login")
@app.route("/register", methods=["GET", "POST"])
@ -204,7 +145,7 @@ def error_404(e):
@app.errorhandler(500)
def error_500(e):
def error_404(e):
print(e)
msg = "There was an error with the server."
if app.config["DEBUG"]:

View File

@ -1,7 +1,4 @@
from __future__ import annotations
from os import environ
from dotenv import load_dotenv
@ -17,8 +14,6 @@ class Config:
DB_PORT = environ.get("DB_PORT")
DB_NAME = environ.get("DB_NAME")
SECURITY_REGISTERABLE = True
GOOGLE_CLIENT_ID = environ.get("GOOGLE_CLIENT_ID")
GOOGLE_CLIENT_SECRET = environ.get("GOOGLE_CLIENT_SECRET")
class Production(Config):
@ -26,7 +21,7 @@ class Production(Config):
DEBUG = False
DB_SOCKET_DIR = environ.get("DB_SOCKET_DIR", "/cloudsql")
CLOUD_SQL_CONNECTION_NAME = environ.get("CLOUD_SQL_CONNECTION_NAME")
DB_URL = f"postgresql+psycopg://{Config.DB_USER}:{Config.DB_PASSWORD}@{Config.DB_HOST}:{Config.DB_PORT}/{Config.DB_NAME}?host={DB_SOCKET_DIR}/{CLOUD_SQL_CONNECTION_NAME}" # noqa: E501
DB_URL = f"postgresql+psycopg2://{Config.DB_USER}:{Config.DB_PASSWORD}@{Config.DB_HOST}:{Config.DB_PORT}/{Config.DB_NAME}?host={DB_SOCKET_DIR}/{CLOUD_SQL_CONNECTION_NAME}"
class Development(Config):
@ -37,8 +32,8 @@ class Development(Config):
DB_PASSWORD = "hello"
DB_HOST = "localhost"
DB_NAME = "postgres"
DB_PORT = "5432"
DB_URL = f"postgresql+psycopg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
DB_PORT = 5432
DB_URL = f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
class Testing(Config):

View File

@ -1,57 +1,34 @@
from __future__ import annotations
import os
from flask import current_app, g
from flask_security import UserMixin, RoleMixin, SQLAlchemySessionUserDatastore, verify_password, hash_password
import sqlalchemy
from typing import Union, Optional, Literal, Type, List, Tuple
import random
from datetime import datetime
from decimal import Decimal
from typing import Any
from typing import Literal
from typing import Sequence
from flask import current_app
from flask import g
from flask_security import hash_password
from flask_security import RoleMixin
from flask_security import SQLAlchemySessionUserDatastore
from flask_security import UserMixin
from flask_security import verify_password
from sqlalchemy import and_
from sqlalchemy import CheckConstraint
from sqlalchemy import create_engine
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import JSON
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Column, JSON, String, Integer, create_engine, ForeignKey, func, Boolean, UnicodeText, DateTime
from sqlalchemy.sql.expression import and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import sessionmaker, relationship, scoped_session
from werkzeug.utils import import_string
config = import_string(os.environ.get("CONFIGURATION_SETUP"))
engine = create_engine(config.DB_URL, pool_size=20)
Base = declarative_base()
class Base(DeclarativeBase):
pass
def get_scoped_session() -> scoped_session:
def get_scoped_session():
session_factory = sessionmaker(bind=engine)
return scoped_session(session_factory)
def get_session() -> scoped_session:
def get_session() -> sqlalchemy.orm.session.Session:
if not hasattr(g, "session"):
session = get_scoped_session()
# This is for compatibility with Flask-Security-Too which assumes usage of Flask-Sqlalchemy
Base.query = session.query_property()
g.session = session()
Session = get_scoped_session()
Base.query = Session.query_property() # This is for compatibility with Flask-Security-Too which assumes usage of Flask-Sqlalchemy
# Session.query_property()
g.session = Session()
return g.session
@ -71,23 +48,17 @@ def get_user_datastore() -> SQLAlchemySessionUserDatastore:
class User(Base, UserMixin):
__tablename__ = "users"
user_id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(unique=True)
username: Mapped[str] = mapped_column(unique=True)
password: Mapped[str | None]
active: Mapped[bool]
last_login_at: Mapped[datetime]
current_login_at: Mapped[datetime]
last_login_ip: Mapped[str] = mapped_column(String(100))
current_login_ip: Mapped[str] = mapped_column(String(100))
login_count: Mapped[int]
fs_uniquifier: Mapped[str] = mapped_column(unique=True)
google_id: Mapped[Decimal | None]
__table_args__ = (
CheckConstraint("(password IS NOT NULL) OR (google_id IS NOT NULL)", "password_google_id_null_check"),
)
user_id = Column(Integer, primary_key=True)
email = Column(String, unique=True, nullable=False)
username = Column(String, unique=True)
password = Column(String, nullable=False)
active = Column(Boolean, nullable=False)
last_login_at = Column(DateTime())
current_login_at = Column(DateTime())
last_login_ip = Column(String(100))
current_login_ip = Column(String(100))
login_count = Column(Integer)
fs_uniquifier = Column(String, unique=True, nullable=False)
roles = relationship("Role", secondary="users_roles")
def check_password(self, password):
@ -110,26 +81,26 @@ class User(Base, UserMixin):
class Role(Base, RoleMixin):
__tablename__ = "roles"
role_id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(unique=True)
description: Mapped[str]
permissions: Mapped[str]
role_id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
description = Column(String)
permissions = Column(UnicodeText)
class UsersRoles(Base):
__tablename__ = "users_roles"
user_role_id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.user_id"))
role_id: Mapped[int] = mapped_column(ForeignKey("roles.role_id"))
user_role_id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.user_id"))
role_id = Column(Integer, ForeignKey("roles.role_id"))
class AllQuestions(Base):
__tablename__ = "all_questions"
question_id: Mapped[int] = mapped_column(primary_key=True)
question_text: Mapped[str]
answer: Mapped[str]
addresses: Mapped[str]
question_id = Column(Integer, primary_key=True, nullable=False)
question_text = Column(String)
answer = Column(String)
addresses = Column(String)
multiple_choice = relationship("MultipleChoice", uselist=False, back_populates="all_question_relationship")
hidden_answer = relationship("HiddenAnswer", uselist=False, back_populates="all_question_relationship")
@ -160,9 +131,9 @@ class AllQuestions(Base):
class HiddenAnswer(Base):
__tablename__ = "category_hidden_answer"
question_id: Mapped[int] = mapped_column(ForeignKey("all_questions.question_id"), primary_key=True)
hidden_answer_difficulty: Mapped[int]
hidden_answer_hint: Mapped[dict | list | None] = mapped_column(JSON)
question_id = Column(Integer, ForeignKey("all_questions.question_id"), primary_key=True)
hidden_answer_difficulty = Column(Integer)
hidden_answer_hint = Column(JSON)
all_question_relationship = relationship("AllQuestions", lazy="joined", back_populates="hidden_answer")
question_text = association_proxy("all_question_relationship", "question_text")
@ -192,10 +163,10 @@ class HiddenAnswer(Base):
class MultipleChoice(Base):
__tablename__ = "category_multiple_choice"
question_id: Mapped[int] = mapped_column(ForeignKey("all_questions.question_id"), primary_key=True)
multiple_choice_difficulty: Mapped[int]
multiple_choice_hint: Mapped[dict | list | None] = mapped_column(JSON)
wrong_answers: Mapped[list] = mapped_column(JSON)
question_id = Column(Integer, ForeignKey("all_questions.question_id"), primary_key=True)
multiple_choice_difficulty = Column(Integer)
multiple_choice_hint = Column(JSON)
wrong_answers = Column(JSON)
all_question_relationship = relationship("AllQuestions", lazy="joined", back_populates="multiple_choice")
question_text = association_proxy("all_question_relationship", "question_text")
@ -211,7 +182,7 @@ class MultipleChoice(Base):
self.answer_list = None
def randomize_answer_list(self):
answer_list: list[str] = [*self.wrong_answers, self.answer]
answer_list: List[str] = [*self.wrong_answers, self.answer]
random.shuffle(answer_list)
self.answer_list = answer_list
@ -243,8 +214,7 @@ class MultipleChoice(Base):
def add_multiple_choice_question(question, answer, addresses, difficulty, hint, wrong_answers):
session = get_session()
select_stmt = select(AllQuestions).order_by(AllQuestions.question_id.desc())
question_id = session.scalars(select_stmt).first().question_id + 1
question_id = session.query(AllQuestions).order_by(AllQuestions.question_id.desc()).first().question_id + 1
print(question_id)
base_question = AllQuestions(question_id, question, answer, addresses)
multiple_choice_question = MultipleChoice(question_id, difficulty, hint, wrong_answers, base_question)
@ -255,8 +225,7 @@ def add_multiple_choice_question(question, answer, addresses, difficulty, hint,
def add_question(data) -> AllQuestions:
session = get_session()
select_stmt = select(AllQuestions).order_by(AllQuestions.question_id.desc())
question_id = session.scalars(select_stmt).first().question_id + 1
question_id = session.query(AllQuestions).order_by(AllQuestions.question_id.desc()).first().question_id + 1
question = AllQuestions(question_id)
session.add(question)
if data["create_multiple_choice"]:
@ -278,104 +247,88 @@ def add_question(data) -> AllQuestions:
return question
def get_all_questions() -> Sequence[AllQuestions]:
def get_all_questions() -> List[AllQuestions]:
session = get_session()
return session.scalars(select(AllQuestions)).all()
return session.query(AllQuestions).all()
def get_all_hidden_answer() -> Sequence[HiddenAnswer]:
def get_all_hidden_answer() -> List[HiddenAnswer]:
session = get_session()
return session.scalars(select(HiddenAnswer)).all()
return session.query(HiddenAnswer).all()
def get_all_multiple_choice() -> Sequence[MultipleChoice]:
def get_all_multiple_choice() -> List[MultipleChoice]:
session = get_session()
return session.scalars(select(MultipleChoice)).all()
return session.query(MultipleChoice).all()
def get_category_count(category: type[MultipleChoice] | type[HiddenAnswer] | type[AllQuestions]) -> int:
def get_category_count(category: Union[Type[MultipleChoice], Type[HiddenAnswer], Type[AllQuestions]]) -> int:
session = get_session()
return session.scalar(select(func.count(category)))
return session.query(category).count()
def get_question[T: (MultipleChoice, HiddenAnswer, AllQuestions)](category: type[T], question_id: int) -> T | None:
def get_question(category: Union[Type[MultipleChoice], Type[HiddenAnswer], Type[AllQuestions]], question_id: int) -> \
Optional[Union[MultipleChoice, HiddenAnswer, AllQuestions]]:
session = get_session()
select_stmt = select(category).where(category.question_id == question_id)
return session.scalars(select_stmt).one_or_none()
return session.query(category).filter(category.question_id == question_id).one_or_none()
def get_random_question_of_difficulty(category: type[MultipleChoice] | type[HiddenAnswer], difficulty: Literal[1, 2, 3]):
def get_random_question_of_difficulty(category: Union[Type[MultipleChoice], Type[HiddenAnswer]],
difficulty: Literal[1, 2, 3]):
session = get_session()
select_stmt = select(category).where(category.hidden_answer_difficulty == difficulty).order_by(func.random())
return session.scalars(select_stmt).first()
return session.query(category).filter(category.hidden_answer_difficulty == difficulty).order_by(func.random()).first()
def get_random_hidden_answer(difficulty: Literal[1, 2, 3] | None = None) -> HiddenAnswer:
def get_random_hidden_answer(difficulty: Optional[Literal[1, 2, 3]] = None) -> HiddenAnswer:
session = get_session()
select_stmt = select(HiddenAnswer).order_by(func.random())
if difficulty is not None:
select_stmt = select_stmt.where(HiddenAnswer.hidden_answer_difficulty == difficulty)
return session.scalars(select_stmt).first()
return session.query(HiddenAnswer).filter(HiddenAnswer.hidden_answer_difficulty == difficulty).order_by(func.random()).first()
return session.query(HiddenAnswer).order_by(func.random()).first()
def get_random_multiple_choice(difficulty: Literal[1, 2, 3] | None = None) -> MultipleChoice:
def get_random_multiple_choice(difficulty: Optional[Literal[1, 2, 3]] = None) -> MultipleChoice:
session = get_session()
select_stmt = select(MultipleChoice).order_by(func.random())
if difficulty is not None:
select_stmt = select_stmt.where(MultipleChoice.multiple_choice_difficulty == difficulty)
return session.scalars(select_stmt).first()
return session.query(MultipleChoice).filter(MultipleChoice.multiple_choice_difficulty == difficulty).order_by(
func.random()).first()
return session.query(MultipleChoice).order_by(func.random()).first()
def check_answer(question_id: int, guess: str) -> tuple[bool, str] | None:
def check_answer(question_id: int, guess: str) -> Tuple[bool, str]:
session = get_session()
select_stmt = select(AllQuestions).where(AllQuestions.question_id == question_id)
question: AllQuestions = session.scalars(select_stmt).one_or_none()
question: AllQuestions = session.query(AllQuestions).filter(AllQuestions.question_id == question_id).one_or_none()
if question:
answer = question.answer
return answer == guess, answer
return None
def query_all_questions(offset, limit, query: dict[str, Any], sort=None, order=None) -> tuple[Sequence[AllQuestions], int]:
def query_all_questions(offset, limit, query: dict = None, sort=None, order=None) -> Tuple[List[AllQuestions], int]:
session = get_session()
select_stmt = select(AllQuestions)
query_params = []
if query is not None:
for key in query.keys():
if key == "multiple_choice" or key == "hidden_answer":
if query[key]:
select_stmt = select_stmt.where(getattr(AllQuestions, key).is_(None))
query_params.append(getattr(AllQuestions, key) != None)
else:
select_stmt = select_stmt.where(getattr(AllQuestions, key).is_(None))
query_params.append(getattr(AllQuestions, key) == None)
else:
select_stmt = select_stmt.where(getattr(AllQuestions, key).ilike("%" + query[key] + "%"))
query_params.append(getattr(AllQuestions, key).ilike("%" + query[key] + "%"))
order_by = None
if sort and order:
select_stmt = select_stmt.order_by(getattr(getattr(AllQuestions, sort), order)())
questions = session.scalars(select_stmt.offset(offset).limit(limit)).all()
count = session.scalar(func.count(select_stmt))
order_by = getattr(getattr(AllQuestions, sort), order)()
q = session.query(AllQuestions).filter(*query_params).order_by(order_by)
questions = q.offset(offset).limit(limit).all()
count = q.count()
return questions, count
def get_unhealthy_questions() -> list[AllQuestions]:
def get_unhealthy_questions() -> List[AllQuestions]:
session = get_session()
questions = []
missing_category_questions = (
session.scalars(
select(AllQuestions)
.where(and_(AllQuestions.hidden_answer.is_(None), AllQuestions.multiple_choice.is_(None))),
).all()
)
multiple_choice_difficulty = (
session.scalars(
select(AllQuestions)
.where(AllQuestions.multiple_choice.has(MultipleChoice.multiple_choice_difficulty.is_(None))),
).all()
)
hidden_answer_difficulty = (
session.scalars(
select(AllQuestions)
.where(AllQuestions.hidden_answer.has(HiddenAnswer.hidden_answer_difficulty.is_(None))),
).all()
)
missing_category_questions = session.query(AllQuestions).filter(and_(AllQuestions.hidden_answer == None, AllQuestions.multiple_choice == None)).all()
multiple_choice_difficulty = session.query(AllQuestions).filter(AllQuestions.multiple_choice.has(MultipleChoice.multiple_choice_difficulty == None)).all()
hidden_answer_difficulty = session.query(AllQuestions).filter(AllQuestions.hidden_answer.has(HiddenAnswer.hidden_answer_difficulty == None)).all()
questions.extend(missing_category_questions)
questions.extend(multiple_choice_difficulty)
questions.extend(hidden_answer_difficulty)
@ -384,8 +337,7 @@ def get_unhealthy_questions() -> list[AllQuestions]:
def update_question(question_id, data):
session = get_session()
session.get(AllQuestions, question_id)
question = session.get(AllQuestions, question_id)
question: AllQuestions = session.query(AllQuestions).get(question_id)
if data["create_multiple_choice"]:
multiple_choice = MultipleChoice(question_id)
session.add(multiple_choice)
@ -404,16 +356,13 @@ def update_question(question_id, data):
session.commit()
def add_user(email, password, username=None, **kwargs):
def add_user(email, password):
session = get_session()
user_datastore = get_user_datastore()
if user_datastore.find_user(email=email) is None:
password_hash = hash_password(password) if password is not None else None
if username is None:
username = email
user = user_datastore.create_user(email=email, password=password_hash, username=username, **kwargs)
user = user_datastore.create_user(email=email, password=hash_password(password), username=email)
session.add(user)
role = session.scalar(select(Role).where(Role.name == "basic"))
role = session.query(Role).filter(Role.name == "basic").one_or_none()
if role is not None:
user_datastore.add_role_to_user(user, role)
# user.add_role(role)
@ -426,33 +375,23 @@ def add_user(email, password, username=None, **kwargs):
return None
def get_user(user_id=None, google_id=None) -> User:
def get_user(user_id) -> User:
session = get_session()
if user_id is not None:
user = session.scalars(
select(User).where(User.user_id == user_id),
).one_or_none()
else:
user = session.scalars(
select(User).where(User.google_id == google_id),
).one_or_none()
return user
return session.query(User).filter(User.user_id == user_id).one_or_none()
def update_user(user_id, data):
session = get_session()
user_datastore = get_user_datastore()
select_stmt = select(User).where(User.user_id == user_id)
user = session.scalars(select_stmt).one_or_none()
user = session.query(User).filter(User.user_id == user_id).one_or_none()
for role in user.roles:
if role.name not in data["roles"]:
# user.remove_role(role)
user_datastore.remove_role_from_user(user, role)
for role_name in data["roles"]:
select_stmt = select(Role).where(Role.name == role_name)
role = session.scalars(select_stmt).one_or_none()
role = session.query(Role).filter(Role.name == role_name).one_or_none()
# if role is not None:
# user.add_role(role)
# user.add_role(role)
user_datastore.add_role_to_user(user, role)
for column in data.keys():
if column in User.__table__.columns:
@ -460,46 +399,38 @@ def update_user(user_id, data):
session.commit()
def query_users(offset, limit, query: dict[str, Any], sort=None, order=None) -> tuple[Sequence[User], int]:
def query_users(offset, limit, query: dict = None, sort=None, order=None) -> Tuple[List[User], int]:
session = get_session()
select_stmt = select(User)
query_params = []
if query is not None:
for key in query.keys():
if hasattr(User, key):
select_stmt = select_stmt.where(getattr(User, key).ilike("%" + query[key] + "%"))
query_params.append(getattr(User, key).ilike("%" + query[key] + "%"))
order_by = None
if sort and order:
select_stmt = select_stmt.order_by(getattr(getattr(User, sort), order)())
users = session.scalars(select_stmt.offset(offset).limit(limit)).all()
count = session.scalar(func.count(select_stmt))
order_by = getattr(getattr(User, sort), order)()
q = session.query(User).filter(*query_params).order_by(order_by)
users = q.offset(offset).limit(limit).all()
count = q.count()
return users, count
def get_all_roles() -> Sequence[Role]:
def get_all_roles() -> List[Role]:
session = get_session()
return session.scalars(select(Role)).all()
return session.query(Role).all()
def next_hidden_answer(previous_questions: list[int], difficulty) -> HiddenAnswer:
def next_hidden_answer(previous_questions: List[int], difficulty) -> HiddenAnswer:
session = get_session()
select_stmt = (
select(HiddenAnswer)
.where(
HiddenAnswer.hidden_answer_difficulty == difficulty,
HiddenAnswer.question_id.notin_(previous_questions),
)
.order_by(func.random())
)
return session.scalars(select_stmt).first()
return session.query(HiddenAnswer)\
.filter(HiddenAnswer.hidden_answer_difficulty == difficulty,
HiddenAnswer.question_id.notin_(previous_questions))\
.order_by(func.random()).first()
def next_multiple_choice(previous_questions: list[int], difficulty) -> MultipleChoice:
def next_multiple_choice(previous_questions: List[int], difficulty) -> MultipleChoice:
session = get_session()
select_stmt = (
select(MultipleChoice)
.where(
MultipleChoice.multiple_choice_difficulty == difficulty,
MultipleChoice.question_id.notin_(previous_questions),
)
.order_by(func.random())
)
return session.scalars(select_stmt).first()
return session.query(MultipleChoice)\
.filter(MultipleChoice.multiple_choice_difficulty == difficulty,
MultipleChoice.question_id.notin_(previous_questions))\
.order_by(func.random()).first()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.9 KiB

View File

@ -29,11 +29,6 @@
</li>
</ul>
<ul class="navbar-nav ms-auto">
{% if has_admin_access %}
<li class="nav-item me-1">
<a class="nav-link" href="{{ url_for('admin.index') }}">Admin</a>
</li>
{% endif %}
{% if not user_authenticated %}
<li class="nav-item">
<a class="btn btn-primary" href="{{ url_for('login') }}">Login</a>

View File

@ -19,11 +19,6 @@
</div>
<button id="submit" name="submit" type="submit" class="btn btn-primary">Submit</button>
</form>
<div class="mt-3">
<a href="{{ google_url }}">
<img src="/static/images/sign_in_with_google.png" alt="Sign in with Google">
</a>
</div>
<div class="mt-3">
Don't have an account? <a href="{{ url_for('register') }}">Register</a>
</div>

View File

@ -1,5 +0,0 @@
[tool.autopep8]
max_line_length = 150
[tool.mypy]
check_untyped_defs = true

View File

@ -1,13 +1,16 @@
Authlib~=1.2.1
bcrypt~=4.0.1
Flask~=3.0.0
Flask-Security-Too~=5.3.2
gunicorn~=21.2.0
click~=7.1.2
Flask~=1.1.2
itsdangerous~=1.1.0
Jinja2~=2.11.3
MarkupSafe~=1.1.1
pg8000~=1.17.0
psycopg2~=2.8.6
python-dotenv~=0.15.0
scramp~=1.2.0
SQLAlchemy~=1.3.23
Werkzeug~=1.0.1
Flask-Security-Too~=4.0.0
pytest~=6.2.2
gunicorn~=20.1.0
html5validate~=0.0.2
psycopg[binary]~=3.1.12
pytest~=7.4.3
python-dotenv~=1.0.0
requests~=2.31.0
setuptools~=68.2.2
SQLAlchemy~=2.0.23
Werkzeug~=3.0.1
bcrypt

View File

@ -1,5 +1,3 @@
from __future__ import annotations
import pathlib
@ -12,8 +10,7 @@ def parse_multiple_choice_questions(question_file: pathlib.Path):
correct_answer_index = int(question_data[0])-1
question = question_data[1]
answer_list = question_data[2].split("//")
bible_verse = question_data[3] if question_data[3].lower(
) != "none" else None
bible_verse = question_data[3] if question_data[3].lower() != "none" else None
correct_answer = answer_list.pop(correct_answer_index)
questions.append({
"question": question,

View File

@ -1,19 +1,7 @@
from __future__ import annotations
import os
from flask_security import hash_password, SQLAlchemySessionUserDatastore
from QuizTheWord.database import User, Role, AllQuestions, MultipleChoice, HiddenAnswer, get_session, init_db, destroy_db
import pytest
from flask_security import hash_password
from flask_security import SQLAlchemySessionUserDatastore
from QuizTheWord.database import AllQuestions
from QuizTheWord.database import destroy_db
from QuizTheWord.database import get_session
from QuizTheWord.database import HiddenAnswer
from QuizTheWord.database import init_db
from QuizTheWord.database import MultipleChoice
from QuizTheWord.database import Role
from QuizTheWord.database import User
@pytest.fixture(scope="module")

View File

@ -1,9 +1,6 @@
from __future__ import annotations
import json
from flask.testing import FlaskClient
from html5validate import validate
from flask.testing import FlaskClient
def test_admin_home_with_basic_user(client: FlaskClient, login_basic_user):