Files
KineIntercom/src/pwrkey.py

122 lines
3.2 KiB
Python

#!/usr/bin/python3
# -*- encoding: utf-8 -*-
# @author: benoit.vince84@free.fr
# @date: Septembre 2022
# @brief: Programme Pwrkey pour le GNSS HAT
###################################################################
# Importation de modules externes #
import sys, os, re
import serial
import RPi.GPIO as GPIO
import time
import logging as log
import time
from datetime import datetime, date
import calendar
import json
import socket
###################################################################
# Class et Methods #
def get_conf(logger=None):
''' Get configuration
:return dict:
Configuration dictionnary
'''
config = None
try:
with open(os.path.join("/home/pi/KineIntercom/src", "KineIntercom.json"), 'r') as f:
try:
config = json.load(f)
except json.decoder.JSONDecodeError as e:
logger.error("Impossible de charger les données de configuration ({})".format(e))
except FileNotFoundError as e:
logger.error("Impossible d'ouvrir le fichier de configuation ({})".format(e))
return config
def verify_open_hours(conf=None, log=None):
''' Verify if GSM HAT must be opened with conf hours
:param conf:
configuration object
:param log:
logger object
:return bool:
True if authorized, False otherwise
'''
flag = False
my_date = date.today()
day = calendar.day_name[my_date.weekday()]
now = datetime.now()
for k,v in conf[day].items():
time_conf = int(k.split('h')[0])*60 + int(k.split('h')[1])
current_time = now.hour*60 + now.minute
if current_time >= time_conf:
if v == 1:
flag = True
elif v == 0:
flag = False
log.debug('Jour: {} - Temps courant: {} - Ouverture: {}'.format(day, now.strftime('%Hh%M'), flag))
return flag
def init_module():
''' initialisation of GNSS/GPS/GSM HAT Module
'''
GPIO.setmode(GPIO.BOARD)
GPIO.setup(7, GPIO.OUT)
return
def setup_module():
''' Setup module (Set/Reset)
'''
while True:
GPIO.output(7, GPIO.LOW)
time.sleep(2)
GPIO.output(7, GPIO.HIGH)
break
GPIO.cleanup()
def main():
''' main function
'''
logger = log.getLogger("pwrkey")
logger.setLevel(log.DEBUG)
fl = log.StreamHandler()
fl.setLevel(log.DEBUG)
formatter = log.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fl.setFormatter(formatter)
logger.addHandler(fl)
# get config dictionnary
config = get_conf(logger)
if not config:
exit(1)
# create a UDS socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
server_addr = '/tmp/uds_socket'
try:
sock.connect(server_addr)
except socket.error as e:
logger.error("Erreur de connexion avec le serveur: {}".format(e))
sys.exit(1)
opened = verify_open_hours(conf=config['HORAIRES'], log=logger)
sock.sendall(str(opened))
sock.close()
#init_module()
#setup_module()
if __name__ == '__main__':
main()