From 2895819b1e26b6b525e94f0ce1ebcae4e4d3a6b2 Mon Sep 17 00:00:00 2001 From: Vincent BENOIT Date: Sun, 2 Oct 2022 11:59:11 +0200 Subject: [PATCH] =?UTF-8?q?mise=20=C3=A0=20jour=20du=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/intercom.py | 114 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 103 insertions(+), 11 deletions(-) diff --git a/src/intercom.py b/src/intercom.py index 8852da6..44c82be 100644 --- a/src/intercom.py +++ b/src/intercom.py @@ -30,6 +30,10 @@ def get_conf(logger=None): :return dict: Configuration dictionnary ''' + if not isinstance(logger, log.Logger): + logger.error("error parameter, expecting logging.Logger, get {}".format(type(logger))) + return None + config = None try: with open(os.path.join("/home/pi/KineIntercom/src", "KineIntercom.json"), 'r') as f: @@ -70,7 +74,7 @@ def send_at_cmd(cmd='', timeout=0.0, serObj=None, logger=None): logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj))) return 2 if not isinstance(logger, log.Logger): - logger.error("error parameter, expecting logging.Logger, get {}".format(type(log))) + logger.error("error parameter, expecting logging.Logger, get {}".format(type(logger))) return 2 try: @@ -133,7 +137,7 @@ def init_gsm_com(serObj=None, config=None, logger=None): logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj))) return False if not isinstance(logger, log.Logger): - logger.error("error parameter, expecting logging.Logger, get {}".format(type(log))) + logger.error("error parameter, expecting logging.Logger, get {}".format(type(logger))) return False cmd_lst = ['AT', @@ -154,7 +158,7 @@ def init_gsm_com(serObj=None, config=None, logger=None): lgger.warning("Timeout avec la commande AT: {}".format(cmd)) return True -def verify_caller(buf="", num="", log=None): +def verify_caller(buf="", num="", logger=None): ''' Verify phone number of caller example : +CLIP: "0607297154",129,"",0,"",0 @@ -171,11 +175,21 @@ def verify_caller(buf="", num="", log=None): True if authorized, False otherwise ''' + if not isinstance(buf, str): + logger.error("error parameter, expecting str, get {}".format(type(buf))) + return False, "" + if not isinstance(num, str): + logger.error("error parameter, expecting str, get {}".format(type(num))) + return False, "" + if not isinstance(logger, log.Logger): + logger.error("error parameter, expecting logging.Logger, get {}".format(type(logger))) + return False, "" + outlst = buf[7:].split(',') - log.debug("=> {}".format(outlst)) + logger.debug("=> {}".format(outlst)) phone_number = outlst[0].replace("\"","") if phone_number != num: - log.warning("phone number not match ! {}/{}".format(phone_number, num)) + logger.warning("phone number not match ! {}/{}".format(phone_number, num)) return False, phone_number return True, phone_number @@ -201,7 +215,7 @@ def listener(sock, logger): if not data: break -def verify_open_hours(conf=None, log=None): +def verify_open_hours(conf=None, logger=None): ''' Verify if GSM HAT must be opened with conf hours :param conf: @@ -213,6 +227,9 @@ def verify_open_hours(conf=None, log=None): :return bool: True if authorized, False otherwise ''' + if not isinstance(logger, log.Logger): + return False + flag = False my_date = date.today() day = calendar.day_name[my_date.weekday()] @@ -225,7 +242,7 @@ def verify_open_hours(conf=None, log=None): flag = True elif v == 0: flag = False - log.debug('Jour: {} - Temps courant: {} - Ouverture: {}'.format(day, now.strftime('%Hh%M'), flag)) + logger.debug('Jour: {} - Temps courant: {} - Ouverture: {}'.format(day, now.strftime('%Hh%M'), flag)) return flag def init_module(): @@ -250,30 +267,76 @@ def cron_verify_hours(config=None, logger=None): ''' cron task ''' global GSM_MODULE_STATE + global GSM_MODULE_INIT_STATE # Verify hours with conf file - opened = verify_open_hours(conf=config, log=logger) + opened = verify_open_hours(conf=config, logger=logger) if opened: # Si le module GSM doit être allumé et qu'il est éteint, on l'allume if not GSM_MODULE_STATE: logger.info("Allumage du module GSM HAT ...") + init_module() setup_module() GSM_MODULE_STATE = True + GSM_MODULE_INIT_STATE = False else: # Si le module GSM doit être éteint et qu'il est allumé, on l'eteint if GSM_MODULE_STATE: logger.debug("Fermeture du module GSM HAT ...") + init_module() setup_module() GSM_MODULE_STATE = False + GSM_MODULE_INIT_STATE = False def sigint_handler(signal, frame): ''' SIGINT handler function ''' sys.exit(0) +def process(buf="", config=None, ser=None, logger=None): + ''' + ''' + if not isinstance(logger, log.Logger): + return False + if not isinstance(buf, str): + logger.error("error parameter, expecting str, get {}".format(type(buf))) + return False + if not isinstance(config, dict): + logger.error("error parameter, expecting dict, get {}".format(type(config))) + return False + if not isinstance(ser, serial.serialposix.Serial): + logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(ser))) + return False + + if buf.startswith('+CLIP: '): + # Verify caller phone number + ret, phone_number = verify_caller(buf=buf, num=config['NUM_AUTORISE'], logger=logger) + if not ret: + # Disconnect connexion + ret = send_at_cmd(cmd="ATH", timeout=0.0, serObj=ser, logger=logger) + logger.warning("Phone number not authorized : {}".format(phone_number)) + else: + logger.debug("Phone number authorized ...") + time.sleep(0.3) + # connect + ret = send_at_cmd(cmd='ATA', timeout=0.0, serObj=ser, logger=logger) + if ret == 0: + time.sleep(0.2) + # send DMTF tone + send_at_cmd(cmd='AT+VTS='+config['DTMF_CODE'], timeout=0.0, serObj=ser, logger=logger) + time.sleep(0.5) + # Disconnect + ret = send_at_cmd(cmd='ATH', timeout=0.0, serObj=ser, logger=logger) + if ret > 0: + logger.error('Cannot disconnect ...') + else: + logger.error("Erreur de connexion avec le correspondant") + return True + ################################################################### # Corps principal du programme # GSM_MODULE_STATE = False +GSM_MODULE_INIT_STATE = False def main(): ''' main function @@ -288,6 +351,7 @@ def main(): logger.addHandler(fl) global GSM_MODULE_STATE + global GSM_MODULE_INIT_STATE # Caught Keyboard interrupt signal(SIGINT, sigint_handler) @@ -335,27 +399,30 @@ def main(): sys.exit(1) # Verify if open or not - opened = verify_open_hours(conf=config['HORAIRES'], log=logger) + opened = verify_open_hours(conf=config['HORAIRES'], logger=logger) if opened: # Si le module GSM doit être allumé et qu'il est éteint, on l'allume if not GSM_MODULE_STATE: logger.info("Allumage du module GSM HAT ...") setup_module() GSM_MODULE_STATE = True - # Attente de 5 secondes avant d'initier l'init GSM - time.sleep(5) + # Attente de 10 secondes avant d'initier l'init GSM + time.sleep(10) # Initialize GSM communication ret = init_gsm_com(serObj=ser, config=config, logger=logger) if not ret: logger.error("Erreur d'initialisation de la com GSM avec le module") ser.close() sys.exit(1) + else: + GSM_MODULE_INIT_STATE = True else: # Si le module GSM doit être éteint et qu'il est allumé, on l'eteint if GSM_MODULE_STATE: logger.debug("Fermeture du module GSM HAT ...") setup_module() GSM_MODULE_STATE = False + GSM_MODULE_INIT_STATE = False # server_addr = "/tmp/uds_socket" # # Make sure the socket does not already exist @@ -384,6 +451,31 @@ def main(): id="task_verify_hours") sched.start() + out = '' + while True: + # Si le module GSM est ouvert + if GSM_MODULE_STATE: + if not GSM_MODULE_INIT_STATE: + time.sleep(5) + # Initialize GSM serial communication + ret = init_gsm_com(serObj=ser, config=config, logger=logger) + if not ret: + logger.error("Erreur d'initialisation de la com GSM avec le module") + else: + GSM_MODULE_INIT_STATE = True + + # While the number of bytes in the input buffer > 0 + while ser.in_waiting > 0: + # remove \r and \n chars from out string + out += ser.read_until().decode('utf-8').replace('\r','').replace('\n','') + if len(out) > 0 : + logger.debug("out: {}".format(out)) + time.sleep(.1) + ret = process(buf=out, config=config, ser=ser, logger=logger) + out = '' + else: + time.sleep(10) + # if ser.isOpen(): # logger.info("Le port de communication avec le module GSM est ouvert") # try: