From 5de4445de79e82ced6ac4985cb07ff7a39c5c31e Mon Sep 17 00:00:00 2001 From: Vincent BENOIT Date: Mon, 14 Nov 2022 17:36:31 +0100 Subject: [PATCH] ajout de la gestion du code pin --- src/intercom.py | 129 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 103 insertions(+), 26 deletions(-) diff --git a/src/intercom.py b/src/intercom.py index 3569ff2..459dba0 100644 --- a/src/intercom.py +++ b/src/intercom.py @@ -120,7 +120,7 @@ def send_at_cmd(cmd='', timeout=0.0, serObj=None, logger=None): return 2 try: - if timeout > 0: + if timeout > 0.0: serObj.timeout = timeout else: serObj.timeout = None @@ -129,26 +129,98 @@ def send_at_cmd(cmd='', timeout=0.0, serObj=None, logger=None): out = '' outlst = [] while serObj.in_waiting > 0: - c = serObj.read(1).decode('utf-8', 'replace') - if c == '\r' or c == '\n': - if out != '': - outlst.append(out) - out = '' - else: - out += c + #c = serObj.read(1).decode('utf-8', 'replace') + #if c == '\r' or c == '\n': + # if out != '': + # outlst.append(out) + # out = '' + #else: + # out += c + + # remove \r and \n chars from out string + out += serObj.read_until().decode('utf-8', 'replace').replace('\r','').replace('\n','') + if out != '': + outlst.append(out) + out = '' logger.debug("Reponse: {}".format(outlst)) if 'OK' in outlst: return 0, outlst - elif 'ERROR' in outlst: - logger.error("Error with cmd : {}".format(cmd)) - return 2, None else: + for item in outlst: + if item.startswith('+CME ERROR:'): + logger.error("Erreur avec la cmd ({}) : {}".format(cmd, item.split('+CME ERROR:')[1])) + return 2, [item.split('+CME ERROR:')[1]] + elif item.startswith('ERROR'): + logger.error("Erreur avec la cmd : {}".format(cmd)) + return 2, None return 1, None except Exception as e: logger.error("Erreur: {}".format(e)) return 2, None return 0, None + +def set_sim_pin(serObj=None, pin_actif=False, code_pin="", logger=None): + ''' Set SIM PIN if necessary + :param serObj: + serial object + + :param config: + dictionary config object from JSON database + + :param log: + logger object + + :return bool: + ''' + if not isinstance(logger, log.Logger): + return False + if not isinstance(serObj, serial.serialposix.Serial): + logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj))) + return False + if not isinstance(pin_actif, bool): + logger.error("error parameter, expecting bool, get {}".format(type(pin_actif))) + return False + if not isinstance(code_pin, str): + logger.error("error parameter, expecting str, get {}".format(type(code_pin))) + return False + + # SIM PIN mandatory or not + ret, rsp = send_at_cmd(cmd='AT+CPIN?', + timeout=0.0, + serObj=serObj, + logger=logger) + if ret == 2: + logger.error("Erreur avec la commande AT: {}".format('AT+CPIN?')) + return False + elif ret == 1: + logger.warning("Timeout avec la commande AT: {}".format('AT+CPIN?')) + else: + if rsp[1].split('+CPIN: ')[1] == 'SIM PIN': + logger.info('SIM verrouillée ...') + # Must enter SIM PIN + if not pin_actif: + logger.error("Configuration en conflit avec la réponse du module GSM") + return False + else: + # Enter the SIM PIN configured in database + ret, rsp = send_at_cmd(cmd='AT+CPIN='+code_pin, + timeout=0.0, + serObj=serObj, + logger=logger) + if ret == 2: + logger.error("Erreur avec la commande AT: {}".format('AT+CPIN=')) + return False + elif ret == 1: + logger.warning("Timeout avec la commande AT: {}".format('AT+CPIN=')) + else: + logger.info("code PIN saisi ...") + elif rsp[1].split('+CPIN: ')[1] == 'READY': + # SIM PIN already notified + logger.info('SIM déverrouillée ...') + + return True + def init_gsm_com(serObj=None, config={}, logger=None): ''' Init GSM Communication source : SIM800_Series_AT_command Manual_v1.09 @@ -185,25 +257,24 @@ def init_gsm_com(serObj=None, config={}, logger=None): logger.error("error parameter, expecting dict, get {}".format(type(config))) return False - cmd_lst = ['AT', - 'ATE1', - 'AT+CMEE=2', - 'AT+CLTS=1', - 'AT+CLIP=1', - 'AT+CMGF=1', - 'AT+VTD='+str(config['DTMF_DURATION'])] - if config['PIN_ACTIF']: - cmd_lst.append('AT+CPIN='+config['CODE_PIN']) - else: - cmd_lst.append('AT+CPIN?') + cmd_lst = [{'cmd':'ATE1', 'timeout':0.0, 'func':None}, + {'cmd':'AT+CMEE=2', 'timeout':0.0, 'func':None}] + cmd_lst.append({'cmd':'AT+CLTS=1', 'timeout':0.0, 'func':None}) + cmd_lst.append({'cmd':'AT+CLIP=1', 'timeout':0.0, 'func':None}) + cmd_lst.append({'cmd':'AT+CMGF=1', 'timeout':0.0, 'func':None}) + cmd_lst.append({'cmd':'AT+VTD='+str(config['DTMF_DURATION']), 'timeout':0.0, 'func':None}) + + if not set_sim_pin(serObj=serObj, pin_actif=config['PIN_ACTIF'], code_pin=config['CODE_PIN'], logger=logger): + return False + logger.info("Initialisation des commandes GSM ...") - for cmd in cmd_lst: - ret, _ = send_at_cmd(cmd=cmd, timeout=0.0, serObj=serObj, logger=logger) + for item in cmd_lst: + ret, rsp = send_at_cmd(cmd=item['cmd'], timeout=item['timeout'], serObj=serObj, logger=logger) if ret == 2: - logger.error("Erreur avec la commande AT: {}".format(cmd)) + logger.error("Erreur avec la commande AT: {}".format(item['cmd'])) return False elif ret == 1: - logger.warning("Timeout avec la commande AT: {}".format(cmd)) + logger.warning("Timeout avec la commande AT: {}".format(item['cmd'])) return True def update_gsm_com(serObj=None, config={}, logger=None): @@ -324,6 +395,7 @@ def info_gsm_com(serObj=None, config={}, logger=None): config['INFOS'][cmd['obj']]['signal_qos'] = item['condition'] break + logger.info('mise à jour des infos dans la base de données') with open(os.path.join("/etc/kineintercom", "db.json"), 'w') as f: json.dump(config, f) @@ -651,6 +723,7 @@ def main(): ret, _ = send_at_cmd(cmd='AT', timeout=0.5, serObj=ser, logger=logger) if ret == 2: logger.error("Erreur d'envoie de la commande AT") + sys.exit(1) elif ret == 1: logger.warning("Pas de réponse du module GSM") GSM_MODULE_STATE = False @@ -699,6 +772,10 @@ def main(): sys.exit(1) else: GSM_MODULE_INIT_STATE = True + + sys.exit(0) + + # Retreive GSM infos logger.info('Récupération des infos du module ...') ret = info_gsm_com(serObj=ser, config=config, logger=logger)