Source code for app

"""Main app for the Data Tracker."""

import json
import logging

import flask
from authlib.integrations.flask_client import OAuth

import collection
import config
import dataset
import db_management
import developer
import order
import schema
import user
import utils

# avoid pylint warnings for using slots on g and session
# pylint: disable=assigning-non-slot

app = flask.Flask(__name__)  # pylint: disable=invalid-name
appconf = config.init()
db_management.check_db(appconf)
app.config.update(appconf)

if app.config["dev_mode"]["api"]:
    app.register_blueprint(developer.blueprint, url_prefix="/api/v1/developer")

app.register_blueprint(dataset.blueprint, url_prefix="/api/v1/dataset")
app.register_blueprint(order.blueprint, url_prefix="/api/v1/order")
app.register_blueprint(collection.blueprint, url_prefix="/api/v1/collection")
app.register_blueprint(user.blueprint, url_prefix="/api/v1/user")
app.register_blueprint(schema.blueprint, url_prefix="/api/v1/schema")


oauth = OAuth(app)
for oidc_name in app.config.get("oidc_names"):
    oauth.register(oidc_name, client_kwargs={"scope": "openid profile email"})


[docs]@app.before_request def prepare(): """Open the database connection and get the current user.""" flask.g.dbclient = utils.get_dbclient(flask.current_app.config) flask.g.db = utils.get_db(flask.g.dbclient, flask.current_app.config) if apikey := flask.request.headers.get("X-API-Key"): if not ( apiuser := flask.request.headers.get("X-API-User") ): # pylint: disable=superfluous-parens flask.abort(status=400) utils.verify_api_key(apiuser, apikey) flask.g.current_user = flask.g.db["users"].find_one({"auth_ids": apiuser}) flask.g.permissions = flask.g.current_user["permissions"] else: if flask.request.method != "GET": utils.verify_csrf_token() flask.g.current_user = user.get_current_user() flask.g.permissions = flask.g.current_user["permissions"] if flask.g.current_user else None
[docs]@app.after_request def finalize(response): """Finalize the response and clean up.""" # close db connection if hasattr(flask.g, "dbserver"): flask.g.dbserver.close() # set csrf cookie if not set if not flask.request.cookies.get("_csrf_token"): response.set_cookie("_csrf_token", utils.gen_csrf_token(), samesite="Lax") # add some headers for protection response.headers["X-Frame-Options"] = "SAMEORIGIN" response.headers["X-XSS-Protection"] = "1; mode=block" return response
[docs]@app.route("/api/v1") def api_base(): """List entities.""" return flask.jsonify({"entities": ["dataset", "order", "collection", "user", "login"]})
[docs]@app.route("/api/heartbeat") def heartbeat(): """Return 200 to show that the api is active.""" return flask.Response(status=200)
[docs]@app.route("/api/v1/login") def login_types(): """List login types.""" return flask.jsonify({"types": ["apikey", "oidc"]})
[docs]@app.route("/api/v1/login/oidc") def oidc_types(): """List OpenID Connect types.""" auth_types = {} for auth_name in app.config.get("oidc_names"): auth_types[auth_name] = flask.url_for("oidc_login", auth_name=auth_name) return flask.jsonify(auth_types)
[docs]@app.route("/api/v1/login/oidc/<auth_name>") def oidc_login(auth_name): """Perform a login using OpenID Connect (e.g. Elixir AAI).""" client = oauth.create_client(auth_name) redirect_uri = flask.url_for("oidc_authorize", auth_name=auth_name, _external=True) flask.session["incoming_url"] = flask.request.args.get("origin") or "/" return client.authorize_redirect(redirect_uri)
[docs]@app.route("/api/v1/login/oidc/<auth_name>/authorize") def oidc_authorize(auth_name): """Authorize a login using OpenID Connect (e.g. Elixir AAI).""" if auth_name not in app.config.get("oidc_names"): flask.abort(status=404) client = oauth.create_client(auth_name) token = client.authorize_access_token() if "id_token" in token: user_info = client.parse_id_token(token) else: user_info = client.userinfo() if auth_name != "elixir": user_info["auth_id"] = f'{user_info["email"]}::{auth_name}' else: user_info["auth_id"] = token["sub"] if not user.do_login(user_info["auth_id"]): user.add_new_user(user_info) user.do_login(user_info["auth_id"]) response = flask.redirect(flask.session["incoming_url"]) del flask.session["incoming_url"] return response
# requests
[docs]@app.route("/api/v1/login/apikey", methods=["POST"]) def key_login(): """Log in using an apikey.""" try: indata = flask.json.loads(flask.request.data) except json.decoder.JSONDecodeError: flask.abort(status=400) if "api-user" not in indata or "api-key" not in indata: app.logger.debug("API key login - bad keys: %s", indata) return flask.Response(status=400) utils.verify_api_key(indata["api-user"], indata["api-key"]) user.do_login(auth_id=indata["api-user"]) response = flask.Response(status=200) return response
[docs]@app.route("/api/v1/logout") def logout(): """Log out the current user.""" flask.session.clear() response = flask.Response(status=200) response.delete_cookie("_csrf_token") return response
[docs]@app.errorhandler(400) def error_bad_request(_): """Make sure a simple 400 is returned instead of an html page.""" return flask.Response(status=400)
[docs]@app.errorhandler(401) def error_unauthorized(_): """Make sure a simple 401 is returned instead of an html page.""" return flask.Response(status=401)
[docs]@app.errorhandler(403) def error_forbidden(_): """Make sure a simple 403 is returned instead of an html page.""" return flask.Response(status=403)
[docs]@app.errorhandler(404) def error_not_found(_): """Make sure a simple 404 is returned instead of an html page.""" return flask.Response(status=404)
# to allow coverage check for testing if __name__ == "__main__": app.run(host="0.0.0.0", port=5000) else: gunicorn_logger = logging.getLogger("gunicorn.error") if gunicorn_logger: app.logger.handlers = gunicorn_logger.handlers app.logger.setLevel(gunicorn_logger.level)