From 6352f7e96595424e7f772989e5e8b2a717bdb959 Mon Sep 17 00:00:00 2001 From: Vincent BENOIT Date: Sat, 24 Sep 2022 11:14:35 +0100 Subject: [PATCH] decrochage automatique et envoie du signal DTMF et racrochage --- src/intercom.py | 51 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/src/intercom.py b/src/intercom.py index b1f269c..8287f81 100644 --- a/src/intercom.py +++ b/src/intercom.py @@ -13,6 +13,7 @@ import serial import logging as log import time from datetime import datetime +import json ################################################################### # Class et Methods # @@ -30,6 +31,7 @@ def send_at_cmd(cmd='', serObj=None, log=None): logger object :return bool: + True if ok, False otherwise ''' if not isinstance(cmd, str): log.error("error parameter, expecting str, get {}".format(type(cmd))) @@ -51,19 +53,21 @@ def send_at_cmd(cmd='', serObj=None, log=None): if 'OK' in outlst: return True elif 'ERROR' in outlst: - logger.error("Error with cmd : {}".format(cmd)) + log.error("Error with cmd : {}".format(cmd)) return False except Exception as e: log.error("Error: {}".format(e)) return False return True -def init_com(serObj=None, log=None): +def init_com(serObj=None, pin='', log=None): ''' Init GSM Communication source : SIM800_Series_AT_command Manual_v1.09 AT : test command AT+CPIN? : Enter PIN (response READY: MT is not pending for any password AT+CREG? : Network registration - return the status of result code + AT+CSQ : + AT+CLTS=1 : AT+CLIP=1 : The calling line identifty (CLI) of calling party when receiving a mobile terminated call AT+VTD=1 : Tone Duration (in 1/10 seconds) @@ -72,12 +76,15 @@ def init_com(serObj=None, log=None): :param serObj: serial object + :param pin: + GSM Pin code + :param log: logger object :return bool: ''' - cmd_lst = ['AT', 'AT+CPIN?', 'AT+CREG?', 'AT+CLIP=1', 'AT+VTD=1', 'ATS0=2'] + cmd_lst = ['AT', 'ATE1', 'AT+CMEE=2', 'AT+CPIN?', 'AT+CREG?', 'AT+CSQ', 'AT+CLTS=1', 'AT+CLIP=1', 'AT+VTD=1'] for cmd in cmd_lst: ret = send_at_cmd(cmd=cmd, serObj=serObj, log=log) if not ret: @@ -86,13 +93,16 @@ def init_com(serObj=None, log=None): return False return True -def verify_caller(buf="", log=None): +def verify_caller(buf="", num="", log=None): ''' Verify phone number of caller example : +CLIP: "0607297154",129,"",0,"",0 :param buf: Serial input buffer + :param num: + Caller phone number + :param log: logger object @@ -103,6 +113,10 @@ def verify_caller(buf="", log=None): outlst = buf[7:].split(',') log.debug("=> {}".format(outlst)) phone_number = outlst[0].replace("\"","") + if phone_number != num: + log.debug("phone number not match ! {}/{}".format(phone_number, num)) + return False, phone_number + log.debug("phone number match ! {}/{}".format(phone_number, num)) return True, phone_number ################################################################### @@ -117,6 +131,18 @@ def main(): fl.setFormatter(formatter) logger.addHandler(fl) + config = None + try: + with open(os.path.join("/home/pi/KineIntercom/src", "KineIntercom.json"), 'r') as f: + try: + config = json.load(f) + except json.decoder.JSONDecodeError as e: + logger.error("Impossible de charger les données de configuration ({})".format(e)) + exit(1) + except FileNotFoundError as e: + logger.error("Impossible d'ouvrir le fichier de configuation ({})".format(e)) + exit(1) + ser = serial.Serial('/dev/ttyAMA0', baudrate=115200, parity=serial.PARITY_NONE, @@ -133,7 +159,7 @@ def main(): ser.flushInput() #flush input buffer, discarding all its contents ser.flushOutput() #flush output buffer, aborting current output # Initialize GSM communication - ret = init_com(serObj=ser, log=logger) + ret = init_com(serObj=ser, pin=config['CODE_PIN'], log=logger) if not ret: logger.error("Erreur d'initialisation de la communication avec le module GSM") return @@ -148,11 +174,22 @@ def main(): time.sleep(.1) if out.startswith('+CLIP: '): # Verify caller phone number - ret, phone_number = verify_caller(buf=out, log=logger) + ret, phone_number = verify_caller(buf=out, num=config['NUM_AUTORISE'], log=logger) if not ret: # Disconnect connexion - send_at_cmd(cmd="ATH", serObj=ser, log=log) + send_at_cmd(cmd="ATH", serObj=ser, log=logger) logger.warning("Phone number not authorized : {}".format(phone_number)) + else: + time.sleep(0.3) + # connect + ret = send_at_cmd(cmd='ATA', serObj=ser, log=logger) + if ret: + time.sleep(0.2) + # send DMTF tone + send_at_cmd(cmd='AT+VTS=*', serObj=ser, log=logger) + time.sleep(0.5) + # Disconnect + send_at_cmd(cmd='ATH', serObj=ser, log=logger) out = '' except Exception as e: logger.error("error communicating: {}".format(e))