mise à jour du code

This commit is contained in:
2022-10-01 20:49:59 +02:00
parent 2844b19271
commit 978122cc4b

View File

@@ -12,32 +12,72 @@ import sys, os, re
import serial
import logging as log
import time
import RPi.GPIO as GPIO
from datetime import datetime, date
import calendar
import json
import socket
from signal import signal, SIGINT
from threading import Thread
from apscheduler.schedulers.background import BackgroundScheduler
###################################################################
# Class et Methods #
def send_at_cmd(cmd='', serObj=None, log=None):
def get_conf(logger=None):
''' Get configuration
:return dict:
Configuration dictionnary
'''
config = None
try:
with open(os.path.join("/home/pi/KineIntercom/src", "KineIntercom.json"), 'r') as f:
try:
config = json.load(f)
except json.decoder.JSONDecodeError as e:
logger.error("Impossible de charger les données de configuration ({})".format(e))
except FileNotFoundError as e:
logger.error("Impossible d'ouvrir le fichier de configuation ({})".format(e))
return config
def send_at_cmd(cmd='', timeout=0.0, serObj=None, logger=None):
''' send AT to command to GNSS_HAT
:param cmd:
string AT command
:param timeout:
float timeout to read response
:param serObj:
serial object
:param log:
logger object
:return bool:
True if ok, False otherwise
:return int:
0 if ok, 2 if error, otherwise 1
'''
if not isinstance(cmd, str):
log.error("error parameter, expecting str, get {}".format(type(cmd)))
return False
logger.error("error parameter, expecting str, get {}".format(type(cmd)))
return 2
if not isinstance(timeout, float):
logger.error("error parameter, expecting float, get {}".format(type(timeout)))
return 2
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return 2
if not isinstance(logger, log.Logger):
logger.error("error parameter, expecting logging.Logger, get {}".format(type(log)))
return 2
try:
if timeout > 0:
serObj.timeout = timeout
else:
serObj.timeout = None
serObj.write(bytes(cmd+'\r', 'utf-8'))
time.sleep(.5)
out = ''
@@ -50,18 +90,20 @@ def send_at_cmd(cmd='', serObj=None, log=None):
out = ''
else:
out += c
log.debug("Reponse: {}".format(outlst))
logger.debug("Reponse: {}".format(outlst))
if 'OK' in outlst:
return True
return 0
elif 'ERROR' in outlst:
log.error("Error with cmd : {}".format(cmd))
return False
logger.error("Error with cmd : {}".format(cmd))
return 2
else:
return 1
except Exception as e:
log.error("Error: {}".format(e))
return False
return True
logger.error("Error: {}".format(e))
return 2
return 0
def init_com(serObj=None, config=None, log=None):
def init_gsm_com(serObj=None, config=None, logger=None):
''' Init GSM Communication
source : SIM800_Series_AT_command Manual_v1.09
AT : test command
@@ -87,6 +129,13 @@ def init_com(serObj=None, config=None, log=None):
:return bool:
'''
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return False
if not isinstance(logger, log.Logger):
logger.error("error parameter, expecting logging.Logger, get {}".format(type(log)))
return False
cmd_lst = ['AT',
'ATE1',
'AT+CMEE=2',
@@ -97,11 +146,12 @@ def init_com(serObj=None, config=None, log=None):
'AT+CLIP=1',
'AT+VTD='+str(config['DTMF_DURATION'])]
for cmd in cmd_lst:
ret = send_at_cmd(cmd=cmd, serObj=serObj, log=log)
if not ret:
logger.warning("fermeture du port de communication avec le GNSS_HAT")
serObj.close()
ret = send_at_cmd(cmd=cmd, timeout=0.0, serObj=serObj, logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format(cmd))
return False
elif ret == 1:
lgger.warning("Timeout avec la commande AT: {}".format(cmd))
return True
def verify_caller(buf="", num="", log=None):
@@ -129,8 +179,31 @@ def verify_caller(buf="", num="", log=None):
return False, phone_number
return True, phone_number
def verify_open_hours(conf=None, log=None):
def listener(sock, logger):
''' Thread socket listener
'''
logger.debug("Démarrage du serveur de communication avec le service horaire")
while True:
# Wait for a connection
try:
clientsocket, address = sock.accept()
except socket.timeout:
continue
# socket recv() will block for a maximum of 1 sec.
#clientsocket.settimeout(1)
while clientsocket:
while True:
try:
data = clientsocket.recv(1)
except Error as e:
continue
if not data:
break
def verify_open_hours(conf=None, log=None):
''' Verify if GSM HAT must be opened with conf hours
:param conf:
configuration object
@@ -140,19 +213,72 @@ def verify_open_hours(conf=None, log=None):
:return bool:
True if authorized, False otherwise
'''
flag = False
my_date = date.today()
day = calendar.day_name[my_date.weekday()]
log.debug("Current day is {}".format(day))
log.debug("CONF {}".format(conf[day]))
now = datetime.now()
current_time = now.strftime("%Hh%M")
log.debug("current time : {}".format(current_time))
return True
for k,v in conf[day].items():
time_conf = int(k.split('h')[0])*60 + int(k.split('h')[1])
current_time = now.hour*60 + now.minute
if current_time >= time_conf:
if v == 1:
flag = True
elif v == 0:
flag = False
log.debug('Jour: {} - Temps courant: {} - Ouverture: {}'.format(day, now.strftime('%Hh%M'), flag))
return flag
def init_module():
''' initialisation of GNSS/GPS/GSM HAT Module
'''
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BOARD)
GPIO.setup(7, GPIO.OUT)
return
def setup_module():
''' Setup module (Set/Reset)
'''
while True:
GPIO.output(7, GPIO.LOW)
time.sleep(2)
GPIO.output(7, GPIO.HIGH)
break
GPIO.cleanup()
def cron_verify_hours(config=None, logger=None):
''' cron task
'''
global GSM_MODULE_STATE
# Verify hours with conf file
opened = verify_open_hours(conf=config, log=logger)
if opened:
# Si le module GSM doit être allumé et qu'il est éteint, on l'allume
if not GSM_MODULE_STATE:
logger.info("Allumage du module GSM HAT ...")
setup_module()
GSM_MODULE_STATE = True
else:
# Si le module GSM doit être éteint et qu'il est allumé, on l'eteint
if GSM_MODULE_STATE:
logger.debug("Fermeture du module GSM HAT ...")
setup_module()
GSM_MODULE_STATE = False
def sigint_handler(signal, frame):
''' SIGINT handler function
'''
sys.exit(0)
###################################################################
# Corps principal du programme #
GSM_MODULE_STATE = False
def main():
''' main function
'''
# Logger configuration
logger = log.getLogger("Intercom")
logger.setLevel(log.DEBUG)
fl = log.StreamHandler()
@@ -161,18 +287,22 @@ def main():
fl.setFormatter(formatter)
logger.addHandler(fl)
config = None
try:
with open(os.path.join("/home/pi/KineIntercom/src", "KineIntercom.json"), 'r') as f:
try:
config = json.load(f)
except json.decoder.JSONDecodeError as e:
logger.error("Impossible de charger les données de configuration ({})".format(e))
exit(1)
except FileNotFoundError as e:
logger.error("Impossible d'ouvrir le fichier de configuation ({})".format(e))
global GSM_MODULE_STATE
# Caught Keyboard interrupt
signal(SIGINT, sigint_handler)
# Configuration loader
config = get_conf(logger)
if not config:
logger.error("Impossible de charger la configuration")
exit(1)
# init GSM HAT module
logger.info("Initialisation du module GSM HAT ...")
init_module()
# Serial configuration
ser = serial.Serial('/dev/ttyAMA0',
baudrate=115200,
parity=serial.PARITY_NONE,
@@ -182,60 +312,130 @@ def main():
xonxoff=False,
rtscts=False,
dsrdtr=False)
# Verify date and time
ret = verify_open_hours(conf=config['HORAIRES'], log=logger)
exit(0)
# Test si le port série est ouvert
if ser.isOpen():
logger.info("Le port de communication avec le GNSS_HAT est ouvert")
logger.info("Le port série avec le module GSM est ouvert")
try:
ser.flushInput() #flush input buffer, discarding all its contents
ser.flushOutput() #flush output buffer, aborting current output
# Initialize GSM communication
ret = init_com(serObj=ser, config=config, log=logger)
if not ret:
logger.error("Erreur d'initialisation de la communication avec le module GSM")
return
out = ''
while True:
# While the number of bytes in the input buffer > 0
while ser.in_waiting > 0:
# remove \r and \n chars from out string
out += ser.read_until().decode('utf-8').replace('\r','').replace('\n','')
if len(out) > 0 :
logger.debug("out: {}".format(out))
time.sleep(.1)
if out.startswith('+CLIP: '):
# Verify date and time
ret = verify_open_hours(conf=config['HORAIRES'], log=logger)
# Verify caller phone number
ret, phone_number = verify_caller(buf=out, num=config['NUM_AUTORISE'], log=logger)
if not ret:
# Disconnect connexion
send_at_cmd(cmd="ATH", serObj=ser, log=logger)
logger.warning("Phone number not authorized : {}".format(phone_number))
else:
time.sleep(0.3)
# connect
ret = send_at_cmd(cmd='ATA', serObj=ser, log=logger)
if ret:
time.sleep(0.2)
# send DMTF tone
send_at_cmd(cmd='AT+VTS='+config['DTMF_CODE'], serObj=ser, log=logger)
time.sleep(0.5)
# Disconnect
ret = send_at_cmd(cmd='ATH', serObj=ser, log=logger)
if not ret:
logger.error('Cannot disconnect ...')
out = ''
ret = send_at_cmd(cmd='AT', timeout=0.5, serObj=ser, logger=logger)
if ret == 2:
logger.error("Erreur d'envoie de la commande AT")
elif ret == 1:
logger.warning("Pas de réponse du module GSM")
GSM_MODULE_STATE = False
else:
logger.info("Module GSM HAT allumé ...")
GSM_MODULE_STATE = True
except Exception as e:
logger.error("error communicating: {}".format(e))
logger.error("Erreur de com série: {}".format(e))
else:
logger.error("cannot open serial port")
logger.error("Impossible d'ouvrir le port série")
sys.exit(1)
# Verify if open or not
opened = verify_open_hours(conf=config['HORAIRES'], log=logger)
if opened:
# Si le module GSM doit être allumé et qu'il est éteint, on l'allume
if not GSM_MODULE_STATE:
logger.info("Allumage du module GSM HAT ...")
setup_module()
GSM_MODULE_STATE = True
# Attente de 5 secondes avant d'initier l'init GSM
time.sleep(5)
# Initialize GSM communication
ret = init_gsm_com(serObj=ser, config=config, logger=logger)
if not ret:
logger.error("Erreur d'initialisation de la com GSM avec le module")
ser.close()
sys.exit(1)
else:
# Si le module GSM doit être éteint et qu'il est allumé, on l'eteint
if GSM_MODULE_STATE:
logger.debug("Fermeture du module GSM HAT ...")
setup_module()
GSM_MODULE_STATE = False
# server_addr = "/tmp/uds_socket"
# # Make sure the socket does not already exist
# try:
# os.unlink(server_addr)
# except OSError:
# if os.path.exists(server_addr):
# raise
#
# # Create UDS socket
# sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# # Bind the socket to the port
# sock.bind(server_addr)
# # Listen for incoming connections
# sock.listen(1)
# thread = Thread(target=listener, args=(sock, logger,))
# thread.start()
# time.sleep(1)
# Scheduler opened hours
sched = BackgroundScheduler(daemon=True)
sched.add_job(func=cron_verify_hours,
args=(config['HORAIRES'], logger),
trigger='cron',
minute='*',
id="task_verify_hours")
sched.start()
# if ser.isOpen():
# logger.info("Le port de communication avec le module GSM est ouvert")
# try:
# ser.flushInput() #flush input buffer, discarding all its contents
# ser.flushOutput() #flush output buffer, aborting current output
# if GSM_MODULE_STATE:
# # Initialize GSM communication
# ret = init_gsm_com(serObj=ser, config=config, logger=logger)
# if not ret:
# logger.error("Erreur d'initialisation de la communication avec le module GSM")
# return
# out = ''
# while True:
# # While the number of bytes in the input buffer > 0
# while ser.in_waiting > 0:
# # remove \r and \n chars from out string
# out += ser.read_until().decode('utf-8').replace('\r','').replace('\n','')
# if len(out) > 0 :
# logger.debug("out: {}".format(out))
# time.sleep(.1)
# if out.startswith('+CLIP: '):
# # Verify date and time
# ret = verify_open_hours(conf=config['HORAIRES'], log=logger)
# # Verify caller phone number
# ret, phone_number = verify_caller(buf=out, num=config['NUM_AUTORISE'], log=logger)
# if not ret:
# # Disconnect connexion
# send_at_cmd(cmd="ATH", serObj=ser, logger=logger)
# logger.warning("Phone number not authorized : {}".format(phone_number))
# else:
# logger.debug("Phone number authorized ...")
# time.sleep(0.3)
# # connect
# ret = send_at_cmd(cmd='ATA', serObj=ser, logger=logger)
# if ret:
# time.sleep(0.2)
# # send DMTF tone
# send_at_cmd(cmd='AT+VTS='+config['DTMF_CODE'], serObj=ser, logger=logger)
# time.sleep(0.5)
# # Disconnect
# ret = send_at_cmd(cmd='ATH', serObj=ser, logger=logger)
# if not ret:
# logger.error('Cannot disconnect ...')
# out = ''
# except Exception as e:
# logger.error("error communicating: {}".format(e))
# else:
# logger.error("cannot open serial port")
logger.info("fermeture du port de communication avec le GNSS_HAT")
ser.close()
sched.shutdown(wait=False)
if __name__ == '__main__':
main()