mqtt.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package main
  2. import (
  3. "fmt"
  4. "strings"
  5. mqtt "github.com/eclipse/paho.mqtt.golang"
  6. )
  7. type mqttControl struct {
  8. broker string
  9. port int
  10. changeIn, changeOut chan change
  11. client mqtt.Client
  12. messagePubHandler func(client mqtt.Client, msg mqtt.Message)
  13. connectHandler func(client mqtt.Client)
  14. connectLostHandler func(client mqtt.Client, err error)
  15. }
  16. func NewMqttControl(broker string, port int, changeIn, changeOut chan change) *mqttControl {
  17. mctrl := mqttControl{
  18. broker: broker,
  19. port: port,
  20. changeIn: changeIn,
  21. changeOut: changeOut,
  22. messagePubHandler: func(client mqtt.Client, msg mqtt.Message) {
  23. fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
  24. change := change{value: string(msg.Payload())}
  25. if strings.Contains(msg.Topic(), "monitorBrightness") {
  26. change.kind = "monitorBrightness"
  27. changeOut <- change
  28. } else if strings.Contains(msg.Topic(), "fanSpeed") {
  29. change.kind = "fanSpeed"
  30. changeOut <- change
  31. }
  32. },
  33. connectHandler: func(client mqtt.Client) {
  34. fmt.Println("Connected")
  35. },
  36. connectLostHandler: func(client mqtt.Client, err error) {
  37. fmt.Printf("Connect lost: %v", err)
  38. },
  39. }
  40. mctrl.Init()
  41. return &mctrl
  42. }
  43. func (mc *mqttControl) Init() {
  44. opts := mqtt.NewClientOptions()
  45. opts.AddBroker(fmt.Sprintf("tcp://%s:%d", mc.broker, mc.port))
  46. opts.SetClientID("go_mqtt_client")
  47. opts.SetUsername("emqx")
  48. opts.SetPassword("public")
  49. opts.SetDefaultPublishHandler(mc.messagePubHandler)
  50. opts.OnConnect = mc.connectHandler
  51. opts.OnConnectionLost = mc.connectLostHandler
  52. mc.client = mqtt.NewClient(opts)
  53. }
  54. func (mc *mqttControl) Run() {
  55. abort := make(chan struct{})
  56. go func() {
  57. select {
  58. case change := <-mc.changeIn:
  59. mc.client.Publish(fmt.Sprintf("/test/%s/status", change.kind), 0, false, change.value)
  60. case <-abort:
  61. fmt.Println("publisher received abort")
  62. break
  63. }
  64. }()
  65. if token := mc.client.Connect(); token.Wait() && token.Error() != nil {
  66. panic(token.Error())
  67. }
  68. }
  69. func StartMqtt(changeIn, changeOut chan change) {
  70. mc := NewMqttControl("broker.emqx.io", 1883, changeIn, changeOut)
  71. mc.Run()
  72. }