initial commit

This commit is contained in:
Javier Peletier
2020-12-11 15:39:11 +01:00
commit 4662308fd0
7 changed files with 472 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
koolnova2mqtt
.vscode/launch.json

11
go.mod Normal file
View File

@@ -0,0 +1,11 @@
module koolnova2mqtt
go 1.14
require (
github.com/eclipse/paho.mqtt.golang v1.3.0
github.com/epiclabs-io/diff3 v0.0.0-20181217103619-05282cece609 // indirect
github.com/epiclabs-io/ut v0.0.0-20190416122157-8da7fe4b4947
github.com/goburrow/modbus v0.1.0
github.com/goburrow/serial v0.1.0 // indirect
)

18
go.sum Normal file
View File

@@ -0,0 +1,18 @@
github.com/eclipse/paho.mqtt.golang v1.3.0 h1:MU79lqr3FKNKbSrGN7d7bNYqh8MwWW7Zcx0iG+VIw9I=
github.com/eclipse/paho.mqtt.golang v1.3.0/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/epiclabs-io/diff3 v0.0.0-20181217103619-05282cece609 h1:KHcpmcC/8cnCDXDm6SaCTajWF/vyUbBE1ovA27xYYEY=
github.com/epiclabs-io/diff3 v0.0.0-20181217103619-05282cece609/go.mod h1:tM499ZoH5jQRF3wlMnl59SJQwVYXIBdJRZa/K71p0IM=
github.com/epiclabs-io/ut v0.0.0-20190416122157-8da7fe4b4947 h1:5jyZq+mwwE90FnIyzAorlWF0Nrg8AB48KsDxofSAyBw=
github.com/epiclabs-io/ut v0.0.0-20190416122157-8da7fe4b4947/go.mod h1:Sm6PW7b/nLOHEn3XxuUOXFYA4xFkLUnyAWUOcTGcRZ4=
github.com/goburrow/modbus v0.1.0 h1:DejRZY73nEM6+bt5JSP6IsFolJ9dVcqxsYbpLbeW/ro=
github.com/goburrow/modbus v0.1.0/go.mod h1:Kx552D5rLIS8E7TyUwQ/UdHEqvX5T8tyiGBTlzMcZBg=
github.com/goburrow/serial v0.1.0 h1:v2T1SQa/dlUqQiYIT8+Cu7YolfqAi3K96UmhwYyuSrA=
github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

154
main.go Normal file
View File

@@ -0,0 +1,154 @@
package main
import (
"crypto/tls"
"flag"
"fmt"
"koolnova2mqtt/watcher"
"koolnova2mqtt/zonewatcher"
"log"
"os"
"os/signal"
"strconv"
"syscall"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/goburrow/modbus"
)
const NUM_ZONES = 16
func buildReader(handler *modbus.RTUClientHandler, client modbus.Client) watcher.ReadRegister {
return func(slaveID byte, address uint16, quantity uint16) (results []byte, err error) {
handler.SlaveId = slaveID
results, err = client.ReadHoldingRegisters(address-1, quantity)
return results, err
}
}
func getActiveZones(w zonewatcher.Watcher) ([]*zonewatcher.ZoneWatcher, error) {
var zones []*zonewatcher.ZoneWatcher
for n := 0; n < NUM_ZONES; n++ {
zone := zonewatcher.New(&zonewatcher.Config{
Zone: n,
Watcher: w,
})
isPresent, err := zone.IsPresent()
if err != nil {
return nil, err
}
if isPresent {
zones = append(zones, zone)
temp, _ := zone.GetCurrentTemperature()
fmt.Printf("Zone %d is present. Temperature %g ºC\n", zone.Zone, temp)
}
}
return zones, nil
}
func main() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
hostname, _ := os.Hostname()
server := flag.String("server", "tcp://127.0.0.1:1883", "The full url of the MQTT server to connect to ex: tcp://127.0.0.1:1883")
//topic := flag.String("topic", "#", "Topic to subscribe to")
//qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
clientid := flag.String("clientid", hostname+strconv.Itoa(time.Now().Second()), "A clientid for the connection")
username := flag.String("username", "", "A username to authenticate to the MQTT server")
password := flag.String("password", "", "Password to match username")
prefix := flag.String("prefix", "koolnova2mqtt", "MQTT topic root where to publish/read topics")
flag.Parse()
// Modbus RTU/ASCII
handler := modbus.NewRTUClientHandler("/dev/remserial1")
handler.BaudRate = 9600
handler.DataBits = 8
handler.Parity = "E"
handler.StopBits = 1
handler.SlaveId = 49
handler.Timeout = 5 * time.Second
err := handler.Connect()
if err != nil {
fmt.Println(err)
}
defer handler.Close()
client := modbus.NewClient(handler)
nodeName := "topFloors"
getTopic := func(zoneNum int, subtopic string) string {
return fmt.Sprintf("%s/%s/zone%d/%s", *prefix, nodeName, zoneNum, subtopic)
}
w := watcher.New(&watcher.Config{
Address: 1,
Quantity: 64,
RegisterSize: 2,
SlaveID: 49,
Read: buildReader(handler, client),
})
err = w.Poll()
if err != nil {
fmt.Println(err)
return
}
zones, err := getActiveZones(w)
connOpts := MQTT.NewClientOptions().AddBroker(*server).SetClientID(*clientid).SetCleanSession(true)
if *username != "" {
connOpts.SetUsername(*username)
if *password != "" {
connOpts.SetPassword(*password)
}
}
tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}
connOpts.SetTLSConfig(tlsConfig)
connOpts.OnConnect = func(c MQTT.Client) {
/* if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil {
panic(token.Error())
} */
w.TriggerCallbacks()
}
mqttClient := MQTT.NewClient(connOpts)
for _, zone := range zones {
zone := zone
zone.OnCurrentTempChange = func(currentTemp float32) {
mqttClient.Publish(getTopic(zone.Zone, "currentTemperature"), 0, false, fmt.Sprintf("%g", currentTemp))
}
zone.OnTargetTempChange = func(targetTemp float32) {
mqttClient.Publish(getTopic(zone.Zone, "targetTemperature"), 0, false, fmt.Sprintf("%g", targetTemp))
}
}
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
} else {
fmt.Printf("Connected to %s\n", *server)
}
ticker := time.NewTicker(time.Second)
go func() {
for range ticker.C {
err := w.Poll()
if err != nil {
log.Printf("Error polling modbus: %s\n", err)
}
}
}()
<-c
}

91
watcher/watcher.go Normal file
View File

@@ -0,0 +1,91 @@
package watcher
import (
"bytes"
"errors"
)
type ReadRegister func(slaveID byte, address uint16, quantity uint16) (results []byte, err error)
type Config struct {
Address uint16
Quantity uint16
SlaveID byte
Read ReadRegister
RegisterSize int
}
type watcher struct {
Config
state []byte
callbacks map[uint16]func(address uint16)
}
var ErrIncorrectRegisterSize = errors.New("Incorrect register size")
var ErrAddressOutOfRange = errors.New("Register address out of range")
var ErrUninitialized = errors.New("State uninitialized. Call Poll() first.")
func New(config *Config) *watcher {
return &watcher{
Config: *config,
callbacks: make(map[uint16]func(address uint16)),
}
}
func (w *watcher) RegisterCallback(address uint16, callback func(address uint16)) {
w.callbacks[address] = callback
}
func (w *watcher) Poll() error {
newState, err := w.Read(w.SlaveID, w.Address, w.Quantity)
if err != nil {
return err
}
if len(newState) != int(w.Quantity)*w.RegisterSize {
return ErrIncorrectRegisterSize
}
oldState := w.state
w.state = newState
first := len(oldState) != len(newState)
address := w.Address
for n := 0; n < len(newState); n += w.RegisterSize {
callback := w.callbacks[address]
if callback == nil {
address++
continue
}
var oldValue []byte
newValue := newState[n : n+w.RegisterSize]
if first {
oldValue = nil
} else {
oldValue = oldState[n : n+w.RegisterSize]
}
if bytes.Compare(oldValue, newValue) != 0 {
callback(address)
}
address++
}
return nil
}
func (w *watcher) ReadRegister(address uint16) (value []byte, err error) {
if address < w.Address || address > w.Address+uint16(w.Quantity) {
return nil, ErrAddressOutOfRange
}
if w.state == nil {
return nil, ErrUninitialized
}
registerOffset := int(address-w.Address) * w.RegisterSize
return w.state[registerOffset : registerOffset+w.RegisterSize], nil
}
func (w *watcher) TriggerCallbacks() {
for address, callback := range w.callbacks {
callback(address)
}
}

87
watcher/watcher_test.go Normal file
View File

@@ -0,0 +1,87 @@
package watcher_test
import (
"errors"
"koolnova2mqtt/watcher"
"testing"
"github.com/epiclabs-io/ut"
)
func TestWatcher(tx *testing.T) {
t := ut.BeginTest(tx, false)
defer t.FinishTest()
var r []byte
var readRegisterError error = nil
readRegister := func(slaveID byte, address uint16, quantity uint16) (results []byte, err error) {
return r, readRegisterError
}
w := watcher.New(&watcher.Config{
Address: 1000,
Quantity: 5,
RegisterSize: 2,
SlaveID: 1,
Read: readRegister,
})
var cbAddress uint16
var callbackCount int
w.RegisterCallback(1000, func(address uint16) {
cbAddress = address
callbackCount++
})
w.RegisterCallback(1004, func(address uint16) {
callbackCount++
})
r = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
value, err := w.ReadRegister(1001)
t.MustFailWith(err, watcher.ErrUninitialized)
t.Equals([]byte(nil), value)
err = w.Poll()
t.Ok(err)
t.Equals(2, callbackCount)
value, err = w.ReadRegister(1001)
t.Ok(err)
t.Equals([]byte{3, 4}, value)
_, err = w.ReadRegister(200)
t.MustFailWith(err, watcher.ErrAddressOutOfRange)
_, err = w.ReadRegister(5000)
t.MustFailWith(err, watcher.ErrAddressOutOfRange)
callbackCount = 0
err = w.Poll()
t.Ok(err)
t.Equals(callbackCount, 0)
r = []byte{79, 82, 3, 4, 5, 6, 7, 8, 9, 10}
callbackCount = 0
err = w.Poll()
t.Ok(err)
t.Equals(1, callbackCount)
t.Equals(uint16(1000), cbAddress)
cbNewValue, err := w.ReadRegister(cbAddress)
t.Ok(err)
t.Equals([]byte{79, 82}, cbNewValue)
r = []byte{1, 2}
err = w.Poll()
t.MustFailWith(err, watcher.ErrIncorrectRegisterSize)
readRegisterError = errors.New("error")
err = w.Poll()
t.MustFail(err, "expected Poll() to fail if readRegister returns error")
}

109
zonewatcher/zonewatcher.go Normal file
View File

@@ -0,0 +1,109 @@
package zonewatcher
import (
"encoding/binary"
"log"
)
const REG_PER_ZONE = 4
const REG_ENABLED = 1
const REG_MODE = 2
const REG_TARGET_TEMP = 3
const REG_CURRENT_TEMP = 4
type Watcher interface {
ReadRegister(address uint16) (value []byte, err error)
RegisterCallback(address uint16, callback func(address uint16))
}
type Config struct {
Zone int
Watcher Watcher
}
type ZoneWatcher struct {
Config
OnCurrentTempChange func(newTemp float32)
OnTargetTempChange func(newTemp float32)
}
func New(config *Config) *ZoneWatcher {
zw := &ZoneWatcher{
Config: *config,
}
zw.RegisterCallback(REG_CURRENT_TEMP, func() {
if zw.OnCurrentTempChange == nil {
return
}
temp, err := zw.GetCurrentTemperature()
if err != nil {
log.Printf("Cannot read current temperature: %s\n", err)
return
}
zw.OnCurrentTempChange(temp)
})
zw.RegisterCallback(REG_TARGET_TEMP, func() {
if zw.OnTargetTempChange == nil {
return
}
temp, err := zw.GetTargetTemperature()
if err != nil {
log.Printf("Cannot read target temperature: %s\n", err)
return
}
zw.OnTargetTempChange(temp)
})
return zw
}
func (zw *ZoneWatcher) RegisterCallback(num int, f func()) {
zw.Watcher.RegisterCallback(uint16(zw.Zone*REG_PER_ZONE+num), func(address uint16) {
f()
})
}
func (zw *ZoneWatcher) ReadRegister(num int) (uint16, error) {
b, err := zw.Watcher.ReadRegister(uint16(zw.Zone*REG_PER_ZONE + num))
if err != nil {
return 0, err
}
return binary.BigEndian.Uint16(b), nil
}
func (zw *ZoneWatcher) IsOn() (bool, error) {
r1, err := zw.ReadRegister(REG_ENABLED)
if err != nil {
return false, err
}
return r1&uint16(0x1) != 0, nil
}
func (zw *ZoneWatcher) IsPresent() (bool, error) {
r1, err := zw.ReadRegister(REG_ENABLED)
if err != nil {
return false, err
}
return r1&uint16(0x2) != 0, nil
}
func (zw *ZoneWatcher) GetCurrentTemperature() (float32, error) {
r4, err := zw.ReadRegister(REG_CURRENT_TEMP)
if err != nil {
return 0.0, err
}
return float32(r4) / 2.0, nil
}
func (zw *ZoneWatcher) GetTargetTemperature() (float32, error) {
r3, err := zw.ReadRegister(REG_TARGET_TEMP)
if err != nil {
return 0.0, err
}
return float32(r3) / 2.0, nil
}