Files
Kine-backend/ConfBack/auth/views.py

256 lines
9.5 KiB
Python

# -*- encoding: utf-8 -*-
# @author : vincent.benoit@benserv.fr
# @brief : Auth routes
#########################################################
# Importation de modules externes #
import sys, re, os
import logging as log
from datetime import datetime, timezone
import time
from flask import Flask, Blueprint, request, abort, jsonify, current_app
from flask_api import status
from flask_jwt_extended import create_access_token
from flask_jwt_extended import get_jwt
from flask_jwt_extended import set_access_cookies
from flask_jwt_extended import unset_jwt_cookies
from flask_jwt_extended import get_jwt_identity
from flask_jwt_extended import jwt_required, decode_token
import json
import shutil
import hashlib
import subprocess
from werkzeug.exceptions import HTTPException
from ConfBack.manager import Sock
#########################################################
# Class et Methods #
auth = Blueprint('auth', __name__, url_prefix='/api/configurateur')
@auth.errorhandler(HTTPException)
def handle_exception(e):
''' return JSON instead of HTML for HTTP errors '''
response = e.get_response()
# replace the body with JSON
response.data = json.dumps({
'code': e.code,
'name': e.name,
'description': e.description,
})
response.content_type = "application/json"
return response
@auth.after_request
def refresh_expiring_tokens(response):
''' Using an 'after_request' callback, we refresh any token that is within
30 minutes of expiring.'''
try:
exp_timestamp = get_jwt()['exp']
now = datetime.now(timezone.utc)
target_timestamp = datetime.timestamp(now + current_app.config['DELTA'])
if target_timestamp > exp_timestamp:
current_app.logger.warning("On doit recréer un token JWT ....")
access_token = create_access_token(identity=get_jwt_identity())
# refresh token in storage place
if os.path.exists(os.path.join("/tmp", current_app.config['PROJECT'])):
with open(os.path.join("/tmp", current_app.config['PROJECT'], get_jwt_identity()['id']), 'w') as f:
f.write(access_token)
# Modifiy a Flask Response to set a cookie containing the access JWT.
set_access_cookies(response, access_token)
return response
except (RuntimeError, KeyError):
return response
def execute_cmd(args=""):
''' Execute system command
'''
out = None
err = None
if len(args) == 0:
current_app.logger.error("Paramètre d'entrée invalide")
return False
current_app.logger.debug("La commande système executée est : {}".format(args))
try:
cmd = subprocess.Popen(args,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(out, err) = cmd.communicate(timeout=10)
except subprocess.CalledProcessError as e:
current_app.logger.error("Error executing system command : {} - {}".format(e.output, e.returncode))
return False
except subprocess.TimeoutExpired:
cmd.kill()
current_app.logger.error("Timeout when executing system command")
return False
except FileNotFoundError:
current_app.logger.error("System command not found")
return False
if cmd.returncode != 0:
current_app.logger.error('Error executing system command ({})'.format(cmd.returncode))
current_app.logger.error('{}'.format(err))
return False
return True
@auth.route('/login', methods=['POST'])
def login():
''' Authenticate User
'''
auth = request.authorization
current_app.logger.info("Connexion de l'utilisateur : {}".format(auth.username))
if not auth or not auth.username or not auth.password:
current_app.logger.error("Login and Password required !")
abort(status.HTTP_401_UNAUTHORIZED, description='Login and Password required')
try:
hashstr = hashlib.sha256(auth.password.encode('utf-8')).hexdigest()
with open(current_app.config['DB_PATH'], 'r') as f:
data = json.load(f)
# authenticate user with his username and password hash
if hashstr == data['utilisateur']['password']:
# create a new access token
current_app.logger.debug("create JWT Token with id: {}".format(auth.username))
token = create_access_token(identity={'id': auth.username})
if not os.path.exists(os.path.join("/tmp", current_app.config['PROJECT'])):
os.mkdir(os.path.join("/tmp", current_app.config['PROJECT']))
with open(os.path.join("/tmp", current_app.config['PROJECT'], auth.username), 'w') as f:
f.write(token)
content = jsonify({'message': 'login successful !'})
# set token as cookie
set_access_cookies(content, token)
return content, status.HTTP_200_OK
current_app.logger.error("Authorization failed !")
except Exception as e:
current_app.logger.error("Erreur : {}".format(e))
abort(status.HTTP_500_INTERNAL_SERVER_ERROR, description='Authentification impossible')
abort(status.HTTP_401_UNAUTHORIZED, description="Authorization failed !")
@auth.route('/logout', methods=['POST'])
@jwt_required()
def logout():
''' Handles HTTP requests to URL: /api/configurateur/logout
'''
current_app.logger.info("Deconnexion de l'utilisateur")
current_user = get_jwt_identity()
content = jsonify({'message': 'logout successful !'})
unset_jwt_cookies(content)
userpath = os.path.join("/tmp", current_app.config['PROJECT'])
if os.path.exists(userpath):
shutil.rmtree(userpath)
return content, status.HTTP_200_OK
@auth.route('/current', methods=['GET'])
@jwt_required()
def current_user():
''' retourne l'utilisateur courant connecté
'''
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
return jsonify(current_user)
@auth.route('/reboot', methods=['POST'])
@jwt_required()
def reboot():
''' Handles HTTP requests to URL: /api/configurateur/reboot
'''
current_app.logger.info("Reboot du système ...")
current_user = get_jwt_identity()
userpath = os.path.join("/tmp", current_app.config['PROJECT'])
if os.path.exists(userpath):
shutil.rmtree(userpath)
conn = Sock(addr=current_app.config['UNIX_ADDR'], logger=current_app.logger)
if not conn.connect():
current_app.logger.error("impossible de se connecter au serveur")
else:
# send order to KineIntercom process
if not conn.send(b"SHUTDOWN\n"):
conn.disconnect()
current_app.logger.error("impossible de communiquer avec le serveur")
conn.disconnect()
time.sleep(3)
reboot_cmd = ['sudo', '/bin/systemctl', 'reboot']
if not execute_cmd(reboot_cmd):
abort(status.HTTP_500_INTERNAL_SERVER_ERROR,
description='Redémarrage du système impossible')
content = jsonify({'message': 'reboot successful !'})
unset_jwt_cookies(content)
return content, status.HTTP_200_OK
@auth.route('/shutdown', methods=['POST'])
@jwt_required()
def shutdown():
''' Handles HTTP requests to URL: /api/configurateur/shutdown
'''
current_app.logger.info("Arrêt du système ...")
current_user = get_jwt_identity()
userpath = os.path.join("/tmp", current_app.config['PROJECT'])
if os.path.exists(userpath):
shutil.rmtree(userpath)
conn = Sock(addr=current_app.config['UNIX_ADDR'], logger=current_app.logger)
if not conn.connect():
current_app.logger.error("impossible de se connecter au serveur")
else:
# send order to KineIntercom process
if not conn.send(b"SHUTDOWN\n"):
conn.disconnect()
current_app.logger.error("impossible de communiquer avec le serveur")
conn.disconnect()
time.sleep(3)
reboot_cmd = ['sudo', '/bin/systemctl', 'poweroff']
if not execute_cmd(reboot_cmd):
abort(status.HTTP_500_INTERNAL_SERVER_ERROR,
description='Arrêt du système impossible')
content = jsonify({'message': 'shutdown successful !'})
unset_jwt_cookies(content)
return content, status.HTTP_200_OK
@auth.route('/userConnected', methods=['GET'])
def user_connected():
''' retourne "oui" si un utilisateur est déjà connecté sinon "non"
'''
flag = False
resp = 'non'
uid = ''
path = os.path.join("/tmp", current_app.config['PROJECT'])
if os.path.exists(path):
# search unique file in path
for root, dirs, files in os.walk(path):
for f in files:
uid = str(f)
# retreive token in file
with open(os.path.join(path, f), 'r') as fp:
token = fp.read()
exp = decode_token(token, allow_expired = True)['exp']
now = datetime.now(timezone.utc)
target = datetime.timestamp(now)
if target > exp:
current_app.logger.warning("Le token de l'ancien utilisateur a expiré")
# remove tmp dir
shutil.rmtree(os.path.join("/tmp", current_app.config['PROJECT']))
flag = True
break
if not flag:
current_app.logger.warning("Un autre utilisateur est déjà connecté")
resp = 'oui'
content = ({'message':resp, 'uid':uid})
return jsonify(content), status.HTTP_200_OK