12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- package main
- import (
- "fmt"
- "strings"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- )
- type mqttControl struct {
- broker string
- port int
- changeIn, changeOut chan change
- client mqtt.Client
- messagePubHandler func(client mqtt.Client, msg mqtt.Message)
- connectHandler func(client mqtt.Client)
- connectLostHandler func(client mqtt.Client, err error)
- }
- func NewMqttControl(broker string, port int, changeIn, changeOut chan change) *mqttControl {
- mctrl := mqttControl{
- broker: broker,
- port: port,
- changeIn: changeIn,
- changeOut: changeOut,
- messagePubHandler: func(client mqtt.Client, msg mqtt.Message) {
- fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
- change := change{value: string(msg.Payload())}
- if strings.Contains(msg.Topic(), "monitorBrightness") {
- change.kind = "monitorBrightness"
- changeOut <- change
- } else if strings.Contains(msg.Topic(), "fanSpeed") {
- change.kind = "fanSpeed"
- changeOut <- change
- }
- },
- connectHandler: func(client mqtt.Client) {
- fmt.Println("Connected")
- },
- connectLostHandler: func(client mqtt.Client, err error) {
- fmt.Printf("Connect lost: %v", err)
- },
- }
- mctrl.Init()
- return &mctrl
- }
- func (mc *mqttControl) Init() {
- opts := mqtt.NewClientOptions()
- opts.AddBroker(fmt.Sprintf("tcp://%s:%d", mc.broker, mc.port))
- opts.SetClientID("go_mqtt_client")
- opts.SetUsername("emqx")
- opts.SetPassword("public")
- opts.SetDefaultPublishHandler(mc.messagePubHandler)
- opts.OnConnect = mc.connectHandler
- opts.OnConnectionLost = mc.connectLostHandler
- mc.client = mqtt.NewClient(opts)
- }
- func (mc *mqttControl) Run() {
- abort := make(chan struct{})
- go func() {
- select {
- case change := <-mc.changeIn:
- mc.client.Publish(fmt.Sprintf("/test/%s/status", change.kind), 0, false, change.value)
- case <-abort:
- fmt.Println("publisher received abort")
- break
- }
- }()
- if token := mc.client.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- }
- func StartMqtt(changeIn, changeOut chan change) {
- mc := NewMqttControl("broker.emqx.io", 1883, changeIn, changeOut)
- mc.Run()
- }
|