ajout de la gestion du code pin

This commit is contained in:
Vincent BENOIT
2022-11-14 17:36:31 +01:00
parent e36055faf1
commit 5de4445de7

View File

@@ -120,7 +120,7 @@ def send_at_cmd(cmd='', timeout=0.0, serObj=None, logger=None):
return 2
try:
if timeout > 0:
if timeout > 0.0:
serObj.timeout = timeout
else:
serObj.timeout = None
@@ -129,26 +129,98 @@ def send_at_cmd(cmd='', timeout=0.0, serObj=None, logger=None):
out = ''
outlst = []
while serObj.in_waiting > 0:
c = serObj.read(1).decode('utf-8', 'replace')
if c == '\r' or c == '\n':
if out != '':
outlst.append(out)
out = ''
else:
out += c
#c = serObj.read(1).decode('utf-8', 'replace')
#if c == '\r' or c == '\n':
# if out != '':
# outlst.append(out)
# out = ''
#else:
# out += c
# remove \r and \n chars from out string
out += serObj.read_until().decode('utf-8', 'replace').replace('\r','').replace('\n','')
if out != '':
outlst.append(out)
out = ''
logger.debug("Reponse: {}".format(outlst))
if 'OK' in outlst:
return 0, outlst
elif 'ERROR' in outlst:
logger.error("Error with cmd : {}".format(cmd))
return 2, None
else:
for item in outlst:
if item.startswith('+CME ERROR:'):
logger.error("Erreur avec la cmd ({}) : {}".format(cmd, item.split('+CME ERROR:')[1]))
return 2, [item.split('+CME ERROR:')[1]]
elif item.startswith('ERROR'):
logger.error("Erreur avec la cmd : {}".format(cmd))
return 2, None
return 1, None
except Exception as e:
logger.error("Erreur: {}".format(e))
return 2, None
return 0, None
def set_sim_pin(serObj=None, pin_actif=False, code_pin="", logger=None):
''' Set SIM PIN if necessary
:param serObj:
serial object
:param config:
dictionary config object from JSON database
:param log:
logger object
:return bool:
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return False
if not isinstance(pin_actif, bool):
logger.error("error parameter, expecting bool, get {}".format(type(pin_actif)))
return False
if not isinstance(code_pin, str):
logger.error("error parameter, expecting str, get {}".format(type(code_pin)))
return False
# SIM PIN mandatory or not
ret, rsp = send_at_cmd(cmd='AT+CPIN?',
timeout=0.0,
serObj=serObj,
logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format('AT+CPIN?'))
return False
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format('AT+CPIN?'))
else:
if rsp[1].split('+CPIN: ')[1] == 'SIM PIN':
logger.info('SIM verrouillée ...')
# Must enter SIM PIN
if not pin_actif:
logger.error("Configuration en conflit avec la réponse du module GSM")
return False
else:
# Enter the SIM PIN configured in database
ret, rsp = send_at_cmd(cmd='AT+CPIN='+code_pin,
timeout=0.0,
serObj=serObj,
logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format('AT+CPIN=<CODE_PIN>'))
return False
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format('AT+CPIN=<CODE_PIN>'))
else:
logger.info("code PIN saisi ...")
elif rsp[1].split('+CPIN: ')[1] == 'READY':
# SIM PIN already notified
logger.info('SIM déverrouillée ...')
return True
def init_gsm_com(serObj=None, config={}, logger=None):
''' Init GSM Communication
source : SIM800_Series_AT_command Manual_v1.09
@@ -185,25 +257,24 @@ def init_gsm_com(serObj=None, config={}, logger=None):
logger.error("error parameter, expecting dict, get {}".format(type(config)))
return False
cmd_lst = ['AT',
'ATE1',
'AT+CMEE=2',
'AT+CLTS=1',
'AT+CLIP=1',
'AT+CMGF=1',
'AT+VTD='+str(config['DTMF_DURATION'])]
if config['PIN_ACTIF']:
cmd_lst.append('AT+CPIN='+config['CODE_PIN'])
else:
cmd_lst.append('AT+CPIN?')
cmd_lst = [{'cmd':'ATE1', 'timeout':0.0, 'func':None},
{'cmd':'AT+CMEE=2', 'timeout':0.0, 'func':None}]
cmd_lst.append({'cmd':'AT+CLTS=1', 'timeout':0.0, 'func':None})
cmd_lst.append({'cmd':'AT+CLIP=1', 'timeout':0.0, 'func':None})
cmd_lst.append({'cmd':'AT+CMGF=1', 'timeout':0.0, 'func':None})
cmd_lst.append({'cmd':'AT+VTD='+str(config['DTMF_DURATION']), 'timeout':0.0, 'func':None})
if not set_sim_pin(serObj=serObj, pin_actif=config['PIN_ACTIF'], code_pin=config['CODE_PIN'], logger=logger):
return False
logger.info("Initialisation des commandes GSM ...")
for cmd in cmd_lst:
ret, _ = send_at_cmd(cmd=cmd, timeout=0.0, serObj=serObj, logger=logger)
for item in cmd_lst:
ret, rsp = send_at_cmd(cmd=item['cmd'], timeout=item['timeout'], serObj=serObj, logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format(cmd))
logger.error("Erreur avec la commande AT: {}".format(item['cmd']))
return False
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format(cmd))
logger.warning("Timeout avec la commande AT: {}".format(item['cmd']))
return True
def update_gsm_com(serObj=None, config={}, logger=None):
@@ -324,6 +395,7 @@ def info_gsm_com(serObj=None, config={}, logger=None):
config['INFOS'][cmd['obj']]['signal_qos'] = item['condition']
break
logger.info('mise à jour des infos dans la base de données')
with open(os.path.join("/etc/kineintercom", "db.json"), 'w') as f:
json.dump(config, f)
@@ -651,6 +723,7 @@ def main():
ret, _ = send_at_cmd(cmd='AT', timeout=0.5, serObj=ser, logger=logger)
if ret == 2:
logger.error("Erreur d'envoie de la commande AT")
sys.exit(1)
elif ret == 1:
logger.warning("Pas de réponse du module GSM")
GSM_MODULE_STATE = False
@@ -699,6 +772,10 @@ def main():
sys.exit(1)
else:
GSM_MODULE_INIT_STATE = True
sys.exit(0)
# Retreive GSM infos
logger.info('Récupération des infos du module ...')
ret = info_gsm_com(serObj=ser, config=config, logger=logger)