ajout d'un backend python RESTful avec Flask

This commit is contained in:
2024-10-25 16:11:37 +02:00
parent 4652ee86d3
commit f08c6fcaa8
11 changed files with 687 additions and 0 deletions

5
.gitignore vendored
View File

@@ -40,3 +40,8 @@ testem.log
# System files
.DS_Store
Thumbs.db
# Python
/backend/.venv
/backend/log
*.pyc

38
backend/requirements.txt Normal file
View File

@@ -0,0 +1,38 @@
aniso8601==9.0.1
asn1crypto==1.5.1
attrs==21.4.0
certifi==2022.5.18.1
cffi==1.15.0
charset-normalizer==2.0.12
click==8.1.3
cryptography==37.0.3
Flask==2.1.2
Flask-API==3.0.post1
Flask-Cors==3.0.10
Flask-JWT-Extended==4.4.1
Flask-Login==0.6.1
Flask-WTF==1.0.1
greenlet==1.1.2
idna==3.3
importlib-metadata==4.11.4
importlib-resources==5.7.1
itsdangerous==2.1.2
Jinja2==3.1.2
jsonschema==4.6.0
MarkupSafe==2.1.1
pyasn1==0.4.8
pycparser==2.21
PyJWT==2.4.0
pyrsistent==0.18.1
pytz==2022.1
requests==2.28.0
six==1.16.0
typing_extensions==4.2.0
urllib3==1.26.9
waitress==2.1.2
Werkzeug==2.1.2
WTForms==3.0.1
zipp==3.8.0
sqlalchemy==1.4.41
flask-sqlalchemy==2.5.1
pymysql==1.0.2

10
backend/run.py Normal file
View File

@@ -0,0 +1,10 @@
# -*- encoding: utf-8 -*-
# @author : vincent.benoit@benserv.fr
# @brief : Backend Configurateur football-drawing
from src import app
print("Launch Flask Backend ...")
ret, application = app.create_app()
if application and ret:
application.run(host="0.0.0.0", port=6000, use_reloader=False)

23
backend/setup.py Normal file
View File

@@ -0,0 +1,23 @@
import io, os
from setuptools import find_packages, setup
def read_requirements(path):
ret = []
with open(path, "r") as f:
ret = f.read().splitlines()
return ret
setup(
name='ConfigurateurBack',
version="1.0.0",
author='Vincent BENOIT',
author_email='vincent.benoit@benserv.fr',
url='https://git.nas.benserv.fr/vincent/football-drawing.git',
description='Backend RESTful API pour l\'outil football-drawing',
long_description=read("README.md"),
long_description_content_type="text/markdown",
packages=["src"],
include_package_data=True,
zip_safe=False,
install_requires=read_requirements("requirements.txt")
)

0
backend/src/__init__.py Normal file
View File

100
backend/src/app.py Normal file
View File

@@ -0,0 +1,100 @@
# -*- encoding: utf-8 -*-
# @author : vincent.benoit@benserv.fr
# @brief : football-drawing Flask RESTful API
#########################################################
# Importation de modules externes #
import sys, re, os
from pprint import pprint
import logging as log
from logging.config import dictConfig
from flask import Flask
from flask.logging import default_handler
from flask_cors import CORS, cross_origin
from flask_jwt_extended import JWTManager
import jwt
from src.config import DefaultConfig
from src.db import mydb
#from src.drawing import drawing
#########################################################
# Corps principal du programme #
def create_app(config=None, app_name=None):
''' create and configure the app '''
print("Create and configure the Flask app ...")
if app_name is None:
app_name = DefaultConfig.PROJECT
# create the app
# tells the app that configuration files are relative to the instance folder.
app = Flask(app_name, instance_path=os.getcwd(), instance_relative_config=True)
configure_app(app, config)
configure_log(app)
configure_database(app)
#configure_blueprints(app)
return True, app
def configure_app(app, config=None):
''' configure the app with conf file and/or object '''
# load default config
app.config.from_object(DefaultConfig)
if config:
# load specific config
app.config.from_object(config)
# setup the Flask-JWT-Extended extension
jwt = JWTManager(app)
# setup the Flask-CORS extension for handling Cross Origin Resource Sharing
CORS(app, resources={r"/api/*": {
"origins": ["http://localhost:4200","http://localhost:5000"],
"supports_credentials": True
}})
def configure_log(app):
''' configure log handler '''
if not os.path.exists(app.config['LOG_FOLDER']):
try:
os.makedirs(app.config['LOG_FOLDER'])
except OSError:
pass
# On vire tous les handlers
for h in app.logger.handlers:
app.logger.removeHandler(h)
# Set info level on logger, which might be overwritten by handers.
# Suppress DEBUG messages.
app.logger.setLevel(log.DEBUG)
formatter = log.Formatter('%(asctime)s - %(name)s [%(module)s.%(funcName)s:%(lineno)d] - %(levelname)s - %(message)s')
info_log = os.path.join(app.config['LOG_FOLDER'], DefaultConfig.PROJECT + '.log')
info_file_handler = log.handlers.RotatingFileHandler(info_log, maxBytes=100000, backupCount=10)
info_file_handler.setLevel(log.INFO)
info_file_handler.setFormatter(formatter)
app.logger.addHandler(info_file_handler)
fl = log.StreamHandler()
fl.setLevel(log.DEBUG)
fl.setFormatter(formatter)
app.logger.addHandler(fl)
def configure_database(app):
''' configure database parameters '''
# SQLAlchemy
mydb.init_app(app)
with app.app_context():
mydb.create_all()
#def configure_blueprints(app):
# ''' configure blueprints '''
# for bp in [users, auth, messages]:
# app.register_blueprint(bp)

53
backend/src/config.py Normal file
View File

@@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
#
# @author: vincent.benoit@benserv.fr
# @brief: Flask Config classes
import os
import datetime
class BaseConfig(object):
PROJECT = "football-drawing"
# Get app root path, also can use flask.root_path.
PROJECT_ROOT = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
DEBUG = False
TESTING = False
ADMINS = ['vincent.benoit@benserv.fr']
LOG_FOLDER = os.path.join(os.getcwd(), 'log')
class DefaultConfig(BaseConfig):
DEBUG = True
TESTING = True
FLASK_ENV = 'development'
SECRET_KEY = "thisissecret"
# Setup the Flask-JWT-Extended extension
JWT_SECRET_KEY = "cdscjdsklcfqezffhrevneqggfuhmnvqnmh"
JWT_COOKIE_SECURE = False
JWT_TOKEN_LOCATION = ["cookies"]
JWT_ACCESS_TOKEN_EXPIRES = datetime.timedelta(hours=1)
# Controls if Cross Site Request Forgery (CSRF) protection is enabled when using cookies
# This should always be True in production
JWT_COOKIE_CSRF_PROTECT = True
JWT_CSRF_IN_COOKIES = True
UPLOAD_FOLDER = os.path.join(os.getcwd(),'static/img')
ALLOWED_EXTENSIONS = {'png', 'jpg', 'gif', 'jpeg'}
MAX_CONTENT_LENGTH = 1 * 1024 * 1024 # 1 megabytes
SQL_HOST_URI = '127.0.0.1'
SQL_PORT = 3306
SQL_USERNAME = 'root'
SQL_PASSWORD = 'root'
SQL_DATABASE = 'football_drawing'
# Log all the statements issued to stderr which can be useful for debugging
SQLALCHEMY_ECHO = False
SQLALCHEMY_TRACK_MODIFICATIONS = True
# Mariadb for production.
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://' + SQL_USERNAME + ':' + SQL_PASSWORD + '@' + SQL_HOST_URI + ':' + str(SQL_PORT) + '/' + SQL_DATABASE + '?charset=utf8'

31
backend/src/db.py Normal file
View File

@@ -0,0 +1,31 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# @author : vincent.benoit@benserv.fr
# @brief : wrapper de la base de données mariadb
#########################################################
# Importation de modules externes #
import sys, re, os
import logging as log
from flask_sqlalchemy import SQLAlchemy
from flask_api import status
from flask import current_app
from functools import wraps
from src.config import DefaultConfig
#########################################################
# Class et Methods #
#########################################################
# Decorators #
#########################################################
# Instantiation #
# Instantiate database via SQLAlchemy
mydb = SQLAlchemy()

View File

@@ -0,0 +1,7 @@
# -*- encoding: utf-8 -*-
# @author : vincent.benoit@benserv.fr
# @brief : Users views and models
from .views import users
from .models import User

View File

@@ -0,0 +1,82 @@
# -*- encoding: utf-8 -*-
# @author : vincent.benoit@benserv.fr
# @brief : Users model
#########################################################
# Importation de modules externes #
import sys, re, os
import logging as log
import json
import enum
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.exceptions import HTTPException, RequestEntityTooLarge
from werkzeug.utils import secure_filename
from sqlalchemy import Enum
from src.db import mydb as db
#########################################################
# Class et Methods #
class Role(enum.Enum):
Administrateur = "Administrateur"
Coach = "Coach"
class User(db.Model):
__tablename__ = 'utilisateur'
userId = db.Column(db.Integer, primary_key=True)
Prenom = db.Column(db.String(32), nullable=False, default="")
Nom = db.Column(db.String(32), nullable=False, default="")
Identifiant = db.Column(db.String(32), nullable=False, default="")
Role = db.Column(db.String(16), nullable=False, default=Role.Coach)
Photo = db.Column(db.String(128), default="")
Actif = db.Column(db.Boolean(), nullable=False)
__password = db.Column('Password', db.String(256), nullable=False)
def __init__(self, prenom='', nom='', identifiant='', password='', role='', photo='', actif=False):
''' constructor '''
self.Prenom = prenom
self.Nom = nom
self.Identifiant = identifiant
self.Password = password
self.Role = role
self.Photo = photo
self.Actif = actif
def __get_password(self):
''' getter password '''
return self.__password
def __set_password(self, password):
''' setter hash password '''
self.__password = generate_password_hash(password, method='sha256')
# Hide password encryption by exposing password field only.
Password = db.synonym('__password',
descriptor=property(__get_password,
__set_password))
# Return a new property that point to the Message class
Messages = db.relationship('Message',
backref="user",
cascade="all, delete-orphan",
lazy=True)
def check_password(self, password):
''' check hash password for user '''
if self.Password is None:
return False
return check_password_hash(self.Password, password)
def as_dict(self):
''' User as dictionnary '''
result = {}
for c in self.__table__.columns:
if c.name != 'Password':
result[c.name] = getattr(self, c.name)
return result

View File

@@ -0,0 +1,338 @@
# -*- encoding: utf-8 -*-
# @author : vincent.benoit@benserv.fr
# @brief : Users routes
#########################################################
# Importation de modules externes #
import sys, re, os
import logging as log
import json
import datetime
from flask import Flask, Blueprint, request, abort, jsonify, send_file, current_app
from flask_api import status
from flask_jwt_extended import create_access_token
from flask_jwt_extended import get_jwt
from flask_jwt_extended import set_access_cookies
from flask_jwt_extended import unset_jwt_cookies
from flask_jwt_extended import get_jwt_identity
from flask_jwt_extended import jwt_required
from werkzeug.security import generate_password_hash
from werkzeug.exceptions import HTTPException, RequestEntityTooLarge
from werkzeug.utils import secure_filename
from src.db import mydb as db
from .models import User
#########################################################
# Class et Methods #
users = Blueprint('users', __name__, url_prefix='/api/utilisateurs')
@users.errorhandler(HTTPException)
def handle_exception(e):
''' return JSON instead of HTML for HTTP errors '''
response = e.get_response()
# replace the body with JSON
response.data = json.dumps({
'code': e.code,
'name': e.name,
'description': e.description,
})
response.content_type = "application/json"
return response
@users.after_request
def refresh_expiring_tokens(response):
''' Using an 'after_request' callback, we refresh any token that is within
30 minutes of expiring.'''
try:
exp_timestamp = get_jwt()['exp']
now = datetime.datetime.now(datetime.timezone.utc)
target_timestamp = datetime.datetime.timestamp(now + datetime.timedelta(minutes=30))
### DEBUG ###
current_app.logger.debug("exp: {} - target: {}".format(exp_timestamp, target_timestamp))
### END DEBUG ###
if target_timestamp > exp_timestamp:
current_app.logger.warning("On doit recréer un token ....")
access_token = create_access_token(identity=get_jwt_identity())
set_access_cookies(response, access_token)
return response
except (RuntimeError, KeyError):
return response
@users.route('', methods=['GET'])
@jwt_required()
def get_all_users():
''' Recuperation de tous les utilisateurs inscrits '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
### DEBUG ###
current_app.logger.debug("Current User: {}".format(current_user))
### DEBUG END ###
# Test si l'utilisateur courant est actif ou pas
if not current_user["Actif"]:
db.disconnect()
abort(status.HTTP_403_FORBIDDEN)
content = []
users = db.session.query(User).all()
if len(users):
### DEBUG ###
current_app.logger.debug("Count: {}".format(len(users)))
### END DEBUG ###
for user in users:
### DEBUG ###
current_app.logger.debug("User: {}".format(user.as_dict()))
### END DEBUG ###
content.append(user.as_dict())
return jsonify(content), status.HTTP_200_OK
@users.route('/<int:userId>', methods=['GET'])
@jwt_required()
def get_one_user(userId):
''' Recuperation d'un seul utilisateur '''
content = {}
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
### DEBUG ###
current_app.logger.debug("Current User: {}".format(current_user))
### DEBUG END ###
# Test si l'utilisateur courant est actif ou pas
# Si l'utilisateur courant n'est pas administrateur, il ne peut voir que son profil
if not current_user["Actif"] or current_user["Role"] != "Administrateur" and current_user['userId'] != userId:
abort(status.HTTP_403_FORBIDDEN, description="Not authorized!")
user = db.session.query(User).filter(User.userId==userId)
if user.count():
content = user.first().as_dict()
else:
abort(status.HTTP_404_NOT_FOUND, descrition="User not found")
return jsonify(content), status.HTTP_200_OK
@users.route('', methods=['POST'])
@jwt_required()
def add_user():
''' Ajout d'un utilisateur '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est actif et Admin ou pas
if not current_user["Actif"] and current_user["Role"] != 'Administrateur':
db.disconnect()
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# recuperation des attributs (JSON) de la requete
data_json = request.get_json()
# On vérifie si l'identifiant en base de données est déjà utilisé ou pas
user = db.session.query(User).filter(User.Identifiant==data_json['Identifiant'])
if user.count():
abort(status.HTTP_401_UNAUTHORIZED, description="Identifiant déjà utilisé!")
### DEBUG ###
current_app.logger.debug("Request datas: {}".format(data_json))
### END DEBUG ###
# Create new user
newUser = User(prenom=data_json['Prenom'],
nom=data_json['Nom'],
identifiant=data_json['Identifiant'],
password=data_json['Password'],
role=data_json['Role'],
photo=data_json['Photo'],
actif=data_json['Actif'])
# Send new user to database
db.session.add(newUser)
db.session.commit()
return jsonify({'message' : 'Nouvel utilisateur créé!'}), status.HTTP_201_CREATED
@users.route('/<int:userId>', methods=['PUT'])
@jwt_required()
def modif_user(userId):
''' modification d'un utilisateur '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est actif ou pas
# Si l'utilisateur courant n'est pas administrateur, il ne peut voir que son profil
if not current_user["Actif"] or current_user["Role"] != "Administrateur" and current_user['userId'] != userId:
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# test des attributs (JSON) de la requete
if not request.data.decode("utf-8"):
current_app.logger.error("Data not found")
abort(status.HTTP_400_BAD_REQUEST, description='Data not found')
# recuperation des attributs (JSON) de la requete
dataDict = request.get_json()
# Recupération de l'utilisateur en fonction de l'id
user = db.session.query(User).filter(User.userId==userId)
if not user.count():
abort(status.HTTP_400_BAD_REQUEST, description='User with this id not found')
user = user.first()
user.Prenom = dataDict['Prenom']
user.Nom = dataDict['Nom']
user.Identifiant = dataDict['Identifiant']
user.Password = dataDict['Password']
user.Photo = dataDict['Photo']
user.Role = dataDict['Role']
user.Actif = dataDict['Actif']
# Send modified user to database
db.session.commit()
return jsonify({'message' : 'utilisateur {} modifié!'.format(userId)}), status.HTTP_200_OK
@users.route('/<int:userId>', methods=['DELETE'])
@jwt_required()
def del_user(userId):
''' Suppression d'un utilisateur '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est Admin ou pas
if not current_user["Actif"] or current_user["Role"] != 'Administrateur':
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# test des attributs (JSON) de la requete
if request.data.decode("utf-8"):
current_app.logger.error("Data found : {}".format(request.data.decode("utf-8")))
abort(status.HTTP_400_BAD_REQUEST)
# Recupération de l'utilisateur en fonction de l'id
user = db.session.query(User).filter(User.userId==userId)
if not user.count():
abort(status.HTTP_400_BAD_REQUEST, description='User with this id not found')
delUser = user.first()
# On supprime l'utilisateur de la base de données
db.session.delete(delUser)
db.session.commit()
return jsonify({'message' : 'utilisateur supprimé!'}), status.HTTP_200_OK
@users.route('/<int:userId>/reset_password', methods=['GET'])
@jwt_required()
def reset_passwd_user(userId):
''' Reset du mot de passe à un utilisateur représenté par son Id '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est Admin ou pas
if not current_user["Actif"] or current_user["Role"] != 'Administrateur':
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# Recupération de l'utilisateur en fonction de l'id
user = db.session.query(User).filter(User.userId==userId)
if not user.count():
abort(status.HTTP_400_BAD_REQUEST, description='User with this id not found')
user = user.first()
user.Password = 'provisoire'
# Send modified user to database
db.session.commit()
return jsonify({'message' : 'reset du mot de passe!'}), status.HTTP_200_OK
@users.route('/<int:userId>/activate', methods=['PUT'])
@jwt_required()
def deactivate_user(userId):
''' Désactivation d'un utilisateur représenté par son Id '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est Admin ou pas
if not current_user["Actif"] or current_user["Role"] != 'Administrateur':
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# test des attributs (JSON) de la requete
if not request.data.decode("utf-8"):
current_app.logger.error("Data not found")
abort(status.HTTP_400_BAD_REQUEST, description='Data not found')
# recuperation des attributs (JSON) de la requete
dataDict = request.get_json()
# Recupération de l'utilisateur en fonction de l'id
user = db.session.query(User).filter(User.userId==userId)
if not user.count():
abort(status.HTTP_400_BAD_REQUEST, description='User with this id not found')
user = user.first()
user.Actif = dataDict['Actif']
# Send modified user to database
db.session.commit()
return jsonify({'message' : 'desactivation de l\'utilisateur réussi!'}), status.HTTP_200_OK
@users.route('/<int:userId>/photo', methods=['GET'])
@jwt_required()
def get_photo(userId):
''' Recupere la photo de l'utilisateur suivant son Id '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est actif ou pas
# Si l'utilisateur courant n'est pas administrateur, il ne peut voir que son profil
if not current_user["Actif"] or current_user["Role"] != "Administrateur" and current_user['userId'] != userId:
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
user = db.session.query(User).filter(User.userId==userId)
if not user.count():
abort(status.HTTP_404_NOT_FOUND, descrition="User not found")
user = user.first()
if user.Photo:
return send_file(os.path.join(user.Photo), mimetype='image/'+os.path.splitext(user.Photo)[1].split('.')[1])
abort(status.HTTP_404_NOT_FOUND, description='Picture not found!')
@users.route('/current', methods=['GET'])
@jwt_required()
def current_user():
''' retourne l'utilisateur courant connecté '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
return jsonify(current_user)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in current_app.config['ALLOWED_EXTENSIONS']
@users.route('/<int:userId>/uploadImage', methods=['POST'])
@jwt_required()
def uploadImage(userId):
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est actif et Admin ou pas
# Si l'utilisateur courant n'est pas administrateur, il ne peut voir que son profil
if not current_user["Actif"] or current_user["Role"] != "Administrateur" and current_user['userId'] != userId:
current_app.logger.error("Utilisateur non autorisé")
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# On vérifie que l'utilisateur existe en base de données
user = db.session.query(User).filter(User.userId==userId)
if not user.count():
abort(status.HTTP_404_NOT_FOUND, descrition="User not found")
user = user.first()
current_app.logger.debug("Req Headers: {}".format(request.headers))
current_app.logger.debug("Req Files: {}".format(request.files))
current_app.logger.debug("Req data: {}".format(request.data))
# check if the post request has the file part
if 'photo' not in request.files:
current_app.logger.error('No file part in the request')
abort(status.HTTP_400_BAD_REQUEST)
try:
photo = request.files['photo']
except RequestEntityTooLarge as e:
current_app.logger.error("Fichier trop gros ...")
abort(status.HTTP_413_REQUEST_ENTITY_TOO_LARGE)
# If the user does not select a file, the browser submits an empty file without a filename
if photo.filename == '':
current_app.logger.error('No selected file')
abort(status.HTTP_401_UNAUTHORIZED)
if photo and allowed_file(photo.filename):
filename = secure_filename(photo.filename)
### DEBUG ###
current_app.logger.debug("filename: {}".format(os.path.splitext(filename)))
### END DEBUG ###
filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], user.Identifiant + os.path.splitext(filename)[1])
photo.save(filepath)
user.Photo = filepath
# modify user to database
db.session.commit()
return jsonify({'message': 'photo saved successfuly!'}), status.HTTP_200_OK