
<VirtualHost *: 80>
ServerName hh.gsk.loc
# WSGI process settings
WSGIDaemonProcess hogwartshat user = hogwartshat group = hogwartshat threads = 10
WSGIScriptAlias ​​/ /var/www/flask/hogwartshat/hogwartshat.py
WSGIScriptReloading On
# authentication parameters
<Location />
AuthType Kerberos
AuthName "HogwartsHat"
# allow rollback on Basic Auth
KrbDelegateBasic On
KrbServiceName HTTP/garage.gsk.loc@GSK.LOC
KrbMethodNegotiate On
# if you disable the following directive - it stops working, why - I do not understand
KrbMethodK5Passwd On
KrbAuthRealms GSK.LOC
Krb5KeyTab / etc / httpd / conf / keytab
AuthBasicProvider PAM
# pointing to the PAM configuration file from /etc/pam.d
AuthPAMService garage
Require valid-user
# The following directives write user information from sssd through DBus to environment variables
LookupUserGECOS REMOTE_USER_FULLNAME
LookupUserAttr uid REMOTE_USER_ID
LookupUserAttr krbLastSuccessfulAuth REMOTE_USER_LASTGOODAUTH
LookupUserAttr krbLastFailedAuth REMOTE_USER_LASTBADAUTH
LookupUserGroups REMOTE_USER_GROUPS ":"
# Timeout is less than 1 s (1000 ms) does not make sense - DBus and LDAP just do not have time to work out in 20-30% of cases
LookupDbusTimeout 2000
</ Location>
<Directory / var / www / flask / hogwartshat>
WSGIProcessGroup hogwartshat
WSGIApplicationGroup% {GLOBAL}
</ Directory>
LogLevel warn
ErrorLog logs / hogwartshat_error.log
CustomLog logs / hogwartshat_access.log combined
</ Virtualhost>
#! / usr / bin / env python
# - * - coding: utf8 - * -
import os
import sys
PROJECT_DIR = '/ var / www / flask / hogwartshat'
# activate virtualenv (in fact, appending to the beginning of the PATH directory with virtualenv)
activate_this = os.path.join (PROJECT_DIR, 'bin', 'activate_this.py')
execfile (activate_this, dict (__ file __ = activate_this))
sys.path.append (PROJECT_DIR)
from app import app as application
# in instance.py - encryption keys
application.config.from_object ('app.config')
application.config.from_pyfile ('../ instance.py')
@ login_manager.request_loader def load_user_from_request (req): logging.debug ('req_loader env vars:% s'% str (req.environ)) uid = req.environ.get ('REMOTE_USER') if uid is None: login_manager.lo API = 'User is not authenticated by HTTPD' return None try: return HTTPDPoweredUser (req.environ.get (app.config.get ('HTTPD_NAME_ATTR')), req.environ.get (app.config.get ('HTTPD_FULLNAME_ATTR') ), req.environ.get (app.config.get ('HTTPD_UID_ATTR')), req.environ.get (app.config.get ('HTTPD_LAST_GOOD_AUTH_ATTR')), req.environ.get (app.config.get ( 'HTTPD_LAST_FAILED_AUTH_ATTR')), req.environ.get (app.config.get ('HTTPD_GROUPS_ATTR'))) except AttributeError: login_manager.login_message = 'This is not found @ app.route ('/', methods = ['GET'])
@login_required
def index ():
if current_user is not None:
cookie = current_user.get_auth_token ()
expire_date = datetime.utcnow () + timedelta (hours = app.config.get ('JWT_EXPIRE_TIME_HOURS'))
response = make_response (render_template ('index.html', user = current_user, cookie = cookie))
response.set_cookie (
app.config.get ('JWT_COOKIE_NAME'),
value = cookie
expires = expire_date,
domain = app.config.get ('JWT_COOKIE_DOMAIN'),
path = app.config.get ('JWT_COOKIE_PATH'),
secure = app.config.get ('SESSION_COOKIE_SECURE')
)
logging.debug ('jwt response:% s'% str (response))
return response
else:
abort (403)
def get_auth_token (self):
tokens = {
'exp': datetime.utcnow () + timedelta (hours = app.config.get ('JWT_EXPIRE_TIME_HOURS')),
'nbf': datetime.utcnow (),
'iss': app.config.get ('JWT_ISSUER_NAME'),
'aud': app.config.get ('JWT_URN') + 'all',
'uid': self.uid,
'fullname': self.fullname,
'groups': self.groups
}
logging.debug ('jwt tokens:% s'% str (tokens))
cookie = jwt.encode (tokens, app.config.get ('JWT_PRIVATE_KEY'), algorithm = app.config.get ('JWT_ALG'))
logging.debug ('jwt cookie:% s'% str (cookie))
return cookie
@ app.route ('/ status', methods = ['GET'])
@login_required
def status ():
auth_cookie = request.cookies.get (app.config.get ('JWT_COOKIE_NAME'))
logging.debug ('cookie:% s'% str (auth_cookie))
tokens = {}
error_message = ''
if auth_cookie is not None:
try:
tokens = jwt.decode (
auth_cookie
app.config.get ('JWT_PUBLIC_KEY'),
audience = app.config.get ('JWT_URN') + 'all',
issuer = app.config.get ('JWT_ISSUER_NAME')
)
nbf = datetime.utcfromtimestamp (tokens.get ('nbf'))
tokens ['nbf'] = '(' + str (nbf) + ')' + str (tokens.get ('nbf'))
exp = datetime.utcfromtimestamp (tokens.get ('exp'))
tokens ['exp'] = '(' + str (exp) + ')' + str (tokens.get ('exp'))
logging.debug ('cookie decoded successfully')
except jwt.DecodeError:
logging.debug ('status: jwt.DecodeError')
error_message = 'Failed to decode provided JWT'
except jwt.ExpiredSignatureError:
logging.debug ('status: jwt.ExpiredSignatureError')
error_message = 'JWT is expired'
except jwt.InvalidIssuerError:
logging.debug ('status: jwt.InvalidIssuerError')
error_message = 'JWT is issued by a wrong issuer'
except jwt.InvalidAudienceError:
logging.debug ('status: jwt.InvalidAudienceError')
error_message = 'JWT is issued for another audience'
else:
error_message = 'No JWT cookie received'
logging.debug ('tokens:% s'% str (tokens))
attr_error = False if current_user is not None else True
return render_template (
'status.html',
error = False if error_message == '' else True,
error_message = error_message,
tokens = tokens,
attr_error = attr_error,
user = current_user
)
openssl ecparam -genkey -name secp521r1 -noout -out hogwartshat_key.pem # p521 - not a typo openssl ec -in hogwartshat_key.pem -pubout -out hogwartshat_pub.pem
@ app.route ('/ return_to', methods = ['GET'])
@login_required
def return_to ():
app_id = request.args.get ('appid')
data = request.args.get ('data')
if app_id is None:
return make_error_page ('No application ID provided', str (request.url)), 400
elif app_id not in app.config.get ('APPS_PUBLIC_KEYS'). keys ():
return make_error_page ('Unknown application ID provided', str (request.url)), 403
if data is None:
return make_error_page ('Application provided empty request', str (request.url)), 400
else:
try:
tokens = jwt.decode (
data,
app.config.get ('APPS_PUBLIC_KEYS') [app_id],
audience = app.config.get ('JWT_ISSUER_NAME'),
issuer = app.config.get ('JWT_URN') + app_id
)
return_url = tokens.get ('return_url')
if current_user is not None:
cookie = current_user.get_auth_token ()
expire_date = datetime.utcnow () + timedelta (hours = app.config.get ('JWT_EXPIRE_TIME_HOURS'))
response = make_response (redirect (str (return_url), code = 301))
response.set_cookie (
app.config.get ('JWT_COOKIE_NAME'),
value = cookie
expires = expire_date,
domain = app.config.get ('JWT_COOKIE_DOMAIN'),
path = app.config.get ('JWT_COOKIE_PATH'),
secure = app.config.get ('SESSION_COOKIE_SECURE')
)
logging.debug ('jwt response:% s'% str (response))
return response
except jwt.DecodeError:
return make_error_page ('Failed to decode provided JWT', str (request.url)), 412
except jwt.ExpiredSignatureError:
return make_error_page ('JWT is expired', str (request.url)), 412
except jwt.InvalidIssuerError:
return make_error_page ('JWT is issued by a wrong issuer', str (request.url)), 412
except jwt.InvalidAudienceError:
return make_error_page ('JWT is issued for another audience', str (request.url)), 412
return str (request.args)



import jwt
import logging.config
from datetime import datetime, timedelta
from flask import Flask, redirect, render_template, get_flashed_messages
from flask_login import LoginManager, UserMixin, login_required, current_user
app = Flask (__ name__)
app.config ['SECRET_KEY'] = 'it was the secret session.
login_manager = LoginManager ()
login_manager.init_app (app)
key = '' '----- BEGIN EC PRIVATE KEY -----
----- END EC PRIVATE KEY ----- '' '
hh_pubkey = '' '----- BEGIN PUBLIC KEY -----
----- END PUBLIC KEY ----- '' '
logging.config.fileConfig ('logging.conf')
class JWTPoweredUser (UserMixin):
def __init __ (self, fullname, uid, groups):
for attr in [fullname, uid, groups]:
if attr is None:
raise AttributeError ('% s cannot be None'% attr .__ name__)
self.fullname = fullname
self.uid = uid
self.groups = groups
def is_anonymous (self):
return false
def is_active (self):
return true
def is_authenticated (self):
return true
def get_id (self):
return unicode (self.uid)
@ login_manager.request_loader
def load_user_from_request (req):
cookie = req.cookies.get ('gsk_auth')
if cookie is None:
login_manager.login_message = 'no cookie'
return none
try:
tokens = jwt.decode (cookie, hh_pubkey, issuer = 'gsk: hogwartshat', audience = 'gsk: all')
except jwt.ExpiredSignatureError:
login_manager.login_message = 'expired'
return none
except jwt.DecodeError:
login_manager.login_message = 'decode error'
return none
except jwt.InvalidIssuerError:
login_manager.login_message = 'invalid issuer'
return none
except jwt.InvalidAudienceError:
login_manager.login_message = 'invalid audience'
return none
return JWTPoweredUser (tokens.get ('fullname'), tokens.get ('uid'), tokens.get ('groups'))
@ login_manager.unauthorized_handler
def unauthorized ():
data = jwt.encode ({
'iss': 'gsk: test',
'aud': 'gsk: hogwartshat',
'nbf': datetime.utcnow (),
'exp': datetime.utcnow () + timedelta (minutes = 1),
'return_url': 'http: //jwttest.gsk.loc'
}, key, algorithm = 'ES512')
logging.debug ('jwt request:% s'% data)
url = 'http: //hh.gsk.loc/return_to? appid = test & data =% s'% data
logging.debug ('jwt return_to:% s'% url)
page = render_template (
'error.html',
error = login_manager.login_message,
url = url
)
logging.debug ('jwt page:% s'% page)
return page, 403
@ app.route ('/', methods = ['GET'])
@login_required
def index ():
return render_template ('index.html', user = current_user)


Source: https://habr.com/ru/post/255569/
All Articles