quiz-the-word/QuizTheWord/database.py

421 lines
16 KiB
Python

import os
from flask import current_app, g
from flask_security import UserMixin, RoleMixin, SQLAlchemySessionUserDatastore, verify_password, hash_password
import sqlalchemy
from typing import Union, Optional, Literal, Type, List, Tuple
import random
from sqlalchemy import Column, JSON, String, Integer, create_engine, ForeignKey, func, Boolean, UnicodeText, DateTime
from sqlalchemy.sql.expression import or_, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import sessionmaker, relationship, scoped_session
from werkzeug.utils import import_string
config = import_string(os.environ.get("CONFIGURATION_SETUP"))
engine = create_engine(config.DB_URL, pool_size=20)
Base = declarative_base()
def get_scoped_session():
session_factory = sessionmaker(bind=engine)
return scoped_session(session_factory)
def get_session() -> sqlalchemy.orm.session.Session:
if not hasattr(g, "session"):
Session = get_scoped_session()
Base.query = Session.query_property() # This is for compatibility with Flask-Security-Too which assumes usage of Flask-Sqlalchemy
# Session.query_property()
g.session = Session()
return g.session
def init_db():
engine = create_engine(current_app.config["DB_URL"])
Base.metadata.create_all(engine)
def destroy_db():
engine = create_engine(current_app.config["DB_URL"])
Base.metadata.drop_all(engine)
def get_user_datastore() -> SQLAlchemySessionUserDatastore:
return SQLAlchemySessionUserDatastore(get_session(), User, Role)
class User(Base, UserMixin):
__tablename__ = "users"
user_id = Column(Integer, primary_key=True)
email = Column(String, unique=True, nullable=False)
username = Column(String, unique=True)
password = Column(String, nullable=False)
active = Column(Boolean, nullable=False)
last_login_at = Column(DateTime())
current_login_at = Column(DateTime())
last_login_ip = Column(String(100))
current_login_ip = Column(String(100))
login_count = Column(Integer)
fs_uniquifier = Column(String, unique=True, nullable=False)
roles = relationship("Role", secondary="users_roles")
def check_password(self, password):
return verify_password(password, self.password)
def add_role(self, role):
self.roles.append(role)
def remove_role(self, role):
self.roles.remove(role)
def get_dict(self):
result = {
"user_id": self.user_id,
"email": self.email,
"username": self.username,
}
return result
class Role(Base, RoleMixin):
__tablename__ = "roles"
role_id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
description = Column(String)
permissions = Column(UnicodeText)
class UsersRoles(Base):
__tablename__ = "users_roles"
user_role_id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.user_id"))
role_id = Column(Integer, ForeignKey("roles.role_id"))
class AllQuestions(Base):
__tablename__ = "all_questions"
question_id = Column(Integer, primary_key=True, nullable=False)
question_text = Column(String)
answer = Column(String)
addresses = Column(String)
multiple_choice = relationship("MultipleChoice", uselist=False, back_populates="all_question_relationship")
hidden_answer = relationship("HiddenAnswer", uselist=False, back_populates="all_question_relationship")
def __init__(self, question_id=None, question=None, answer=None, addresses=None):
self.question_id = question_id
self.question_text = question
self.answer = answer
self.addresses = addresses
def __repr__(self):
return f"<Question: {self.question_id}>"
def get_dict(self):
result = {
"question_id": self.question_id,
"question": self.question_text,
"answer": self.answer,
"addresses": self.addresses,
}
if self.hidden_answer:
result["hidden_answer"] = self.hidden_answer.get_dict()
if self.multiple_choice:
result["multiple_choice"] = self.multiple_choice.get_dict()
return result
class HiddenAnswer(Base):
__tablename__ = "category_hidden_answer"
question_id = Column(Integer, ForeignKey("all_questions.question_id"), primary_key=True)
hidden_answer_difficulty = Column(Integer)
hidden_answer_hint = Column(JSON)
all_question_relationship = relationship("AllQuestions", lazy="joined", back_populates="hidden_answer")
question_text = association_proxy("all_question_relationship", "question_text")
answer = association_proxy("all_question_relationship", "answer")
addresses = association_proxy("all_question_relationship", "addresses")
def __init__(self, question_id, difficulty=None, hint=None, base_question=None):
self.question_id = question_id
self.hidden_answer_difficulty = difficulty
self.hidden_answer_hint = hint
self.all_question_relationship = base_question
def __repr__(self):
return f"<Question Hidden Answer: {self.question_id}>"
def get_dict(self):
return {
"question_id": self.question_id,
"difficulty": self.hidden_answer_difficulty,
"hint": self.hidden_answer_hint,
"question_text": self.question_text,
"answer": self.answer,
"addresses": self.addresses,
}
class MultipleChoice(Base):
__tablename__ = "category_multiple_choice"
question_id = Column(Integer, ForeignKey("all_questions.question_id"), primary_key=True)
multiple_choice_difficulty = Column(Integer)
multiple_choice_hint = Column(JSON)
wrong_answers = Column(JSON)
all_question_relationship = relationship("AllQuestions", lazy="joined", back_populates="multiple_choice")
question_text = association_proxy("all_question_relationship", "question_text")
answer = association_proxy("all_question_relationship", "answer")
addresses = association_proxy("all_question_relationship", "addresses")
def __init__(self, question_id, difficulty=None, hint=None, wrong_answers=None, base_question=None):
self.question_id = question_id
self.multiple_choice_difficulty = difficulty
self.multiple_choice_hint = hint
self.wrong_answers = wrong_answers
self.all_question_relationship = base_question
self.answer_list = None
def randomize_answer_list(self):
answer_list: List[str] = [*self.wrong_answers, self.answer]
random.shuffle(answer_list)
self.answer_list = answer_list
def __repr__(self):
return f"<Question Multiple Choice: {self.question_id}>"
def get_dict(self):
return {
"question_id": self.question_id,
"difficulty": self.multiple_choice_difficulty,
"hint": self.multiple_choice_hint,
"wrong_answers": self.wrong_answers,
"question_text": self.question_text,
"answer": self.answer,
"addresses": self.addresses,
}
def get_dict_shuffled_choices(self):
self.randomize_answer_list()
return {
"question_id": self.question_id,
"difficulty": self.multiple_choice_difficulty,
"hint": self.multiple_choice_hint,
"question_text": self.question_text,
"addresses": self.addresses,
"answer_list": self.answer_list,
}
def add_multiple_choice_question(question, answer, addresses, difficulty, hint, wrong_answers):
session = get_session()
question_id = session.query(AllQuestions).order_by(AllQuestions.question_id.desc()).first().question_id + 1
print(question_id)
base_question = AllQuestions(question_id, question, answer, addresses)
multiple_choice_question = MultipleChoice(question_id, difficulty, hint, wrong_answers, base_question)
session.add(base_question)
session.add(multiple_choice_question)
session.commit()
def add_question(data) -> AllQuestions:
session = get_session()
question_id = session.query(AllQuestions).order_by(AllQuestions.question_id.desc()).first().question_id + 1
question = AllQuestions(question_id)
session.add(question)
if data["create_multiple_choice"]:
multiple_choice = MultipleChoice(question.question_id)
session.add(multiple_choice)
question.multiple_choice = multiple_choice
if data["create_hidden_answer"]:
hidden_answer = HiddenAnswer(question.question_id)
session.add(hidden_answer)
question.hidden_answer = hidden_answer
for column in data.keys():
if column in AllQuestions.__table__.columns:
setattr(question, column, data[column])
if column in MultipleChoice.__table__.columns:
setattr(question.multiple_choice, column, data[column])
if column in HiddenAnswer.__table__.columns:
setattr(question.hidden_answer, column, data[column])
session.commit()
return question
def get_all_questions() -> List[AllQuestions]:
session = get_session()
return session.query(AllQuestions).all()
def get_all_hidden_answer() -> List[HiddenAnswer]:
session = get_session()
return session.query(HiddenAnswer).all()
def get_all_multiple_choice() -> List[MultipleChoice]:
session = get_session()
return session.query(MultipleChoice).all()
def get_category_count(category: Union[Type[MultipleChoice], Type[HiddenAnswer], Type[AllQuestions]]) -> int:
session = get_session()
return session.query(category).count()
def get_question(category: Union[Type[MultipleChoice], Type[HiddenAnswer], Type[AllQuestions]], question_id: int) -> \
Optional[Union[MultipleChoice, HiddenAnswer, AllQuestions]]:
session = get_session()
return session.query(category).filter(category.question_id == question_id).one_or_none()
def get_random_question_of_difficulty(category: Union[Type[MultipleChoice], Type[HiddenAnswer]],
difficulty: Literal[1, 2, 3]):
session = get_session()
return session.query(category).filter(category.hidden_answer_difficulty == difficulty).order_by(func.random()).first()
def get_random_hidden_answer(difficulty: Optional[Literal[1, 2, 3]] = None) -> HiddenAnswer:
session = get_session()
if difficulty is not None:
return session.query(HiddenAnswer).filter(HiddenAnswer.hidden_answer_difficulty == difficulty).order_by(func.random()).first()
return session.query(HiddenAnswer).order_by(func.random()).first()
def get_random_multiple_choice(difficulty: Optional[Literal[1, 2, 3]] = None) -> MultipleChoice:
session = get_session()
if difficulty is not None:
return session.query(MultipleChoice).filter(MultipleChoice.multiple_choice_difficulty == difficulty).order_by(
func.random()).first()
return session.query(MultipleChoice).order_by(func.random()).first()
def check_answer(question_id: int, guess: str) -> Tuple[bool, str]:
session = get_session()
question: AllQuestions = session.query(AllQuestions).filter(AllQuestions.question_id == question_id).one_or_none()
if question:
answer = question.answer
return answer == guess, answer
def query_all_questions(offset, limit, query: dict = None, sort=None, order=None) -> Tuple[List[AllQuestions], int]:
session = get_session()
query_params = []
if query is not None:
for key in query.keys():
if key == "multiple_choice" or key == "hidden_answer":
if query[key]:
query_params.append(getattr(AllQuestions, key) != None)
else:
query_params.append(getattr(AllQuestions, key) == None)
else:
query_params.append(getattr(AllQuestions, key).ilike("%" + query[key] + "%"))
order_by = None
if sort and order:
order_by = getattr(getattr(AllQuestions, sort), order)()
q = session.query(AllQuestions).filter(*query_params).order_by(order_by)
questions = q.offset(offset).limit(limit).all()
count = q.count()
return questions, count
def get_unhealthy_questions() -> List[AllQuestions]:
session = get_session()
questions = []
missing_category_questions = session.query(AllQuestions).filter(and_(AllQuestions.hidden_answer == None, AllQuestions.multiple_choice == None)).all()
multiple_choice_difficulty = session.query(AllQuestions).filter(AllQuestions.multiple_choice.has(MultipleChoice.multiple_choice_difficulty == None)).all()
hidden_answer_difficulty = session.query(AllQuestions).filter(AllQuestions.hidden_answer.has(HiddenAnswer.hidden_answer_difficulty == None)).all()
questions.extend(missing_category_questions)
questions.extend(multiple_choice_difficulty)
questions.extend(hidden_answer_difficulty)
return questions
def update_question(question_id, data):
session = get_session()
question: AllQuestions = session.query(AllQuestions).get(question_id)
if data["create_multiple_choice"]:
multiple_choice = MultipleChoice(question_id)
session.add(multiple_choice)
question.multiple_choice = multiple_choice
if data["create_hidden_answer"]:
hidden_answer = HiddenAnswer(question_id)
session.add(hidden_answer)
question.hidden_answer = hidden_answer
for column in data.keys():
if column in AllQuestions.__table__.columns:
setattr(question, column, data[column])
if column in MultipleChoice.__table__.columns:
setattr(question.multiple_choice, column, data[column])
if column in HiddenAnswer.__table__.columns:
setattr(question.hidden_answer, column, data[column])
session.commit()
def add_user(email, password):
session = get_session()
user_datastore = get_user_datastore()
if user_datastore.find_user(email=email) is None:
user = user_datastore.create_user(email=email, password=hash_password(password), username=email)
session.add(user)
role = session.query(Role).filter(Role.name == "basic").one_or_none()
if role is not None:
user_datastore.add_role_to_user(user, role)
# user.add_role(role)
# user_datastore.commit()
# update_user(user.user_id, {"roles": ["basic"]})
# session.add(user)
# user_datastore.add_role_to_user(user, "basic")
session.commit()
return user
return None
def get_user(user_id) -> User:
session = get_session()
return session.query(User).filter(User.user_id == user_id).one_or_none()
def update_user(user_id, data):
session = get_session()
user_datastore = get_user_datastore()
user = session.query(User).filter(User.user_id == user_id).one_or_none()
for role in user.roles:
if role.name not in data["roles"]:
# user.remove_role(role)
user_datastore.remove_role_from_user(user, role)
for role_name in data["roles"]:
role = session.query(Role).filter(Role.name == role_name).one_or_none()
# if role is not None:
# user.add_role(role)
user_datastore.add_role_to_user(user, role)
for column in data.keys():
if column in User.__table__.columns:
setattr(user, column, data[column])
session.commit()
def query_users(offset, limit, query: dict = None, sort=None, order=None) -> Tuple[List[User], int]:
session = get_session()
query_params = []
if query is not None:
for key in query.keys():
if hasattr(User, key):
query_params.append(getattr(User, key).ilike("%" + query[key] + "%"))
order_by = None
if sort and order:
order_by = getattr(getattr(User, sort), order)()
q = session.query(User).filter(*query_params).order_by(order_by)
questions = q.offset(offset).limit(limit).all()
count = q.count()
return questions, count
def get_all_roles() -> List[Role]:
session = get_session()
return session.query(Role).all()