Files
KineIntercom/kineintercom/intercom.py

1061 lines
40 KiB
Python

#!/usr/bin/python3
# -*- encoding: utf-8 -*-
# @author: benoit.vince84@free.fr
# @date: Septembre 2022
# @brief: Programme Intercom à partir du module GSM
###################################################################
# Importation de modules externes #
import sys, os, re
import serial
import logging as log
import time
import RPi.GPIO as GPIO
from datetime import datetime, date
import calendar
import json
import jsonschema
import socket
import shutil
from threading import Thread
from apscheduler.schedulers.background import BackgroundScheduler
import logging.handlers as handlers
###################################################################
# Class et Methods #
class SizedTimedRotatingFileHandler(handlers.TimedRotatingFileHandler):
"""
Handler for logging to a set of files, which switches from one file
to the next when the current file reaches a certain size, or at certain
timed intervals
"""
def __init__(self, filename, maxBytes=0, mode='a', backupCount=0, encoding=None,
delay=0, when='h', interval=1, utc=False):
handlers.TimedRotatingFileHandler.__init__(self,
filename=filename,
when=when,
interval=interval,
backupCount=backupCount,
encoding=encoding,
delay=delay,
utc=utc)
handlers.RotatingFileHandler.__init__(self,
filename=filename,
mode=mode,
maxBytes=maxBytes,
backupCount=backupCount,
encoding=encoding,
delay=delay)
def computeRollover(self, current_time):
'''
'''
return handlers.TimedRotatingFileHandler.computeRollover(self, current_time)
def shouldRollover(self, record):
'''
'''
return handlers.TimedRotatingFileHandler.shouldRollover(self, record) or handlers.RotatingFileHandler.shouldRollover(self, record)
def doRollover(self):
'''
'''
current_time = int(time.time())
dst_now = time.localtime(current_time)[-1]
new_rollover_at = self.computeRollover(current_time)
while new_rollover_at <= current_time:
new_rollover_at = new_rollover_at + self.interval
# If DST changes and midnight or weekly rollover, adjust for this.
if (self.when == 'MIDNIGHT' or self.when.startswith('W')) and not self.utc:
dst_at_rollover = time.localtime(new_rollover_at)[-1]
if dst_now != dst_at_rollover:
if not dst_now: # DST kicks in before next rollover, so we need to deduct an hour
addend = -3600
else: # DST bows out before next rollover, so we need to add an hour
addend = 3600
new_rollover_at += addend
self.rolloverAt = new_rollover_at
return handlers.RotatingFileHandler.doRollover(self)
def validate_json(json_data={}, logger=None):
''' Validate json database by schema
:param json_data:
json data loaded
:param logger:
logger object
:return bool:
True if OK, otherwise False
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(json_data, dict):
logger.error("error parameter, expecting dict, get {}".format(type(json_data)))
return False
ret = True
try:
with open(os.path.join('/usr/share/kineintercom', 'db.json.schema'), 'r') as f:
schema=json.load(f)
except FileNotFoundError as e:
logger.error("Impossible d'ouvrir le fichier de validation ({})".format(e))
ret = False
try:
jsonschema.validate(instance=json_data, schema=schema)
except jsonschema.exceptions.ValidationError as e:
logger.error("Erreur de validation de la base de données : {}".format(e))
ret = False
except jsonschema.exceptions.SchemaError as e:
logger.error("Erreur de schéma : {}".format(e))
ret = False
return ret
def get_conf(logger=None):
''' Get configuration
:param logger:
logger object
:return dict:
Configuration dictionnary
'''
if not isinstance(logger, log.Logger):
return None
config = None
# copy database if not exists
if not os.path.exists(os.path.join("/etc/kineintercom", "db.json")):
logger.warning("Le fichier de la base de données n'existe pas ...")
try:
shutil.copyfile(os.path.join("/usr/share/kineintercom", "database_origin.json"),
os.path.join("/etc/kineintercom", "db.json"))
except PermissionError as e:
logger.error("Erreur de permission: {}".format(e))
return None
try:
with open(os.path.join("/etc/kineintercom", "db.json"), 'r') as f:
try:
config = json.load(f)
if not validate_json(config, logger):
logger.error("Erreur de validation de la configuation")
config = None
except json.decoder.JSONDecodeError as e:
logger.error("Impossible de charger les données de configuration ({})".format(e))
except FileNotFoundError as e:
logger.error("Impossible d'ouvrir le fichier de configuation ({})".format(e))
return config
def send_at_cmd(cmd='', timeout=0.0, serObj=None, logger=None):
''' send AT to command to GNSS_HAT
:param cmd:
string AT command
:param timeout:
float timeout to read response
:param serObj:
serial object
:param logger:
logger object
:return int:
0 if ok, 2 if error, otherwise 1
'''
if not isinstance(cmd, str):
logger.error("error parameter, expecting str, get {}".format(type(cmd)))
return 2
if not isinstance(timeout, float):
logger.error("error parameter, expecting float, get {}".format(type(timeout)))
return 2
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return 2
if not isinstance(logger, log.Logger):
logger.error("error parameter, expecting logging.Logger, get {}".format(type(logger)))
return 2
try:
if timeout > 0.0:
serObj.timeout = timeout + 0.1
else:
serObj.timeout = None
serObj.write(bytes(cmd+'\r', 'utf-8'))
time.sleep(.5)
out = ''
outlst = []
while serObj.in_waiting > 0:
# remove \r and \n chars from out string
out += serObj.read_until().decode('utf-8', 'replace').replace('\r','').replace('\n','')
if out != '' and not out.startswith('NORMAL POWER DOWN'):
outlst.append(out)
out = ''
time.sleep(timeout)
logger.debug("Reponse: {}".format(outlst))
if 'OK' in outlst:
return 0, outlst
else:
for item in outlst:
if item.startswith('+CME ERROR:'):
logger.error("Erreur avec la cmd ({}) : {}".format(cmd, item.split('+CME ERROR:')[1]))
return 2, [item.split('+CME ERROR:')[1]]
elif item.startswith('ERROR'):
logger.error("Erreur avec la cmd : {}".format(cmd))
return 2, None
return 1, None
except Exception as e:
logger.error("Erreur: {}".format(e))
return 2, None
return 0, None
def set_sim_pin(serObj=None, pin_actif=False, code_pin="", logger=None):
''' Set SIM PIN if necessary
AT+CPIN=<CODE PIN> : Enter PIN (response READY: MT is not pending for any password)
:param serObj:
serial object
:param pin_actif:
attribute from dictionary config object
:param code_pin:
attribute from dictionary config object
:param log:
logger object
:return int:
0 if OK
1 if sim code error
2 if error
'''
if not isinstance(logger, log.Logger):
return 2
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return 2
if not isinstance(pin_actif, bool):
logger.error("error parameter, expecting bool, get {}".format(type(pin_actif)))
return 2
if not isinstance(code_pin, str):
logger.error("error parameter, expecting str, get {}".format(type(code_pin)))
return 2
# SIM PIN mandatory or not
ret, rsp = send_at_cmd(cmd='AT+CPIN?',
timeout=0.0,
serObj=serObj,
logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format('AT+CPIN?'))
return 2
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format('AT+CPIN?'))
return 2
else:
for item in rsp:
if item.startswith('+CPIN:'):
if item.split('+CPIN: ')[1] == 'SIM PIN':
logger.info('SIM verrouillée ...')
# Must enter SIM PIN
if not pin_actif:
logger.error("Configuration en conflit avec la réponse du module GSM")
return 1
else:
# Enter the SIM PIN configured in database
ret, _ = send_at_cmd(cmd='AT+CPIN='+code_pin,
timeout=2.0,
serObj=serObj,
logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format('AT+CPIN=<CODE_PIN>'))
return 1
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format('AT+CPIN=<CODE_PIN>'))
return 2
else:
logger.info("code PIN validé ...")
elif item.split('+CPIN: ')[1] == 'READY':
# SIM PIN already notified
logger.info('SIM déverrouillée ...')
return 0
def init_gsm_com(serObj=None, config={}, logger=None):
''' Init GSM Communication
source : SIM800_Series_AT_command Manual_v1.09
AT : test command
ATE1 :
AT+CMEE=2 :
AT+CLTS=1 :
AT+CMGF=1 : Set the format of messages to Text mode
AT+CLIP=1 : The calling line identifty (CLI) of calling party when
receiving a mobile terminated call
AT+VTD=1 : Tone Duration (in 1/10 seconds)
:param serObj:
serial object
:param config:
dictionary config object from JSON database
:param log:
logger object
:return int:
0 if OK
1 if sim code error
2 if error
'''
if not isinstance(logger, log.Logger):
return 2
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return 2
if not isinstance(config, dict):
logger.error("error parameter, expecting dict, get {}".format(type(config)))
return 2
cmd_lst = [{'cmd':'ATE1', 'timeout':0.0, 'func':None},
{'cmd':'AT+CMEE=2', 'timeout':0.0, 'func':None}]
cmd_lst.append({'cmd':'AT+CLTS=1', 'timeout':0.0, 'func':None})
cmd_lst.append({'cmd':'AT+CLIP=1', 'timeout':0.0, 'func':None})
cmd_lst.append({'cmd':'AT+CMGF=1', 'timeout':0.0, 'func':None})
cmd_lst.append({'cmd':'AT+VTD='+str(config['DTMF_DURATION']), 'timeout':0.0, 'func':None})
ret = set_sim_pin(serObj=serObj,
pin_actif=config['PIN_ACTIF'],
code_pin=config['CODE_PIN'],
logger=logger)
if ret > 0:
return ret
logger.info("Initialisation des commandes GSM ...")
for item in cmd_lst:
ret, rsp = send_at_cmd(cmd=item['cmd'],
timeout=item['timeout'],
serObj=serObj,
logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format(item['cmd']))
return False
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format(item['cmd']))
ret = 2
return ret
def update_gsm_com(serObj=None, config={}, logger=None):
''' Init GSM Communication
source : SIM800_Series_AT_command Manual_v1.09
AT+VTD=<PARAM DB> : Tone Duration (in 1/10 seconds)
:param serObj:
serial object
:param config:
dictionary config object from JSON database
:param logger:
logger object
:return bool:
True if OK, otherwise False
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return False
if not isinstance(config, dict):
logger.error("error parameter, expecting dict, get {}".format(type(config)))
return False
cmd_lst = ['AT+VTD='+str(config['DTMF_DURATION'])]
for cmd in cmd_lst:
ret, _ = send_at_cmd(cmd=cmd, timeout=0.0, serObj=serObj, logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format(cmd))
return False
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format(cmd))
return True
def info_gsm_com(serObj=None, config={}, logger=None):
''' Retreive GSM Module info
:param serObj:
serial object
:param config:
dictionary config object from JSON database
:param logger:
logger object
:return bool:
True if ok, otherwise False
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(serObj, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(serObj)))
return False
if not isinstance(config, dict):
logger.error("error parameter, expecting dict, get {}".format(type(config)))
return False
cmd_lst = [{'cmd':'AT+CGMI', 'obj':'manufacturer', 'item':'identification'},
{'cmd':'AT+CGMM', 'obj':'manufacturer', 'item':'model'},
{'cmd':'AT+CGMR', 'obj':'manufacturer', 'item':'hardware_rev'},
{'cmd':'AT+COPS?', 'obj':'control', 'item':'operator'},
{'cmd':'AT+CSMINS?', 'obj':'control', 'item':'sim_inserted'},
{'cmd':'AT+CSPN?', 'obj':'control', 'item':'service_provider'},
{'cmd':'AT+CGSN', 'obj':'manufacturer', 'item':'serial_number'},
{'cmd':'AT+CCALR?', 'obj':'control', 'item':'call_ready'},
{'cmd':'AT+CREG?', 'obj':'control', 'item':''},
{'cmd':'AT+CSQ', 'obj':'control', 'item':'signal_dbm'}]
for idx, cmd in enumerate(cmd_lst):
ret, rsp = send_at_cmd(cmd=cmd['cmd'], timeout=0.0, serObj=serObj, logger=logger)
if ret == 2:
logger.error("Erreur avec la commande AT: {}".format(cmd))
return False
elif ret == 1:
logger.warning("Timeout avec la commande AT: {}".format(cmd))
else:
logger.debug('cmd: {} - rsp: {}'.format(cmd, rsp))
if cmd['cmd'] == 'AT+CGMI':
# Request Manufacturer Identification
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1]
elif cmd['cmd'] == 'AT+CGMM':
# Request Model Identification
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1]
elif cmd['cmd'] == 'AT+CGMR':
# Request TA Revision Identification of Software Release
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1].split(':')[1]
elif cmd['cmd'] == 'AT+COPS?':
# Operator Selection
search = re.match("^(?:\+COPS: )([0-3]{1})[,]?(?:[0-3]?)[,]?\"?([A-Za-z ]+)?\"?$", rsp[1])
if search and search.groups()[1]:
config['INFOS'][cmd['obj']][cmd['item']] = search.groups()[1]
else:
logger.warning("Aucun operateur trouvé")
config['INFOS'][cmd['obj']][cmd['item']] = ""
elif cmd['cmd'] == 'AT+CSMINS?':
# SIM Inserted Status Reporting
if rsp[1].split(',')[1] == '1':
config['INFOS'][cmd['obj']][cmd['item']] = True
elif rsp[1].split(',')[1] == '0':
config['INFOS'][cmd['obj']][cmd['item']] = False
elif cmd['cmd'] == 'AT+CSPN?':
# Get Service Provider Name from SIM
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1].split(': ')[1].split(',')[0][1:-1]
elif cmd['cmd'] == 'AT+CGSN':
# Request Product Serial Number Identification
config['INFOS'][cmd['obj']][cmd['item']] = rsp[1]
elif cmd['cmd'] == 'AT+CCALR?':
# Call Ready Query
if rsp[1].split(': ')[1] == '1':
config['INFOS'][cmd['obj']][cmd['item']] = True
elif rsp[1].split(': ')[1] == '0':
config['INFOS'][cmd['obj']][cmd['item']] = False
elif cmd['cmd'] == 'AT+CSQ':
# Signal Quality Report
val = rsp[1].split('+CSQ: ')[1].split(',')[0]
for item in SIGNAL:
if item['val'] == int(val):
config['INFOS'][cmd['obj']][cmd['item']] = item['dbm']
config['INFOS'][cmd['obj']]['signal_qos'] = item['condition']
break
logger.info('mise à jour des infos dans la base de données')
with open(os.path.join("/etc/kineintercom", "db.json"), 'w') as f:
json.dump(config, f)
return True
SIGNAL = [{"val":2, "dbm":"-109", 'condition':'Marginal'},
{"val":3, "dbm":"-107", 'condition':'Marginal'},
{"val":4, "dbm":"-105", 'condition':'Marginal'},
{"val":5, "dbm":"-103", 'condition':'Marginal'},
{"val":6, "dbm":"-101", 'condition':'Marginal'},
{"val":7, "dbm":"-99", 'condition':'Marginal'},
{"val":8, "dbm":"-97", 'condition':'Marginal'},
{"val":9, "dbm":"-95", 'condition':'Marginal'},
{"val":10, "dbm":"-93", 'condition':'OK'},
{"val":11, "dbm":"-91", 'condition':'OK'},
{"val":12, "dbm":"-89", 'condition':'OK'},
{"val":13, "dbm":"-87", 'condition':'OK'},
{"val":14, "dbm":"-85", 'condition':'OK'},
{"val":15, "dbm":"-83", 'condition':'Good'},
{"val":16, "dbm":"-81", 'condition':'Good'},
{"val":17, "dbm":"-79", 'condition':'Good'},
{"val":18, "dbm":"-77", 'condition':'Good'},
{"val":19, "dbm":"-75", 'condition':'Good'},
{"val":20, "dbm":"-73", 'condition':'Excellent'},
{"val":21, "dbm":"-71", 'condition':'Excellent'},
{"val":22, "dbm":"-69", 'condition':'Excellent'},
{"val":23, "dbm":"-67", 'condition':'Excellent'},
{"val":24, "dbm":"-65", 'condition':'Excellent'},
{"val":25, "dbm":"-63", 'condition':'Excellent'},
{"val":26, "dbm":"-61", 'condition':'Excellent'},
{"val":27, "dbm":"-59", 'condition':'Excellent'},
{"val":28, "dbm":"-57", 'condition':'Excellent'},
{"val":29, "dbm":"-55", 'condition':'Excellent'},
{"val":30, "dbm":"-53", 'condition':'Excellent'}]
def verify_caller(buf="", num="", logger=None):
''' Verify phone number of caller
example : +CLIP: "0607297154",129,"",0,"",0
:param buf:
Serial input buffer
:param num:
Caller phone number
:param logger:
logger object
:return bool:
True if authorized, False otherwise
'''
if not isinstance(logger, log.Logger):
return False, ""
if not isinstance(buf, str):
logger.error("error parameter, expecting str, get {}".format(type(buf)))
return False, ""
if not isinstance(num, str):
logger.error("error parameter, expecting str, get {}".format(type(num)))
return False, ""
outlst = buf[7:].split(',')
logger.debug("=> {}".format(outlst))
phone_number = outlst[0].replace("\"","")
if not phone_number.endswith(num):
logger.warning("phone number not match ! {}/{}".format(phone_number, num))
return False, phone_number
return True, phone_number
def listener(sock, logger):
''' Thread socket listener
:param sock:
socket object
:param logger:
logger object
'''
global FLAG_CONF_UPDATE
global FLAG_HORAIRE_UPDATE
global FLAG_TH_CLOSE
flag = True
logger.debug("Démarrage du serveur de communication avec le backend du configurateur")
while not FLAG_TH_CLOSE:
# Wait for a connection
try:
clientsocket, address = sock.accept()
flag = True
logger.debug("Client connecté {}".format(clientsocket))
except socket.timeout:
continue
# socket recv() will block for a maximum of 1 sec.
#clientsocket.settimeout(1)
data = b''
while clientsocket:
while flag:
try:
data += clientsocket.recv(1)
if data.decode('utf-8').endswith('\n'):
logger.debug("datas recv: {}".format(data.decode('utf-8')))
if data.decode('utf-8').startswith('RELOAD_DB'):
logger.info("re-chargement de la base de données ...")
FLAG_CONF_UPDATE = True
data = b''
elif data.decode('utf-8').startswith('RELOAD_HOURS'):
logger.info("mise à jour du mode de fonctionnement ...")
FLAG_CONF_UPDATE = True
FLAG_HORAIRE_UPDATE = True
data = b''
elif data.decode('utf-8').startswith('ALIVE?'):
clientsocket.sendall(b"ALIVE\n")
data = b''
elif len(data) == 0:
# deconnexion du client
logger.debug("deconnexion du client ...")
clientsocket = None
flag = False
except:
continue
#if not data:
# break
logger.debug("Fin du serveur de communication")
return
def verify_open_hours(conf={}, logger=None):
''' Verify if GSM HAT must be opened with conf hours
:param conf:
configuration object
:param logger:
logger object
:return bool:
True if authorized, False otherwise
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(conf, dict):
logger.error("error parameter, expecting dict, get {}".format(type(conf)))
return False
flag = False
my_date = date.today()
day = calendar.day_name[my_date.weekday()]
now = datetime.now()
for item in conf[day]:
time_conf = int(item['name'].split('h')[0])*60 + int(item['name'].split('h')[1])
current_time = now.hour*60 + now.minute
if current_time >= time_conf:
if item['state'] == 1:
flag = True
elif item['state'] == 0:
flag = False
#logger.debug('Jour: {} - Temps courant: {} - Ouverture: {}'.format(day, now.strftime('%Hh%M'), flag))
return flag
def init_module():
''' initialisation of GNSS/GPS/GSM HAT Module
:return bool:
True if OK, otherwise False
'''
try:
GPIO.setmode(GPIO.BOARD)
GPIO.setwarnings(False)
GPIO.setup(7, GPIO.OUT)
except ValueError as e:
return False
return True
def setup_module():
''' Setup module (Set/Reset)
:return bool:
True if OK, otherwise False
'''
while True:
try:
GPIO.output(7, GPIO.LOW)
time.sleep(2)
GPIO.output(7, GPIO.HIGH)
except ValueError as e:
return False
break
#GPIO.cleanup()
return True
def cron_verify_hours(op='', config=None, logger=None):
''' cron job to check opened or closed hours
'''
if not isinstance(logger, log.Logger):
return
if not isinstance(config, dict):
logger.error("error parameter, expecting dict, get {}".format(type(config)))
return
if not isinstance(op, str):
logger.error("error parameter, expecting str, get {}".format(type(op)))
return
global GSM_MODULE_STATE
global GSM_MODULE_INIT_STATE
if op == 'Horaires':
# Verify hours with conf file
opened = verify_open_hours(conf=config, logger=logger)
elif op == 'Manuel ON':
opened = True
elif op == 'Manuel OFF':
opened = False
else:
logger.error("Wrong 'OPERATION' parameter in database")
if opened:
# Si le module GSM doit être allumé et qu'il est éteint, on l'allume
if not GSM_MODULE_STATE:
logger.info("Allumage du module GSM HAT ...")
init_module()
if not setup_module():
logger.error('Erreur d\'allumage du module GSM')
else:
logger.info("module GSM HAT allumé ...")
GSM_MODULE_STATE = True
GSM_MODULE_INIT_STATE = False
else:
# Si le module GSM doit être éteint et qu'il est allumé, on l'eteint
if GSM_MODULE_STATE:
logger.info("Fermeture du module GSM HAT ...")
init_module()
if not setup_module():
logger.error('Erreur de fermeture du module GSM')
else:
logger.info("module GSM HAT éteint ...")
GSM_MODULE_STATE = False
GSM_MODULE_INIT_STATE = False
def process(buf="", config={}, ser=None, logger=None):
''' run process
'''
if not isinstance(logger, log.Logger):
return False
if not isinstance(buf, str):
logger.error("error parameter, expecting str, get {}".format(type(buf)))
return False
if not isinstance(config, dict):
logger.error("error parameter, expecting dict, get {}".format(type(config)))
return False
if not isinstance(ser, serial.serialposix.Serial):
logger.error("error parameter, expecting serial.serialposix.Serial, get {}".format(type(ser)))
return False
# receive phone call
if buf.startswith('+CLIP: '):
# Verify caller phone number
ret, phone_number = verify_caller(buf=buf, num=config['NUM_AUTORISE'], logger=logger)
if not ret:
# Hang up
ret, _ = send_at_cmd(cmd="ATH", timeout=0.0, serObj=ser, logger=logger)
logger.warning("Phone number not authorized ({}) - Hang up ...".format(phone_number))
else:
time.sleep(0.3)
# Pick up
ret, _ = send_at_cmd(cmd='ATA', timeout=0.0, serObj=ser, logger=logger)
if ret == 0:
time.sleep(0.2)
logger.info('Pick up the call ...')
# Configure DTMF duration
ret, _ = send_at_cmd(cmd='AT+VTD='+str(config['DTMF_DURATION']), timeout=0.0, serObj=ser, logger=logger)
if ret:
logger.error("Erreur de configuration de la durée ({}) du code DTMF".format(config['DTMF_DURATION']))
time.sleep(0.2)
# send DMTF tone
send_at_cmd(cmd='AT+VTS='+config['DTMF_CODE'], timeout=0.0, serObj=ser, logger=logger)
# Waiting DTMF tone duration (1/10 sec) + 2 seconds
time.sleep(float(config['DTMF_DURATION']/10) + 1)
# Hang up
logger.info('Hang up ...')
ret, _ = send_at_cmd(cmd='ATH', timeout=0.0, serObj=ser, logger=logger)
if ret > 0:
logger.error('Cannot hang up ...')
else:
logger.error("Erreur de connexion avec le correspondant")
elif buf.startswith('+CMTI: '): # receive msg
search = re.match('^(?:\+CMTI: \"SM\",)([0-9]+)$', buf)
send_at_cmd(cmd='AT+CMGF=1', timeout=0.0, serObj=ser, logger=logger)
if search:
msg_num = search.groups()[0]
ret, rsp = send_at_cmd(cmd='AT+CMGR='+msg_num, timeout=0.0, serObj=ser, logger=logger)
if ret == 0:
phone_num = rsp[1].split('+CMGR: ')[1].split(',')[1][1:-1]
msg = rsp[2]
logger.info('[{}] {}'.format(phone_num, msg))
else:
logger.error("Impossible de lire le SMS")
ret, rsp = send_at_cmd(cmd='AT+CMGD='+str(msg_num), timeout=0.0, serObj=ser, logger=logger)
else:
logger.error('aucune correspondance')
return True
###################################################################
# Corps principal du programme #
GSM_MODULE_STATE = False
GSM_MODULE_SIM_STATE = False
GSM_MODULE_INIT_STATE = False
FLAG_CONF_UPDATE = False
FLAG_HORAIRE_UPDATE = False
FLAG_TH_CLOSE = False
def main():
''' main function
'''
# Logger configuration
logger = log.getLogger("Intercom")
logger.setLevel(log.DEBUG)
fl = log.StreamHandler()
fl.setLevel(log.DEBUG)
formatter = log.Formatter('%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
fl.setFormatter(formatter)
logger.addHandler(fl)
handler = SizedTimedRotatingFileHandler(os.path.join('/var/log/kineintercom','Intercom.log'),
maxBytes=1048576, backupCount=30, when='D', interval=1)
handler.setLevel(log.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)
global GSM_MODULE_STATE
global GSM_MODULE_SIM_STATE
global GSM_MODULE_INIT_STATE
global FLAG_CONF_UPDATE
global FLAG_HORAIRE_UPDATE
global FLAG_TH_CLOSE
# Configuration loader
config = get_conf(logger)
if config is None:
logger.error("Impossible de charger la configuration")
sys.exit(1)
# init GSM HAT module
logger.info("Initialisation du module GSM HAT ...")
if not init_module():
logger.error("Impossible d'initialiser les GPIO de la board")
sys.exit(1)
# Serial configuration
ser = serial.Serial('/dev/ttyAMA0',
baudrate=115200,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=None, write_timeout=2,
xonxoff=False,
rtscts=False,
dsrdtr=False)
# Test si le port série est ouvert
if ser.isOpen():
logger.info("Le port série avec le module GSM est ouvert")
try:
ser.flushInput() #flush input buffer, discarding all its contents
ser.flushOutput() #flush output buffer, aborting current output
idx = 0
while idx < 5:
# send AT command to test the connection
ret, _ = send_at_cmd(cmd='AT', timeout=0.5, serObj=ser, logger=logger)
if ret == 2:
logger.error("Erreur d'envoie de la commande AT")
ser.close()
sys.exit(1)
elif ret == 1:
logger.warning("Pas de réponse du module GSM HAT ({}/5)".format(idx + 1))
GSM_MODULE_STATE = False
else:
logger.info("Module GSM HAT allumé ...")
GSM_MODULE_STATE = True
break
idx += 1
time.sleep(1)
except Exception as e:
logger.error("Erreur de com série: {}".format(e))
ser.close()
sys.exit(1)
else:
logger.error("Impossible d'ouvrir le port série")
sys.exit(1)
# Verify if the GSM module must be opened or not
if config['OPERATION'] == 'Horaires':
logger.info("Mode \"Horaires\" activé ...")
opened = verify_open_hours(conf=config['HORAIRES'], logger=logger)
elif config['OPERATION'] == 'Manuel ON':
logger.info("Mode \"Manuel ON\" activé ...")
opened = True
elif config['OPERATION'] == 'Manuel OFF':
logger.info("Mode \"Manuel OFF\" activé ...")
opened = False
else:
logger.error("Wrong 'OPERATION' parameter in database")
ser.close()
sys.exit(1)
if opened:
# Si le module GSM doit être allumé et qu'il est éteint, on l'allume
if not GSM_MODULE_STATE:
logger.info("Allumage du module GSM HAT ...")
init_module()
if not setup_module():
logger.error("Erreur de configuration des GPIOs de la board")
ser.close()
sys.exit(1)
else:
GSM_MODULE_STATE = True
# Attente de 10 secondes avant d'initier l'init GSM
time.sleep(10)
# Initialize GSM communication
logger.info("Initialisation des commandes AT nécessaires ...")
ret = init_gsm_com(serObj=ser, config=config, logger=logger)
if ret == 2:
logger.error("Erreur d'initialisation de la com GSM avec le module")
ser.close()
sys.exit(1)
elif ret == 1:
logger.error("Erreur de code PIN")
GSM_MODULE_INIT_STATE = False
GSM_MODULE_STATE = False
# On force en Manuel OFF
config['OPERATION'] = 'Manuel OFF'
# mise à jour de la base de données
with open(os.path.join("/etc/kineintercom", "db.json"), 'w') as f:
json.dump(config, f)
# On eteint le module GSM en cas de mauvais code PIN
if not setup_module():
logger.error("Erreur de configuration des GPIOs de la board")
ser.close()
sys.exit(1)
else:
GSM_MODULE_INIT_STATE = True
# Retreive GSM infos
logger.info('Récupération des infos du module ...')
ret = info_gsm_com(serObj=ser, config=config, logger=logger)
if not ret:
logger.error("Erreur d'initialisation de la com GSM avec le module")
ser.close()
sys.exit(1)
else:
# Si le module GSM doit être éteint et qu'il est allumé, on l'eteint
if GSM_MODULE_STATE:
logger.debug("Fermeture du module GSM HAT ...")
init_module()
if not setup_module():
logger.error("Erreur de configuration des GPIOs de la board")
ser.close()
sys.exit(1)
else:
GSM_MODULE_STATE = False
GSM_MODULE_INIT_STATE = False
GSM_MODULE_SIM_STATE = False
server_addr = "/tmp/uds_socket"
# Make sure the socket does not already exist
try:
os.unlink(server_addr)
except OSError:
if os.path.exists(server_addr):
raise
# Create UDS socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Bind the socket to the port
sock.bind(server_addr)
sock.settimeout(1) # Timeout for accept
# Listen for incoming connections
sock.listen(1)
thread = Thread(target=listener, args=(sock, logger,))
thread.start()
time.sleep(1)
# Scheduler opened hours
sched = BackgroundScheduler(daemon=True, timezone="Europe/Paris")
sched.add_job(func=cron_verify_hours,
args=(config['OPERATION'], config['HORAIRES'], logger),
trigger='interval',
seconds=15,
id="job_id")
sched.start()
out = ''
try:
while True:
# Drapeau de mise à jour de la configuration, par le configurateur via le serveur (socket)
if FLAG_CONF_UPDATE:
logger.info("Configuration doit être rechargée ...")
# reload conf in memory
config = get_conf(logger)
if config is None:
logger.error("Impossible de charger la configuration")
break
logger.info("Mode \"{}\" activé ...".format(config['OPERATION']))
if FLAG_HORAIRE_UPDATE:
logger.info("Restart scheduler ...")
# Stop job scheduler to restart it
sched.remove_job('job_id')
sched.add_job(func=cron_verify_hours,
args=(config['OPERATION'], config['HORAIRES'], logger),
trigger='interval',
seconds=15,
id="job_id")
FLAG_HORAIRE_UPDATE = False
FLAG_CONF_UPDATE = False
# Si le module GSM est ouvert
if GSM_MODULE_STATE:
# Si le module GSM n'a pas été initialisé
if not GSM_MODULE_INIT_STATE:
# Attente de 10sec pour que le module accroche le signal après allumage
time.sleep(10)
# Initialize GSM serial communication
ret = init_gsm_com(serObj=ser, config=config, logger=logger)
if ret == 2:
logger.error("Erreur d'initialisation de la com GSM avec le module")
break
elif ret == 1:
logger.error("Erreur de code PIN")
GSM_MODULE_INIT_STATE = False
GSM_MODULE_STATE = False
# On force en Manuel OFF
config['OPERATION'] = 'Manuel OFF'
# mise à jour de la base de données
with open(os.path.join("/etc/kineintercom", "db.json"), 'w') as f:
json.dump(config, f)
# On eteint le module GSM en cas de mauvais code PIN
if not setup_module():
logger.error("Erreur de configuration des GPIOs de la board")
ser.close()
sys.exit(1)
else:
GSM_MODULE_INIT_STATE = True
logger.info('Récupération des infos du module ...')
ret = info_gsm_com(serObj=ser, config=config, logger=logger)
# While the number of bytes in the input buffer > 0
if ser.in_waiting > 0:
# remove \r and \n chars from out string
out += ser.read_until().decode('utf-8').replace('\r','').replace('\n','')
if len(out) > 0 :
logger.debug("out: {}".format(out))
time.sleep(.1)
ret = process(buf=out, config=config, ser=ser, logger=logger)
if not ret:
break
out = ''
else:
time.sleep(1)
else:
time.sleep(1)
except KeyboardInterrupt:
logger.error("Keyboard Interrupt !!")
FLAG_TH_CLOSE = True
logger.info("fermeture du port de communication avec le GNSS_HAT")
ser.close()
sched.shutdown(wait=False)
thread.join()
if __name__ == '__main__':
main()