506 lines
18 KiB
Python
506 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import random
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
from typing import Any
|
|
from typing import Literal
|
|
from typing import Sequence
|
|
|
|
from flask import current_app
|
|
from flask import g
|
|
from flask_security import hash_password
|
|
from flask_security import RoleMixin
|
|
from flask_security import SQLAlchemySessionUserDatastore
|
|
from flask_security import UserMixin
|
|
from flask_security import verify_password
|
|
from sqlalchemy import and_
|
|
from sqlalchemy import CheckConstraint
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy import ForeignKey
|
|
from sqlalchemy import func
|
|
from sqlalchemy import JSON
|
|
from sqlalchemy import select
|
|
from sqlalchemy import String
|
|
from sqlalchemy.ext.associationproxy import association_proxy
|
|
from sqlalchemy.orm import DeclarativeBase
|
|
from sqlalchemy.orm import Mapped
|
|
from sqlalchemy.orm import mapped_column
|
|
from sqlalchemy.orm import relationship
|
|
from sqlalchemy.orm import scoped_session
|
|
from sqlalchemy.orm import sessionmaker
|
|
from werkzeug.utils import import_string
|
|
|
|
|
|
config = import_string(os.environ.get("CONFIGURATION_SETUP"))
|
|
engine = create_engine(config.DB_URL, pool_size=20)
|
|
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
|
|
def get_scoped_session() -> scoped_session:
|
|
session_factory = sessionmaker(bind=engine)
|
|
return scoped_session(session_factory)
|
|
|
|
|
|
def get_session() -> scoped_session:
|
|
if not hasattr(g, "session"):
|
|
session = get_scoped_session()
|
|
# This is for compatibility with Flask-Security-Too which assumes usage of Flask-Sqlalchemy
|
|
Base.query = session.query_property()
|
|
g.session = session()
|
|
return g.session
|
|
|
|
|
|
def init_db():
|
|
engine = create_engine(current_app.config["DB_URL"])
|
|
Base.metadata.create_all(engine)
|
|
|
|
|
|
def destroy_db():
|
|
engine = create_engine(current_app.config["DB_URL"])
|
|
Base.metadata.drop_all(engine)
|
|
|
|
|
|
def get_user_datastore() -> SQLAlchemySessionUserDatastore:
|
|
return SQLAlchemySessionUserDatastore(get_session(), User, Role)
|
|
|
|
|
|
class User(Base, UserMixin):
|
|
__tablename__ = "users"
|
|
user_id: Mapped[int] = mapped_column(primary_key=True)
|
|
email: Mapped[str] = mapped_column(unique=True)
|
|
username: Mapped[str] = mapped_column(unique=True)
|
|
password: Mapped[str | None]
|
|
active: Mapped[bool]
|
|
last_login_at: Mapped[datetime]
|
|
current_login_at: Mapped[datetime]
|
|
last_login_ip: Mapped[str] = mapped_column(String(100))
|
|
current_login_ip: Mapped[str] = mapped_column(String(100))
|
|
login_count: Mapped[int]
|
|
fs_uniquifier: Mapped[str] = mapped_column(unique=True)
|
|
google_id: Mapped[Decimal | None]
|
|
|
|
__table_args__ = (
|
|
CheckConstraint("(password IS NOT NULL) OR (google_id IS NOT NULL)", "password_google_id_null_check"),
|
|
)
|
|
|
|
roles = relationship("Role", secondary="users_roles")
|
|
|
|
def check_password(self, password):
|
|
return verify_password(password, self.password)
|
|
|
|
def add_role(self, role):
|
|
self.roles.append(role)
|
|
|
|
def remove_role(self, role):
|
|
self.roles.remove(role)
|
|
|
|
def get_dict(self):
|
|
result = {
|
|
"user_id": self.user_id,
|
|
"email": self.email,
|
|
"username": self.username,
|
|
}
|
|
return result
|
|
|
|
|
|
class Role(Base, RoleMixin):
|
|
__tablename__ = "roles"
|
|
role_id: Mapped[int] = mapped_column(primary_key=True)
|
|
name: Mapped[str] = mapped_column(unique=True)
|
|
description: Mapped[str]
|
|
permissions: Mapped[str]
|
|
|
|
|
|
class UsersRoles(Base):
|
|
__tablename__ = "users_roles"
|
|
user_role_id: Mapped[int] = mapped_column(primary_key=True)
|
|
user_id: Mapped[int] = mapped_column(ForeignKey("users.user_id"))
|
|
role_id: Mapped[int] = mapped_column(ForeignKey("roles.role_id"))
|
|
|
|
|
|
class AllQuestions(Base):
|
|
__tablename__ = "all_questions"
|
|
|
|
question_id: Mapped[int] = mapped_column(primary_key=True)
|
|
question_text: Mapped[str]
|
|
answer: Mapped[str]
|
|
addresses: Mapped[str]
|
|
|
|
multiple_choice = relationship("MultipleChoice", uselist=False, back_populates="all_question_relationship")
|
|
hidden_answer = relationship("HiddenAnswer", uselist=False, back_populates="all_question_relationship")
|
|
|
|
def __init__(self, question_id=None, question=None, answer=None, addresses=None):
|
|
self.question_id = question_id
|
|
self.question_text = question
|
|
self.answer = answer
|
|
self.addresses = addresses
|
|
|
|
def __repr__(self):
|
|
return f"<Question: {self.question_id}>"
|
|
|
|
def get_dict(self):
|
|
result = {
|
|
"question_id": self.question_id,
|
|
"question": self.question_text,
|
|
"answer": self.answer,
|
|
"addresses": self.addresses,
|
|
}
|
|
if self.hidden_answer:
|
|
result["hidden_answer"] = self.hidden_answer.get_dict()
|
|
if self.multiple_choice:
|
|
result["multiple_choice"] = self.multiple_choice.get_dict()
|
|
return result
|
|
|
|
|
|
class HiddenAnswer(Base):
|
|
__tablename__ = "category_hidden_answer"
|
|
|
|
question_id: Mapped[int] = mapped_column(ForeignKey("all_questions.question_id"), primary_key=True)
|
|
hidden_answer_difficulty: Mapped[int]
|
|
hidden_answer_hint: Mapped[dict | list | None] = mapped_column(JSON)
|
|
|
|
all_question_relationship = relationship("AllQuestions", lazy="joined", back_populates="hidden_answer")
|
|
question_text = association_proxy("all_question_relationship", "question_text")
|
|
answer = association_proxy("all_question_relationship", "answer")
|
|
addresses = association_proxy("all_question_relationship", "addresses")
|
|
|
|
def __init__(self, question_id, difficulty=None, hint=None, base_question=None):
|
|
self.question_id = question_id
|
|
self.hidden_answer_difficulty = difficulty
|
|
self.hidden_answer_hint = hint
|
|
self.all_question_relationship = base_question
|
|
|
|
def __repr__(self):
|
|
return f"<Question Hidden Answer: {self.question_id}>"
|
|
|
|
def get_dict(self):
|
|
return {
|
|
"question_id": self.question_id,
|
|
"difficulty": self.hidden_answer_difficulty,
|
|
"hint": self.hidden_answer_hint,
|
|
"question_text": self.question_text,
|
|
"answer": self.answer,
|
|
"addresses": self.addresses,
|
|
}
|
|
|
|
|
|
class MultipleChoice(Base):
|
|
__tablename__ = "category_multiple_choice"
|
|
|
|
question_id: Mapped[int] = mapped_column(ForeignKey("all_questions.question_id"), primary_key=True)
|
|
multiple_choice_difficulty: Mapped[int]
|
|
multiple_choice_hint: Mapped[dict | list | None] = mapped_column(JSON)
|
|
wrong_answers: Mapped[list] = mapped_column(JSON)
|
|
|
|
all_question_relationship = relationship("AllQuestions", lazy="joined", back_populates="multiple_choice")
|
|
question_text = association_proxy("all_question_relationship", "question_text")
|
|
answer = association_proxy("all_question_relationship", "answer")
|
|
addresses = association_proxy("all_question_relationship", "addresses")
|
|
|
|
def __init__(self, question_id, difficulty=None, hint=None, wrong_answers=None, base_question=None):
|
|
self.question_id = question_id
|
|
self.multiple_choice_difficulty = difficulty
|
|
self.multiple_choice_hint = hint
|
|
self.wrong_answers = wrong_answers
|
|
self.all_question_relationship = base_question
|
|
self.answer_list = None
|
|
|
|
def randomize_answer_list(self):
|
|
answer_list: list[str] = [*self.wrong_answers, self.answer]
|
|
random.shuffle(answer_list)
|
|
self.answer_list = answer_list
|
|
|
|
def __repr__(self):
|
|
return f"<Question Multiple Choice: {self.question_id}>"
|
|
|
|
def get_dict(self):
|
|
return {
|
|
"question_id": self.question_id,
|
|
"difficulty": self.multiple_choice_difficulty,
|
|
"hint": self.multiple_choice_hint,
|
|
"wrong_answers": self.wrong_answers,
|
|
"question_text": self.question_text,
|
|
"answer": self.answer,
|
|
"addresses": self.addresses,
|
|
}
|
|
|
|
def get_dict_shuffled_choices(self):
|
|
self.randomize_answer_list()
|
|
return {
|
|
"question_id": self.question_id,
|
|
"difficulty": self.multiple_choice_difficulty,
|
|
"hint": self.multiple_choice_hint,
|
|
"question_text": self.question_text,
|
|
"addresses": self.addresses,
|
|
"answer_list": self.answer_list,
|
|
}
|
|
|
|
|
|
def add_multiple_choice_question(question, answer, addresses, difficulty, hint, wrong_answers):
|
|
session = get_session()
|
|
select_stmt = select(AllQuestions).order_by(AllQuestions.question_id.desc())
|
|
question_id = session.scalars(select_stmt).first().question_id + 1
|
|
print(question_id)
|
|
base_question = AllQuestions(question_id, question, answer, addresses)
|
|
multiple_choice_question = MultipleChoice(question_id, difficulty, hint, wrong_answers, base_question)
|
|
session.add(base_question)
|
|
session.add(multiple_choice_question)
|
|
session.commit()
|
|
|
|
|
|
def add_question(data) -> AllQuestions:
|
|
session = get_session()
|
|
select_stmt = select(AllQuestions).order_by(AllQuestions.question_id.desc())
|
|
question_id = session.scalars(select_stmt).first().question_id + 1
|
|
question = AllQuestions(question_id)
|
|
session.add(question)
|
|
if data["create_multiple_choice"]:
|
|
multiple_choice = MultipleChoice(question.question_id)
|
|
session.add(multiple_choice)
|
|
question.multiple_choice = multiple_choice
|
|
if data["create_hidden_answer"]:
|
|
hidden_answer = HiddenAnswer(question.question_id)
|
|
session.add(hidden_answer)
|
|
question.hidden_answer = hidden_answer
|
|
for column in data.keys():
|
|
if column in AllQuestions.__table__.columns:
|
|
setattr(question, column, data[column])
|
|
if column in MultipleChoice.__table__.columns:
|
|
setattr(question.multiple_choice, column, data[column])
|
|
if column in HiddenAnswer.__table__.columns:
|
|
setattr(question.hidden_answer, column, data[column])
|
|
session.commit()
|
|
return question
|
|
|
|
|
|
def get_all_questions() -> Sequence[AllQuestions]:
|
|
session = get_session()
|
|
return session.scalars(select(AllQuestions)).all()
|
|
|
|
|
|
def get_all_hidden_answer() -> Sequence[HiddenAnswer]:
|
|
session = get_session()
|
|
return session.scalars(select(HiddenAnswer)).all()
|
|
|
|
|
|
def get_all_multiple_choice() -> Sequence[MultipleChoice]:
|
|
session = get_session()
|
|
return session.scalars(select(MultipleChoice)).all()
|
|
|
|
|
|
def get_category_count(category: type[MultipleChoice] | type[HiddenAnswer] | type[AllQuestions]) -> int:
|
|
session = get_session()
|
|
return session.scalar(select(func.count(category)))
|
|
|
|
|
|
def get_question[T: (MultipleChoice, HiddenAnswer, AllQuestions)](category: type[T], question_id: int) -> T | None:
|
|
session = get_session()
|
|
select_stmt = select(category).where(category.question_id == question_id)
|
|
return session.scalars(select_stmt).one_or_none()
|
|
|
|
|
|
def get_random_question_of_difficulty(category: type[MultipleChoice] | type[HiddenAnswer], difficulty: Literal[1, 2, 3]):
|
|
session = get_session()
|
|
select_stmt = select(category).where(category.hidden_answer_difficulty == difficulty).order_by(func.random())
|
|
return session.scalars(select_stmt).first()
|
|
|
|
|
|
def get_random_hidden_answer(difficulty: Literal[1, 2, 3] | None = None) -> HiddenAnswer:
|
|
session = get_session()
|
|
select_stmt = select(HiddenAnswer).order_by(func.random())
|
|
if difficulty is not None:
|
|
select_stmt = select_stmt.where(HiddenAnswer.hidden_answer_difficulty == difficulty)
|
|
return session.scalars(select_stmt).first()
|
|
|
|
|
|
def get_random_multiple_choice(difficulty: Literal[1, 2, 3] | None = None) -> MultipleChoice:
|
|
session = get_session()
|
|
select_stmt = select(MultipleChoice).order_by(func.random())
|
|
if difficulty is not None:
|
|
select_stmt = select_stmt.where(MultipleChoice.multiple_choice_difficulty == difficulty)
|
|
return session.scalars(select_stmt).first()
|
|
|
|
|
|
def check_answer(question_id: int, guess: str) -> tuple[bool, str] | None:
|
|
session = get_session()
|
|
select_stmt = select(AllQuestions).where(AllQuestions.question_id == question_id)
|
|
question: AllQuestions = session.scalars(select_stmt).one_or_none()
|
|
if question:
|
|
answer = question.answer
|
|
return answer == guess, answer
|
|
return None
|
|
|
|
|
|
def query_all_questions(offset, limit, query: dict[str, Any], sort=None, order=None) -> tuple[Sequence[AllQuestions], int]:
|
|
session = get_session()
|
|
select_stmt = select(AllQuestions)
|
|
if query is not None:
|
|
for key in query.keys():
|
|
if key == "multiple_choice" or key == "hidden_answer":
|
|
if query[key]:
|
|
select_stmt = select_stmt.where(getattr(AllQuestions, key).is_(None))
|
|
else:
|
|
select_stmt = select_stmt.where(getattr(AllQuestions, key).is_(None))
|
|
else:
|
|
select_stmt = select_stmt.where(getattr(AllQuestions, key).ilike("%" + query[key] + "%"))
|
|
if sort and order:
|
|
select_stmt = select_stmt.order_by(getattr(getattr(AllQuestions, sort), order)())
|
|
questions = session.scalars(select_stmt.offset(offset).limit(limit)).all()
|
|
count = session.scalar(func.count(select_stmt))
|
|
return questions, count
|
|
|
|
|
|
def get_unhealthy_questions() -> list[AllQuestions]:
|
|
session = get_session()
|
|
questions = []
|
|
missing_category_questions = (
|
|
session.scalars(
|
|
select(AllQuestions)
|
|
.where(and_(AllQuestions.hidden_answer.is_(None), AllQuestions.multiple_choice.is_(None))),
|
|
).all()
|
|
)
|
|
multiple_choice_difficulty = (
|
|
session.scalars(
|
|
select(AllQuestions)
|
|
.where(AllQuestions.multiple_choice.has(MultipleChoice.multiple_choice_difficulty.is_(None))),
|
|
).all()
|
|
)
|
|
hidden_answer_difficulty = (
|
|
session.scalars(
|
|
select(AllQuestions)
|
|
.where(AllQuestions.hidden_answer.has(HiddenAnswer.hidden_answer_difficulty.is_(None))),
|
|
).all()
|
|
)
|
|
questions.extend(missing_category_questions)
|
|
questions.extend(multiple_choice_difficulty)
|
|
questions.extend(hidden_answer_difficulty)
|
|
return questions
|
|
|
|
|
|
def update_question(question_id, data):
|
|
session = get_session()
|
|
session.get(AllQuestions, question_id)
|
|
question = session.get(AllQuestions, question_id)
|
|
if data["create_multiple_choice"]:
|
|
multiple_choice = MultipleChoice(question_id)
|
|
session.add(multiple_choice)
|
|
question.multiple_choice = multiple_choice
|
|
if data["create_hidden_answer"]:
|
|
hidden_answer = HiddenAnswer(question_id)
|
|
session.add(hidden_answer)
|
|
question.hidden_answer = hidden_answer
|
|
for column in data.keys():
|
|
if column in AllQuestions.__table__.columns:
|
|
setattr(question, column, data[column])
|
|
if column in MultipleChoice.__table__.columns:
|
|
setattr(question.multiple_choice, column, data[column])
|
|
if column in HiddenAnswer.__table__.columns:
|
|
setattr(question.hidden_answer, column, data[column])
|
|
session.commit()
|
|
|
|
|
|
def add_user(email, password, username=None, **kwargs):
|
|
session = get_session()
|
|
user_datastore = get_user_datastore()
|
|
if user_datastore.find_user(email=email) is None:
|
|
password_hash = hash_password(password) if password is not None else None
|
|
if username is None:
|
|
username = email
|
|
user = user_datastore.create_user(email=email, password=password_hash, username=username, **kwargs)
|
|
session.add(user)
|
|
role = session.scalar(select(Role).where(Role.name == "basic"))
|
|
if role is not None:
|
|
user_datastore.add_role_to_user(user, role)
|
|
# user.add_role(role)
|
|
# user_datastore.commit()
|
|
# update_user(user.user_id, {"roles": ["basic"]})
|
|
# session.add(user)
|
|
# user_datastore.add_role_to_user(user, "basic")
|
|
session.commit()
|
|
return user
|
|
return None
|
|
|
|
|
|
def get_user(user_id=None, google_id=None) -> User:
|
|
session = get_session()
|
|
if user_id is not None:
|
|
user = session.scalars(
|
|
select(User).where(User.user_id == user_id),
|
|
).one_or_none()
|
|
else:
|
|
user = session.scalars(
|
|
select(User).where(User.google_id == google_id),
|
|
).one_or_none()
|
|
return user
|
|
|
|
|
|
def update_user(user_id, data):
|
|
session = get_session()
|
|
user_datastore = get_user_datastore()
|
|
select_stmt = select(User).where(User.user_id == user_id)
|
|
user = session.scalars(select_stmt).one_or_none()
|
|
for role in user.roles:
|
|
if role.name not in data["roles"]:
|
|
# user.remove_role(role)
|
|
user_datastore.remove_role_from_user(user, role)
|
|
for role_name in data["roles"]:
|
|
select_stmt = select(Role).where(Role.name == role_name)
|
|
role = session.scalars(select_stmt).one_or_none()
|
|
# if role is not None:
|
|
# user.add_role(role)
|
|
user_datastore.add_role_to_user(user, role)
|
|
for column in data.keys():
|
|
if column in User.__table__.columns:
|
|
setattr(user, column, data[column])
|
|
session.commit()
|
|
|
|
|
|
def query_users(offset, limit, query: dict[str, Any], sort=None, order=None) -> tuple[Sequence[User], int]:
|
|
session = get_session()
|
|
select_stmt = select(User)
|
|
if query is not None:
|
|
for key in query.keys():
|
|
if hasattr(User, key):
|
|
select_stmt = select_stmt.where(getattr(User, key).ilike("%" + query[key] + "%"))
|
|
if sort and order:
|
|
select_stmt = select_stmt.order_by(getattr(getattr(User, sort), order)())
|
|
users = session.scalars(select_stmt.offset(offset).limit(limit)).all()
|
|
count = session.scalar(func.count(select_stmt))
|
|
return users, count
|
|
|
|
|
|
def get_all_roles() -> Sequence[Role]:
|
|
session = get_session()
|
|
return session.scalars(select(Role)).all()
|
|
|
|
|
|
def next_hidden_answer(previous_questions: list[int], difficulty) -> HiddenAnswer:
|
|
session = get_session()
|
|
select_stmt = (
|
|
select(HiddenAnswer)
|
|
.where(
|
|
HiddenAnswer.hidden_answer_difficulty == difficulty,
|
|
HiddenAnswer.question_id.notin_(previous_questions),
|
|
)
|
|
.order_by(func.random())
|
|
)
|
|
return session.scalars(select_stmt).first()
|
|
|
|
|
|
def next_multiple_choice(previous_questions: list[int], difficulty) -> MultipleChoice:
|
|
session = get_session()
|
|
select_stmt = (
|
|
select(MultipleChoice)
|
|
.where(
|
|
MultipleChoice.multiple_choice_difficulty == difficulty,
|
|
MultipleChoice.question_id.notin_(previous_questions),
|
|
)
|
|
.order_by(func.random())
|
|
)
|
|
return session.scalars(select_stmt).first()
|