Utilisation de la lib flask-jwt-extended pour la gestion du token JWT et passe du token en cookie avec refresh automatique

This commit is contained in:
2022-04-29 11:55:26 +02:00
parent ff0afc7d02
commit 9d56b1efdf
3 changed files with 101 additions and 63 deletions

View File

@@ -9,6 +9,7 @@ flask = "*"
mariadb = "*"
flask-api = "*"
pyjwt = "*"
flask-jwt-extended = "*"
[dev-packages]

10
Pipfile.lock generated
View File

@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "81a17ff766677f5c63687245776c9a27e97c295a76bb25e51c704703794cbaa7"
"sha256": "654686a639ae618ee1116e4e49f842aa2fb0b7cb782ed4fa962d1d014db4c27f"
},
"pipfile-spec": 6,
"requires": {
@@ -55,6 +55,14 @@
"index": "pypi",
"version": "==3.0.post1"
},
"flask-jwt-extended": {
"hashes": [
"sha256:ad6977b07c54e51c13b5981afc246868b9901a46715d9b9827898bfd916aae88",
"sha256:c82c9e505bc96f4a5186de31c05262dbcde6fa10581e9aa46df8f99ca04be2c3"
],
"index": "pypi",
"version": "==4.3.1"
},
"idna": {
"hashes": [
"sha256:84d9dd047ffa80596e0f246e2eab0b391788b0503584e8945f2368256d2735ff",

View File

@@ -15,6 +15,13 @@ from pprint import pprint
from flask import Flask, request, abort, jsonify, render_template, make_response
from flask.logging import default_handler
from flask_api import status
from flask_jwt_extended import create_access_token
from flask_jwt_extended import get_jwt
from flask_jwt_extended import set_access_cookies
from flask_jwt_extended import unset_jwt_cookies
from flask_jwt_extended import get_jwt_identity
from flask_jwt_extended import jwt_required
from flask_jwt_extended import JWTManager
import json
import mariadb
@@ -111,6 +118,16 @@ app = Flask(__name__)
db = BDD(host='127.0.0.1', port=3306, user='vincent', password='malkavian', database='test1')
app.config["SECRET_KEY"] = "thisissecret"
# Setup the Flask-JWT-Extended extension
app.config["JWT_SECRET_KEY"] = "cdscjdsklcfqezffhrevneqggfuhmnvqnmh"
app.config["JWT_COOKIE_SECURE"] = False
app.config["JWT_TOKEN_LOCATION"] = ["cookies"]
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = datetime.timedelta(hours=1)
# Controls if Cross Site Request Forgery (CSRF) protection is enabled when using cookies
# This should always be True in production
app.config["JWT_COOKIE_CSRF_PROTECT"] = False
jwt = JWTManager(app)
def dbmanage(func):
''' decorateur de la fonction db.connect & db.disconnect '''
@@ -129,45 +146,6 @@ def dbmanage(func):
return ret
return wrapper
def token_required(func):
@wraps(func)
@dbmanage
def decorated(*args, **kwargs):
token = None
if 'x-access-token' in request.headers:
token = request.headers['x-access-token']
### DEBUG ###
logger.debug("token : {}".format(token))
### END DEBUG ###
if not token:
return jsonify({'message' : 'Token is missing!'}), 401
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
### DEBUG ###
logger.debug("data : {}".format(data))
### END DEBUG ###
# On recupere l'utilisateur en base de données
sql_statement = "SELECT * FROM utilisateur WHERE userid = {}".format(data['userId'])
# execution de la requete sql
etat, ret = db.execute(sql_statement, None, False)
if not etat:
db.disconnect()
abort(500)
else:
if not ret:
db.disconnect()
abort(404)
else:
current_user = ret[0]
except Exception as e:
logger.error("Token invalid : {}".format(e))
return jsonify({'message': 'Token is invalid!'}), 401
return func(current_user, *args, **kwargs)
return decorated
@app.errorhandler(HTTPException)
def handle_exception(e):
''' return JSON instead of HTML for HTTP errors '''
@@ -181,10 +159,35 @@ def handle_exception(e):
response.content_type = "application/json"
return response
@app.after_request
def refresh_expiring_tokens(response):
''' Using an 'after_request' callback, we refresh any token that is within
30 minutes of expiring.'''
try:
exp_timestamp = get_jwt()['exp']
now = datetime.datetime.now(datetime.timezone.utc)
target_timestamp = datetime.datetime.timestamp(now + datetime.timedelta(minutes=30))
### DEBUG ###
logger.debug("exp: {} - target: {}".format(exp_timestamp, target_timestamp))
### END DEBUG ###
if target_timestamp > exp_timestamp:
logger.warning("On doit recréer un token ....")
access_token = create_access_token(identity=get_jwt_identity())
set_access_cookies(response, access_token)
return response
except (RuntimeError, KeyError):
return response
@app.route('/api/utilisateurs', methods=['GET'])
@token_required
def get_all_users(current_user):
@jwt_required()
@dbmanage
def get_all_users():
''' Recuperation de tous les utilisateurs inscrits '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
### DEBUG ###
logger.debug("current_user: {}".format(current_user))
### DEBUG END ###
# Test si l'utilisateur courant est actif ou pas
if not current_user["Actif"]:
db.disconnect()
@@ -203,9 +206,12 @@ def get_all_users(current_user):
return jsonify(content)
@app.route('/api/utilisateurs/<int:userId>', methods=['GET'])
@token_required
def get_one_user(current_user, userId):
@jwt_required()
@dbmanage
def get_one_user(userId):
''' Recuperation d'un seul utilisateur '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
### DEBUG ###
logger.debug("Actif ? {} - Role ? {} - userId : {}/{}".format(current_user["Actif"], current_user["Role"], current_user["userId"], userId))
### DEBUG END ###
@@ -228,9 +234,12 @@ def get_one_user(current_user, userId):
return jsonify(content)
@app.route('/api/utilisateurs', methods=['POST'])
@token_required
def add_user(current_user):
@jwt_required()
@dbmanage
def add_user():
''' Ajout d'un utilisateur '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est actif et Admin ou pas
if not current_user["Actif"] and current_user["Role"] != 'Administrateur':
db.disconnect()
@@ -238,6 +247,17 @@ def add_user(current_user):
# recuperation des attributs (JSON) de la requete
data_json = request.get_json()
# On vérifie si l'identifiant en base de données est déjà utilisé ou pas
sql_statement = "SELECT * FROM utilisateur WHERE Identifiant = \'{}\'".format(data_json['Identifiant'])
# execution de la requete sql
etat, ret = db.execute(sql_statement, None, False)
if not etat:
db.disconnect()
abort(500)
else:
if ret:
db.disconnect()
abort(401, description="Identifiant déjà utilisé!")
### DEBUG ###
logger.debug("Datas: {}".format(data_json))
### END DEBUG ###
@@ -257,9 +277,12 @@ def add_user(current_user):
return jsonify({'message' : 'Nouvel utilisateur créé!'})
@app.route('/api/utilisateurs/<int:userId>', methods=['PUT'])
@token_required
def modif_user(current_user, userId):
@jwt_required()
@dbmanage
def modif_user(userId):
''' modification d'un utilisateur '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est actif ou pas
# Si l'utilisateur courant n'est pas administrateur, il ne peut voir que son profil
if not current_user["Actif"] or current_user["Role"] != "Administrateur" and current_user['userId'] != userId:
@@ -290,9 +313,12 @@ def modif_user(current_user, userId):
return jsonify({'message' : 'utilisateur {} modifié!'.format(userId)})
@app.route('/api/utilisateurs/<int:userId>', methods=['DELETE'])
@token_required
def del_user(current_user, userId):
@jwt_required()
@dbmanage
def del_user(userId):
''' Suppression d'un utilisateur '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est Admin ou pas
if not current_user["Actif"] or current_user["Role"] != 'Administrateur':
db.disconnect()
@@ -329,9 +355,12 @@ def del_user(current_user, userId):
return jsonify(content)
@app.route('/api/utilisateurs/<int:userId>/reset_password', methods=['GET'])
@token_required
def reset_passwd_user(current_user, userId):
@jwt_required()
@dbmanage
def reset_passwd_user(userId):
''' Reset du mot de passe à un utilisateur représenté par son Id '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est Admin ou pas
if not current_user["Actif"] or current_user["Role"] != 'Administrateur':
db.disconnect()
@@ -377,25 +406,25 @@ def login():
### END DEBUG ###
if user and check_password_hash(user['Password'], auth.password):
try:
token = jwt.encode({'userId': user['userId'],
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=30),
'iat': datetime.datetime.utcnow()},
app.config['SECRET_KEY'],
algorithm='HS256')
# create a new access token
token = create_access_token(identity=user)
content = jsonify({'message': 'login successful !'})
# set token as cookie
set_access_cookies(content, token)
except Exception as e:
logger.exception('Erreur : {}'.format(e))
logger.erreur('Erreur : {}'.format(e))
db.disconnect()
return abort(500)
return jsonify({'token': token})
return content
logger.error("authorization failed !")
db.disconnect()
return abort(401)
@app.route('/api/utilisateurs/logout')
@dbmanage
@app.route('/api/utilisateurs/logout', methods=['POST'])
def logout():
return ''
content = jsonify({'message': 'logout successful !'})
unset_jwt_cookies(content)
return content
def main():
logger.setLevel(log.DEBUG)