Ajout des sources du projet
This commit is contained in:
119
src/intercom.py
Normal file
119
src/intercom.py
Normal file
@@ -0,0 +1,119 @@
|
||||
#!/usr/bin/python3
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
# @author: benoit.vince84@free.fr
|
||||
# @date: Septembre 2022
|
||||
# @brief: Programme Intercom à partir du module GSM du GNSS_HAT
|
||||
|
||||
###################################################################
|
||||
# Importation de modules externes #
|
||||
|
||||
import sys, os, re
|
||||
import serial
|
||||
import logging as log
|
||||
import time
|
||||
|
||||
###################################################################
|
||||
# Class et Methods #
|
||||
|
||||
def send_at_cmd(cmd='', serObj=None, log=None):
|
||||
''' send AT to command to GNSS_HAT
|
||||
|
||||
:param cmd:
|
||||
string AT command
|
||||
|
||||
:param serObj:
|
||||
serial object
|
||||
|
||||
:param log:
|
||||
logger object
|
||||
|
||||
:return bool:
|
||||
'''
|
||||
if not isinstance(cmd, str):
|
||||
log.error("error parameter, expecting str, get {}".format(type(cmd)))
|
||||
return False
|
||||
try:
|
||||
serObj.write(bytes(cmd+'\r', 'utf-8'))
|
||||
time.sleep(.5)
|
||||
out = ''
|
||||
outlst = []
|
||||
while serObj.inWaiting() > 0:
|
||||
c = serObj.read(1).decode('utf-8')
|
||||
if c == '\r' or c == '\n':
|
||||
if out != '':
|
||||
outlst.append(out)
|
||||
out = ''
|
||||
else:
|
||||
out += c
|
||||
log.debug("Reponse: {}".format(outlst))
|
||||
if 'OK' in outlst:
|
||||
return True
|
||||
elif 'ERROR' in outlst:
|
||||
logger.error("Error with cmd : {}".format(cmd))
|
||||
return False
|
||||
except Exception as e:
|
||||
log.error("Error: {}".format(e))
|
||||
return False
|
||||
return True
|
||||
|
||||
def init_com(serObj=None, log=None):
|
||||
''' Init GSM Communication
|
||||
|
||||
:param serObj:
|
||||
serial object
|
||||
|
||||
:param log:
|
||||
logger object
|
||||
|
||||
:return bool:
|
||||
'''
|
||||
cmd_lst = ['AT', 'AT+CPIN?', 'AT+CREG?', 'AT+CLIP=1', 'AT+VTD=1']
|
||||
for cmd in cmd_lst:
|
||||
ret = send_at_cmd(cmd=cmd, serObj=serObj, log=log)
|
||||
if not ret:
|
||||
logger.warning("fermeture du port de communication avec le GNSS_HAT")
|
||||
serObj.close()
|
||||
return False
|
||||
return True
|
||||
|
||||
###################################################################
|
||||
# Corps principal du programme #
|
||||
|
||||
def main():
|
||||
logger = log.getLogger("Intercom")
|
||||
logger.setLevel(log.DEBUG)
|
||||
fl = log.StreamHandler()
|
||||
fl.setLevel(log.DEBUG)
|
||||
formatter = log.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
fl.setFormatter(formatter)
|
||||
logger.addHandler(fl)
|
||||
|
||||
ser = serial.Serial('/dev/ttyAMA0',
|
||||
baudrate=115200,
|
||||
parity=serial.PARITY_NONE,
|
||||
stopbits=serial.STOPBITS_ONE,
|
||||
bytesize=serial.EIGHTBITS,
|
||||
timeout=1, write_timeout=2,
|
||||
xonxoff=False,
|
||||
rtscts=False,
|
||||
dsrdtr=False)
|
||||
|
||||
if ser.isOpen():
|
||||
logger.info("Le port de communication avec le GNSS_HAT est ouvert")
|
||||
try:
|
||||
ser.flushInput() #flush input buffer, discarding all its contents
|
||||
ser.flushOutput() #flush output buffer, aborting current output
|
||||
ret = init_com(serObj=ser, log=logger)
|
||||
if not ret:
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error("error communicating: {}".format(e))
|
||||
else:
|
||||
logger.error("cannot open serial port")
|
||||
|
||||
logger.info("fermeture du port de communication avec le GNSS_HAT")
|
||||
ser.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user