<VirtualHost *: 80> ServerName hh.gsk.loc # WSGI process settings WSGIDaemonProcess hogwartshat user = hogwartshat group = hogwartshat threads = 10 WSGIScriptAlias ​​/ /var/www/flask/hogwartshat/hogwartshat.py WSGIScriptReloading On # authentication parameters <Location /> AuthType Kerberos AuthName "HogwartsHat" # allow rollback on Basic Auth KrbDelegateBasic On KrbServiceName HTTP/garage.gsk.loc@GSK.LOC KrbMethodNegotiate On # if you disable the following directive - it stops working, why - I do not understand KrbMethodK5Passwd On KrbAuthRealms GSK.LOC Krb5KeyTab / etc / httpd / conf / keytab AuthBasicProvider PAM # pointing to the PAM configuration file from /etc/pam.d AuthPAMService garage Require valid-user # The following directives write user information from sssd through DBus to environment variables LookupUserGECOS REMOTE_USER_FULLNAME LookupUserAttr uid REMOTE_USER_ID LookupUserAttr krbLastSuccessfulAuth REMOTE_USER_LASTGOODAUTH LookupUserAttr krbLastFailedAuth REMOTE_USER_LASTBADAUTH LookupUserGroups REMOTE_USER_GROUPS ":" # Timeout is less than 1 s (1000 ms) does not make sense - DBus and LDAP just do not have time to work out in 20-30% of cases LookupDbusTimeout 2000 </ Location> <Directory / var / www / flask / hogwartshat> WSGIProcessGroup hogwartshat WSGIApplicationGroup% {GLOBAL} </ Directory> LogLevel warn ErrorLog logs / hogwartshat_error.log CustomLog logs / hogwartshat_access.log combined </ Virtualhost>
#! / usr / bin / env python # - * - coding: utf8 - * - import os import sys PROJECT_DIR = '/ var / www / flask / hogwartshat' # activate virtualenv (in fact, appending to the beginning of the PATH directory with virtualenv) activate_this = os.path.join (PROJECT_DIR, 'bin', 'activate_this.py') execfile (activate_this, dict (__ file __ = activate_this)) sys.path.append (PROJECT_DIR) from app import app as application # in instance.py - encryption keys application.config.from_object ('app.config') application.config.from_pyfile ('../ instance.py')
@ login_manager.request_loader def load_user_from_request (req): logging.debug ('req_loader env vars:% s'% str (req.environ)) uid = req.environ.get ('REMOTE_USER') if uid is None: login_manager.lo API = 'User is not authenticated by HTTPD' return None try: return HTTPDPoweredUser (req.environ.get (app.config.get ('HTTPD_NAME_ATTR')), req.environ.get (app.config.get ('HTTPD_FULLNAME_ATTR') ), req.environ.get (app.config.get ('HTTPD_UID_ATTR')), req.environ.get (app.config.get ('HTTPD_LAST_GOOD_AUTH_ATTR')), req.environ.get (app.config.get ( 'HTTPD_LAST_FAILED_AUTH_ATTR')), req.environ.get (app.config.get ('HTTPD_GROUPS_ATTR'))) except AttributeError: login_manager.login_message = 'This is not found
@ app.route ('/', methods = ['GET']) @login_required def index (): if current_user is not None: cookie = current_user.get_auth_token () expire_date = datetime.utcnow () + timedelta (hours = app.config.get ('JWT_EXPIRE_TIME_HOURS')) response = make_response (render_template ('index.html', user = current_user, cookie = cookie)) response.set_cookie ( app.config.get ('JWT_COOKIE_NAME'), value = cookie expires = expire_date, domain = app.config.get ('JWT_COOKIE_DOMAIN'), path = app.config.get ('JWT_COOKIE_PATH'), secure = app.config.get ('SESSION_COOKIE_SECURE') ) logging.debug ('jwt response:% s'% str (response)) return response else: abort (403)
def get_auth_token (self): tokens = { 'exp': datetime.utcnow () + timedelta (hours = app.config.get ('JWT_EXPIRE_TIME_HOURS')), 'nbf': datetime.utcnow (), 'iss': app.config.get ('JWT_ISSUER_NAME'), 'aud': app.config.get ('JWT_URN') + 'all', 'uid': self.uid, 'fullname': self.fullname, 'groups': self.groups } logging.debug ('jwt tokens:% s'% str (tokens)) cookie = jwt.encode (tokens, app.config.get ('JWT_PRIVATE_KEY'), algorithm = app.config.get ('JWT_ALG')) logging.debug ('jwt cookie:% s'% str (cookie)) return cookie
@ app.route ('/ status', methods = ['GET']) @login_required def status (): auth_cookie = request.cookies.get (app.config.get ('JWT_COOKIE_NAME')) logging.debug ('cookie:% s'% str (auth_cookie)) tokens = {} error_message = '' if auth_cookie is not None: try: tokens = jwt.decode ( auth_cookie app.config.get ('JWT_PUBLIC_KEY'), audience = app.config.get ('JWT_URN') + 'all', issuer = app.config.get ('JWT_ISSUER_NAME') ) nbf = datetime.utcfromtimestamp (tokens.get ('nbf')) tokens ['nbf'] = '(' + str (nbf) + ')' + str (tokens.get ('nbf')) exp = datetime.utcfromtimestamp (tokens.get ('exp')) tokens ['exp'] = '(' + str (exp) + ')' + str (tokens.get ('exp')) logging.debug ('cookie decoded successfully') except jwt.DecodeError: logging.debug ('status: jwt.DecodeError') error_message = 'Failed to decode provided JWT' except jwt.ExpiredSignatureError: logging.debug ('status: jwt.ExpiredSignatureError') error_message = 'JWT is expired' except jwt.InvalidIssuerError: logging.debug ('status: jwt.InvalidIssuerError') error_message = 'JWT is issued by a wrong issuer' except jwt.InvalidAudienceError: logging.debug ('status: jwt.InvalidAudienceError') error_message = 'JWT is issued for another audience' else: error_message = 'No JWT cookie received' logging.debug ('tokens:% s'% str (tokens)) attr_error = False if current_user is not None else True return render_template ( 'status.html', error = False if error_message == '' else True, error_message = error_message, tokens = tokens, attr_error = attr_error, user = current_user )
openssl ecparam -genkey -name secp521r1 -noout -out hogwartshat_key.pem # p521 - not a typo openssl ec -in hogwartshat_key.pem -pubout -out hogwartshat_pub.pem
@ app.route ('/ return_to', methods = ['GET']) @login_required def return_to (): app_id = request.args.get ('appid') data = request.args.get ('data') if app_id is None: return make_error_page ('No application ID provided', str (request.url)), 400 elif app_id not in app.config.get ('APPS_PUBLIC_KEYS'). keys (): return make_error_page ('Unknown application ID provided', str (request.url)), 403 if data is None: return make_error_page ('Application provided empty request', str (request.url)), 400 else: try: tokens = jwt.decode ( data, app.config.get ('APPS_PUBLIC_KEYS') [app_id], audience = app.config.get ('JWT_ISSUER_NAME'), issuer = app.config.get ('JWT_URN') + app_id ) return_url = tokens.get ('return_url') if current_user is not None: cookie = current_user.get_auth_token () expire_date = datetime.utcnow () + timedelta (hours = app.config.get ('JWT_EXPIRE_TIME_HOURS')) response = make_response (redirect (str (return_url), code = 301)) response.set_cookie ( app.config.get ('JWT_COOKIE_NAME'), value = cookie expires = expire_date, domain = app.config.get ('JWT_COOKIE_DOMAIN'), path = app.config.get ('JWT_COOKIE_PATH'), secure = app.config.get ('SESSION_COOKIE_SECURE') ) logging.debug ('jwt response:% s'% str (response)) return response except jwt.DecodeError: return make_error_page ('Failed to decode provided JWT', str (request.url)), 412 except jwt.ExpiredSignatureError: return make_error_page ('JWT is expired', str (request.url)), 412 except jwt.InvalidIssuerError: return make_error_page ('JWT is issued by a wrong issuer', str (request.url)), 412 except jwt.InvalidAudienceError: return make_error_page ('JWT is issued for another audience', str (request.url)), 412 return str (request.args)
import jwt import logging.config from datetime import datetime, timedelta from flask import Flask, redirect, render_template, get_flashed_messages from flask_login import LoginManager, UserMixin, login_required, current_user app = Flask (__ name__) app.config ['SECRET_KEY'] = 'it was the secret session. login_manager = LoginManager () login_manager.init_app (app) key = '' '----- BEGIN EC PRIVATE KEY ----- ----- END EC PRIVATE KEY ----- '' ' hh_pubkey = '' '----- BEGIN PUBLIC KEY ----- ----- END PUBLIC KEY ----- '' ' logging.config.fileConfig ('logging.conf') class JWTPoweredUser (UserMixin): def __init __ (self, fullname, uid, groups): for attr in [fullname, uid, groups]: if attr is None: raise AttributeError ('% s cannot be None'% attr .__ name__) self.fullname = fullname self.uid = uid self.groups = groups def is_anonymous (self): return false def is_active (self): return true def is_authenticated (self): return true def get_id (self): return unicode (self.uid) @ login_manager.request_loader def load_user_from_request (req): cookie = req.cookies.get ('gsk_auth') if cookie is None: login_manager.login_message = 'no cookie' return none try: tokens = jwt.decode (cookie, hh_pubkey, issuer = 'gsk: hogwartshat', audience = 'gsk: all') except jwt.ExpiredSignatureError: login_manager.login_message = 'expired' return none except jwt.DecodeError: login_manager.login_message = 'decode error' return none except jwt.InvalidIssuerError: login_manager.login_message = 'invalid issuer' return none except jwt.InvalidAudienceError: login_manager.login_message = 'invalid audience' return none return JWTPoweredUser (tokens.get ('fullname'), tokens.get ('uid'), tokens.get ('groups')) @ login_manager.unauthorized_handler def unauthorized (): data = jwt.encode ({ 'iss': 'gsk: test', 'aud': 'gsk: hogwartshat', 'nbf': datetime.utcnow (), 'exp': datetime.utcnow () + timedelta (minutes = 1), 'return_url': 'http: //jwttest.gsk.loc' }, key, algorithm = 'ES512') logging.debug ('jwt request:% s'% data) url = 'http: //hh.gsk.loc/return_to? appid = test & data =% s'% data logging.debug ('jwt return_to:% s'% url) page = render_template ( 'error.html', error = login_manager.login_message, url = url ) logging.debug ('jwt page:% s'% page) return page, 403 @ app.route ('/', methods = ['GET']) @login_required def index (): return render_template ('index.html', user = current_user)
Source: https://habr.com/ru/post/255569/
All Articles