gorilla-wsserver.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package wsserver
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "strings"
  10. gamelangpb "git.alfi.li/gamelang/protobuf/gamelang"
  11. um "git.alfi.li/gamelang/systems/usermanager"
  12. "github.com/gorilla/websocket"
  13. )
  14. var wsaddr = flag.String("wsaddr", "localhost:12345", "websocket service address")
  15. type WSMsg struct {
  16. Mtype string `json:"mtype"`
  17. Payload string `json:"payload"`
  18. }
  19. type Handler struct {
  20. MType string
  21. Callback func(WSMsg, gamelangpb.User) (WSMsg, error)
  22. }
  23. type WSServer struct {
  24. usermanager *um.UserManager
  25. userclient gamelangpb.UserServiceClient
  26. upgrader *websocket.Upgrader
  27. handler map[string]func(WSMsg, gamelangpb.User) (WSMsg, error)
  28. writeChans map[string]chan WSMsg
  29. }
  30. func NewWSServer(umBroker []string, userclient gamelangpb.UserServiceClient, usermanager *um.UserManager) *WSServer {
  31. upgrader := websocket.Upgrader{} // use default options
  32. wss := WSServer{upgrader: &upgrader, writeChans: map[string]chan WSMsg{}}
  33. if userclient != nil {
  34. log.Print("init userclient")
  35. wss.userclient = userclient
  36. } else if usermanager != nil {
  37. log.Print("using usermanager")
  38. wss.usermanager = usermanager
  39. } else {
  40. log.Print("init usermanager")
  41. usermanager := um.NewUserManager(umBroker)
  42. wss.usermanager = usermanager
  43. }
  44. return &wss
  45. }
  46. func (wss *WSServer) Write(name string, sendMsg WSMsg) {
  47. chans := wss.GetWriteChans(name)
  48. if len(chans) == 0 {
  49. log.Println("WSServer:Write - no sendChans for", name)
  50. return
  51. }
  52. for _, con := range chans {
  53. con <- sendMsg
  54. }
  55. }
  56. func (wss *WSServer) GetWriteChans(searchname string) []chan WSMsg {
  57. cons := []chan WSMsg{}
  58. for key, con := range wss.writeChans {
  59. username := strings.Split(key, "\uffff")[0]
  60. if username == searchname {
  61. cons = append(cons, con)
  62. }
  63. }
  64. return cons
  65. }
  66. func (wss *WSServer) CleanWriteChan(name string) {
  67. delete(wss.writeChans, name)
  68. }
  69. func (wss *WSServer) RegisterHandler(handlers []Handler) {
  70. if wss.handler == nil {
  71. wss.handler = map[string]func(WSMsg, gamelangpb.User) (WSMsg, error){}
  72. }
  73. for _, handler := range handlers {
  74. log.Println("Registrering handler for MType: ", handler.MType)
  75. wss.handler[handler.MType] = handler.Callback
  76. }
  77. }
  78. type rawMsg struct {
  79. mtype int
  80. p []byte
  81. }
  82. func (wss WSServer) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
  83. log.Println("request on /ws from", r.RemoteAddr)
  84. c, err := wss.upgrader.Upgrade(w, r, nil)
  85. if err != nil {
  86. log.Print("upgrade:", err)
  87. return
  88. }
  89. user := "not logged in"
  90. defer wss.CleanWriteChan(fmt.Sprintf("%s\uffff%s", user, r.RemoteAddr))
  91. defer c.Close()
  92. var userObj *gamelangpb.User
  93. var writeChan chan WSMsg
  94. for {
  95. mt, message, err := c.ReadMessage()
  96. if err != nil {
  97. log.Println("read:", err)
  98. break
  99. }
  100. msg := WSMsg{}
  101. err = json.Unmarshal(message, &msg)
  102. if err != nil {
  103. log.Println("error unmarshalling message:", err)
  104. log.Printf("recv: %s", message)
  105. return
  106. }
  107. if user == "not logged in" && msg.Mtype != "login" {
  108. msg := WSMsg{Mtype: "login-needed", Payload: ""}
  109. jsonMsg, _ := json.Marshal(msg)
  110. c.WriteMessage(mt, jsonMsg)
  111. continue
  112. }
  113. if msg.Mtype == "login" {
  114. log.Println("login request:", msg.Payload)
  115. if user == "not logged in" {
  116. creds := strings.Split(msg.Payload, "\uffff")
  117. if len(creds) != 2 {
  118. log.Printf("login failed for \"%s\"", msg.Payload)
  119. msg := WSMsg{Mtype: "login-failed", Payload: ""}
  120. wss.answer(mt, c, msg)
  121. return
  122. }
  123. username := creds[0]
  124. password := creds[1]
  125. var ok bool
  126. if wss.userclient != nil {
  127. log.Print("checking user via userclient")
  128. checkuser := gamelangpb.User{Name: username, Password: []byte(password)}
  129. var err error
  130. var tmpUserObj *gamelangpb.User
  131. tmpUserObj, err = wss.userclient.CheckUser(context.Background(), &checkuser)
  132. if err != nil {
  133. ok = false
  134. log.Print(err.Error())
  135. } else {
  136. userObj = tmpUserObj
  137. ok = true
  138. }
  139. } else {
  140. log.Print("checking user via usermanager")
  141. var tmpUserObj gamelangpb.User
  142. tmpUserObj, ok = wss.usermanager.CheckUser(username, password)
  143. if ok {
  144. userObj = &tmpUserObj
  145. }
  146. }
  147. if !ok {
  148. log.Printf("login failed for \"%s\" with \"%s\"", username, msg.Payload)
  149. msg := WSMsg{Mtype: "login-failed", Payload: ""}
  150. wss.answer(mt, c, msg)
  151. return
  152. }
  153. user = username
  154. writeChan = make(chan WSMsg, 20)
  155. wss.writeChans[fmt.Sprintf("%s\uffff%s", username, r.RemoteAddr)] = writeChan
  156. go func() {
  157. for {
  158. newMsg := <-writeChan
  159. err := wss.answer(websocket.BinaryMessage, c, newMsg)
  160. if err != nil {
  161. if strings.Contains(err.Error(), "use of closed network connection") || strings.Contains(err.Error(), "connection reset by peer") {
  162. delete(wss.writeChans, fmt.Sprintf("%s\uffff%s", username, r.RemoteAddr))
  163. }
  164. log.Println("websocket writer error:", err.Error())
  165. }
  166. }
  167. }()
  168. log.Printf("login succeeded for \"%s\"", username)
  169. msg := WSMsg{Mtype: "login-succeeded", Payload: username}
  170. writeChan <- msg
  171. }
  172. } else {
  173. handler, ok := wss.handler[msg.Mtype]
  174. if !ok {
  175. log.Println("unknown MType", msg.Mtype)
  176. keys := ""
  177. for k := range wss.handler {
  178. keys = fmt.Sprintf("%s\n%s", keys, k)
  179. }
  180. log.Println("available MTypes:\n", keys, "----")
  181. remsg := WSMsg{Mtype: "UNKNOWN", Payload: ""}
  182. writeChan <- remsg
  183. return
  184. }
  185. remsg, err := handler(msg, *userObj)
  186. if err != nil {
  187. log.Println("handler failed", err.Error())
  188. writeChan <- msg
  189. }
  190. writeChan <- remsg
  191. }
  192. }
  193. }
  194. func (wss WSServer) answer(mt int, c *websocket.Conn, msg WSMsg) error {
  195. jsonMsg, err := json.Marshal(msg)
  196. if err != nil {
  197. log.Printf("write: err marshalling \"%s\"\n%s", msg, err.Error())
  198. return err
  199. }
  200. err = c.WriteMessage(mt, jsonMsg)
  201. if err != nil {
  202. log.Println("write:", err)
  203. return err
  204. }
  205. return nil
  206. }