ajout d'un validateur json pour le chargement de la conf

This commit is contained in:
Vincent BENOIT
2022-11-08 12:08:49 +01:00
parent c37ce9f6f3
commit 1398385b14

View File

@@ -16,6 +16,7 @@ import RPi.GPIO as GPIO
from datetime import datetime, date
import calendar
import json
from jsonschema import validate
import socket
from signal import signal, SIGINT
from threading import Thread
@@ -24,6 +25,44 @@ from apscheduler.schedulers.background import BackgroundScheduler
###################################################################
# Class et Methods #
def validate_json(json_data={}, logger=None):
''' Validate json by schema
:param json_data:
json data loaded
:param logger:
werkzeug context
:return int:
0 => If validates with 2.0
1 => otherwise
'''
if not isinstance(json_data, dict):
logger.error("error parameter, expecting dict, get {}".format(type(json_data)))
return False
if not isinstance(logger, log.Logger):
logger.error("error parameter, expecting logging.Logger, get {}".format(type(logger)))
return False
ret = True
try:
with open(os.path.join('/home/pi', 'db.json.schema'), 'r') as f:
schema=json.load(f)
try:
validate(instance=json_data, schema=schema)
except jsonschema.exceptions.ValidationError as e:
logger.error("Validator error")
ret = False
except jsonschema.exceptions.SchemaError as e:
logger.error("Schema error")
ret = False
except FileNotFoundError as e:
logger.error("Impossible d'ouvrir le fichier de validation ({})".format(e))
ret = False
return ret
def get_conf(logger=None):
''' Get configuration
@@ -38,7 +77,10 @@ def get_conf(logger=None):
try:
with open(os.path.join("/etc/kineintercom", "db.json"), 'r') as f:
try:
config = json.load(f)
config = json.load(f)
if not validate_json(config, logger):
logger.error("Erreur de validation de la configuation")
config = None
except json.decoder.JSONDecodeError as e:
logger.error("Impossible de charger les données de configuration ({})".format(e))
except FileNotFoundError as e:
@@ -159,30 +201,78 @@ def init_gsm_com(serObj=None, config=None, logger=None):
return True
def info_gsm_com(serObj=None, config=None, logger=None):
def info_gsm_com(serObj=None, config={}, logger=None):
''' Retreive GSM Module info
:param serObj:
serial object
:param config:
config object
:param log:
logger object
:return bool:
True if ok, otherwise False
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return False
if not isinstance(logger, log.Logger):
logger.error("error parameter, expecting logging.Logger, get {}".format(type(logger)))
if not isinstance(config, dict):
logger.error("error parameter, expecting dict, get {}".format(type(config)))
return False
cmd_lst = ['AT+CGMI',
'AT+CGMM',
'AT+CGMR',
'AT+COPS?',
'AT+CSMINS?',
'AT+CSPN?',
'AT+CGSN',
'AT+CCALR?']
for cmd in cmd_lst:
ret, rsp = send_at_cmd(cmd=cmd, timeout=0.0, serObj=serObj, logger=logger)
cmd_lst = [{'cmd':'AT+CGMI', 'obj':'manufacturer', 'item':'identification'},
{'cmd':'AT+CGMM', 'obj':'manufacturer', 'item':'model'},
{'cmd':'AT+CGMR', 'obj':'manufacturer', 'item':'hardware_rev'},
{'cmd':'AT+COPS?', 'obj':'control', 'item':'operator'},
{'cmd':'AT+CSMINS?', 'obj':'control', 'item':'sim_inserted'},
{'cmd':'AT+CSPN?', 'obj':'control', 'item':'service_provider'},
{'cmd':'AT+CGSN', 'obj':'manufacturer', 'item':'serial_number'},
{'cmd':'AT+CCALR?', 'obj':'control', 'item':'call_ready'}]
for idx, cmd in enumerate(cmd_lst):
ret, rsp = send_at_cmd(cmd=cmd['cmd'], timeout=0.0, serObj=serObj, logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format(cmd))
return False
elif ret == 1:
lgger.warning("Timeout avec la commande AT: {}".format(cmd))
else:
logger.debug('cmd: {} - rsp: {}'.format(cmd, rsp))
if cmd['cmd'] == 'AT+CGMI':
#cmd: AT+CGMI - rsp: ['AT+CGMI', 'SIMCOM_Ltd', 'OK']
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1]
elif cmd['cmd'] == 'AT+CGMM':
#cmd: AT+CGMM - rsp: ['AT+CGMM', 'SIMCOM_SIM868', 'OK']
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1]
elif cmd['cmd'] == 'AT+CGMR':
#cmd: AT+CGMR - rsp: ['AT+CGMR', 'Revision:1418B02SIM868M32_BT_EAT', 'OK']
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1].split(':')[1]
elif cmd['cmd'] == 'AT+COPS?':
#cmd: AT+COPS? - rsp: ['AT+COPS?', '+COPS: 0,0,"SFR"', 'OK']
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1].split(',')[2][1:-1]
elif cmd['cmd'] == 'AT+CSMINS?':
#cmd: AT+CSMINS? - rsp: ['AT+CSMINS?', '+CSMINS: 0,1', 'OK']
if rsp[1].split(',')[1] == '1':
config['INFOS'][cmd['obj']][cmd['item']] = True
elif rsp[1].split(',')[1] == '0':
config['INFOS'][cmd['obj']][cmd['item']] = False
elif cmd['cmd'] == 'AT+CSPN?':
#cmd: AT+CSPN? - rsp: ['AT+CSPN?', '+CSPN: "La Poste Mobile",0', 'OK']
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1].split(': ')[1].split(',')[0][1:-1]
elif cmd['cmd'] == 'AT+CGSN':
#cmd: AT+CGSN - rsp: ['AT+CGSN', '864866057705260', 'OK']
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1]
elif cmd['cmd'] == 'AT+CCALR?':
#cmd: AT+CCALR? - rsp: ['AT+CCALR?', '+CCALR: 1', 'OK']
if rsp[1].split(': ')[1] == '1':
config['INFOS'][cmd['obj']][cmd['item']] = True
elif rsp[1].split(': ')[1] == '0':
config['INFOS'][cmd['obj']][cmd['item']] = False
return True
def verify_caller(buf="", num="", logger=None):
@@ -226,11 +316,13 @@ def listener(sock, logger):
global FLAG_CONF_UPDATE
global FLAG_HORAIRE_UPDATE
global FLAG_TH_CLOSE
flag = True
logger.debug("Démarrage du serveur de communication avec le backend du configurateur")
while True:
while not FLAG_TH_CLOSE:
# Wait for a connection
try:
logger.debug("Attente de connexion")
clientsocket, address = sock.accept()
flag = True
logger.debug("Client connecté {}".format(clientsocket))
@@ -257,6 +349,9 @@ def listener(sock, logger):
continue
if not data:
break
logger.debug("Fin du serveur de communication")
return
def verify_open_hours(conf=None, logger=None):
''' Verify if GSM HAT must be opened with conf hours
@@ -395,6 +490,7 @@ GSM_MODULE_STATE = False
GSM_MODULE_INIT_STATE = False
FLAG_CONF_UPDATE = False
FLAG_HORAIRE_UPDATE = False
FLAG_TH_CLOSE = False
def main():
''' main function
@@ -412,10 +508,11 @@ def main():
global GSM_MODULE_INIT_STATE
global FLAG_CONF_UPDATE
global FLAG_HORAIRE_UPDATE
global FLAG_TH_CLOSE
# Configuration loader
config = get_conf(logger)
if not config:
if config is None:
logger.error("Impossible de charger la configuration")
sys.exit(1)
@@ -522,6 +619,7 @@ def main():
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Bind the socket to the port
sock.bind(server_addr)
sock.settimeout(1) # Timeout for accept
# Listen for incoming connections
sock.listen(1)
thread = Thread(target=listener, args=(sock, logger,))
@@ -569,6 +667,8 @@ def main():
break
else:
GSM_MODULE_INIT_STATE = True
logger.info('Récupération des infos du module ...')
ret = info_gsm_com(serObj=ser, config=config, logger=logger)
# While the number of bytes in the input buffer > 0
if ser.in_waiting > 0:
@@ -587,6 +687,7 @@ def main():
time.sleep(1)
except KeyboardInterrupt:
logger.error("Keyboard Interrupt !!")
FLAG_TH_CLOSE = True
logger.info("fermeture du port de communication avec le GNSS_HAT")
ser.close()