211 lines
8.3 KiB
Python
211 lines
8.3 KiB
Python
from flask import current_app, g
|
|
from flask_security import UserMixin, RoleMixin
|
|
import sqlalchemy
|
|
from typing import Union, Optional, Literal, Type, List, Tuple
|
|
import random
|
|
import os
|
|
from sqlalchemy import Column, JSON, String, Integer, create_engine, ForeignKey, func, ARRAY, Boolean, UnicodeText, DateTime
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.ext.associationproxy import association_proxy
|
|
from sqlalchemy.orm import sessionmaker, relationship, scoped_session
|
|
from werkzeug.utils import import_string
|
|
|
|
|
|
environment_configuration = os.environ['CONFIGURATION_SETUP']
|
|
config = import_string(environment_configuration)
|
|
engine = create_engine(config.DB_URL)
|
|
session_factory = sessionmaker(bind=engine)
|
|
Session = scoped_session(session_factory)
|
|
Base = declarative_base()
|
|
Base.query = Session.query_property()
|
|
|
|
|
|
class User(Base, UserMixin):
|
|
__tablename__ = "users"
|
|
user_id = Column(Integer, primary_key=True)
|
|
email = Column(String, unique=True, nullable=False)
|
|
username = Column(String, unique=True)
|
|
password = Column(String, nullable=False)
|
|
active = Column(Boolean, nullable=False)
|
|
last_login_at = Column(DateTime())
|
|
current_login_at = Column(DateTime())
|
|
last_login_ip = Column(String(100))
|
|
current_login_ip = Column(String(100))
|
|
login_count = Column(Integer)
|
|
fs_uniquifier = Column(String, unique=True, nullable=False)
|
|
roles = relationship("Role", secondary="users_roles")
|
|
|
|
|
|
class Role(Base, RoleMixin):
|
|
__tablename__ = "roles"
|
|
role_id = Column(Integer, primary_key=True)
|
|
name = Column(String, unique=True)
|
|
description = Column(String)
|
|
permissions = Column(UnicodeText)
|
|
|
|
|
|
class UsersRoles(Base):
|
|
__tablename__ = "users_roles"
|
|
user_role_id = Column(Integer, primary_key=True)
|
|
user_id = Column(Integer, ForeignKey("users.user_id"))
|
|
role_id = Column(Integer, ForeignKey("roles.role_id"))
|
|
|
|
|
|
class AllQuestions(Base):
|
|
__tablename__ = "all_questions"
|
|
|
|
question_id = Column(Integer, primary_key=True, nullable=False)
|
|
question = Column(String)
|
|
answer = Column(String)
|
|
addresses = Column(String)
|
|
|
|
multiple_choice = relationship("MultipleChoice", uselist=False, back_populates="all_question_relationship")
|
|
hidden_answer = relationship("HiddenAnswer", uselist=False, back_populates="all_question_relationship")
|
|
|
|
def __init__(self, question_id, question, answer, addresses):
|
|
self.question_id = question_id
|
|
self.question = question
|
|
self.answer = answer
|
|
self.addresses = addresses
|
|
|
|
def __repr__(self):
|
|
return f"<Question: {self.question_id}>"
|
|
|
|
|
|
class HiddenAnswer(Base):
|
|
__tablename__ = "category_hidden_answer"
|
|
|
|
question_id = Column(Integer, ForeignKey("all_questions.question_id"), primary_key=True)
|
|
difficulty = Column(Integer)
|
|
hint = Column(JSON)
|
|
|
|
all_question_relationship = relationship("AllQuestions", lazy="joined", back_populates="hidden_answer")
|
|
question = association_proxy("all_question_relationship", "question")
|
|
answer = association_proxy("all_question_relationship", "answer")
|
|
addresses = association_proxy("all_question_relationship", "addresses")
|
|
|
|
def __init__(self, question_id, difficulty, hint, base_question):
|
|
self.question_id = question_id
|
|
self.difficulty = difficulty
|
|
self.hint = hint
|
|
self.all_question_relationship = base_question
|
|
|
|
def __repr__(self):
|
|
return f"<Question Hidden Answer: {self.question_id}>"
|
|
|
|
|
|
class MultipleChoice(Base):
|
|
__tablename__ = "category_multiple_choice"
|
|
|
|
question_id = Column(Integer, ForeignKey("all_questions.question_id"), primary_key=True)
|
|
difficulty = Column(Integer)
|
|
hint = Column(JSON)
|
|
wrong_answers = Column(ARRAY(String))
|
|
|
|
all_question_relationship = relationship("AllQuestions", lazy="joined", back_populates="multiple_choice")
|
|
question = association_proxy("all_question_relationship", "question")
|
|
answer = association_proxy("all_question_relationship", "answer")
|
|
addresses = association_proxy("all_question_relationship", "addresses")
|
|
|
|
def __init__(self, question_id, difficulty, hint, wrong_answers, base_question):
|
|
self.question_id = question_id
|
|
self.difficulty = difficulty
|
|
self.hint = hint
|
|
self.wrong_answers = wrong_answers
|
|
self.all_question_relationship = base_question
|
|
self.answer_list = None
|
|
|
|
def randomize_answer_list(self):
|
|
answer_list: List[str] = [*self.wrong_answers, self.answer]
|
|
random.shuffle(answer_list)
|
|
self.answer_list = answer_list
|
|
|
|
def __repr__(self):
|
|
return f"<Question Multiple Choice: {self.question_id}>"
|
|
|
|
|
|
Base.metadata.create_all(engine)
|
|
|
|
|
|
def add_multiple_choice_question(question, answer, addresses, difficulty, hint, wrong_answers):
|
|
session: sqlalchemy.orm.session.Session = Session()
|
|
question_id = session.query(AllQuestions).count()
|
|
base_question = AllQuestions(question_id, question, answer, addresses)
|
|
multiple_choice_question = MultipleChoice(question_id, difficulty, hint, wrong_answers, base_question)
|
|
session.add(base_question)
|
|
session.add(multiple_choice_question)
|
|
session.commit()
|
|
|
|
|
|
def get_all_questions() -> List[AllQuestions]:
|
|
session: sqlalchemy.orm.session.Session = Session()
|
|
return session.query(AllQuestions).all()
|
|
|
|
|
|
def get_all_hidden_answer() -> List[HiddenAnswer]:
|
|
session: sqlalchemy.orm.session.Session = Session()
|
|
return session.query(HiddenAnswer).all()
|
|
|
|
|
|
def get_all_multiple_choice() -> List[MultipleChoice]:
|
|
session: sqlalchemy.orm.session.Session = Session()
|
|
return session.query(MultipleChoice).all()
|
|
|
|
|
|
def get_category_count(category: Union[Type[MultipleChoice], Type[HiddenAnswer], Type[AllQuestions]]) -> int:
|
|
session: sqlalchemy.orm.session.Session = Session()
|
|
return session.query(category).count()
|
|
|
|
|
|
def get_question(category: Union[Type[MultipleChoice], Type[HiddenAnswer], Type[AllQuestions]], question_id: int) -> Optional[Union[MultipleChoice, HiddenAnswer, AllQuestions]]:
|
|
session: sqlalchemy.orm.session.Session = Session()
|
|
return session.query(category).filter(category.question_id == question_id).one_or_none()
|
|
|
|
|
|
def get_random_question_of_difficulty(category: Union[Type[MultipleChoice], Type[HiddenAnswer]], difficulty: Literal[1, 2, 3]):
|
|
session: sqlalchemy.orm.session.Session = Session()
|
|
return session.query(category).filter(category.difficulty == difficulty).order_by(func.random()).first()
|
|
|
|
|
|
def get_random_hidden_answer(difficulty: Optional[Literal[1, 2, 3]] = None) -> HiddenAnswer:
|
|
session: sqlalchemy.orm.session.Session = Session()
|
|
if difficulty is not None:
|
|
return session.query(HiddenAnswer).filter(HiddenAnswer.difficulty == difficulty).order_by(func.random()).first()
|
|
return session.query(HiddenAnswer).order_by(func.random()).first()
|
|
|
|
|
|
def get_random_multiple_choice(difficulty: Optional[Literal[1, 2, 3]] = None) -> MultipleChoice:
|
|
session: sqlalchemy.orm.session.Session = Session()
|
|
if difficulty is not None:
|
|
return session.query(MultipleChoice).filter(MultipleChoice.difficulty == difficulty).order_by(func.random()).first()
|
|
return session.query(MultipleChoice).order_by(func.random()).first()
|
|
|
|
|
|
def check_answer(question_id: int, guess: str) -> Tuple[bool, str]:
|
|
session: sqlalchemy.orm.session.Session = Session()
|
|
question: AllQuestions = session.query(AllQuestions).filter(AllQuestions.question_id == question_id).one_or_none()
|
|
if question:
|
|
answer = question.answer
|
|
return answer == guess, answer
|
|
|
|
|
|
def query_all_questions(offset, limit, query: dict = None, sort=None, order=None) -> Tuple[List[AllQuestions], int]:
|
|
session: sqlalchemy.orm.session.Session = Session()
|
|
query_params = []
|
|
if query is not None:
|
|
for key in query.keys():
|
|
if key == "multiple_choice" or key == "hidden_answer":
|
|
if query[key]:
|
|
query_params.append(getattr(AllQuestions, key) != None)
|
|
else:
|
|
query_params.append(getattr(AllQuestions, key) == None)
|
|
else:
|
|
query_params.append(getattr(AllQuestions, key).ilike("%"+query[key]+"%"))
|
|
order_by = None
|
|
if sort and order:
|
|
order_by = getattr(getattr(AllQuestions, sort), order)()
|
|
q = session.query(AllQuestions).filter(*query_params).order_by(order_by)
|
|
questions = q.offset(offset).limit(limit).all()
|
|
count = q.count()
|
|
return questions, count
|