models.py - Define the database models use by the webapp

Imports

These are listed in the order prescribed by PEP 8.

Standard library

from datetime import datetime
import pickle
from pathlib import Path
import sys
import re
from enum import Enum
from collections import defaultdict
from pprint import pprint
import csv
from io import StringIO

Third-party imports

from flask_sqlalchemy import SQLAlchemy
from flask_user import UserManager, UserMixin, SQLAlchemyAdapter
from wtforms.validators import DataRequired
from wtforms.fields import StringField, BooleanField
from flask_user.forms import RegisterForm

Local imports

from app import app
BOOK_BINDER_LIB_PATH = str(Path(__file__).resolve().parent.parent)
sys.path.insert(0, BOOK_BINDER_LIB_PATH)
from book_binder_lib import GraderType, commentForExt

Setup

Initialize Flask-SQLAlchemy

db = SQLAlchemy(app)

Naming convention

Per http://alembic.zzzcomputing.com/en/latest/naming.html#integration-of-naming-conventions-into-operations-autogenerate, use a naming convention. See also http://docs.sqlalchemy.org/en/rel_1_1/core/constraints.html#constraint-naming-conventions.

meta = db.MetaData(naming_convention={
        "ix": 'ix_%(column_0_label)s',
        "uq": "uq_%(table_name)s_%(column_0_name)s",
        "ck": "ck_%(table_name)s_%(constraint_name)s",
        "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
        "pk": "pk_%(table_name)s"
      })

Database

The overall data structure is given below. All tables have an id (not shown). Any nested items use their parent’s id as a foreign key.

  • Book =
    • url = str
    • title = str
    • price = int (in pennies)
    • Page =
      • url = str
      • index = int (the ordering of this page in the gradebook)
      • Question =
        • label = str
        • index = int
        • pointsPossible = int
        • grader = GraderType
        • Interactive (possible addition):
          • Class
          • enabled = boolean
          • provideFeedback = boolean
        • Answer =
          • User
          • string = str
          • points = int
        • Feedback =
          • answer = str
          • feedback = str
          • points = int
      • Annotations =
    • Class
      • name = str
      • price = int (optional)
      • Instructor =
        • User
    • Author
      • User
  • User
    • username = str
    • password = str
    • reset_password_token = str
    • email = str
    • confirmed_at = DateTime
    • active = bool
    • first_name = str
    • last_name = str
    • type_ = UserType
    • Payment =

Copied from https://bitbucket.org/zzzeek/pycon2014_atmcraft, atmcraft.model.meta.schema.

class SurrogatePK():

A mixin that adds a surrogate integer ‘primary key’ column named id to any declarative-mapped class.

 
    id = db.Column(db.Integer, primary_key=True, index=True)

cascade='all, delete-orphan'
 

Annotation target types.

class TargetType(Enum):
    user = 0
    instructor_or_student = 1
    author = 2
 

User account types.

class UserType(Enum):

Either an instructor or a student.

    user = 0

Authors have access to the entire site and receive “report a problem” messages.

    author = 1

Admins have access to the entire site.

    admin = 2

class Book(db.Model, SurrogatePK):
    url = db.Column(db.Unicode(255), nullable=False, unique=True, index=True)
    title = db.Column(db.Unicode(255), default='', nullable=False)
    price = db.Column(db.Integer(), default=0, nullable=False)
    page = db.relationship('Page', lazy='dynamic', cascade=cascade, backref='book')
    class_ = db.relationship('Class_', lazy='dynamic', cascade=cascade, backref='book')
    author = db.relationship('Author', lazy='dynamic', cascade=cascade, backref='book')

    def __str__(self):
        return self.title

class Page(db.Model, SurrogatePK):
    book_id = db.Column(db.Integer(), db.ForeignKey('book.id'), nullable=False, index=True)
    url = db.Column(db.Unicode(1024), nullable=False)
    index = db.Column(db.Integer())
    question = db.relationship('Question', lazy='dynamic', cascade=cascade, backref='page')
    annotations = db.relationship('Annotations', lazy='dynamic', cascade=cascade, backref='page')

    def __str__(self):
        return self.url

db.Index('ix_page_book_id_url', Page.book_id, Page.url)

class Question(db.Model, SurrogatePK):
    page_id = db.Column(db.Integer(), db.ForeignKey('page.id'), nullable=False, index=True)
    label = db.Column(db.Unicode(255), nullable=False)
    index = db.Column(db.Integer(), nullable=False)
    pointsPossible = db.Column(db.Integer(), nullable=False)
    grader = db.Column(db.Enum(GraderType))
    answer = db.relationship('Answer', lazy='dynamic', cascade=cascade, backref='question')
    feedback = db.relationship('Feedback',  lazy='dynamic', cascade=cascade, backref='question')

    def __str__(self):
        return self.label

db.Index('ix_question_page_id_label', Question.page_id, Question.label)

class Answer(db.Model, SurrogatePK):
    question_id = db.Column(db.Integer(), db.ForeignKey('question.id'), nullable=False, index=True)
    user_id = db.Column(db.Integer(), db.ForeignKey('user.id'), nullable=False)
    user = db.relationship('User', backref='answer')
    string = db.Column(db.UnicodeText(), nullable=False)
    points = db.Column(db.Integer(), nullable=False)

    def __str__(self):
        return self.string

    def __repr__(self):
        return '<Answer id={}, question_id={}, user_id={}, string={}, points={}>'.format(self.id, self.question_id, self.user_id, self.string, self.points)

db.Index('ix_answer_question_id_user_id', Answer.question_id, Answer.user_id)

class Feedback(db.Model, SurrogatePK):
    question_id = db.Column(db.Integer(), db.ForeignKey('question.id'), nullable=False, index=True)
    answer = db.Column(db.Unicode(255), nullable=False)
    feedback = db.Column(db.UnicodeText(), nullable=False)
    points = db.Column(db.Integer(), nullable=False)

    def __str__(self):
        return self.feedback

class Annotations(db.Model, SurrogatePK):
    page_id = db.Column(db.Integer(), db.ForeignKey('page.id'), nullable=False, index=True)
    user_id = db.Column(db.Integer(), db.ForeignKey('user.id'), nullable=False)
    user = db.relationship('User', backref='annotations')
    annotation = db.relationship('Annotation', lazy='dynamic', cascade=cascade, backref='annotations')

    def __repr__(self):
        return '<Annotations id={}, page_id={}, user_id={}>'.format(self.id, self.page_id, self.user_id)

db.Index('ix_Annotations_page_id_user_id', Annotations.page_id, Annotations.user_id)

class Annotation(db.Model, SurrogatePK):
    annotations_id = db.Column(db.Integer(), db.ForeignKey('annotations.id'), nullable=False, index=True)
    data = db.Column(db.UnicodeText(), nullable=False)
    target = db.Column(db.Enum(TargetType), nullable=False)

class Class_(db.Model, SurrogatePK):
    book_id = db.Column(db.Integer(), db.ForeignKey('book.id'), nullable=False, index=True)
    name = db.Column(db.Unicode(255))
    price = db.Column(db.Integer())
    instructor = db.relationship('Instructor', lazy='dynamic', cascade=cascade, backref='class_')

    def __str__(self):
        return self.name

class Instructor(db.Model, SurrogatePK):
    class_id = db.Column(db.Integer(), db.ForeignKey('class_.id'), nullable=False, index=True)
    user_id = db.Column(db.Integer(), db.ForeignKey('user.id'), nullable=False)

This relationship is special – not a parent/child, so leave off typical attributes (which makes SQLAlchemy complain).

    user = db.relationship('User', backref='instructor')

    def __str__(self):
        return db.session.query(User.username).filter(User.id==self.user_id).scalar()

db.Index('ix_Instructor_class_id_user_id', Instructor.class_id, Instructor.user_id)

class Author(db.Model, SurrogatePK):
    book_id = db.Column(db.Integer(), db.ForeignKey('book.id'), nullable=False, index=True)
    user_id = db.Column(db.Integer(), db.ForeignKey('user.id'), nullable=False)
    user = db.relationship('User', backref='author')

    def __str__(self):
        return db.session.query(User.username).filter(User.id==self.user_id).scalar()

db.Index('ix_Author_book_id_user_id', Author.book_id, Author.user_id)
 

Define the User data model. Make sure to add flask.ext.user UserMixin !!!

class User(db.Model, UserMixin, SurrogatePK):

User authentication information

    username = db.Column(db.Unicode(50), nullable=False, unique=True)
    password = db.Column(db.Unicode(255), nullable=False, default='')
    reset_password_token = db.Column(db.Unicode(100), nullable=False, default='user')
 

User email information

    email = db.Column(db.Unicode(255), nullable=False, unique=True)
    confirmed_at = db.Column(db.DateTime())
 

User information

    active = db.Column('is_active', db.Boolean(), nullable=False, default='0')
    first_name = db.Column(db.Unicode(100), nullable=False, default='')
    last_name = db.Column(db.Unicode(100), nullable=False, default='')
 

Type of account

    type_ = db.Column(db.Enum(UserType), default='user', nullable=False)
 

Ownership

    payment = db.relationship('Payment', lazy='dynamic', cascade=cascade, backref='user')

    def __str__(self):
        return self.username


    def __repr__(self):
        return '<User id={}, username={}, password={}, reset_password_token={}, email={}, confirmed_at={}, active={}, first_name={}, last_name={}, type_={}'.format(self.id, self.username, self.password, self.reset_password_token, self.email, self.confirmed_at, self.active, self.first_name, self.last_name, self.type_)

class Payment(db.Model, SurrogatePK):
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False, index=True)
    class_id = db.Column(db.Integer, db.ForeignKey('class_.id'), nullable=False)
    class_ = db.relationship('Class_', backref='class_')
    charge_id = db.Column(db.Unicode(255))

    def __repr__(self):
        return '<Payment id={}, user_id={}, class_id={}, charge_id={}'.format(self.id, self.user_id, self.class_id, self.charge_id)

db.Index('ix_Payment_user_id_class_id', Payment.user_id, Payment.class_id)

Setup with models

Define a custom registeration form with extra fields. See https://pythonhosted.org/Flask-User/customization.html#registration-form.

class MyRegisterForm(RegisterForm):
    first_name = StringField('First name', validators=[DataRequired('First name is required.')])
    last_name = StringField('Last name',  validators=[DataRequired('Last name is required.')])
    agree = BooleanField('agree', validators=[DataRequired('You must agree to the site policy to register.')])
 

Setup Flask-User.

Register the User model.

db_adapter = SQLAlchemyAdapter(db, User)

Initialize Flask-User.

user_manager = UserManager(db_adapter, app, register_form=MyRegisterForm)

Utilities

def make_user(username, type_):
    if not db.session.query(User).filter(User.username == username).all():
        user = User(
            username=username,
            password=app.user_manager.hash_password('default'),
            email=username + '@foo.com',
            confirmed_at=datetime.utcnow(),
            active=True,
            type_=type_)
        db.session.add(user)
        db.session.commit()
    else:
        user = db.session.query(User).filter(User.username == username).one()
    return user

def create_db():

Create all database tables

    db.create_all()
 

Given a data structure from Sphinx and an existing database, the purpose of this function is to change to database to mirror the Sphinx structure. The basic approach is to walk the Sphinx data, mirroring it in the ORM and deleting anything in the database we didn’t see.

def sphinxImport(

True to update the database unconditionally; when False, asks the user before commiting the changes.

  commitDbChanges=False,

True to back-annotate changes to Sphinx source files unconditionally; when False, asks the user before making the changes.

  performBackAnnotation=False):

    with open('interactive_questions.pickle', 'rb') as f:
        iq = pickle.load(f)
 

Split the pages from Sphinx into books and pages in that book.

    books = defaultdict(dict)
    for docname in iq.pages:

Check for some hand eliminations.

        if (docname.startswith('2016-spring/')
            or docname.startswith('2015-fall/')
            or docname.startswith('2016-summer/')):
            continue

Check for non-book pages first.

        if Path(docname) in app.config['NON_BOOK_FILES']:
            books[''][docname] = iq.pages[docname]
            continue

Split into book and page then add them.

        try:
            book_url, page_url = docname.split('/', 1)
        except ValueError as e:
            print('Cannot split {}: {}.'.format(docname, e))
        else:
            books[book_url][page_url] = iq.pages[docname]
 

Optionally display the data loaded for debug purposes

    #pprint(iq)
    #pprint(books)
 

Walk the Sphinx tree. Keep a list IDs which we’ve seen.

    foundBookIds = []
    foundPageIds = []
    foundQuestionIds = []
    foundFeedbackIds = []
    mw = ModificationWatcher()

For every book:

    for book_url, pages in books.items():

Look for the database book with the given name. If not found, create a new book.

        def db_book_query():
            return db.session.query(Book).filter(Book.url == book_url).one_or_none()
        db_book = db_book_query()
        if db_book is None:
            print('Warning: creating a new book with URL = "{}". Make sure this is correct!'.format(book_url))
            db_book = Book()

Update the book’s data.

        db_book.url = book_url

Mark this book as visited. See foundQuestionIds.

        foundBookIds.append(db_book_query().id)
 

Walk every page of this book.

        for page_url, page in pages.items():

Look for the database with the given ID, if present; if not, search by the URL. If not found, create a new page.

            db_page = None
            if page.id_ is not None:
                db_page = db.session.query(Page).filter(Page.id == page.id_).one_or_none()

Look for renames and note them.

                if db_page and (page_url != db_page.url):
                    print('Renaming {} to {} based on page id.'.format(page_url, db_page.url))
            def db_page_query():
                return db.session.query(Page).filter(Page.book_id == db_book.id).filter(Page.url == page_url).one_or_none()
            if db_page is None:
                db_page = db_page_query()
            if db_page is None:
                db_page = Page()
                db_book.page.append(db_page)

Update the page’s data.

            db_page.url = page_url

Not all pages have an index.

            if hasattr(page, 'index'):
                db_page.index = page.index
            else:
                db_page.index = None

Mark this page as visited. See foundQuestionIds.

            foundPageIds.append(db_page_query().id)
 

Walk every question on this page.

            question_index = 0
            for questionId, question in page.questions.items():

Look for the question with the given HTML ID (in database terms, it’s label).

                def db_question_query():
                    return db.session.query(Question).filter(Question.page_id == db_page.id).filter(Question.label == questionId).one_or_none()
                db_question = db_question_query()
                if db_question is None:
                    db_question = Question()
                    db_page.question.append(db_question)

Update this question’s data.

                db_question.label = questionId
                db_question.index = question_index
                db_question.pointsPossible = question.pointsPossible
                db_question.grader = question.grader
                question_index += 1

foundQuestionIds: Mark this question as visited. We can’t rely on db_question.id, since a newly created question’s id will be None. Instead, query for it to get its id.

                foundQuestionIds.append(db_question_query().id)
 

Walk every feedback of this question.

                for answer, feedback in question.feedback.items():

Look for the feedback with the given answer. It not found, create new feedback.

                    def db_feedback_query():
                        return db.session.query(Feedback).filter(Feedback.question_id == db_question.id).filter(Feedback.answer == answer).one_or_none()
                    db_feedback = db_feedback_query()
                    if db_feedback is None:
                        db_feedback = Feedback()
                        db_question.feedback.append(db_feedback)

Update the feedback’s data.

                    db_feedback.answer = answer
                    db_feedback.feedback = feedback.string
                    db_feedback.points = feedback.points

Mark this feedback as visited. See foundQuestionIds.

                    foundFeedbackIds.append(db_feedback_query().id)
 

Merge all changes made to this book and all its relationships.

        db.session.merge(db_book)
 
    if mw.is_modified:
        print('Items (books/pages/questions/feedback) will be added to the database.')
 

Delete any items we didn’t see.

    query_delete(db.session.query(Feedback).filter(~Feedback.id.in_(foundFeedbackIds)))
    query_delete(db.session.query(Question).filter(~Question.id.in_(foundQuestionIds)))
    query_delete(db.session.query(Page).filter(~Page.id.in_(foundPageIds)))
    query_delete(db.session.query(Book).filter(~Book.id.in_(foundBookIds)))
 

Look for any user data that will be deleted.

    userDataDeleted = 0
    for x in db.session.deleted:
        if isinstance(x, Answer) or isinstance(x, Annotations):
            if userDataDeleted == 0:
                print('User data will be deleted! Some of the deletions:')
            userDataDeleted += 1
            if userDataDeleted < 20:
                print(x)
    if userDataDeleted > 0:
        print('A total of {} records of user data will be deleted.'.format(userDataDeleted))
 

Do the work (if changes have been made).

    if mw.is_modified:
        if not commitDbChanges:
            choice = input('Make these changes to the database? ')
            commitDbChanges = choice.lower() in ('y', 'yes')
        if commitDbChanges:
            db.session.commit()
        else:
            db.session.rollback()
            return
    mw.remove()
 

Back-annotate the page IDs to the Sphinx source. Skip for now.

    return
    for page in db.session.query(Page).all():

Determine the path to the HTML file for this page.

        bookUrl = db.session.query(Book.url).filter(Book.id == page.book_id).scalar()

All paths are stored Unix-style by Sphinx; join them with a / unless this is the empty book.

        pagePath = bookUrl
        if pagePath:
            pagePath += '/'
        pagePath += page.url

See if the Sphinx ID is different or missing.

        sphinxPageId = iq.pages[pagePath].id_
        if sphinxPageId != page.id:

Find the path to the source file from which the HTML was generated.

            sourcePath = app.config['SOURCE_ROOT_PATH'] / iq.source_paths[pagePath]

Prepare a string to search for or write.

            inlineComment = commentForExt(sourcePath)
            pageIdRegex = (

The beginning of a line followed by zero or more whitespaces, which also beings group 1 of the match,

                '^(\s*'

then the inline comment marker,

                + inlineComment

then zero or more spaces,

                + '\s*'

then the .. pageid:: directive,

                '\.\.\s+page-id::'

then one or more spaces, ending group 1,

                ' +)'

then the id, which consists of one or more digits and defines group 2,

                '(\d+)'

then zero or more spaces, defining group 3, which ends the line.

                '( *)$'
            )

Update the ID or append it to the end of this file.

            with sourcePath.open('r+', encoding='utf-8') as f:
                txt = f.read()
                m = re.search(pageIdRegex, txt, re.ASCII)
                if m is None:

If we didn’t find the page ID, append it to the end of the file.

                    newTxt = txt + '\n{}.. page-id:: {}'.format(inlineComment, page.id)
                else:

Replace the old page ID.

                    newTxt = re.sub(pageIdRegex, '\1{}\3'.format(page.id), txt, flags=re.ASCII)
                    assert newTxt != txt
 

Check for permission to update the file.

                if not performBackAnnotation:
                    choice = input('Back-annotate page IDs from the database to the Sphinx source? ')
                    if choice.lower() not in ('y', 'yes'):
                        return
                    else:
                        performBackAnnotation = True

Update the file.

                f.seek(0)
                f.write(newTxt)
                f.truncate()
 

Given a database query, delete all items which match this query.

def query_delete(query):
    for x in query:
        db.session.delete(x)

class ModificationWatcher:
    def __init__(self):
        self.is_modified = False

Keep track of any flushes, which indicates a modified session. See http://docs.sqlalchemy.org/en/latest/core/event.html#sqlalchemy.event.listen.

        db.event.listen(db.Session, 'after_flush', self.on_after_flush)
 
    def on_after_flush(self, session, flush_context):
        if not self.is_modified:

Return True if a the session contains unflushed transactions. Copied from https://github.com/giomasce/liturgy/blob/master/database.py#L19 based on the comments at http://stackoverflow.com/a/16286946.

            self.is_modified = (
                any(session.new) or any(session.deleted)
                or any([x for x in session.dirty if session.is_modified(x)])
            )
 

Stop tracking modifications.

    def remove(self):
        db.event.remove(db.Session, 'after_flush', self.on_after_flush)

def csv_export(book_url, class_id):

First, build a list of field names and headings for the export.

    field_names_query = (

Create a tables with the following fields...

        db.session.query(Page.id, Page.url, Question.id, Question.label, Question.index, Question.pointsPossible).

by finding this book...

        select_from(Book).filter(Book.url == book_url).

then all pages with an index in the book, sorting by index...

        join(Page).filter(Page.index != None).order_by(Page.index).

then all questions on each page, sorted by their index...

        join(Question).order_by(Question.index)
    )
    field_names_list = ['email', 'last_name', 'first_name']
    page_titles = dict(first_name='Page URL')
    headings = dict(email='e-mail', last_name='Last name', first_name='First name')
    pointsPossible = dict(first_name='Points possible')
    for page_id, page_url, question_id, question_label, question_index, question_pointsPossible in field_names_query:

Dict keys are page_id.question_id

        key = '{}.{}'.format(page_id, question_id)
        field_names_list.append(key)

One heading row has page URLs on the first question of each page.

        if question_index == 0:
            page_titles[key] = page_url

The next row has a question label for each entry.

        headings[key] = question_label

The next row gives the points possible for each entry.

        pointsPossible[key] = question_pointsPossible
 

Write this to a CSV.

    csvfile = StringIO()
    writer = csv.DictWriter(csvfile, field_names_list)
    writer.writerow(page_titles)
    writer.writerow(headings)
    writer.writerow(pointsPossible)
 

Now, get a table of users in this class.

    users_in_class = (

Gather information about a user...

        db.session.query(User.id, User.email, User.last_name, User.first_name).

for all users in the current class.

        join(Payment).filter(Payment.class_id == class_id)
    )

For each of these users, obtain their grades.

    for user_id, user_email, user_last_name, user_first_name in users_in_class:
        cols = (

To do an export, build a table with the following data:

            db.session.query(Page.id, Question.id, Answer.points).

by finding this book...

            select_from(Book).filter(Book.url == book_url).

then all pages with an index in the book, sorting by index...

            join(Page).filter(Page.index is not None).order_by(Page.index).

then all questions on each page, sorted by their index...

            join(Question).order_by(Question.index).

Find all students that answered this question, ordering them by last name.

           join(Answer).join(User).filter(User.id == user_id)
        )
        row_data = dict(email=user_email, last_name=user_last_name, first_name=user_first_name)
        for page_id, question_id, answer_points in cols:
            row_data['{}.{}'.format(page_id, question_id)] = answer_points
        writer.writerow(row_data)

    return csvfile.getvalue()
 

Uncomment to create an admin.

#make_user('admin', UserType.admin)