quiz-the-word/QuizTheWord/database.py
2023-11-27 19:25:33 -08:00

506 lines
18 KiB
Python

from __future__ import annotations
import os
import random
from datetime import datetime
from decimal import Decimal
from typing import Any
from typing import Literal
from typing import Sequence
from flask import current_app
from flask import g
from flask_security import hash_password
from flask_security import RoleMixin
from flask_security import SQLAlchemySessionUserDatastore
from flask_security import UserMixin
from flask_security import verify_password
from sqlalchemy import and_
from sqlalchemy import CheckConstraint
from sqlalchemy import create_engine
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import JSON
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
from werkzeug.utils import import_string
config = import_string(os.environ.get("CONFIGURATION_SETUP"))
engine = create_engine(config.DB_URL, pool_size=20)
class Base(DeclarativeBase):
pass
def get_scoped_session() -> scoped_session:
session_factory = sessionmaker(bind=engine)
return scoped_session(session_factory)
def get_session() -> scoped_session:
if not hasattr(g, "session"):
session = get_scoped_session()
# This is for compatibility with Flask-Security-Too which assumes usage of Flask-Sqlalchemy
Base.query = session.query_property()
g.session = session()
return g.session
def init_db():
engine = create_engine(current_app.config["DB_URL"])
Base.metadata.create_all(engine)
def destroy_db():
engine = create_engine(current_app.config["DB_URL"])
Base.metadata.drop_all(engine)
def get_user_datastore() -> SQLAlchemySessionUserDatastore:
return SQLAlchemySessionUserDatastore(get_session(), User, Role)
class User(Base, UserMixin):
__tablename__ = "users"
user_id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(unique=True)
username: Mapped[str] = mapped_column(unique=True)
password: Mapped[str | None]
active: Mapped[bool]
last_login_at: Mapped[datetime]
current_login_at: Mapped[datetime]
last_login_ip: Mapped[str] = mapped_column(String(100))
current_login_ip: Mapped[str] = mapped_column(String(100))
login_count: Mapped[int]
fs_uniquifier: Mapped[str] = mapped_column(unique=True)
google_id: Mapped[Decimal | None]
__table_args__ = (
CheckConstraint("(password IS NOT NULL) OR (google_id IS NOT NULL)", "password_google_id_null_check"),
)
roles = relationship("Role", secondary="users_roles")
def check_password(self, password):
return verify_password(password, self.password)
def add_role(self, role):
self.roles.append(role)
def remove_role(self, role):
self.roles.remove(role)
def get_dict(self):
result = {
"user_id": self.user_id,
"email": self.email,
"username": self.username,
}
return result
class Role(Base, RoleMixin):
__tablename__ = "roles"
role_id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(unique=True)
description: Mapped[str]
permissions: Mapped[str]
class UsersRoles(Base):
__tablename__ = "users_roles"
user_role_id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.user_id"))
role_id: Mapped[int] = mapped_column(ForeignKey("roles.role_id"))
class AllQuestions(Base):
__tablename__ = "all_questions"
question_id: Mapped[int] = mapped_column(primary_key=True)
question_text: Mapped[str]
answer: Mapped[str]
addresses: Mapped[str]
multiple_choice = relationship("MultipleChoice", uselist=False, back_populates="all_question_relationship")
hidden_answer = relationship("HiddenAnswer", uselist=False, back_populates="all_question_relationship")
def __init__(self, question_id=None, question=None, answer=None, addresses=None):
self.question_id = question_id
self.question_text = question
self.answer = answer
self.addresses = addresses
def __repr__(self):
return f"<Question: {self.question_id}>"
def get_dict(self):
result = {
"question_id": self.question_id,
"question": self.question_text,
"answer": self.answer,
"addresses": self.addresses,
}
if self.hidden_answer:
result["hidden_answer"] = self.hidden_answer.get_dict()
if self.multiple_choice:
result["multiple_choice"] = self.multiple_choice.get_dict()
return result
class HiddenAnswer(Base):
__tablename__ = "category_hidden_answer"
question_id: Mapped[int] = mapped_column(ForeignKey("all_questions.question_id"), primary_key=True)
hidden_answer_difficulty: Mapped[int]
hidden_answer_hint: Mapped[dict | list | None] = mapped_column(JSON)
all_question_relationship = relationship("AllQuestions", lazy="joined", back_populates="hidden_answer")
question_text = association_proxy("all_question_relationship", "question_text")
answer = association_proxy("all_question_relationship", "answer")
addresses = association_proxy("all_question_relationship", "addresses")
def __init__(self, question_id, difficulty=None, hint=None, base_question=None):
self.question_id = question_id
self.hidden_answer_difficulty = difficulty
self.hidden_answer_hint = hint
self.all_question_relationship = base_question
def __repr__(self):
return f"<Question Hidden Answer: {self.question_id}>"
def get_dict(self):
return {
"question_id": self.question_id,
"difficulty": self.hidden_answer_difficulty,
"hint": self.hidden_answer_hint,
"question_text": self.question_text,
"answer": self.answer,
"addresses": self.addresses,
}
class MultipleChoice(Base):
__tablename__ = "category_multiple_choice"
question_id: Mapped[int] = mapped_column(ForeignKey("all_questions.question_id"), primary_key=True)
multiple_choice_difficulty: Mapped[int]
multiple_choice_hint: Mapped[dict | list | None] = mapped_column(JSON)
wrong_answers: Mapped[list] = mapped_column(JSON)
all_question_relationship = relationship("AllQuestions", lazy="joined", back_populates="multiple_choice")
question_text = association_proxy("all_question_relationship", "question_text")
answer = association_proxy("all_question_relationship", "answer")
addresses = association_proxy("all_question_relationship", "addresses")
def __init__(self, question_id, difficulty=None, hint=None, wrong_answers=None, base_question=None):
self.question_id = question_id
self.multiple_choice_difficulty = difficulty
self.multiple_choice_hint = hint
self.wrong_answers = wrong_answers
self.all_question_relationship = base_question
self.answer_list = None
def randomize_answer_list(self):
answer_list: list[str] = [*self.wrong_answers, self.answer]
random.shuffle(answer_list)
self.answer_list = answer_list
def __repr__(self):
return f"<Question Multiple Choice: {self.question_id}>"
def get_dict(self):
return {
"question_id": self.question_id,
"difficulty": self.multiple_choice_difficulty,
"hint": self.multiple_choice_hint,
"wrong_answers": self.wrong_answers,
"question_text": self.question_text,
"answer": self.answer,
"addresses": self.addresses,
}
def get_dict_shuffled_choices(self):
self.randomize_answer_list()
return {
"question_id": self.question_id,
"difficulty": self.multiple_choice_difficulty,
"hint": self.multiple_choice_hint,
"question_text": self.question_text,
"addresses": self.addresses,
"answer_list": self.answer_list,
}
def add_multiple_choice_question(question, answer, addresses, difficulty, hint, wrong_answers):
session = get_session()
select_stmt = select(AllQuestions).order_by(AllQuestions.question_id.desc())
question_id = session.scalars(select_stmt).first().question_id + 1
print(question_id)
base_question = AllQuestions(question_id, question, answer, addresses)
multiple_choice_question = MultipleChoice(question_id, difficulty, hint, wrong_answers, base_question)
session.add(base_question)
session.add(multiple_choice_question)
session.commit()
def add_question(data) -> AllQuestions:
session = get_session()
select_stmt = select(AllQuestions).order_by(AllQuestions.question_id.desc())
question_id = session.scalars(select_stmt).first().question_id + 1
question = AllQuestions(question_id)
session.add(question)
if data["create_multiple_choice"]:
multiple_choice = MultipleChoice(question.question_id)
session.add(multiple_choice)
question.multiple_choice = multiple_choice
if data["create_hidden_answer"]:
hidden_answer = HiddenAnswer(question.question_id)
session.add(hidden_answer)
question.hidden_answer = hidden_answer
for column in data.keys():
if column in AllQuestions.__table__.columns:
setattr(question, column, data[column])
if column in MultipleChoice.__table__.columns:
setattr(question.multiple_choice, column, data[column])
if column in HiddenAnswer.__table__.columns:
setattr(question.hidden_answer, column, data[column])
session.commit()
return question
def get_all_questions() -> Sequence[AllQuestions]:
session = get_session()
return session.scalars(select(AllQuestions)).all()
def get_all_hidden_answer() -> Sequence[HiddenAnswer]:
session = get_session()
return session.scalars(select(HiddenAnswer)).all()
def get_all_multiple_choice() -> Sequence[MultipleChoice]:
session = get_session()
return session.scalars(select(MultipleChoice)).all()
def get_category_count(category: type[MultipleChoice] | type[HiddenAnswer] | type[AllQuestions]) -> int:
session = get_session()
return session.scalar(select(func.count(category)))
def get_question[T: (MultipleChoice, HiddenAnswer, AllQuestions)](category: type[T], question_id: int) -> T | None:
session = get_session()
select_stmt = select(category).where(category.question_id == question_id)
return session.scalars(select_stmt).one_or_none()
def get_random_question_of_difficulty(category: type[MultipleChoice] | type[HiddenAnswer], difficulty: Literal[1, 2, 3]):
session = get_session()
select_stmt = select(category).where(category.hidden_answer_difficulty == difficulty).order_by(func.random())
return session.scalars(select_stmt).first()
def get_random_hidden_answer(difficulty: Literal[1, 2, 3] | None = None) -> HiddenAnswer:
session = get_session()
select_stmt = select(HiddenAnswer).order_by(func.random())
if difficulty is not None:
select_stmt = select_stmt.where(HiddenAnswer.hidden_answer_difficulty == difficulty)
return session.scalars(select_stmt).first()
def get_random_multiple_choice(difficulty: Literal[1, 2, 3] | None = None) -> MultipleChoice:
session = get_session()
select_stmt = select(MultipleChoice).order_by(func.random())
if difficulty is not None:
select_stmt = select_stmt.where(MultipleChoice.multiple_choice_difficulty == difficulty)
return session.scalars(select_stmt).first()
def check_answer(question_id: int, guess: str) -> tuple[bool, str] | None:
session = get_session()
select_stmt = select(AllQuestions).where(AllQuestions.question_id == question_id)
question: AllQuestions = session.scalars(select_stmt).one_or_none()
if question:
answer = question.answer
return answer == guess, answer
return None
def query_all_questions(offset, limit, query: dict[str, Any], sort=None, order=None) -> tuple[Sequence[AllQuestions], int]:
session = get_session()
select_stmt = select(AllQuestions)
if query is not None:
for key in query.keys():
if key == "multiple_choice" or key == "hidden_answer":
if query[key]:
select_stmt = select_stmt.where(getattr(AllQuestions, key).is_(None))
else:
select_stmt = select_stmt.where(getattr(AllQuestions, key).is_(None))
else:
select_stmt = select_stmt.where(getattr(AllQuestions, key).ilike("%" + query[key] + "%"))
if sort and order:
select_stmt = select_stmt.order_by(getattr(getattr(AllQuestions, sort), order)())
questions = session.scalars(select_stmt.offset(offset).limit(limit)).all()
count = session.scalar(func.count(select_stmt))
return questions, count
def get_unhealthy_questions() -> list[AllQuestions]:
session = get_session()
questions = []
missing_category_questions = (
session.scalars(
select(AllQuestions)
.where(and_(AllQuestions.hidden_answer.is_(None), AllQuestions.multiple_choice.is_(None))),
).all()
)
multiple_choice_difficulty = (
session.scalars(
select(AllQuestions)
.where(AllQuestions.multiple_choice.has(MultipleChoice.multiple_choice_difficulty.is_(None))),
).all()
)
hidden_answer_difficulty = (
session.scalars(
select(AllQuestions)
.where(AllQuestions.hidden_answer.has(HiddenAnswer.hidden_answer_difficulty.is_(None))),
).all()
)
questions.extend(missing_category_questions)
questions.extend(multiple_choice_difficulty)
questions.extend(hidden_answer_difficulty)
return questions
def update_question(question_id, data):
session = get_session()
session.get(AllQuestions, question_id)
question = session.get(AllQuestions, question_id)
if data["create_multiple_choice"]:
multiple_choice = MultipleChoice(question_id)
session.add(multiple_choice)
question.multiple_choice = multiple_choice
if data["create_hidden_answer"]:
hidden_answer = HiddenAnswer(question_id)
session.add(hidden_answer)
question.hidden_answer = hidden_answer
for column in data.keys():
if column in AllQuestions.__table__.columns:
setattr(question, column, data[column])
if column in MultipleChoice.__table__.columns:
setattr(question.multiple_choice, column, data[column])
if column in HiddenAnswer.__table__.columns:
setattr(question.hidden_answer, column, data[column])
session.commit()
def add_user(email, password, username=None, **kwargs):
session = get_session()
user_datastore = get_user_datastore()
if user_datastore.find_user(email=email) is None:
password_hash = hash_password(password) if password is not None else None
if username is None:
username = email
user = user_datastore.create_user(email=email, password=password_hash, username=username, **kwargs)
session.add(user)
role = session.scalar(select(Role).where(Role.name == "basic"))
if role is not None:
user_datastore.add_role_to_user(user, role)
# user.add_role(role)
# user_datastore.commit()
# update_user(user.user_id, {"roles": ["basic"]})
# session.add(user)
# user_datastore.add_role_to_user(user, "basic")
session.commit()
return user
return None
def get_user(user_id=None, google_id=None) -> User:
session = get_session()
if user_id is not None:
user = session.scalars(
select(User).where(User.user_id == user_id),
).one_or_none()
else:
user = session.scalars(
select(User).where(User.google_id == google_id),
).one_or_none()
return user
def update_user(user_id, data):
session = get_session()
user_datastore = get_user_datastore()
select_stmt = select(User).where(User.user_id == user_id)
user = session.scalars(select_stmt).one_or_none()
for role in user.roles:
if role.name not in data["roles"]:
# user.remove_role(role)
user_datastore.remove_role_from_user(user, role)
for role_name in data["roles"]:
select_stmt = select(Role).where(Role.name == role_name)
role = session.scalars(select_stmt).one_or_none()
# if role is not None:
# user.add_role(role)
user_datastore.add_role_to_user(user, role)
for column in data.keys():
if column in User.__table__.columns:
setattr(user, column, data[column])
session.commit()
def query_users(offset, limit, query: dict[str, Any], sort=None, order=None) -> tuple[Sequence[User], int]:
session = get_session()
select_stmt = select(User)
if query is not None:
for key in query.keys():
if hasattr(User, key):
select_stmt = select_stmt.where(getattr(User, key).ilike("%" + query[key] + "%"))
if sort and order:
select_stmt = select_stmt.order_by(getattr(getattr(User, sort), order)())
users = session.scalars(select_stmt.offset(offset).limit(limit)).all()
count = session.scalar(func.count(select_stmt))
return users, count
def get_all_roles() -> Sequence[Role]:
session = get_session()
return session.scalars(select(Role)).all()
def next_hidden_answer(previous_questions: list[int], difficulty) -> HiddenAnswer:
session = get_session()
select_stmt = (
select(HiddenAnswer)
.where(
HiddenAnswer.hidden_answer_difficulty == difficulty,
HiddenAnswer.question_id.notin_(previous_questions),
)
.order_by(func.random())
)
return session.scalars(select_stmt).first()
def next_multiple_choice(previous_questions: list[int], difficulty) -> MultipleChoice:
session = get_session()
select_stmt = (
select(MultipleChoice)
.where(
MultipleChoice.multiple_choice_difficulty == difficulty,
MultipleChoice.question_id.notin_(previous_questions),
)
.order_by(func.random())
)
return session.scalars(select_stmt).first()