book_webapp.py - A web application hosting the book website

Imports

These are listed in the order prescribed by PEP 8.

Standard library

import sys
import os
from pathlib import Path, PurePosixPath
from functools import wraps
import re
import json
from tempfile import TemporaryDirectory
import subprocess
import ast
from numbers import Number

Third-party imports

from sqlalchemy.orm.exc import NoResultFound
from flask import (request, redirect, abort, current_app,
    render_template, render_template_string, flash, send_from_directory,
    jsonify, safe_join, make_response)
from flask_mail import Mail
from flask_user import current_user
from flask_admin import Admin
from flask_admin.contrib import sqla
import stripe

Local imports

from app import app
from models import (
    db, Book, Page, Question, Answer, Feedback, Annotations, Annotation, Class_,
    Instructor, Author, User, Payment, create_db, UserType, sphinxImport,
    GraderType, csv_export, BOOK_BINDER_LIB_PATH
)
sys.path.insert(0, BOOK_BINDER_LIB_PATH)
from book_binder_lib import get_sim_str, STUDENT_SOURCE_PATH, code_here_comment

Setup

Initialize Flask-mail.

mail = Mail(app)

Decorators

Determine if the url should be accessible by the current user. Redirct if the content isn’t accessible; otherwise, pass it on to the view it decorates.

def unauthorized_redirect(fn):
    @wraps(fn)
    def decorated_view(url=None):
        if url is None:

If the URL was’t provided, determine it. When provided, the url omits the leading slash; do the same here. (For example, @app.route('/<path:url>') – note the leading slash isn’t part of the URL.

            url = request.path[1:]
            assert request.path[0] == '/'

See if this content is free.

        if Path(url) in app.config['NON_BOOK_FILES']:
            return fn(url)

Check that the current user is logged in.

        if not current_user.is_authenticated:
            return app.login_manager.unauthorized()

Grant authors or admins access to everything.

        if current_user.type_ in (UserType.author, UserType.admin):
            return fn(url)

Split out the book and path within the book.

        try:
            book_url, page_url = url.split('/', 1)

If these are static files, they aren’t in the NON_BOOK_FILES list. Therefore, treat them like book files from a permissions perspective: permissions for book_one and static_/book_one are the same.

            if book_url == str(app.config['STATIC_PATH']):
                book_url, page_url = page_url.split('/', 1)
        except ValueError:

If we can’t, allow content since the user is authenticated.

            return fn(url)
 

See if the current user owns this book or is an instructor for it.

        try:

Find the book this book_url refers to.

            book_query = db.session.query(Book).filter(Book.url == book_url)

See if this user is an instructor for this book.

            is_instructor = (

Find the book this book_url refers to...

                book_query.

look in each class of this book...

                join(Class_).

and see if any of the instuctor IDs match this user’s id.

                join(Instructor).filter(Instructor.user_id == current_user.id).one_or_none()
            )
            if is_instructor:
                return fn(url)
 

See if this user owns this book. If the book doesn’t exist, return a 404.

            try:
                book = book_query.one()
            except NoResultFound:
                abort(404)
            payment = (

Look in the current user’s Payment info...

                db.session.query(Payment).filter(Payment.user_id == current_user.id).

then determine which classes they own...

                join(Class_, Payment.class_id == Class_.id).

and choose only classes which belong to the current book.

                filter(Class_.book_id == book.id).one()
            )
            assert payment is not None
        except NoResultFound:

Ask the user to buy the book.

            flash('Please purchase this book in order to view its contents.')
            return redirect(app.config['SELECT_CLASS_URL'])
        return fn(url)

    return decorated_view
def page_exists(fn):
    @wraps(fn)
    def decorated_view(url):

See if the requested page exists. Invalid file names (see below) raise an exception; catch this:

[2017-01-03 11:54:17,923] ERROR in app: Exception on /build/Flask/account/index.htmlc//:booger [GET]
(omitted traceback)
OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect: 'E:\\ece3724\\ece3724_inclass\\_build\\html\\build\\Flask\\account\\index.htmlc\\:booger'
        try:
            assert (app.config['WEB_ROOT_PATH'] / url).is_file()
        except:
            abort(404)
        return fn(url)

    return decorated_view

Taken from http://flask.pocoo.org/snippets/93/.

def ssl_required(fn):
    @wraps(fn)
    def decorated_view(*args, **kwargs):
        if current_app.config.get("SSL"):
            if request.is_secure:
                return fn(*args, **kwargs)
            else:
                return redirect(request.url.replace("http://", "https://"))

        return fn(*args, **kwargs)

    return decorated_view

Gradebook

See http://stackoverflow.com/a/22966127.

@app.context_processor
def grade_processor():
    def grade(url):
        try:
            book_url, page_url = url.split('/', 1)

            pageId = (

Find the page this URL refers to...

                db.session.query(Page.id).

beginning by locating the book...

                select_from(Book).filter(Book.url == book_url).

and the page...

                join(Page).filter(Page.url == page_url).scalar()
            )

Find all questions on this page.

            num_possible = (

Sum all the pointsPossible fields in the questions...

                db.session.query(db.func.sum(Question.pointsPossible)).

which are on the current page.

                filter(Question.page_id == pageId).scalar()
            )

            num_correct = (

Find the sum of the points...

                db.session.query(db.func.sum(Answer.points)).

beginning by locating the questions on this page...

                select_from(Question).filter(Question.page_id == pageId).

Look for all answers provided by this user...

                join(Answer).filter(Answer.user_id == current_user.id).

and return their sum.

                scalar()
            )

Replace None with zero.

            if num_correct is None:
                num_correct = 0

            return '{}/{}'.format(num_correct, num_possible)
        except (NoResultFound, ValueError):
            return ''

    return {'grade' : grade}

Export grades to CSV

@app.route('/csv')
@unauthorized_redirect
def csv_download(url):
    class_id = request.args.get('classId')
    book_url = request.args.get('book_url')

Only allow for instructors of this book or admins.

    instructor_query = db.session.query(User.id).select_from(Book).filter(Book.url == book_url).join(Class_).filter(Class_.id == class_id).join(Instructor).filter(Instructor.user_id == current_user.id).join(User)
    if current_user.type_ != UserType.admin and instructor_query.one_or_none() is None:
        return('Must be an instructor or admin.')

Return the requested CSV file.

    response = make_response(csv_export(book_url, class_id))
    response.headers["Content-Disposition"] = "attachment; filename=grades.csv"
    return response

Misc views

Route based on a user’s permissions.

@app.route('/<path:url>')
@unauthorized_redirect
@page_exists
def show_book(url):
    if ( url.startswith(str(app.config['STATIC_PATH']) + '/') or
        url.startswith(str(app.config['IMAGES_PATH']) + '/') ):

For static files, use send_from_directory to correctly handle MIME types; otherwise, CSS files are boogered. This also provides security by not serving files above web_root_path and returning a 404 for non-existant files.

        return send_from_directory(str(app.config['WEB_ROOT_PATH']), url)
    else:

Use templates non-static content.

        return render_template(url)
 

Provide a default home page

@app.route('/')
def show_index():
    return show_book('index.html')
 

Provide a nice error page. See http://flask.pocoo.org/docs/0.11/quickstart/#redirects-and-errors.

@app.errorhandler(404)
def page_not_found(error):

Sphinx generates pages with a fixed path. However, the 404 page’s path will be different. So, do some (ugly) edits to make this one page’s paths relative.

First, find the relative path from where the 404 occurred to where page_not_found.html lives.

    rp = os.path.relpath('/', start=request.path).replace('\\', '/')

Read in page_not_found.html.

    with app.config['PAGE_NOT_FOUND_PATH'].open(encoding='utf-8') as f:
        html = f.read()

Now, go and do some replacements to use this path instead. Some paths are absolute (begin with http:// or https://), and some are relative. We only want to replace relative paths, so look for ../, the omits the first three characters of the relative path. What an ugly kludge.

    html = html.replace('href="../', 'href="../{}/'.format(rp[3:]))
    html = html.replace('src="../', 'src="../{}/'.format(rp[3:]))
    return render_template_string(html), 404

Questions

Given a question and an answer string, return a tuple of:

  1. The points corresponding to this answer.
  2. The total points possible for this question.
  3. The feedback string corresponding to this answer.
def find_feedback(question, answer_string, url):

Look up the feedback given this answer.

    allFeedback = (

Look for the feedback to the current question.

        db.session.query(Feedback).filter(Feedback.question_id == question.id)
    )

Look at the grader to be used for this question.

    if question.grader in (GraderType.exact, GraderType.ws_ignore):

Remove whitespace, then look for case-insensitive answer.

        if question.grader == GraderType.ws_ignore:
            answer_string = ' '.join(answer_string.split())
        feedback = (

Starting with feedback to the current question, ...

            allFeedback.

find feedback which matches the given answer.

            filter(db.func.lower(Feedback.answer) == db.func.lower(answer_string)).one_or_none()
        )
    elif question.grader in (GraderType.regexp, GraderType.regexp_nocase):
        flags = re.IGNORECASE if question.grader == GraderType.regexp_nocase else 0

Look through each given feedback regexp.

        for feedback in allFeedback:

If it matches, stop.

            if re.search(feedback.answer, answer_string, flags):
                break
        else:

No match found. Record this.

            feedback = None
    elif question.grader == GraderType.code:

The answer_string is a JSON-encoded array of code snippets.

        code_snippets = json.loads(answer_string)

Perform the test.

        str_, is_correct = do_test(code_snippets, url)

Wrap this as feedback.

        feedback = Feedback(answer='', feedback=str_, points=question.pointsPossible if is_correct else 0)
    elif question.grader == GraderType.number:
        answer_num = to_num(answer_string)
        for feedback in allFeedback:

Look for a max/min or just a single number.

            vals = [to_num(x) for x in feedback.answer.split('-')]

Duplicate a single number to make max == min.

            if len(vals) == 1:
                vals.append(vals[0])
            min_val, max_val = sorted(vals[0:2])

Look for a match.

            if answer_num >= min_val and answer_num <= max_val:

Found it!

                break
        else:

No match found. Record this.

            feedback = None
    else:

Not implemented.

        abort(404)
 

If there is none, look for feedback for non-matching answers.

    if not feedback:
        feedback = (

Starting with feedback to the current question, ...

            allFeedback.

look for feedback that applies to wrong (unmatched) answers.

            filter(Feedback.answer == '').one_or_none()
        )

If this isn’t available, provide default feedback.

    if not feedback:
        feedback = Feedback(answer='', feedback='Incorrect.', points=0)

    return feedback.points, question.pointsPossible, feedback.feedback

to_num

Convert a string to a number. Accepts decimal, scientific notation, complex numbers, floating point special values (NaN, inf, etc.), hex, binary (0b0100...), octal (0o7523...), hex (0xAB12...). If no conversion is possible, return NaN. (None isn’t sortable, so don’t use that.)

def to_num(string):

Parse the number. Note that int('0x34') produces ValueError: invalid literal for int() with base 10: '0x34'; we need another approach. Use AST to avoid the danger of an eval. See https://docs.python.org/3/library/ast.html#ast.literal_eval and https://www.python.org/dev/peps/pep-3127/.

    try:
        num = ast.literal_eval(string)
        assert isinstance(num, Number)
    except (ValueError, AssertionError, SyntaxError):

Catch parse errors / invalid types such as sets, lists, etc:

  • ValueError: converting nonsense (“foo”) to a number.
  • SyntaxError: converting an empty string to a number.
  • AssertionError: if the literal isn’t a number (from the assert above).
        num = float('NaN')
    return num
 

Perform a unit test and return the results.

This function returns the tuple:

  1. A string giving output produced by the build and run processes.
  2. True if the test succeeded; False otherwise.
def do_test(

A list of strings, giving code snippets to be substituted into the test code.

  code_snippets,

The URL of the page that requested the test.

  url):
 

Find the relative path to the source file. Use with_suffix('') to remove the .html suffix from the url.

    relative_source_path = PurePosixPath(url).with_suffix('')

First, determine the path to the requested file. Note that safe_join requires the second parameter as a Posix-style path.

    source_path = safe_join(str(app.config['WEB_ROOT_PATH'] / STUDENT_SOURCE_PATH), str(relative_source_path))
 

Next, read the source in for the program the student is working on.

    try:
        with open(source_path, encoding='utf-8') as f:
            source_str = f.read()
    except:
        abort(404)
 

Create a snippet-replaced version of the source, by looking for “put code here” comments and replacing them with the provided code. To do so, first split out the “put code here” comments.

    split_source = source_str.split(code_here_comment(source_path))

Sanity check! Source with n “put code here” comments splits into n+1 items, into which the n student code snippets should be interleaved.

    if len(split_source) - 1 != len(code_snippets):
        abort(404)

Interleave these with the student snippets.

    interleaved_source = [None]*(2*len(split_source) - 1)
    interleaved_source[::2] = split_source
    interleaved_source[1::2] = platform_edit(code_snippets, source_path)

Join them into a single string. Make sure newlines separate everything.

    source_str = '\n'.join(interleaved_source)
 

Create a temporary directory, then write the source there.

    with TemporaryDirectory() as temp_path:

Debug: make this dir visible.

        #temp_source_path = Path('debug').resolve() / Path(source_path).name
        temp_source_path = Path(temp_path) / Path(source_path).name
        with temp_source_path.open('w', encoding='utf-8') as f:
            f.write(source_str)
        return platform_do_test(temp_source_path, relative_source_path)
 

This function should take a list of code snippets and modify them to prepare for the platform-specific compile. For example, add a line number directive to the beginning of each.

def platform_edit(

A list of code snippets submitted by the user.

  code_snippets,

The name of the source file into which these snippets will be inserted.

  source_path):
 

Prepend a line number directive to each snippet. I can’t get this to work in the assembler. I tried:

  • From Section 4.11 (Misc directives):
    • .appline 1
    • .ln 1 (produces the message Error: unknown pseudo-op: `.ln'. But if I use the assembly option -a, the listing file show that this directive inserts line 1 of the source .s file into the listing file. ???
    • .loc 1 1 (trying .loc 1, 1 produces Error: rest of line ignored; first ignored character is `,')
  • From Section 4.12 (directives for debug information):
    • .line 1. I also tried this inside a .def/.endef pair, which just produced error messages.

Perhaps saving each snippet to a file, then including them via .include would help. Ugh.

Select that to prepend based on the language.

    ext = Path(source_path).suffix
    if ext == '.c':
        fmt = '#line 1 "box {}"\n'
    elif ext == '.s':
        fmt = ''
    else:
        assert False
    return [fmt.format(index + 1) + code_snippets[index] for index in range(len(code_snippets))]
 

Transform the arguments to subprocess.run into a string showing what command will be executed.

def subprocess_string(*args, **kwargs):
    return kwargs.get('cwd', '') + '% ' + ' '.join(args[0]) + '\n'
 

This function should run the provided code and report the results. It will vary for a given compiler and language.

def platform_do_test(

The Path of the file which contains code to test. The file resides in a temporary directory, which should be used to hold any additional files produced by the test.

  file_path,

The Path to the source file, relative to the root of the project.

  relative_source_path):
 

Assemble the source.

    waf_root = app.config['WEB_ROOT_PATH'].parent / 'waf'
    sp_args = dict(
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        universal_newlines=True,
        cwd=str(waf_root),
    )
    o_path = str(file_path) + '.o'
    if file_path.suffix == '.s':
        args = [str(app.config['XC16_PATH'] / 'xc16-as'), '-omf=elf', '-g', '--processor=33EP128GP502', str(file_path), '-o' + o_path]
    elif file_path.suffix == '.c':
        args = [str(app.config['XC16_PATH'] / 'xc16-gcc'), '-mcpu=33EP128GP502', '-omf=elf', '-g', '-O0', '-msmart-io=1', '-Wall', '-Wextra', '-Wdeclaration-after-statement', '-I' + str(app.config['SOURCE_ROOT_PATH'] / 'lib/include'), '-I' + str(app.config['SOURCE_ROOT_PATH'] / 'book'), '-I' + str(app.config['SOURCE_ROOT_PATH'] / relative_source_path.parent), '-I' + str(app.config['XC16_PATH'].parent / 'include'), str(file_path), '-c', '-o' + o_path]
    else:
        return 'Unknown file extension in {}.'.format(file_path), False
    out = subprocess_string(args, **sp_args)
    cp = subprocess.run(args, **sp_args)
    out += cp.stdout + cp.stderr

Stop if there are errors.

    if cp.returncode:
        return out, False
 

Link.

    elf_path = str(file_path) + '.elf'
    test_object_path = waf_root / relative_source_path.with_name(relative_source_path.stem + '-test.c.1.o')
    args = [str(app.config['XC16_PATH'] / 'xc16-gcc'), '-omf=elf', '-Wl,--heap=100,--stack=16,--check-sections,--data-init,--pack-data,--handles,--isr,--no-gc-sections,--fill-upper=0,--stackguard=16,--no-force-link,--smart-io,--no-cpp', '-Wl,--script=' + str(app.config['SOURCE_ROOT_PATH'] / 'lib/lkr/p33EP128GP502_bootldr.gld'), str(test_object_path), o_path, 'lib/src/pic24_clockfreq.c.1.o', 'lib/src/pic24_configbits.c.1.o', 'lib/src/pic24_serial.c.1.o', 'lib/src/pic24_timer.c.1.o', 'lib/src/pic24_uart.c.1.o', 'lib/src/pic24_util.c.1.o', 'book/test_utils.c.1.o', '-o' + elf_path, '-Wl,-Bstatic', '-Wl,-Bdynamic']
    out += '\n' + subprocess_string(args, **sp_args)
    cp = subprocess.run(args, **sp_args)
    out += cp.stdout + cp.stderr

Stop if there are errors.

    if cp.returncode:
        return out, False
 

Simulate. Create the simulation commands.

    simout_path = str(file_path) + '.simout'
    ss = get_sim_str('dspic33epsuper', elf_path, simout_path)

Run the simulation. This is a re-coded version of wscript.sim_run – I couldn’t find a way to re-use that code.

    sim_ret = 0
    args = [str(app.config['XC16_PATH'] / 'sim30')]
    out += '\n' + subprocess_string(args, **sp_args)
    try:
        cp = subprocess.run(args, input=ss, timeout=1, **sp_args)
        sim_ret = cp.returncode
        timeout_str = ''
    except (subprocess.TimeoutExpired) as e:
        sim_ret = 1

Report the exception (such as a timeout) in addition to the simulator output.

        timeout_str = '\n\n' + str(e) + '\n' + '*'*80 + '\n'

Check the output.

    with open(simout_path, encoding='utf-8') as f:
        out += f.read().rstrip()

Put the timeout string at the end of all the simulator output.

    out += timeout_str
    return out, not sim_ret and out.endswith('Correct.')
 
 

Given a URL, split it into a book and page, or abort if the URL doesn’t contain both.

def book_and_page(url):
    try:
        book_url, page_url = url.split('/', 1)
    except ValueError:
        abort(404)
    return book_url, page_url

@app.route('/<path:url>/questions')
@unauthorized_redirect
@page_exists

Given a URL, this function either stores then grades the answer provided in the request, or simply retrieves the existing grades there is no request. The actual grading is performed by find_feedback.

def question_reply(url):
    book_url, page_url = book_and_page(url)
 

Process the request.

    try:

See if we’ve saving and checking data or just reading back what data is available.

        if request.args:
            result = {}

We have arguments – save and check data.

            for question_label, new_answer_string in request.args.items():

Find this question.

                question = (

Look for the question...

                    db.session.query(Question).

in the current book...

                    select_from(Book).filter(Book.url == book_url).

on the current page of this book...

                    join(Page).filter(Page.url == page_url).

whose label matches current question.

                    join(Question).filter(Question.label == question_label).one()
                )
 

Get feedback for the answer provided.

                points, pointsPossible, feedbackString = find_feedback(question, new_answer_string, url)
 

Save the answer provided. First, see if an answer already exists.

                answer = (

Look for the answer...

                    db.session.query(Answer).

to the current question...

                    select_from(Question).filter(Question.id == question.id).

which came from the current user.

                    join(Answer).filter(Answer.user_id == current_user.id).scalar()
                )

If it doesn’t, create one.

                if not answer:
                    answer = Answer(user_id=current_user.id, question_id=question.id)
                    db.session.add(answer)

Update the answer and save it.

                answer.string = new_answer_string
                answer.points = points
                db.session.commit()
 

Record grading info to return.

                result[question_label] = [points, pointsPossible, feedbackString]
            return jsonify(result)
        else:

No arguments – load data. First, produce a dict of all answers for this user.

            result = {}
            questions = (

Look for all the questions...

                db.session.query(Question).

in the current book...

                select_from(Book).filter(Book.url == book_url).

on the current page of the current book.

                join(Page).filter(Page.url == page_url).join(Question))

Preferred syntax: Book[book_url].page[page_url].question

Walk through each question.

            for question in questions:

See if this user has an answer for the current question.

                answer = (

Retrieve the Answer.string...

                    db.session.query(Answer.string).

from the current question...

                    select_from(Question).filter(Question.id == question.id).

from the current user’s answer to the current question.

                    join(Answer).filter(Answer.user_id == current_user.id).scalar()

Preferred syntax: Question[question.id].answer[current_user.id].string.

To implement this: Dictionary collections sounds close, but foreign key values can’t be used. It seems like the Question[question.id] syntax would be a class method using __getattr__, returning something like select_from(Question).filter(Question.id == given_id). Likewise, some sort of override on answer[blah] would return join(Answer).filter(Answer.user_id == blah).

                )

If so, provide it to the client.

                if answer:

Look up feedback for this answer.

                    points, pointsPossible, feedbackString = find_feedback(question, answer, url)
                    result[question.label] = [answer, points, pointsPossible, feedbackString]
            return jsonify(result)
    except NoResultFound:
        abort(404)

Payment

Given a class ID, find the price for a book. Return value:

  1. The price, as an int, in cents.
  2. The Class_ object for the given classId.
  3. The Book associated with this Class_.
  4. A redirect if an error was encountered, or None.
def get_price(

Returns a price for this class ID.

  classId):

    class_ = db.session.query(Class_).filter(Class_.id == classId).one_or_none()
    if class_ is None:

If the user provided an invalid ID, complain. Don’t complain if no ID was provided, just redirect.

        if classId is not None:
            flash('The class code {} you provided is invalid.'.format(classId))
        return None, None, None, redirect(app.config['SELECT_CLASS_URL'])
 

Make sure this user doesn’t already own this book.

    payment_id = (
        db.session.query(Payment).filter(Payment.user_id == current_user.id).
        filter(Payment.class_id == class_.id).one_or_none()
    )

Instructors own the books in which their classes reside.

    if (current_user.type_ == UserType.user
        and db.session.query(Instructor.id).
            filter(Instructor.class_id == class_.id).
            filter(Instructor.user_id == current_user.id).
            one_or_none() is not None):

        instructor_owns = True
    else:
        instructor_owns = False
    if (payment_id is not None

Authors and admins can’t purchase books; they automatically own them.

        or current_user.type_ in (UserType.author, UserType.admin)
        or instructor_owns):

        flash('You have already paid for this class.')
        return None, None, None, redirect('/')
 

Find the price – from the class, or if not defined there, from the book.

    book = db.session.query(Book).filter(Book.id == class_.book_id).one()
    if class_.price is None:
        price = book.price
        assert price is not None
    else:
        price = class_.price

    return price, class_, book, None
 

Payment. See also Payment.

stripe.api_key = app.config['STRIPE_PRIVATE_KEY']
@app.route(app.config['PAYMENT_URL'], methods=['GET', 'POST'])
@ssl_required
@unauthorized_redirect
def payment_view(url):
    if request.method == 'POST':

Handle a purhcase.

        purchaseEmail = request.form['stripeEmail']
        classId = request.args.get('classId')
        price, class_, book, redirect_ = get_price(classId)
        if price is None:
            return redirect_
 
        try:

See https://stripe.com/docs/checkout/flask; for a more general introduction, see docs on adding a Stripe button.

            customer = stripe.Customer.create(
                email=purchaseEmail,
                source=request.form['stripeToken'],
            )

            charge = stripe.Charge.create(
                customer=customer.id,
                amount=price,
                currency='usd',
                description='Purchase of {}'.format(book.title),
            )
        except stripe.error.CardError as e:
            body = e.json_body
            err = body['error']
            flash('Card declined: {} (code {}, type {}).'.format(err['message'], err['code'], err['type']))
            return redirect('{}?classid={}'.format(app.config['PAYMENT_URL'], classId))
        except stripe.error.StripeError:
            flash('Unable to process payment. Please try again.')
            return redirect('{}?classid={}'.format(app.config['PAYMENT_URL'], classId))
 

Save the charge token.

        current_user.payment = [Payment(user_id=current_user.id, class_id=class_.id, charge_id=charge.id)]
        db.session.commit()
 

Send an e-mail receipt

        priceDollars = '{:,.2f}'.format(price/100)
        mail.send_message(body="""Thank you for your purchase of "{0}" for the class {1}!

Price: ${2}

Paid:  ${2}""".format(book.title, class_.name, priceDollars), subject='Thank you for your purchase', recipients=[current_user.email])
 

Redirect to a web receipt.

        return render_template(app.config['RECEIPT_URL'], classId=class_.id, className=class_.name, priceDollars=priceDollars, bookTitle=book.title)

    else:

The user wants to purchase a book. Fill out a form to enable that.

Get the class from the provided URL query string. Redirect if the class isn’t valid.

        classId = request.args.get('classId')
        price, class_, book, redirect_ = get_price(classId)
        if price is None:
            return redirect_
 

Render the class template with class-specific info.

        return render_template(app.config['PAYMENT_URL'], classId=class_.id, className=class_.name, priceDollars='{:,.2f}'.format(price/100), priceCents=price, bookTitle=book.title, appName=app.config['USER_APP_NAME'], stripePublicKey=app.config['STRIPE_PUBLIC_KEY'], email=current_user.email)

Flask-admin integration

Flask-user depends on flask-login. Instructions for integrating flask-login with flask-admin are at https://flask-admin.readthedocs.io/en/latest/introduction/#rolling-your-own. The following code follows those instructions.

class ModelView(sqla.ModelView):
    column_display_pk = True
    def is_accessible(self):
        return (
            current_user.is_authenticated
            and current_user.type_ == UserType.admin
        )

    def inaccessible_callback(self, name, **kwargs):

redirect to login page if user doesn’t have access

        return app.login_manager.unauthorized()
 

Create view of the database. The default URL is /admin.

admin = Admin(app, name=app.config['USER_APP_NAME'], template_mode='bootstrap3')
for model in (Book, Page, Question, Answer, Feedback, Annotations, Annotation, Class_, Instructor, Author, User, Payment):
    admin.add_view(ModelView(model, db.session))

Main

Start development web server

if __name__ == '__main__':
    create_db()
    sphinxImport()
 

Make the server externally visible. Notes:

  • To enable debug, set FLASK_DEBUG=1.
  • Use port 80 for a public webserver.
    app.run(host='0.0.0.0', port=5000)