commit 4662308fd0a7aa8fb5aca7c58da71b7e0bfc1c69 Author: Javier Peletier Date: Fri Dec 11 15:39:11 2020 +0100 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..af357d2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +koolnova2mqtt +.vscode/launch.json diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2156a18 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module koolnova2mqtt + +go 1.14 + +require ( + github.com/eclipse/paho.mqtt.golang v1.3.0 + github.com/epiclabs-io/diff3 v0.0.0-20181217103619-05282cece609 // indirect + github.com/epiclabs-io/ut v0.0.0-20190416122157-8da7fe4b4947 + github.com/goburrow/modbus v0.1.0 + github.com/goburrow/serial v0.1.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c4bb734 --- /dev/null +++ b/go.sum @@ -0,0 +1,18 @@ +github.com/eclipse/paho.mqtt.golang v1.3.0 h1:MU79lqr3FKNKbSrGN7d7bNYqh8MwWW7Zcx0iG+VIw9I= +github.com/eclipse/paho.mqtt.golang v1.3.0/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= +github.com/epiclabs-io/diff3 v0.0.0-20181217103619-05282cece609 h1:KHcpmcC/8cnCDXDm6SaCTajWF/vyUbBE1ovA27xYYEY= +github.com/epiclabs-io/diff3 v0.0.0-20181217103619-05282cece609/go.mod h1:tM499ZoH5jQRF3wlMnl59SJQwVYXIBdJRZa/K71p0IM= +github.com/epiclabs-io/ut v0.0.0-20190416122157-8da7fe4b4947 h1:5jyZq+mwwE90FnIyzAorlWF0Nrg8AB48KsDxofSAyBw= +github.com/epiclabs-io/ut v0.0.0-20190416122157-8da7fe4b4947/go.mod h1:Sm6PW7b/nLOHEn3XxuUOXFYA4xFkLUnyAWUOcTGcRZ4= +github.com/goburrow/modbus v0.1.0 h1:DejRZY73nEM6+bt5JSP6IsFolJ9dVcqxsYbpLbeW/ro= +github.com/goburrow/modbus v0.1.0/go.mod h1:Kx552D5rLIS8E7TyUwQ/UdHEqvX5T8tyiGBTlzMcZBg= +github.com/goburrow/serial v0.1.0 h1:v2T1SQa/dlUqQiYIT8+Cu7YolfqAi3K96UmhwYyuSrA= +github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/main.go b/main.go new file mode 100644 index 0000000..fa96421 --- /dev/null +++ b/main.go @@ -0,0 +1,154 @@ +package main + +import ( + "crypto/tls" + "flag" + "fmt" + "koolnova2mqtt/watcher" + "koolnova2mqtt/zonewatcher" + "log" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + MQTT "github.com/eclipse/paho.mqtt.golang" + "github.com/goburrow/modbus" +) + +const NUM_ZONES = 16 + +func buildReader(handler *modbus.RTUClientHandler, client modbus.Client) watcher.ReadRegister { + return func(slaveID byte, address uint16, quantity uint16) (results []byte, err error) { + handler.SlaveId = slaveID + results, err = client.ReadHoldingRegisters(address-1, quantity) + return results, err + } +} + +func getActiveZones(w zonewatcher.Watcher) ([]*zonewatcher.ZoneWatcher, error) { + var zones []*zonewatcher.ZoneWatcher + + for n := 0; n < NUM_ZONES; n++ { + zone := zonewatcher.New(&zonewatcher.Config{ + Zone: n, + Watcher: w, + }) + isPresent, err := zone.IsPresent() + if err != nil { + return nil, err + } + if isPresent { + zones = append(zones, zone) + temp, _ := zone.GetCurrentTemperature() + fmt.Printf("Zone %d is present. Temperature %g ÂșC\n", zone.Zone, temp) + } + } + return zones, nil +} + +func main() { + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + hostname, _ := os.Hostname() + + server := flag.String("server", "tcp://127.0.0.1:1883", "The full url of the MQTT server to connect to ex: tcp://127.0.0.1:1883") + //topic := flag.String("topic", "#", "Topic to subscribe to") + //qos := flag.Int("qos", 0, "The QoS to subscribe to messages at") + clientid := flag.String("clientid", hostname+strconv.Itoa(time.Now().Second()), "A clientid for the connection") + username := flag.String("username", "", "A username to authenticate to the MQTT server") + password := flag.String("password", "", "Password to match username") + prefix := flag.String("prefix", "koolnova2mqtt", "MQTT topic root where to publish/read topics") + flag.Parse() + + // Modbus RTU/ASCII + handler := modbus.NewRTUClientHandler("/dev/remserial1") + handler.BaudRate = 9600 + handler.DataBits = 8 + handler.Parity = "E" + handler.StopBits = 1 + handler.SlaveId = 49 + handler.Timeout = 5 * time.Second + + err := handler.Connect() + if err != nil { + fmt.Println(err) + } + defer handler.Close() + + client := modbus.NewClient(handler) + + nodeName := "topFloors" + + getTopic := func(zoneNum int, subtopic string) string { + return fmt.Sprintf("%s/%s/zone%d/%s", *prefix, nodeName, zoneNum, subtopic) + } + + w := watcher.New(&watcher.Config{ + Address: 1, + Quantity: 64, + RegisterSize: 2, + SlaveID: 49, + Read: buildReader(handler, client), + }) + + err = w.Poll() + if err != nil { + fmt.Println(err) + return + } + + zones, err := getActiveZones(w) + + connOpts := MQTT.NewClientOptions().AddBroker(*server).SetClientID(*clientid).SetCleanSession(true) + if *username != "" { + connOpts.SetUsername(*username) + if *password != "" { + connOpts.SetPassword(*password) + } + } + tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert} + connOpts.SetTLSConfig(tlsConfig) + + connOpts.OnConnect = func(c MQTT.Client) { + /* if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil { + panic(token.Error()) + } */ + w.TriggerCallbacks() + } + + mqttClient := MQTT.NewClient(connOpts) + + for _, zone := range zones { + zone := zone + zone.OnCurrentTempChange = func(currentTemp float32) { + mqttClient.Publish(getTopic(zone.Zone, "currentTemperature"), 0, false, fmt.Sprintf("%g", currentTemp)) + } + zone.OnTargetTempChange = func(targetTemp float32) { + mqttClient.Publish(getTopic(zone.Zone, "targetTemperature"), 0, false, fmt.Sprintf("%g", targetTemp)) + } + } + + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { + panic(token.Error()) + } else { + fmt.Printf("Connected to %s\n", *server) + } + + ticker := time.NewTicker(time.Second) + + go func() { + for range ticker.C { + err := w.Poll() + if err != nil { + log.Printf("Error polling modbus: %s\n", err) + } + } + }() + + <-c + +} diff --git a/watcher/watcher.go b/watcher/watcher.go new file mode 100644 index 0000000..c9506c7 --- /dev/null +++ b/watcher/watcher.go @@ -0,0 +1,91 @@ +package watcher + +import ( + "bytes" + "errors" +) + +type ReadRegister func(slaveID byte, address uint16, quantity uint16) (results []byte, err error) + +type Config struct { + Address uint16 + Quantity uint16 + SlaveID byte + Read ReadRegister + RegisterSize int +} + +type watcher struct { + Config + state []byte + callbacks map[uint16]func(address uint16) +} + +var ErrIncorrectRegisterSize = errors.New("Incorrect register size") +var ErrAddressOutOfRange = errors.New("Register address out of range") +var ErrUninitialized = errors.New("State uninitialized. Call Poll() first.") + +func New(config *Config) *watcher { + return &watcher{ + Config: *config, + callbacks: make(map[uint16]func(address uint16)), + } +} + +func (w *watcher) RegisterCallback(address uint16, callback func(address uint16)) { + w.callbacks[address] = callback +} + +func (w *watcher) Poll() error { + newState, err := w.Read(w.SlaveID, w.Address, w.Quantity) + if err != nil { + return err + } + + if len(newState) != int(w.Quantity)*w.RegisterSize { + return ErrIncorrectRegisterSize + } + + oldState := w.state + w.state = newState + + first := len(oldState) != len(newState) + address := w.Address + for n := 0; n < len(newState); n += w.RegisterSize { + callback := w.callbacks[address] + if callback == nil { + address++ + continue + } + var oldValue []byte + newValue := newState[n : n+w.RegisterSize] + if first { + oldValue = nil + } else { + oldValue = oldState[n : n+w.RegisterSize] + } + if bytes.Compare(oldValue, newValue) != 0 { + callback(address) + } + address++ + } + return nil +} + +func (w *watcher) ReadRegister(address uint16) (value []byte, err error) { + if address < w.Address || address > w.Address+uint16(w.Quantity) { + return nil, ErrAddressOutOfRange + } + if w.state == nil { + return nil, ErrUninitialized + } + registerOffset := int(address-w.Address) * w.RegisterSize + return w.state[registerOffset : registerOffset+w.RegisterSize], nil + +} + +func (w *watcher) TriggerCallbacks() { + for address, callback := range w.callbacks { + callback(address) + } +} diff --git a/watcher/watcher_test.go b/watcher/watcher_test.go new file mode 100644 index 0000000..a2ab4ee --- /dev/null +++ b/watcher/watcher_test.go @@ -0,0 +1,87 @@ +package watcher_test + +import ( + "errors" + "koolnova2mqtt/watcher" + "testing" + + "github.com/epiclabs-io/ut" +) + +func TestWatcher(tx *testing.T) { + t := ut.BeginTest(tx, false) + defer t.FinishTest() + + var r []byte + var readRegisterError error = nil + readRegister := func(slaveID byte, address uint16, quantity uint16) (results []byte, err error) { + return r, readRegisterError + } + + w := watcher.New(&watcher.Config{ + Address: 1000, + Quantity: 5, + RegisterSize: 2, + SlaveID: 1, + Read: readRegister, + }) + + var cbAddress uint16 + var callbackCount int + w.RegisterCallback(1000, func(address uint16) { + cbAddress = address + callbackCount++ + }) + + w.RegisterCallback(1004, func(address uint16) { + callbackCount++ + }) + + r = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + + value, err := w.ReadRegister(1001) + t.MustFailWith(err, watcher.ErrUninitialized) + t.Equals([]byte(nil), value) + + err = w.Poll() + t.Ok(err) + t.Equals(2, callbackCount) + + value, err = w.ReadRegister(1001) + t.Ok(err) + t.Equals([]byte{3, 4}, value) + + _, err = w.ReadRegister(200) + t.MustFailWith(err, watcher.ErrAddressOutOfRange) + + _, err = w.ReadRegister(5000) + t.MustFailWith(err, watcher.ErrAddressOutOfRange) + + callbackCount = 0 + err = w.Poll() + t.Ok(err) + + t.Equals(callbackCount, 0) + + r = []byte{79, 82, 3, 4, 5, 6, 7, 8, 9, 10} + callbackCount = 0 + err = w.Poll() + t.Ok(err) + + t.Equals(1, callbackCount) + t.Equals(uint16(1000), cbAddress) + + cbNewValue, err := w.ReadRegister(cbAddress) + t.Ok(err) + t.Equals([]byte{79, 82}, cbNewValue) + + r = []byte{1, 2} + err = w.Poll() + t.MustFailWith(err, watcher.ErrIncorrectRegisterSize) + + readRegisterError = errors.New("error") + + err = w.Poll() + t.MustFail(err, "expected Poll() to fail if readRegister returns error") + +} diff --git a/zonewatcher/zonewatcher.go b/zonewatcher/zonewatcher.go new file mode 100644 index 0000000..552ce67 --- /dev/null +++ b/zonewatcher/zonewatcher.go @@ -0,0 +1,109 @@ +package zonewatcher + +import ( + "encoding/binary" + "log" +) + +const REG_PER_ZONE = 4 +const REG_ENABLED = 1 +const REG_MODE = 2 +const REG_TARGET_TEMP = 3 +const REG_CURRENT_TEMP = 4 + +type Watcher interface { + ReadRegister(address uint16) (value []byte, err error) + RegisterCallback(address uint16, callback func(address uint16)) +} + +type Config struct { + Zone int + Watcher Watcher +} + +type ZoneWatcher struct { + Config + OnCurrentTempChange func(newTemp float32) + OnTargetTempChange func(newTemp float32) +} + +func New(config *Config) *ZoneWatcher { + zw := &ZoneWatcher{ + Config: *config, + } + zw.RegisterCallback(REG_CURRENT_TEMP, func() { + if zw.OnCurrentTempChange == nil { + return + } + + temp, err := zw.GetCurrentTemperature() + if err != nil { + log.Printf("Cannot read current temperature: %s\n", err) + return + } + zw.OnCurrentTempChange(temp) + }) + zw.RegisterCallback(REG_TARGET_TEMP, func() { + if zw.OnTargetTempChange == nil { + return + } + + temp, err := zw.GetTargetTemperature() + if err != nil { + log.Printf("Cannot read target temperature: %s\n", err) + return + } + zw.OnTargetTempChange(temp) + }) + return zw +} + +func (zw *ZoneWatcher) RegisterCallback(num int, f func()) { + zw.Watcher.RegisterCallback(uint16(zw.Zone*REG_PER_ZONE+num), func(address uint16) { + f() + }) +} + +func (zw *ZoneWatcher) ReadRegister(num int) (uint16, error) { + + b, err := zw.Watcher.ReadRegister(uint16(zw.Zone*REG_PER_ZONE + num)) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint16(b), nil +} + +func (zw *ZoneWatcher) IsOn() (bool, error) { + r1, err := zw.ReadRegister(REG_ENABLED) + if err != nil { + return false, err + } + + return r1&uint16(0x1) != 0, nil +} + +func (zw *ZoneWatcher) IsPresent() (bool, error) { + r1, err := zw.ReadRegister(REG_ENABLED) + if err != nil { + return false, err + } + + return r1&uint16(0x2) != 0, nil +} + +func (zw *ZoneWatcher) GetCurrentTemperature() (float32, error) { + r4, err := zw.ReadRegister(REG_CURRENT_TEMP) + if err != nil { + return 0.0, err + } + + return float32(r4) / 2.0, nil +} + +func (zw *ZoneWatcher) GetTargetTemperature() (float32, error) { + r3, err := zw.ReadRegister(REG_TARGET_TEMP) + if err != nil { + return 0.0, err + } + return float32(r3) / 2.0, nil +}