package main import ( "fmt" "strings" mqtt "github.com/eclipse/paho.mqtt.golang" ) type mqttControl struct { broker string port int changeIn, changeOut chan change client mqtt.Client messagePubHandler func(client mqtt.Client, msg mqtt.Message) connectHandler func(client mqtt.Client) connectLostHandler func(client mqtt.Client, err error) } func NewMqttControl(broker string, port int, changeIn, changeOut chan change) *mqttControl { mctrl := mqttControl{ broker: broker, port: port, changeIn: changeIn, changeOut: changeOut, messagePubHandler: func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) change := change{value: string(msg.Payload())} if strings.Contains(msg.Topic(), "monitorBrightness") { change.kind = "monitorBrightness" changeOut <- change } else if strings.Contains(msg.Topic(), "fanSpeed") { change.kind = "fanSpeed" changeOut <- change } }, connectHandler: func(client mqtt.Client) { fmt.Println("Connected") }, connectLostHandler: func(client mqtt.Client, err error) { fmt.Printf("Connect lost: %v", err) }, } mctrl.Init() return &mctrl } func (mc *mqttControl) Init() { opts := mqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", mc.broker, mc.port)) opts.SetClientID("go_mqtt_client") opts.SetUsername("emqx") opts.SetPassword("public") opts.SetDefaultPublishHandler(mc.messagePubHandler) opts.OnConnect = mc.connectHandler opts.OnConnectionLost = mc.connectLostHandler mc.client = mqtt.NewClient(opts) } func (mc *mqttControl) Run() { abort := make(chan struct{}) go func() { select { case change := <-mc.changeIn: mc.client.Publish(fmt.Sprintf("/test/%s/status", change.kind), 0, false, change.value) case <-abort: fmt.Println("publisher received abort") break } }() if token := mc.client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } } func StartMqtt(changeIn, changeOut chan change) { mc := NewMqttControl("broker.emqx.io", 1883, changeIn, changeOut) mc.Run() }