Files
KineIntercom/kineintercom/intercom.py
2022-11-15 15:39:38 +01:00

922 lines
34 KiB
Python

#!/usr/bin/python3
# -*- encoding: utf-8 -*-
# @author: benoit.vince84@free.fr
# @date: Septembre 2022
# @brief: Programme Intercom à partir du module GSM
###################################################################
# Importation de modules externes #
import sys, os, re
import serial
import logging as log
import time
import RPi.GPIO as GPIO
from datetime import datetime, date
import calendar
import json
import jsonschema
import socket
import shutil
from threading import Thread
from apscheduler.schedulers.background import BackgroundScheduler
###################################################################
# Class et Methods #
def validate_json(json_data={}, logger=None):
''' Validate json database by schema
:param json_data:
json data loaded
:param logger:
logger object
:return bool:
True if OK, otherwise False
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(json_data, dict):
logger.error("error parameter, expecting dict, get {}".format(type(json_data)))
return False
ret = True
try:
with open(os.path.join('/home/pi', 'db.json.schema'), 'r') as f:
schema=json.load(f)
except FileNotFoundError as e:
logger.error("Impossible d'ouvrir le fichier de validation ({})".format(e))
ret = False
try:
jsonschema.validate(instance=json_data, schema=schema)
except jsonschema.exceptions.ValidationError as e:
logger.error("Erreur de validation de la base de données : {}".format(e))
ret = False
except jsonschema.exceptions.SchemaError as e:
logger.error("Erreur de schéma : {}".format(e))
ret = False
return ret
def get_conf(logger=None):
''' Get configuration
:param logger:
logger object
:return dict:
Configuration dictionnary
'''
if not isinstance(logger, log.Logger):
return None
config = None
# copy database if not exists
if not os.path.exists(os.path.join("/etc/kineintercom", "db.json")):
logger.warning("Le fichier de la base de données n'existe pas ...")
try:
shutil.copyfile(os.path.join("/usr/share/kineintercom", "database_origin.json"),
os.path.join("/etc/kineintercom", "db.json"))
except PermissionError as e:
logger.error("Erreur de permission: {}".format(e))
return None
try:
with open(os.path.join("/etc/kineintercom", "db.json"), 'r') as f:
try:
config = json.load(f)
if not validate_json(config, logger):
logger.error("Erreur de validation de la configuation")
config = None
except json.decoder.JSONDecodeError as e:
logger.error("Impossible de charger les données de configuration ({})".format(e))
except FileNotFoundError as e:
logger.error("Impossible d'ouvrir le fichier de configuation ({})".format(e))
return config
def send_at_cmd(cmd='', timeout=0.0, serObj=None, logger=None):
''' send AT to command to GNSS_HAT
:param cmd:
string AT command
:param timeout:
float timeout to read response
:param serObj:
serial object
:param logger:
logger object
:return int:
0 if ok, 2 if error, otherwise 1
'''
if not isinstance(cmd, str):
logger.error("error parameter, expecting str, get {}".format(type(cmd)))
return 2
if not isinstance(timeout, float):
logger.error("error parameter, expecting float, get {}".format(type(timeout)))
return 2
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return 2
if not isinstance(logger, log.Logger):
logger.error("error parameter, expecting logging.Logger, get {}".format(type(logger)))
return 2
try:
if timeout > 0.0:
serObj.timeout = timeout + 0.1
else:
serObj.timeout = None
serObj.write(bytes(cmd+'\r', 'utf-8'))
time.sleep(.5)
out = ''
outlst = []
while serObj.in_waiting > 0:
# remove \r and \n chars from out string
out += serObj.read_until().decode('utf-8', 'replace').replace('\r','').replace('\n','')
if out != '':
outlst.append(out)
out = ''
time.sleep(timeout)
logger.debug("Reponse: {}".format(outlst))
if 'OK' in outlst:
return 0, outlst
else:
for item in outlst:
if item.startswith('+CME ERROR:'):
logger.error("Erreur avec la cmd ({}) : {}".format(cmd, item.split('+CME ERROR:')[1]))
return 2, [item.split('+CME ERROR:')[1]]
elif item.startswith('ERROR'):
logger.error("Erreur avec la cmd : {}".format(cmd))
return 2, None
return 1, None
except Exception as e:
logger.error("Erreur: {}".format(e))
return 2, None
return 0, None
def set_sim_pin(serObj=None, pin_actif=False, code_pin="", logger=None):
''' Set SIM PIN if necessary
AT+CPIN=<CODE PIN> : Enter PIN (response READY: MT is not pending for any password)
:param serObj:
serial object
:param pin_actif:
attribute from dictionary config object
:param code_pin:
attribute from dictionary config object
:param log:
logger object
:return bool:
True if OK, otherwise False
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return False
if not isinstance(pin_actif, bool):
logger.error("error parameter, expecting bool, get {}".format(type(pin_actif)))
return False
if not isinstance(code_pin, str):
logger.error("error parameter, expecting str, get {}".format(type(code_pin)))
return False
# SIM PIN mandatory or not
ret, rsp = send_at_cmd(cmd='AT+CPIN?',
timeout=0.0,
serObj=serObj,
logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format('AT+CPIN?'))
return False
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format('AT+CPIN?'))
else:
for item in rsp:
if item.startswith('+CPIN:'):
if item.split('+CPIN: ')[1] == 'SIM PIN':
logger.info('SIM verrouillée ...')
# Must enter SIM PIN
if not pin_actif:
logger.error("Configuration en conflit avec la réponse du module GSM")
return False
else:
# Enter the SIM PIN configured in database
ret, _ = send_at_cmd(cmd='AT+CPIN='+code_pin,
timeout=2.0,
serObj=serObj,
logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format('AT+CPIN=<CODE_PIN>'))
return False
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format('AT+CPIN=<CODE_PIN>'))
else:
logger.info("code PIN validé ...")
elif item.split('+CPIN: ')[1] == 'READY':
# SIM PIN already notified
logger.info('SIM déverrouillée ...')
return True
def init_gsm_com(serObj=None, config={}, logger=None):
''' Init GSM Communication
source : SIM800_Series_AT_command Manual_v1.09
AT : test command
ATE1 :
AT+CMEE=2 :
AT+CLTS=1 :
AT+CMGF=1 : Set the format of messages to Text mode
AT+CLIP=1 : The calling line identifty (CLI) of calling party when
receiving a mobile terminated call
AT+VTD=1 : Tone Duration (in 1/10 seconds)
:param serObj:
serial object
:param config:
dictionary config object from JSON database
:param log:
logger object
:return bool:
True if OK, otherwise False
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return False
if not isinstance(config, dict):
logger.error("error parameter, expecting dict, get {}".format(type(config)))
return False
cmd_lst = [{'cmd':'ATE1', 'timeout':0.0, 'func':None},
{'cmd':'AT+CMEE=2', 'timeout':0.0, 'func':None}]
cmd_lst.append({'cmd':'AT+CLTS=1', 'timeout':0.0, 'func':None})
cmd_lst.append({'cmd':'AT+CLIP=1', 'timeout':0.0, 'func':None})
cmd_lst.append({'cmd':'AT+CMGF=1', 'timeout':0.0, 'func':None})
cmd_lst.append({'cmd':'AT+VTD='+str(config['DTMF_DURATION']), 'timeout':0.0, 'func':None})
if not set_sim_pin(serObj=serObj, pin_actif=config['PIN_ACTIF'], code_pin=config['CODE_PIN'], logger=logger):
return False
logger.info("Initialisation des commandes GSM ...")
for item in cmd_lst:
ret, rsp = send_at_cmd(cmd=item['cmd'], timeout=item['timeout'], serObj=serObj, logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format(item['cmd']))
return False
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format(item['cmd']))
return True
def update_gsm_com(serObj=None, config={}, logger=None):
''' Init GSM Communication
source : SIM800_Series_AT_command Manual_v1.09
AT+VTD=<PARAM DB> : Tone Duration (in 1/10 seconds)
:param serObj:
serial object
:param config:
dictionary config object from JSON database
:param logger:
logger object
:return bool:
True if OK, otherwise False
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return False
if not isinstance(config, dict):
logger.error("error parameter, expecting dict, get {}".format(type(config)))
return False
cmd_lst = ['AT+VTD='+str(config['DTMF_DURATION'])]
for cmd in cmd_lst:
ret, _ = send_at_cmd(cmd=cmd, timeout=0.0, serObj=serObj, logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format(cmd))
return False
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format(cmd))
return True
def info_gsm_com(serObj=None, config={}, logger=None):
''' Retreive GSM Module info
:param serObj:
serial object
:param config:
dictionary config object from JSON database
:param logger:
logger object
:return bool:
True if ok, otherwise False
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return False
if not isinstance(config, dict):
logger.error("error parameter, expecting dict, get {}".format(type(config)))
return False
cmd_lst = [{'cmd':'AT+CGMI', 'obj':'manufacturer', 'item':'identification'},
{'cmd':'AT+CGMM', 'obj':'manufacturer', 'item':'model'},
{'cmd':'AT+CGMR', 'obj':'manufacturer', 'item':'hardware_rev'},
{'cmd':'AT+COPS?', 'obj':'control', 'item':'operator'},
{'cmd':'AT+CSMINS?', 'obj':'control', 'item':'sim_inserted'},
{'cmd':'AT+CSPN?', 'obj':'control', 'item':'service_provider'},
{'cmd':'AT+CGSN', 'obj':'manufacturer', 'item':'serial_number'},
{'cmd':'AT+CCALR?', 'obj':'control', 'item':'call_ready'},
{'cmd':'AT+CREG?', 'obj':'control', 'item':''},
{'cmd':'AT+CSQ', 'obj':'control', 'item':'signal_dbm'}]
for idx, cmd in enumerate(cmd_lst):
ret, rsp = send_at_cmd(cmd=cmd['cmd'], timeout=0.0, serObj=serObj, logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format(cmd))
return False
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format(cmd))
else:
logger.debug('cmd: {} - rsp: {}'.format(cmd, rsp))
if cmd['cmd'] == 'AT+CGMI':
# Request Manufacturer Identification
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1]
elif cmd['cmd'] == 'AT+CGMM':
# Request Model Identification
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1]
elif cmd['cmd'] == 'AT+CGMR':
# Request TA Revision Identification of Software Release
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1].split(':')[1]
elif cmd['cmd'] == 'AT+COPS?':
# Operator Selection
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1].split(',')[2][1:-1]
elif cmd['cmd'] == 'AT+CSMINS?':
# SIM Inserted Status Reporting
if rsp[1].split(',')[1] == '1':
config['INFOS'][cmd['obj']][cmd['item']] = True
elif rsp[1].split(',')[1] == '0':
config['INFOS'][cmd['obj']][cmd['item']] = False
elif cmd['cmd'] == 'AT+CSPN?':
# Get Service Provider Name from SIM
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1].split(': ')[1].split(',')[0][1:-1]
elif cmd['cmd'] == 'AT+CGSN':
# Request Product Serial Number Identification
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1]
elif cmd['cmd'] == 'AT+CCALR?':
# Call Ready Query
if rsp[1].split(': ')[1] == '1':
config['INFOS'][cmd['obj']][cmd['item']] = True
elif rsp[1].split(': ')[1] == '0':
config['INFOS'][cmd['obj']][cmd['item']] = False
elif cmd['cmd'] == 'AT+CSQ':
# Signal Quality Report
val = rsp[1].split('+CSQ: ')[1].split(',')[0]
for item in SIGNAL:
if item['val'] == int(val):
config['INFOS'][cmd['obj']][cmd['item']] = item['dbm']
config['INFOS'][cmd['obj']]['signal_qos'] = item['condition']
break
logger.info('mise à jour des infos dans la base de données')
with open(os.path.join("/etc/kineintercom", "db.json"), 'w') as f:
json.dump(config, f)
return True
SIGNAL = [{"val":2, "dbm":"-109", 'condition':'Marginal'},
{"val":3, "dbm":"-107", 'condition':'Marginal'},
{"val":4, "dbm":"-105", 'condition':'Marginal'},
{"val":5, "dbm":"-103", 'condition':'Marginal'},
{"val":6, "dbm":"-101", 'condition':'Marginal'},
{"val":7, "dbm":"-99", 'condition':'Marginal'},
{"val":8, "dbm":"-97", 'condition':'Marginal'},
{"val":9, "dbm":"-95", 'condition':'Marginal'},
{"val":10, "dbm":"-93", 'condition':'OK'},
{"val":11, "dbm":"-91", 'condition':'OK'},
{"val":12, "dbm":"-89", 'condition':'OK'},
{"val":13, "dbm":"-87", 'condition':'OK'},
{"val":14, "dbm":"-85", 'condition':'OK'},
{"val":15, "dbm":"-83", 'condition':'Good'},
{"val":16, "dbm":"-81", 'condition':'Good'},
{"val":17, "dbm":"-79", 'condition':'Good'},
{"val":18, "dbm":"-77", 'condition':'Good'},
{"val":19, "dbm":"-75", 'condition':'Good'},
{"val":20, "dbm":"-73", 'condition':'Excellent'},
{"val":21, "dbm":"-71", 'condition':'Excellent'},
{"val":22, "dbm":"-69", 'condition':'Excellent'},
{"val":23, "dbm":"-67", 'condition':'Excellent'},
{"val":24, "dbm":"-65", 'condition':'Excellent'},
{"val":25, "dbm":"-63", 'condition':'Excellent'},
{"val":26, "dbm":"-61", 'condition':'Excellent'},
{"val":27, "dbm":"-59", 'condition':'Excellent'},
{"val":28, "dbm":"-57", 'condition':'Excellent'},
{"val":29, "dbm":"-55", 'condition':'Excellent'},
{"val":30, "dbm":"-53", 'condition':'Excellent'}]
def verify_caller(buf="", num="", logger=None):
''' Verify phone number of caller
example : +CLIP: "0607297154",129,"",0,"",0
:param buf:
Serial input buffer
:param num:
Caller phone number
:param logger:
logger object
:return bool:
True if authorized, False otherwise
'''
if not isinstance(logger, log.Logger):
return False, ""
if not isinstance(buf, str):
logger.error("error parameter, expecting str, get {}".format(type(buf)))
return False, ""
if not isinstance(num, str):
logger.error("error parameter, expecting str, get {}".format(type(num)))
return False, ""
outlst = buf[7:].split(',')
logger.debug("=> {}".format(outlst))
phone_number = outlst[0].replace("\"","")
if not phone_number.endswith(num):
logger.warning("phone number not match ! {}/{}".format(phone_number, num))
return False, phone_number
return True, phone_number
def listener(sock, logger):
''' Thread socket listener
:param sock:
socket object
:param logger:
logger object
'''
global FLAG_CONF_UPDATE
global FLAG_HORAIRE_UPDATE
global FLAG_TH_CLOSE
flag = True
logger.debug("Démarrage du serveur de communication avec le backend du configurateur")
while not FLAG_TH_CLOSE:
# Wait for a connection
try:
clientsocket, address = sock.accept()
flag = True
logger.debug("Client connecté {}".format(clientsocket))
except socket.timeout:
continue
# socket recv() will block for a maximum of 1 sec.
#clientsocket.settimeout(1)
data = b''
while clientsocket:
while flag:
try:
data += clientsocket.recv(1)
if data.decode('utf-8').endswith('\n'):
if data.decode('utf-8').startswith('RELOAD_DB'):
logger.debug("==> RELOAD DB")
FLAG_CONF_UPDATE = True
elif data.decode('utf-8').startswith('RELOAD_HOURS'):
FLAG_CONF_UPDATE = True
FLAG_HORAIRE_UPDATE = True
clientsocket = None
flag = False
except:
continue
if not data:
break
logger.debug("Fin du serveur de communication")
return
def verify_open_hours(conf={}, logger=None):
''' Verify if GSM HAT must be opened with conf hours
:param conf:
configuration object
:param logger:
logger object
:return bool:
True if authorized, False otherwise
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(conf, dict):
logger.error("error parameter, expecting dict, get {}".format(type(conf)))
return False
flag = False
my_date = date.today()
day = calendar.day_name[my_date.weekday()]
now = datetime.now()
for item in conf[day]:
time_conf = int(item['name'].split('h')[0])*60 + int(item['name'].split('h')[1])
current_time = now.hour*60 + now.minute
if current_time >= time_conf:
if item['state'] == 1:
flag = True
elif item['state'] == 0:
flag = False
logger.debug('Jour: {} - Temps courant: {} - Ouverture: {}'.format(day, now.strftime('%Hh%M'), flag))
return flag
def init_module():
''' initialisation of GNSS/GPS/GSM HAT Module
:return bool:
True if OK, otherwise False
'''
try:
GPIO.setmode(GPIO.BOARD)
GPIO.setwarnings(False)
GPIO.setup(7, GPIO.OUT)
except ValueError as e:
return False
return True
def setup_module():
''' Setup module (Set/Reset)
:return bool:
True if OK, otherwise False
'''
while True:
try:
GPIO.output(7, GPIO.LOW)
time.sleep(2)
GPIO.output(7, GPIO.HIGH)
except ValueError as e:
return False
break
#GPIO.cleanup()
return True
def cron_verify_hours(op='', config=None, logger=None):
''' cron job to check opened or closed hours
'''
if not isinstance(logger, log.Logger):
return
if not isinstance(config, dict):
logger.error("error parameter, expecting dict, get {}".format(type(config)))
return
if not isinstance(op, str):
logger.error("error parameter, expecting str, get {}".format(type(op)))
return
global GSM_MODULE_STATE
global GSM_MODULE_INIT_STATE
if op == 'Horaires':
# Verify hours with conf file
opened = verify_open_hours(conf=config, logger=logger)
elif op == 'Manuel ON':
opened = True
elif op == 'Manuel OFF':
opened = False
else:
logger.error("Wrong 'OPERATION' parameter in database")
if opened:
# Si le module GSM doit être allumé et qu'il est éteint, on l'allume
if not GSM_MODULE_STATE:
logger.info("Allumage du module GSM HAT ...")
if not setup_module():
logger.error('Erreur d\'allumage du module GSM')
else:
GSM_MODULE_STATE = True
GSM_MODULE_INIT_STATE = False
else:
# Si le module GSM doit être éteint et qu'il est allumé, on l'eteint
if GSM_MODULE_STATE:
logger.info("Fermeture du module GSM HAT ...")
if not setup_module():
logger.error('Erreur de fermeture du module GSM')
else:
GSM_MODULE_STATE = False
GSM_MODULE_INIT_STATE = False
def process(buf="", config={}, ser=None, logger=None):
''' run process
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(buf, str):
logger.error("error parameter, expecting str, get {}".format(type(buf)))
return False
if not isinstance(config, dict):
logger.error("error parameter, expecting dict, get {}".format(type(config)))
return False
if not isinstance(ser, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(ser)))
return False
# receive phone call
if buf.startswith('+CLIP: '):
# Verify caller phone number
ret, phone_number = verify_caller(buf=buf, num=config['NUM_AUTORISE'], logger=logger)
if not ret:
# Hang up
ret, _ = send_at_cmd(cmd="ATH", timeout=0.0, serObj=ser, logger=logger)
logger.warning("Phone number not authorized ({}) - Hang up ...".format(phone_number))
else:
time.sleep(0.3)
# Pick up
ret, _ = send_at_cmd(cmd='ATA', timeout=0.0, serObj=ser, logger=logger)
if ret == 0:
time.sleep(0.2)
logger.info('Pick up the call ...')
# Configure DTMF duration
ret, _ = send_at_cmd(cmd='AT+VTD='+str(config['DTMF_DURATION']), timeout=0.0, serObj=ser, logger=logger)
if ret:
logger.error("Erreur de configuration de la durée ({}) du code DTMF".format(config['DTMF_DURATION']))
time.sleep(0.2)
# send DMTF tone
send_at_cmd(cmd='AT+VTS='+config['DTMF_CODE'], timeout=0.0, serObj=ser, logger=logger)
# Waiting DTMF tone duration (1/10 sec) + 2 seconds
time.sleep(float(config['DTMF_DURATION']/10) + 1)
# Hang up
logger.info('Hang up ...')
ret, _ = send_at_cmd(cmd='ATH', timeout=0.0, serObj=ser, logger=logger)
if ret > 0:
logger.error('Cannot hang up ...')
else:
logger.error("Erreur de connexion avec le correspondant")
elif buf.startswith('+CMTI: '): # receive msg
msg_num = buf.split('+CMTI: ')[1].split(',')[1]
send_at_cmd(cmd='AT+CMGF=1', timeout=0.0, serObj=ser, logger=logger)
ret, rsp = send_at_cmd(cmd='AT+CMGR='+msg_num, timeout=0.0, serObj=ser, logger=logger)
if ret == 0:
phone_num = rsp[1].split('+CMGR: ')[1].split(',')[1][1:-1]
msg = rsp[2]
logger.info('[{}] {}'.format(phone_num, msg))
else:
logger.error("Impossible de lire le SMS")
ret, rsp = send_at_cmd(cmd='AT+CMGD='+str(msg_num), timeout=0.0, serObj=ser, logger=logger)
return True
###################################################################
# Corps principal du programme #
GSM_MODULE_STATE = False
GSM_MODULE_INIT_STATE = False
FLAG_CONF_UPDATE = False
FLAG_HORAIRE_UPDATE = False
FLAG_TH_CLOSE = False
def main():
''' main function
'''
# Logger configuration
logger = log.getLogger("Intercom")
logger.setLevel(log.DEBUG)
fl = log.StreamHandler()
hl = log.FileHandler(os.path.join('/var/log/kineintercom','Intercom.log'))
fl.setLevel(log.DEBUG)
hl.setLevel(log.INFO)
formatter = log.Formatter('%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
fl.setFormatter(formatter)
hl.setFormatter(formatter)
logger.addHandler(fl)
logger.addHandler(hl)
global GSM_MODULE_STATE
global GSM_MODULE_INIT_STATE
global FLAG_CONF_UPDATE
global FLAG_HORAIRE_UPDATE
global FLAG_TH_CLOSE
# Configuration loader
config = get_conf(logger)
if config is None:
logger.error("Impossible de charger la configuration")
sys.exit(1)
# init GSM HAT module
logger.info("Initialisation du module GSM HAT ...")
if not init_module():
logger.error("Impossible d'initialiser les GPIO de la board")
sys.exit(1)
# Serial configuration
ser = serial.Serial('/dev/ttyAMA0',
baudrate=115200,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=None, write_timeout=2,
xonxoff=False,
rtscts=False,
dsrdtr=False)
# Test si le port série est ouvert
if ser.isOpen():
logger.info("Le port série avec le module GSM est ouvert")
try:
ser.flushInput() #flush input buffer, discarding all its contents
ser.flushOutput() #flush output buffer, aborting current output
# send AT command to test the connection
ret, _ = send_at_cmd(cmd='AT', timeout=0.5, serObj=ser, logger=logger)
if ret == 2:
logger.error("Erreur d'envoie de la commande AT")
sys.exit(1)
elif ret == 1:
logger.warning("Pas de réponse du module GSM")
GSM_MODULE_STATE = False
else:
logger.info("Module GSM HAT allumé ...")
GSM_MODULE_STATE = True
except Exception as e:
logger.error("Erreur de com série: {}".format(e))
else:
logger.error("Impossible d'ouvrir le port série")
sys.exit(1)
# Verify if the GSM module must be opened or not
if config['OPERATION'] == 'Horaires':
logger.info("Mode \"Horaires\" activé ...")
opened = verify_open_hours(conf=config['HORAIRES'], logger=logger)
elif config['OPERATION'] == 'Manuel ON':
logger.info("Mode \"Manuel ON\" activé ...")
opened = True
elif config['OPERATION'] == 'Manuel OFF':
logger.info("Mode \"Manuel OFF\" activé ...")
opened = False
else:
logger.error("Wrong 'OPERATION' parameter in database")
ser.close()
sys.exit(1)
if opened:
# Si le module GSM doit être allumé et qu'il est éteint, on l'allume
if not GSM_MODULE_STATE:
logger.info("Allumage du module GSM HAT ...")
if not setup_module():
logger.error("Erreur de configuration des GPIOs de la board")
ser.close()
sys.exit(1)
else:
GSM_MODULE_STATE = True
# Attente de 10 secondes avant d'initier l'init GSM
time.sleep(10)
# Initialize GSM communication
logger.info("Initialisation des commandes AT nécessaires ...")
ret = init_gsm_com(serObj=ser, config=config, logger=logger)
if not ret:
logger.error("Erreur d'initialisation de la com GSM avec le module")
ser.close()
sys.exit(1)
else:
GSM_MODULE_INIT_STATE = True
# Retreive GSM infos
logger.info('Récupération des infos du module ...')
ret = info_gsm_com(serObj=ser, config=config, logger=logger)
if not ret:
logger.error("Erreur d'initialisation de la com GSM avec le module")
ser.close()
sys.exit(1)
else:
# Si le module GSM doit être éteint et qu'il est allumé, on l'eteint
if GSM_MODULE_STATE:
logger.debug("Fermeture du module GSM HAT ...")
if not setup_module():
logger.error("Erreur de configuration des GPIOs de la board")
ser.close()
sys.exit(1)
else:
GSM_MODULE_STATE = False
GSM_MODULE_INIT_STATE = False
server_addr = "/tmp/uds_socket"
# Make sure the socket does not already exist
try:
os.unlink(server_addr)
except OSError:
if os.path.exists(server_addr):
raise
# Create UDS socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Bind the socket to the port
sock.bind(server_addr)
sock.settimeout(1) # Timeout for accept
# Listen for incoming connections
sock.listen(1)
thread = Thread(target=listener, args=(sock, logger,))
thread.start()
time.sleep(1)
# Scheduler opened hours
sched = BackgroundScheduler(daemon=True)
sched.add_job(func=cron_verify_hours,
args=(config['OPERATION'], config['HORAIRES'], logger),
trigger='cron',
minute='*',
id="job_id")
sched.start()
out = ''
try:
while True:
# Drapeau de mise à jour de la configuration, par le configurateur via le serveur (socket)
if FLAG_CONF_UPDATE:
logger.info("Configuration doit être rechargée ...")
# reload conf in memory
config = get_conf(logger)
logger.info("Mode \"{}\" activé ...".format(config['OPERATION']))
# update GSM parameters with new parameters
#if not update_gsm_com(serObj=ser, config=config, logger=logger):
# logger.warning('Impossible de mettre à jour les paramètres GSM')
if FLAG_HORAIRE_UPDATE:
logger.info("Restart scheduler ...")
# Stop job scheduler to restart it
sched.remove_job('job_id')
sched.add_job(func=cron_verify_hours,
args=(config['OPERATION'], config['HORAIRES'], logger),
trigger='cron',
minute='*',
id="job_id")
FLAG_HORAIRE_UPDATE = False
FLAG_CONF_UPDATE = False
# Si le module GSM est ouvert
if GSM_MODULE_STATE:
# Si le module GSM n'a pas été initialisé
if not GSM_MODULE_INIT_STATE:
# Attente de 10sec pour que le module accroche le signal après allumage
time.sleep(10)
# Initialize GSM serial communication
ret = init_gsm_com(serObj=ser, config=config, logger=logger)
if not ret:
logger.error("Erreur d'initialisation de la com GSM avec le module")
break
else:
GSM_MODULE_INIT_STATE = True
logger.info('Récupération des infos du module ...')
ret = info_gsm_com(serObj=ser, config=config, logger=logger)
# While the number of bytes in the input buffer > 0
if ser.in_waiting > 0:
# remove \r and \n chars from out string
out += ser.read_until().decode('utf-8').replace('\r','').replace('\n','')
if len(out) > 0 :
logger.debug("out: {}".format(out))
time.sleep(.1)
ret = process(buf=out, config=config, ser=ser, logger=logger)
if not ret:
break
out = ''
else:
time.sleep(1)
else:
time.sleep(1)
except KeyboardInterrupt:
logger.error("Keyboard Interrupt !!")
FLAG_TH_CLOSE = True
logger.info("fermeture du port de communication avec le GNSS_HAT")
ser.close()
sched.shutdown(wait=False)
thread.join()
if __name__ == '__main__':
main()