ajout des routes et modele SQL pour les messages

This commit is contained in:
2022-05-28 12:01:31 +02:00
parent 89eb63af86
commit 8fb82a30be
12 changed files with 460 additions and 246 deletions

View File

@@ -13,6 +13,8 @@ flask-jwt-extended = "*"
flask-cors = "*"
flask-restx = "*"
sqlalchemy = "*"
flask-sqlalchemy = "*"
pymysql = "*"
[dev-packages]

24
Pipfile.lock generated
View File

@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "d32cc66979cc6c5146b7607b4d44ecf5aca0c98e863c8f3c4ce33ef9e5246458"
"sha256": "ed24356a3728a2b668430a3c99cd8bac69b897562ea965babaae6af06d3e0e19"
},
"pipfile-spec": 6,
"requires": {
@@ -96,6 +96,14 @@
"index": "pypi",
"version": "==0.5.1"
},
"flask-sqlalchemy": {
"hashes": [
"sha256:2bda44b43e7cacb15d4e05ff3cc1f8bc97936cc464623424102bfc2c35e95912",
"sha256:f12c3d4cc5cc7fdcc148b9527ea05671718c3ea45d50c7e732cceb33f574b390"
],
"index": "pypi",
"version": "==2.5.1"
},
"greenlet": {
"hashes": [
"sha256:0051c6f1f27cb756ffc0ffbac7d2cd48cb0362ac1736871399a739b2885134d3",
@@ -167,11 +175,11 @@
},
"importlib-metadata": {
"hashes": [
"sha256:1208431ca90a8cca1a6b8af391bb53c1a2db74e5d1cef6ddced95d4b2062edc6",
"sha256:ea4c597ebf37142f827b8f39299579e31685c31d3a438b59f469406afd0f2539"
"sha256:5d26852efe48c0a32b0509ffbc583fda1a2266545a78d104a6f4aff3db17d700",
"sha256:c58c8eb8a762858f49e18436ff552e83914778e50e9d2f1660535ffb364552ec"
],
"markers": "python_version < '3.10'",
"version": "==4.11.3"
"version": "==4.11.4"
},
"itsdangerous": {
"hashes": [
@@ -266,6 +274,14 @@
"index": "pypi",
"version": "==2.4.0"
},
"pymysql": {
"hashes": [
"sha256:41fc3a0c5013d5f039639442321185532e3e2c8924687abe6537de157d403641",
"sha256:816927a350f38d56072aeca5dfb10221fe1dc653745853d30a216637f5d7ad36"
],
"index": "pypi",
"version": "==1.0.2"
},
"pyrsistent": {
"hashes": [
"sha256:0e3e1fcc45199df76053026a51cc59ab2ea3fc7c094c6627e93b7b44cdae2c8c",

View File

@@ -19,9 +19,10 @@ from flask_jwt_extended import JWTManager
import jwt
from src.config import DefaultConfig
from src.db import db
from src.db import db1
from src.users import users
from src.auth import auth
from src.messages import messages
#########################################################
# Corps principal du programme #
@@ -90,15 +91,13 @@ def configure_log(app):
def configure_database(app):
''' configure database parameters '''
# Set database parameters ...
db.host = app.config['SQL_HOST_URI']
db.port = app.config['SQL_PORT']
db.user = app.config['SQL_USERNAME']
db.password = app.config['SQL_PASSWORD']
db.database = app.config['SQL_DATABASE']
# SQLAlchemy
db1.init_app(app)
with app.app_context():
db1.create_all()
def configure_blueprints(app):
''' configure blueprints '''
for bp in [users, auth]:
for bp in [users, auth, messages]:
app.register_blueprint(bp)

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# @author : vincent.benoit@benserv.fr
@@ -9,6 +8,7 @@
import sys, re, os
import logging as log
from pprint import pprint
from flask import Flask, Blueprint, request, abort, jsonify, current_app
from flask_api import status
@@ -18,24 +18,19 @@ from flask_jwt_extended import set_access_cookies
from flask_jwt_extended import unset_jwt_cookies
from flask_jwt_extended import get_jwt_identity
from flask_jwt_extended import jwt_required
from flask_restx import Api, Resource, reqparse
import json
from werkzeug.security import check_password_hash
from werkzeug.exceptions import HTTPException
from src.db import db, dbmanage
from src.db import db1 as db
from src.users.models import User
#########################################################
# Class et Methods #
auth = Blueprint('auth', __name__, url_prefix='/api/utilisateurs')
api = Api(auth,
version="1.0",
title="U10Manager Flask RESTful API",
description="Welcome to the Swagger UI documentation site!",
doc="/ui")
@auth.errorhandler(HTTPException)
def handle_exception(e):
@@ -79,40 +74,29 @@ def login():
user = None
if not auth or not auth.username or not auth.password:
current_app.logger.error("Login and Password required !")
db.disconnect()
abort(401, description='Login and Password required')
# On vérifie que l'utilisateur existe en base de données
sql_statement = "SELECT * FROM utilisateur WHERE Identifiant = \"{}\"".format(auth.username)
# execution de la requete sql
etat, ret = db.execute(sql_statement, None, False)
if not etat:
db.disconnect()
try:
user = db.session.query(User).filter(User.Identifiant==auth.username)
if user.count():
#if user[0].Actif and check_password_hash(user[0].Password, auth.password):
if user[0].Actif and user[0].check_password(auth.password):
# create a new access token
token = create_access_token(identity={'userId': user[0].userId,
'Identifiant':user[0].Identifiant,
'Prenom':user[0].Prenom,
'Nom':user[0].Nom,
'Photo':user[0].Photo,
'Role':str(user[0].Role),
'Actif':user[0].Actif})
content = jsonify({'message': 'login successful !'})
# set token as cookie
set_access_cookies(content, token)
return content, 200
current_app.logger.error("authorization failed !")
except Exception as e:
current_app.logger.error("Erreur : {}".format(e))
abort(500)
else:
if not ret:
db.disconnect()
abort(404)
else:
user = ret[0]
### DEBUG ###
current_app.logger.debug("user bdd: {}".format(user))
### END DEBUG ###
if user and user['Actif'] and check_password_hash(user['Password'], auth.password):
try:
# create a new access token
token = create_access_token(identity=user)
content = jsonify({'message': 'login successful !'})
# set token as cookie
set_access_cookies(content, token)
except Exception as e:
current_app.logger.erreur('Erreur : {}'.format(e))
db.disconnect()
abort(500)
return content
current_app.logger.error("authorization failed !")
db.disconnect()
abort(401)
@auth.route('/logout', methods=['POST'])

View File

@@ -45,3 +45,10 @@ class DefaultConfig(BaseConfig):
SQL_USERNAME = 'vincent'
SQL_PASSWORD = 'malkavian'
SQL_DATABASE = 'test1'
# Log all the statements issued to stderr which can be useful for debugging
SQLALCHEMY_ECHO = False
SQLALCHEMY_TRACK_MODIFICATIONS = True
# Mariadb for production.
#SQLALCHEMY_DATABASE_URI = 'mariadb+mariadbconnector://' + SQL_USERNAME + ':' + SQL_PASSWORD + '@' + SQL_HOST_URI + ':' + str(SQL_PORT) + '/' + SQL_DATABASE + '?charset=utf8'
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://' + SQL_USERNAME + ':' + SQL_PASSWORD + '@' + SQL_HOST_URI + ':' + str(SQL_PORT) + '/' + SQL_DATABASE + '?charset=utf8'

View File

@@ -11,6 +11,7 @@ import sys, re, os
import logging as log
import mariadb
from flask_sqlalchemy import SQLAlchemy
from flask_api import status
from flask import current_app
@@ -147,22 +148,8 @@ class BDDsql:
#########################################################
# Decorators #
def dbmanage(func):
''' decorateur de la fonction db.connect & db.disconnect '''
@wraps(func)
def wrapper(*args, **kwargs):
# connexion à la base de données
state, ret = db.connect()
if not state:
content = {'Erreur' : ret['error']}
return content, status.HTTP_503_SERVICE_UNAVAILABLE
else:
# Appel de la fonction
ret = func(*args, **kwargs)
# deconnexion de la base de données
db.disconnect()
return ret
return wrapper
#########################################################
# Instantiation #
# Instantiate database
db = BDDsql()
# Instantiate database via SQLAlchemy
db1 = SQLAlchemy()

7
src/messages/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
# -*- encoding: utf-8 -*-
# @author : vincent.benoit@benserv.fr
# @brief : Messages views and models
from .views import messages
from .models import Message

37
src/messages/models.py Normal file
View File

@@ -0,0 +1,37 @@
# -*- encoding: utf-8 -*-
# @author : vincent.benoit@benserv.fr
# @brief : Messages model
#########################################################
# Importation de modules externes #
import sys, re, os
import logging as log
import json
from src.db import db1 as db
#########################################################
# Class et Methods #
class Message(db.Model):
__tablename__ = 'message'
msgId = db.Column(db.Integer, primary_key=True)
Text = db.Column(db.Text, nullable=False, default="")
Date = db.Column(db.DateTime, nullable=False)
userId = db.Column(db.Integer, db.ForeignKey('utilisateur.userId'), nullable=False)
def __init__(self, text="", date=None, userId=0):
''' constructor '''
self.Text = text
self.Date = date
self.userId = userId
def as_dict(self):
''' Message as dictionnary '''
result = {}
for c in self.__table__.columns:
result[c.name] = getattr(self, c.name)
return result

150
src/messages/views.py Normal file
View File

@@ -0,0 +1,150 @@
# -*- encoding: utf-8 -*-
# @author : vincent.benoit@benserv.fr
# @brief : Messages routes
#########################################################
# Importation de modules externes #
import sys, re, os
import logging as log
import json
import datetime
from flask import Flask, Blueprint, request, abort, jsonify, send_file, current_app
from flask_api import status
from flask_jwt_extended import create_access_token
from flask_jwt_extended import get_jwt
from flask_jwt_extended import set_access_cookies
from flask_jwt_extended import unset_jwt_cookies
from flask_jwt_extended import get_jwt_identity
from flask_jwt_extended import jwt_required
from werkzeug.exceptions import HTTPException, RequestEntityTooLarge
from src.db import db1 as db
from .models import Message
from ..users.models import User
#########################################################
# Class et Methods #
messages = Blueprint('messages', __name__, url_prefix='/api/messages')
@messages.errorhandler(HTTPException)
def handle_exception(e):
''' return JSON instead of HTML for HTTP errors '''
response = e.get_response()
# replace the body with JSON
response.data = json.dumps({
'code': e.code,
'name': e.name,
'description': e.description,
})
response.content_type = "application/json"
return response
@messages.after_request
def refresh_expiring_tokens(response):
''' Using an 'after_request' callback, we refresh any token that is within
30 minutes of expiring.'''
try:
exp_timestamp = get_jwt()['exp']
now = datetime.datetime.now(datetime.timezone.utc)
target_timestamp = datetime.datetime.timestamp(now + datetime.timedelta(minutes=30))
### DEBUG ###
current_app.logger.debug("exp: {} - target: {}".format(exp_timestamp, target_timestamp))
### END DEBUG ###
if target_timestamp > exp_timestamp:
current_app.logger.warning("On doit recréer un token ....")
access_token = create_access_token(identity=get_jwt_identity())
set_access_cookies(response, access_token)
return response
except (RuntimeError, KeyError):
return response
@messages.route('', methods=['GET'])
@jwt_required()
def get_all_messages():
''' Recuperation de tous les utilisateurs inscrits '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
### DEBUG ###
current_app.logger.debug("Current User: {}".format(current_user))
### DEBUG END ###
# Test si l'utilisateur courant est actif ou pas
if not current_user["Actif"]:
db.disconnect()
abort(status.HTTP_403_FORBIDDEN)
content = []
msgs = db.session.query(Message).all()
if len(msgs):
### DEBUG ###
current_app.logger.debug("Count: {}".format(len(msgs)))
### END DEBUG ###
for msg in msgs:
user = db.session.query(User).filter(User.userId == msg.userId).first()
### DEBUG ###
current_app.logger.debug("Msg: {} - User: {}".format(msg.as_dict(), user.as_dict()))
### END DEBUG ###
res = msg.as_dict()
res['user'] = { 'userId': user.userId, 'Prenom': user.Prenom, 'Nom': user.Nom }
res.pop('userId')
#content.append(msg.as_dict())
content.append(res)
return jsonify(content), status.HTTP_200_OK
@messages.route('', methods=['POST'])
@jwt_required()
def save_message():
''' Recuperation de tous les utilisateurs inscrits '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
### DEBUG ###
current_app.logger.debug("Current User: {}".format(current_user))
### DEBUG END ###
# Test si l'utilisateur courant est actif ou pas
if not current_user["Actif"]:
db.disconnect()
abort(status.HTTP_403_FORBIDDEN)
# recuperation des attributs (JSON) de la requete
data_json = request.get_json()
# Create new user
newMsg = Message(text=data_json['Text'],
date=data_json['Date'],
userId=current_user['userId'])
# Send new user to database
db.session.add(newMsg)
db.session.commit()
return jsonify({'message': 'message posted successfuly!'}), status.HTTP_200_OK
@messages.route('/<int:msgId>', methods=['DELETE'])
@jwt_required()
def delete_message(msgId):
''' Recuperation de tous les utilisateurs inscrits '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
### DEBUG ###
current_app.logger.debug("Current User: {}".format(current_user))
### DEBUG END ###
# Test si l'utilisateur courant est actif ou pas
if not current_user["Actif"]:
db.disconnect()
abort(status.HTTP_403_FORBIDDEN)
# Recupération du message en fonction de l'id
msg = db.session.query(Message).filter(Message.msgId==msgId)
if not msg.count():
abort(status.HTTP_400_BAD_REQUEST, description='Message with this id not found')
msg = msg.first()
# On supprime l'utilisateur de la base de données
db.session.delete(msg)
db.session.commit()
return jsonify({'message': 'message deleted successfuly!'}), status.HTTP_200_OK

View File

@@ -4,3 +4,4 @@
# @brief : Users views and models
from .views import users
from .models import User

82
src/users/models.py Normal file
View File

@@ -0,0 +1,82 @@
# -*- encoding: utf-8 -*-
# @author : vincent.benoit@benserv.fr
# @brief : Users model
#########################################################
# Importation de modules externes #
import sys, re, os
import logging as log
import json
import enum
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.exceptions import HTTPException, RequestEntityTooLarge
from werkzeug.utils import secure_filename
from sqlalchemy import Enum
from src.db import db1 as db
#########################################################
# Class et Methods #
class Role(enum.Enum):
Administrateur = "Administrateur"
Coach = "Coach"
class User(db.Model):
__tablename__ = 'utilisateur'
userId = db.Column(db.Integer, primary_key=True)
Prenom = db.Column(db.String(32), nullable=False, default="")
Nom = db.Column(db.String(32), nullable=False, default="")
Identifiant = db.Column(db.String(32), nullable=False, default="")
Role = db.Column(db.String(16), nullable=False, default=Role.Coach)
Photo = db.Column(db.String(128), default="")
Actif = db.Column(db.Boolean(), nullable=False)
__password = db.Column('Password', db.String(256), nullable=False)
def __init__(self, prenom='', nom='', identifiant='', password='', role='', photo='', actif=False):
''' constructor '''
self.Prenom = prenom
self.Nom = nom
self.Identifiant = identifiant
self.Password = password
self.Role = role
self.Photo = photo
self.Actif = actif
def __get_password(self):
''' getter password '''
return self.__password
def __set_password(self, password):
''' setter hash password '''
self.__password = generate_password_hash(password, method='sha256')
# Hide password encryption by exposing password field only.
Password = db.synonym('__password',
descriptor=property(__get_password,
__set_password))
# Return a new property that point to the Message class
Messages = db.relationship('Message',
backref="user",
lazy=True)
def check_password(self, password):
''' check hash password for user '''
if self.Password is None:
return False
return check_password_hash(self.Password, password)
def as_dict(self):
''' User as dictionnary '''
result = {}
for c in self.__table__.columns:
if c.name != 'Password':
result[c.name] = getattr(self, c.name)
return result

View File

@@ -24,7 +24,8 @@ from werkzeug.security import generate_password_hash
from werkzeug.exceptions import HTTPException, RequestEntityTooLarge
from werkzeug.utils import secure_filename
from src.db import db, dbmanage
from src.db import db1 as db
from .models import User
#########################################################
# Class et Methods #
@@ -63,65 +64,57 @@ def refresh_expiring_tokens(response):
except (RuntimeError, KeyError):
return response
@users.route('', methods=['GET'])
@jwt_required()
@dbmanage
def get_all_users():
''' Recuperation de tous les utilisateurs inscrits '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
### DEBUG ###
current_app.logger.debug("current_user: {}".format(current_user))
current_app.logger.debug("Current User: {}".format(current_user))
### DEBUG END ###
# Test si l'utilisateur courant est actif ou pas
if not current_user["Actif"]:
db.disconnect()
abort(403)
sql_statement = "SELECT * FROM utilisateur"
# Execution de la requete SQL
etat, ret = db.execute(sql_statement, None, False)
if not etat:
db.disconnect()
abort(500)
else:
if not ret:
db.disconnect()
abort(404)
content = ret
return jsonify(content)
abort(status.HTTP_403_FORBIDDEN)
content = []
users = db.session.query(User).all()
if len(users):
### DEBUG ###
current_app.logger.debug("Count: {}".format(len(users)))
### END DEBUG ###
for user in users:
### DEBUG ###
current_app.logger.debug("User: {}".format(user.as_dict()))
### END DEBUG ###
content.append(user.as_dict())
return jsonify(content), status.HTTP_200_OK
@users.route('/<int:userId>', methods=['GET'])
@jwt_required()
@dbmanage
def get_one_user(userId):
''' Recuperation d'un seul utilisateur '''
content = {}
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
### DEBUG ###
current_app.logger.debug("Actif ? {} - Role ? {} - userId : {}/{}".format(current_user["Actif"], current_user["Role"], current_user["userId"], userId))
current_app.logger.debug("Current User: {}".format(current_user))
### DEBUG END ###
# Test si l'utilisateur courant est actif ou pas
# Si l'utilisateur courant n'est pas administrateur, il ne peut voir que son profil
if not current_user["Actif"] or current_user["Role"] != "Administrateur" and current_user['userId'] != userId:
db.disconnect()
abort(403)
sql_statement = "SELECT * FROM utilisateur WHERE userid = {}".format(userId)
# execution de la requete sql
etat, ret = db.execute(sql_statement, None, False)
if not etat:
db.disconnect()
abort(500)
abort(status.HTTP_403_FORBIDDEN, description="Not authorized!")
user = db.session.query(User).filter(User.userId==userId)
if user.count():
content = user.first().as_dict()
else:
if not ret:
db.disconnect()
abort(404)
content = ret[0]
return jsonify(content)
abort(status.HTTP_404_NOT_FOUND, descrition="User not found")
return jsonify(content), status.HTTP_200_OK
@users.route('', methods=['POST'])
@jwt_required()
@dbmanage
def add_user():
''' Ajout d'un utilisateur '''
# Access the identity of the current user with get_jwt_identity
@@ -129,42 +122,33 @@ def add_user():
# Test si l'utilisateur courant est actif et Admin ou pas
if not current_user["Actif"] and current_user["Role"] != 'Administrateur':
db.disconnect()
abort(403, description='Utilisateur non autorisé')
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# recuperation des attributs (JSON) de la requete
data_json = request.get_json()
# On vérifie si l'identifiant en base de données est déjà utilisé ou pas
sql_statement = "SELECT * FROM utilisateur WHERE Identifiant = \'{}\'".format(data_json['Identifiant'])
# execution de la requete sql
etat, ret = db.execute(sql_statement, None, False)
if not etat:
db.disconnect()
abort(500)
else:
if ret:
db.disconnect()
abort(401, description="Identifiant déjà utilisé!")
user = db.session.query(User).filter(User.Identifiant==data_json['Identifiant'])
if user.count():
abort(status.HTTP_401_UNAUTHORIZED, description="Identifiant déjà utilisé!")
### DEBUG ###
current_app.logger.debug("Datas: {}".format(data_json))
current_app.logger.debug("Request datas: {}".format(data_json))
### END DEBUG ###
# Hash du mot de passe
hashed_password = generate_password_hash(data_json['Password'], method='sha256')
# creation de la requete SQL
sql_statement="INSERT INTO utilisateur (Nom,Prenom,Role,Identifiant,Password,Actif,Photo) VALUES (?,?,?,?,?,?,?)"
data=(data_json['Nom'], data_json['Prenom'], data_json['Role'], data_json['Identifiant'], hashed_password, data_json['Actif'], data_json['Photo'])
# Execution de la requete SQL
etat, ret = db.execute(sql_statement, data, True)
if not etat:
content = {'Erreur': ret['error']}
### DEBUG ###
current_app.logger.debug("content: {}".format(content))
### END DEBUG ###
return content, status.HTTP_500_INTERNAL_SERVER_ERROR
return jsonify({'message' : 'Nouvel utilisateur créé!'})
# Create new user
newUser = User(prenom=data_json['Prenom'],
nom=data_json['Nom'],
identifiant=data_json['Identifiant'],
password=data_json['Password'],
role=data_json['Role'],
photo=data_json['Photo'],
actif=data_json['Actif'])
# Send new user to database
db.session.add(newUser)
db.session.commit()
return jsonify({'message' : 'Nouvel utilisateur créé!'}), status.HTTP_201_CREATED
@users.route('/<int:userId>', methods=['PUT'])
@jwt_required()
@dbmanage
def modif_user(userId):
''' modification d'un utilisateur '''
# Access the identity of the current user with get_jwt_identity
@@ -172,129 +156,111 @@ def modif_user(userId):
# Test si l'utilisateur courant est actif ou pas
# Si l'utilisateur courant n'est pas administrateur, il ne peut voir que son profil
if not current_user["Actif"] or current_user["Role"] != "Administrateur" and current_user['userId'] != userId:
db.disconnect()
abort(403, description='Utilisateur non autorisé')
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# test des attributs (JSON) de la requete
if not request.data.decode("utf-8"):
current_app.logger.error("Data not found")
db.disconnect()
abort(400, description='Data not found')
abort(status.HTTP_400_BAD_REQUEST, description='Data not found')
# recuperation des attributs (JSON) de la requete
dataDict = request.get_json()
# Hash du mot de passe
dataDict['Password'] = generate_password_hash(dataDict['Password'], method='sha256')
sql_statement = "UPDATE utilisateur SET Nom=%s, Prenom=%s, Photo=%s, Identifiant=%s, Password=%s, Role=%s, Actif=%d WHERE userId = {}".format(userId)
data = (dataDict['Nom'], dataDict['Prenom'], dataDict['Photo'], dataDict['Identifiant'], dataDict['Password'], dataDict['Role'], dataDict['Actif'])
# Recupération de l'utilisateur en fonction de l'id
user = db.session.query(User).filter(User.userId==userId)
if not user.count():
abort(status.HTTP_400_BAD_REQUEST, description='User with this id not found')
# Execution de la requete SQL
etat, ret = db.execute(sql_statement, data, True)
if not etat:
db.disconnect()
abort(500)
return jsonify({'message' : 'utilisateur {} modifié!'.format(userId)})
user = user.first()
user.Prenom = dataDict['Prenom']
user.Nom = dataDict['Nom']
user.Identifiant = dataDict['Identifiant']
user.Password = dataDict['Password']
user.Photo = dataDict['Photo']
user.Role = dataDict['Role']
user.Actif = dataDict['Actif']
# Send modified user to database
db.session.commit()
return jsonify({'message' : 'utilisateur {} modifié!'.format(userId)}), status.HTTP_200_OK
@users.route('/<int:userId>', methods=['DELETE'])
@jwt_required()
@dbmanage
def del_user(userId):
''' Suppression d'un utilisateur '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est Admin ou pas
if not current_user["Actif"] or current_user["Role"] != 'Administrateur':
db.disconnect()
abort(403, description='Utilisateur non autorisé')
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# test des attributs (JSON) de la requete
if request.data.decode("utf-8"):
current_app.logger.error("Data found : {}".format(request.data.decode("utf-8")))
db.disconnect()
abort(400)
# On vérifie que l'utilisateur existe en base de données
sql_statement = "SELECT * FROM utilisateur WHERE userid = {}".format(userId)
# execution de la requete sql
etat, ret = db.execute(sql_statement, None, False)
if not etat:
db.disconnect()
abort(500)
else:
if not ret:
db.disconnect()
abort(404)
abort(status.HTTP_400_BAD_REQUEST)
# On supprime l'utilisateur si celui-ci a été trouvé
sql_statement = "DELETE FROM utilisateur WHERE userId = {}".format(userId)
### DEBUG ###
current_app.logger.debug("statement: {}".format(sql_statement))
### END DEBUG ###
# Execution de la requete SQL
etat, ret = db.execute(sql_statement, None, True)
if not etat:
db.disconnect()
abort(500)
content = {'message' : 'utilisateur supprimé!'}
return jsonify(content)
# Recupération de l'utilisateur en fonction de l'id
user = db.session.query(User).filter(User.userId==userId)
if not user.count():
abort(status.HTTP_400_BAD_REQUEST, description='User with this id not found')
user = user.first()
# On supprime l'utilisateur de la base de données
db.session.delete(user)
db.session.commit()
return jsonify({'message' : 'utilisateur supprimé!'}), status.HTTP_200_OK
@users.route('/<int:userId>/reset_password', methods=['GET'])
@jwt_required()
@dbmanage
def reset_passwd_user(userId):
''' Reset du mot de passe à un utilisateur représenté par son Id '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est Admin ou pas
if not current_user["Actif"] or current_user["Role"] != 'Administrateur':
db.disconnect()
abort(403, description='Utilisateur non autorisé')
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# Hash du mot de passe
passwd = generate_password_hash('provisoire', method='sha256')
sql_statement = "UPDATE utilisateur SET Password='{}' WHERE userId = {}".format(passwd, userId)
# Recupération de l'utilisateur en fonction de l'id
user = db.session.query(User).filter(User.userId==userId)
if not user.count():
abort(status.HTTP_400_BAD_REQUEST, description='User with this id not found')
# Execution de la requete SQL
etat, ret = db.execute(sql_statement, None, True)
if not etat:
db.disconnect()
abort(500)
content = {'message' : 'reset du mot de passe!'}
return jsonify(content)
user = user.first()
user.Password = 'provisoire'
# Send modified user to database
db.session.commit()
return jsonify({'message' : 'reset du mot de passe!'}), status.HTTP_200_OK
@users.route('/<int:userId>/activate', methods=['PUT'])
@jwt_required()
@dbmanage
def deactivate_user(userId):
''' Désactivation d'un utilisateur représenté par son Id '''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est Admin ou pas
if not current_user["Actif"] or current_user["Role"] != 'Administrateur':
db.disconnect()
abort(403, description='Utilisateur non autorisé')
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# test des attributs (JSON) de la requete
if not request.data.decode("utf-8"):
current_app.logger.error("Data not found")
db.disconnect()
abort(400, description='Data not found')
abort(status.HTTP_400_BAD_REQUEST, description='Data not found')
# recuperation des attributs (JSON) de la requete
dataDict = request.get_json()
# Création de la requete SQL
sql_statement = "UPDATE utilisateur SET Actif={} WHERE userId = {}".format(dataDict['Actif'], userId)
# Recupération de l'utilisateur en fonction de l'id
user = db.session.query(User).filter(User.userId==userId)
if not user.count():
abort(status.HTTP_400_BAD_REQUEST, description='User with this id not found')
# Execution de la requete SQL
etat, ret = db.execute(sql_statement, None, True)
if not etat:
db.disconnect()
abort(500)
content = {'message' : 'desactivation de l\'utilisateur réussi!'}
return jsonify(content)
user = user.first()
user.Actif = dataDict['Actif']
# Send modified user to database
db.session.commit()
return jsonify({'message' : 'desactivation de l\'utilisateur réussi!'}), status.HTTP_200_OK
@users.route('/<int:userId>/photo', methods=['GET'])
@jwt_required()
@dbmanage
def get_photo(userId):
''' Recupere la photo de l'utilisateur suivant son Id '''
# Access the identity of the current user with get_jwt_identity
@@ -302,26 +268,15 @@ def get_photo(userId):
# Test si l'utilisateur courant est actif ou pas
# Si l'utilisateur courant n'est pas administrateur, il ne peut voir que son profil
if not current_user["Actif"] or current_user["Role"] != "Administrateur" and current_user['userId'] != userId:
db.disconnect()
abort(403, description='Utilisateur non autorisé')
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# Hash du mot de passe
passwd = generate_password_hash('provisoire', method='sha256')
sql_statement = "SELECT * FROM utilisateur WHERE userid = {}".format(userId)
# Execution de la requete SQL
etat, ret = db.execute(sql_statement, None, False)
if not etat:
db.disconnect()
abort(500)
else:
if not ret:
db.disconnect()
abort(404)
user = ret[0]
if user['Photo']:
return send_file(os.path.join(user['Photo']), mimetype='image/'+os.path.splitext(user['Photo'])[1].split('.')[1])
return abort(404, description='Picture not found!')
user = db.session.query(User).filter(User.userId==userId)
if not user.count():
abort(status.HTTP_404_NOT_FOUND, descrition="User not found")
user = user.first()
if user.Photo:
return send_file(os.path.join(user.Photo), mimetype='image/'+os.path.splitext(user.Photo)[1].split('.')[1])
abort(status.HTTP_404_NOT_FOUND, description='Picture not found!')
@users.route('/current', methods=['GET'])
@jwt_required()
@@ -332,33 +287,25 @@ def current_user():
return jsonify(current_user)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
return '.' in filename and filename.rsplit('.', 1)[1].lower() in current_app.config['ALLOWED_EXTENSIONS']
@users.route('/<int:userId>/uploadImage', methods=['POST'])
@jwt_required()
@dbmanage
def uploadImage(userId):
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# Test si l'utilisateur courant est actif et Admin ou pas
# Si l'utilisateur courant n'est pas administrateur, il ne peut voir que son profil
if not current_user["Actif"] or current_user["Role"] != "Administrateur" and current_user['userId'] != userId:
db.disconnect()
current_app.logger.error("Utilisateur non autorisé")
abort(403, description='Utilisateur non autorisé')
abort(status.HTTP_403_FORBIDDEN, description='Utilisateur non autorisé')
# On vérifie que l'utilisateur existe en base de données
sql_statement = "SELECT * FROM utilisateur WHERE userid = {}".format(userId)
# execution de la requete sql
etat, ret = db.execute(sql_statement, None, False)
if not etat:
db.disconnect()
abort(500)
else:
if not ret:
db.disconnect()
abort(404)
user = ret[0]
user = db.session.query(User).filter(User.userId==userId)
if not user.count():
abort(status.HTTP_404_NOT_FOUND, descrition="User not found")
user = user.first()
current_app.logger.debug("Req Headers: {}".format(request.headers))
current_app.logger.debug("Req Files: {}".format(request.files))
@@ -367,30 +314,25 @@ def uploadImage(userId):
# check if the post request has the file part
if 'photo' not in request.files:
current_app.logger.error('No file part in the request')
abort(400)
abort(status.HTTP_400_BAD_REQUEST)
try:
photo = request.files['photo']
except RequestEntityTooLarge as e:
current_app.logger.error("Fichier trop gros ...")
abort(413)
abort(status.HTTP_413_REQUEST_ENTITY_TOO_LARGE)
# If the user does not select a file, the browser submits an empty file without a filename
if photo.filename == '':
current_app.logger.error('No selected file')
abort(401)
abort(status.HTTP_401_UNAUTHORIZED)
if photo and allowed_file(photo.filename):
filename = secure_filename(photo.filename)
### DEBUG ###
current_app.logger.debug("filename: {}".format(os.path.splitext(filename)))
### END DEBUG ###
filepath = os.path.join(app.config['UPLOAD_FOLDER'], user['Identifiant'] + os.path.splitext(filename)[1])
filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], user.Identifiant + os.path.splitext(filename)[1])
photo.save(filepath)
sql_statement = "UPDATE utilisateur SET Nom=%s, Prenom=%s, Photo=%s, Identifiant=%s, Password=%s, Role=%s, Actif=%d WHERE userId = {}".format(userId)
data = (user['Nom'], user['Prenom'], filepath, user['Identifiant'], user['Password'], user['Role'], user['Actif'])
# Execution de la requete SQL
etat, ret = db.execute(sql_statement, data, True)
if not etat:
db.disconnect()
abort(500)
content = jsonify({'message': 'photo saved successfuly!'})
return content
user.Photo = filepath
# modify user to database
db.session.commit()
return jsonify({'message': 'photo saved successfuly!'}), status.HTTP_200_OK