mirror of
https://github.com/sinseman44/PyCNC.git
synced 2026-01-12 02:40:04 +00:00
95 lines
3.3 KiB
Python
95 lines
3.3 KiB
Python
import threading
|
|
import time
|
|
import logging
|
|
|
|
from cnc.pid import Pid
|
|
|
|
|
|
class Heater(threading.Thread):
|
|
LOOP_INTERVAL_S = 0.5
|
|
SENSOR_TIMEOUT_S = 1
|
|
|
|
def __init__(self, target_temp, pid_coefficients, measure_method,
|
|
control_method):
|
|
""" Initialize and run asynchronous heating.
|
|
:param target_temp: temperature which should be reached in Celsius.
|
|
:param pid_coefficients: dict with PID coefficients.
|
|
:param measure_method: Method which should be called to measure
|
|
temperature, it should return temperature in
|
|
Celsius.
|
|
:param control_method: Method which should be called to control heater
|
|
power, it should received one argument with
|
|
heater power in percent(0..100).
|
|
"""
|
|
self._current_power = 0
|
|
threading.Thread.__init__(self)
|
|
self._pid = Pid(target_temp, pid_coefficients)
|
|
self._measure = measure_method
|
|
self._control = control_method
|
|
self._is_run = True
|
|
self._mutex = threading.Lock()
|
|
self.setDaemon(True)
|
|
self.start()
|
|
logging.info("Heating thread start, temperature {}/{} C"
|
|
.format(self._measure(), self.target_temperature()))
|
|
|
|
def target_temperature(self):
|
|
""" Return target temperature which should be reached.
|
|
:return:
|
|
"""
|
|
return self._pid.target_value()
|
|
|
|
def is_fixed(self):
|
|
""" Check if target value is reached and PID maintains this value.
|
|
:return: boolean value
|
|
"""
|
|
return self._pid.is_fixed()
|
|
|
|
def run(self):
|
|
""" Thread worker implementation. There is a loop for PID control.
|
|
"""
|
|
last_error = None
|
|
while True:
|
|
self._mutex.acquire()
|
|
if not self._is_run:
|
|
break
|
|
try:
|
|
current_temperature = self._measure()
|
|
except (IOError, OSError):
|
|
self._control(0)
|
|
if last_error is None:
|
|
last_error = time.time()
|
|
else:
|
|
if time.time() - last_error > self.SENSOR_TIMEOUT_S:
|
|
logging.critical("No data from temperature sensor."
|
|
" Stop heating.")
|
|
break
|
|
continue
|
|
last_error = None
|
|
self._current_power = self._pid.update(current_temperature) * 100
|
|
self._control(self._current_power)
|
|
self._mutex.release()
|
|
time.sleep(self.LOOP_INTERVAL_S)
|
|
|
|
def stop(self):
|
|
""" Stop heating and free this instance.
|
|
"""
|
|
# make sure that control will not be called in worker anymore.
|
|
self._mutex.acquire()
|
|
self._is_run = False
|
|
self._mutex.release()
|
|
self._control(0)
|
|
logging.info("Heating thread stop")
|
|
|
|
def wait(self):
|
|
""" Block until target temperature is reached.
|
|
"""
|
|
i = 0
|
|
while not self._pid.is_fixed():
|
|
if i % 8 == 0:
|
|
logging.info("Heating... current temperature {} C, power {}%".
|
|
format(self._measure(), int(self._current_power)))
|
|
i = 0
|
|
i += 1
|
|
time.sleep(0.25)
|