bingomanager.go 12 KB


  1. package BingoManager
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "encoding/hex"
  6. "errors"
  7. "log"
  8. mathrand "math/rand"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/golang/protobuf/proto"
  13. "github.com/segmentio/kafka-go"
  14. glbingopb "git.alfi.li/gamelang/protobuf/gamelang-bingo"
  15. )
  16. // BingoManager manages holds
  17. type BingoManager struct {
  18. Games map[string]*glbingopb.Bingo
  19. GamesLock sync.Mutex
  20. kafkaBroker []string
  21. kafkaR *kafka.Reader
  22. kafkaW *kafka.Writer
  23. notichan chan glbingopb.Event
  24. }
  25. // NewBingoManager returns an initialized BingoManager
  26. func NewBingoManager(kafkaBroker []string, notichan chan glbingopb.Event) *BingoManager {
  27. // default broker: []string{"localhost:9092"}
  28. bs := BingoManager{notichan: notichan}
  29. bs.Games = map[string]*glbingopb.Bingo{}
  30. if len(kafkaBroker) > 0 {
  31. bs.kafkaBroker = kafkaBroker
  32. bs.InitKafka()
  33. }
  34. go bs.CheckWin()
  35. return &bs
  36. }
  37. func (bs *BingoManager) CheckWin() {
  38. for {
  39. bs.GamesLock.Lock()
  40. games := bs.Games
  41. for _, game := range games {
  42. //log.Println("CheckWin: checking game", game)
  43. if strings.Split(game.Name, "\uffff")[1] == "lobby" {
  44. continue
  45. }
  46. win, winner := bs.CheckHorizontal(game)
  47. if !win {
  48. win, winner = bs.CheckVertical(game)
  49. }
  50. if win {
  51. game.Winner = winner
  52. game.Finished = true
  53. bs.notify(glbingopb.Event{Type: "Win", Gamename: game.Name, Username: winner})
  54. }
  55. }
  56. bs.GamesLock.Unlock()
  57. time.Sleep(time.Second)
  58. }
  59. }
  60. func (bs *BingoManager) CheckHorizontal(game *glbingopb.Bingo) (bool, string) {
  61. log.Println("CheckHorizontal: checking game", game.Name)
  62. fields := game.GetFields()
  63. for _, row := range fields.GetRows() {
  64. //log.Println("CheckHorizontal: checking row", i)
  65. hits := uint32(0)
  66. winner := ""
  67. for _, field := range row.GetFields() {
  68. //log.Println("CheckHorizontal: checking field", y)
  69. if field.Checked {
  70. if winner == "" {
  71. winner = field.Checker[0]
  72. //log.Println("CheckHorizontal: new winner", winner)
  73. hits++
  74. continue
  75. }
  76. if winner == field.Checker[0] {
  77. //log.Println("CheckHorizontal: same winner", winner)
  78. hits++
  79. continue
  80. }
  81. //log.Println("CheckHorizontal: other winner", winner)
  82. break
  83. } //else {
  84. //log.Println("CheckHorizontal: not checked")
  85. //}
  86. }
  87. if hits == game.Numcols {
  88. //log.Println("got a winner", winner)
  89. return true, winner
  90. }
  91. }
  92. return false, ""
  93. }
  94. func (bs *BingoManager) CheckVertical(game *glbingopb.Bingo) (bool, string) {
  95. log.Println("CheckHorizontal: checking game", game.Name)
  96. fields := game.GetFields()
  97. hits := uint32(0)
  98. winner := ""
  99. for col := uint32(0); col < game.GetNumcols(); col++ {
  100. for _, row := range fields.GetRows() {
  101. field := row.Fields[col]
  102. if field.Checked {
  103. //log.Println("CheckHorizontal: checking field", y)
  104. if field.Checked {
  105. if winner == "" {
  106. winner = field.Checker[0]
  107. //log.Println("CheckHorizontal: new winner", winner)
  108. hits++
  109. continue
  110. }
  111. if winner == field.Checker[0] {
  112. //log.Println("CheckHorizontal: same winner", winner)
  113. hits++
  114. continue
  115. }
  116. //log.Println("CheckHorizontal: other winner", winner)
  117. break
  118. }
  119. }
  120. }
  121. }
  122. if hits == game.Numrows {
  123. //log.Println("got a winner", winner)
  124. return true, winner
  125. }
  126. return false, ""
  127. }
  128. // CraftGame crafts a new Game
  129. func (bs *BingoManager) CraftGame(id uint32, name string, wordlist []string, numrows, numcols uint32, options ...*glbingopb.Option) glbingopb.Bingo {
  130. log.Println("crafting game", name)
  131. newGame := glbingopb.Bingo{Id: id, Name: name, Textlist: wordlist, Options: options, Numrows: numrows, Numcols: numcols}
  132. log.Println("new game:\n", newGame)
  133. matrix := glbingopb.FieldMatrix{Rows: []*glbingopb.FieldMatrix_MatrixRow{}}
  134. for row := uint32(0); row < numrows; row += 1 {
  135. matrix.Rows = append(matrix.Rows, &glbingopb.FieldMatrix_MatrixRow{})
  136. matrix.Rows[row].Fields = []*glbingopb.Field{}
  137. for col := uint32(0); col < numrows; col += 1 {
  138. if getOption(options, "unique fields").Value == "true" {
  139. var word string
  140. word, wordlist = pickRandomEntryAndDelete(wordlist)
  141. matrix.Rows[row].Fields = append(matrix.Rows[row].Fields, &glbingopb.Field{Text: word})
  142. } else {
  143. word, _ := pickRandomEntry(wordlist)
  144. matrix.Rows[row].Fields = append(matrix.Rows[row].Fields, &glbingopb.Field{Text: word})
  145. }
  146. }
  147. }
  148. newGame.Fields = &matrix
  149. return newGame
  150. }
  151. func getOption(options []*glbingopb.Option, search string) *glbingopb.Option {
  152. for _, option := range options {
  153. if option.Key == search {
  154. return option
  155. }
  156. }
  157. return &glbingopb.Option{}
  158. }
  159. func pickRandomEntry(list []string) (string, int) {
  160. index := mathrand.Int() % len(list)
  161. return list[index], index
  162. }
  163. func pickRandomEntryAndDelete(list []string) (string, []string) {
  164. entry, index := pickRandomEntry(list)
  165. list[index] = list[len(list)-1]
  166. return entry, list[:len(list)-1]
  167. }
  168. // Create adds an entry into the Game database
  169. func (bs *BingoManager) Create(newGame glbingopb.Bingo) error {
  170. log.Println("BingoManager: create game", newGame.Name)
  171. if bs.kafkaW != nil {
  172. newGameBin, _ := proto.Marshal(&newGame)
  173. if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:create"), Value: newGameBin}); err != nil {
  174. log.Println("ERROR: cannot write to kafka\n", err.Error())
  175. return err
  176. }
  177. return nil
  178. } else {
  179. log.Println("creating Game:\n", newGame)
  180. bs._create(newGame)
  181. return nil
  182. }
  183. }
  184. func (bs *BingoManager) _create(newGame glbingopb.Bingo) {
  185. bs.GamesLock.Lock()
  186. bs.Games[newGame.Name] = &newGame
  187. bs.GamesLock.Unlock()
  188. log.Println("BingoManager: Game has been created", newGame)
  189. }
  190. // Remove removes an entry from the Game database
  191. func (bs *BingoManager) Remove(name string) error {
  192. if bs.kafkaW != nil {
  193. bs.GamesLock.Lock()
  194. delGame := bs.Games[name]
  195. bs.GamesLock.Unlock()
  196. delGameBin, _ := proto.Marshal(delGame)
  197. if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:delete"), Value: delGameBin}); err != nil {
  198. log.Fatal(err)
  199. } else {
  200. // print common key info
  201. log.Print("Delete is done")
  202. }
  203. } else {
  204. bs.GamesLock.Lock()
  205. delete(bs.Games, name)
  206. bs.GamesLock.Unlock()
  207. }
  208. return nil
  209. }
  210. // Get returns an Instance of a Game
  211. func (bs *BingoManager) Get(searchGame glbingopb.Bingo) glbingopb.Bingo {
  212. bs.GamesLock.Lock()
  213. Game, ok := bs.Games[searchGame.Name]
  214. bs.GamesLock.Unlock()
  215. if !ok {
  216. log.Println("BingoManager: cannot find Game", searchGame.Name)
  217. return glbingopb.Bingo{}
  218. }
  219. return *Game
  220. }
  221. // List returns a list of all Games
  222. func (bs *BingoManager) List() []glbingopb.Bingo {
  223. list := []glbingopb.Bingo{}
  224. bs.GamesLock.Lock()
  225. games := bs.Games
  226. for _, Game := range games {
  227. list = append(list, *Game)
  228. }
  229. bs.GamesLock.Unlock()
  230. return list
  231. }
  232. // Modify adapts the changeset in modifyGame
  233. func (bs *BingoManager) Modify(modGame glbingopb.Bingo) error {
  234. if bs.kafkaW != nil {
  235. modGameBin, _ := proto.Marshal(&modGame)
  236. if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:modify"), Value: modGameBin}); err != nil {
  237. log.Fatal(err)
  238. return err
  239. } else {
  240. log.Print("modify sent")
  241. return nil
  242. }
  243. } else {
  244. return bs._modifyGame(modGame)
  245. }
  246. }
  247. // modifyGame _actually_ applies the diff to the live game
  248. func (bs *BingoManager) _modifyGame(modGame glbingopb.Bingo) error {
  249. bs.GamesLock.Lock()
  250. Game, ok := bs.Games[modGame.Name]
  251. bs.GamesLock.Unlock()
  252. if !ok {
  253. log.Printf("BingoManager: Game \"%v\" not found", modGame.Name)
  254. return errors.New("Game not found")
  255. }
  256. // do nothing if game is won by someone
  257. if Game.Finished {
  258. return errors.New("game is already finished")
  259. }
  260. bs.GamesLock.Lock()
  261. bs.Games[modGame.Name] = &modGame
  262. bs.GamesLock.Unlock()
  263. return nil
  264. }
  265. // HandleEvent handles incoming game events
  266. func (bs *BingoManager) HandleEvent(event glbingopb.Event) error {
  267. bs.GamesLock.Lock()
  268. game, ok := bs.Games[event.Gamename]
  269. if game.Finished {
  270. return errors.New("game finished")
  271. }
  272. if !ok {
  273. return errors.New("game not found")
  274. }
  275. bs.GamesLock.Unlock()
  276. if bs.kafkaW != nil {
  277. eventBin, _ := proto.Marshal(&event)
  278. if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:event"), Value: eventBin}); err != nil {
  279. log.Fatal(err)
  280. return err
  281. } else {
  282. log.Print("event sent")
  283. return nil
  284. }
  285. } else {
  286. return bs._handleEvent(event)
  287. }
  288. }
  289. // HandleEvent _actually_ handles incoming game events
  290. func (bs *BingoManager) _handleEvent(event glbingopb.Event) error {
  291. log.Println("BingoManager: handling event", event)
  292. // if this is not a common game, all other games from the world should also receive the event
  293. neighbors := event.GetNeighborgames()
  294. if len(neighbors) > 0 {
  295. for _, game := range neighbors {
  296. event.Gamename = game
  297. event.Neighborgames = []string{}
  298. log.Println("BingoManager: trigger event for", game, "with", event)
  299. go bs._handleEvent(event)
  300. }
  301. }
  302. if event.Type == "checkField" {
  303. bs.GamesLock.Lock()
  304. game, ok := bs.Games[event.Gamename]
  305. bs.GamesLock.Unlock()
  306. if !ok {
  307. log.Println("BingoManager: cannot find Game to check", event.Gamename)
  308. return errors.New("cannot find Game")
  309. }
  310. checkedgame := bs.checkField(event.Value, *game, event.Username)
  311. bs.GamesLock.Lock()
  312. bs.Games[event.Gamename] = &checkedgame
  313. bs.GamesLock.Unlock()
  314. // send the event out to wss clients
  315. go bs.notify(event)
  316. return nil
  317. }
  318. return errors.New("unknown event type")
  319. }
  320. func (bs *BingoManager) checkField(text string, game glbingopb.Bingo, checker string) glbingopb.Bingo {
  321. matrix := game.GetFields()
  322. for _, row := range matrix.GetRows() {
  323. for _, field := range row.Fields {
  324. if field.Text == text {
  325. field.Checked = true
  326. field.Checker = append(field.Checker, checker)
  327. }
  328. }
  329. }
  330. //game.Fields = matrix
  331. return game
  332. }
  333. func (bs *BingoManager) notify(event glbingopb.Event) {
  334. log.Println("BingoManager: notify", event)
  335. bs.notichan <- event
  336. log.Print("BingoManager: notify sent")
  337. }
  338. // ReceiveMessages handles kafka messages
  339. func (bs *BingoManager) ReceiveMessages(msg kafka.Message) {
  340. log.Println("bs receive: new msg", msg.Offset, string(msg.Key))
  341. switch string(msg.Key) {
  342. // uc - Game create event
  343. case "Game:create":
  344. newGame := &glbingopb.Bingo{}
  345. proto.Unmarshal(msg.Value, newGame)
  346. bs._create(*newGame)
  347. log.Println("bs receive: Game \"", newGame.Name, "\" has been created")
  348. // ur - Game remove event
  349. case "Game:remove":
  350. delGame := &glbingopb.Bingo{}
  351. proto.Unmarshal(msg.Value, delGame)
  352. name := delGame.Name
  353. bs.GamesLock.Lock()
  354. delete(bs.Games, name)
  355. bs.GamesLock.Unlock()
  356. log.Println("bs receive: Game \"", delGame.Name, "\" has been deleted")
  357. case "Game:modify":
  358. modGame := &glbingopb.Bingo{}
  359. proto.Unmarshal(msg.Value, modGame)
  360. bs._modifyGame(*modGame)
  361. case "Game:event":
  362. event := &glbingopb.Event{}
  363. proto.Unmarshal(msg.Value, event)
  364. bs._handleEvent(*event)
  365. default:
  366. log.Println("unknown type ", string(msg.Key))
  367. }
  368. }
  369. // InitKafka initializes the kafka reader and writer and starts the watch thread
  370. func (bs *BingoManager) InitKafka() error {
  371. bs.kafkaR = kafka.NewReader(kafka.ReaderConfig{
  372. Brokers: bs.kafkaBroker,
  373. Topic: "BingoManager",
  374. Partition: 0,
  375. MinBytes: 10e3, // 10KB
  376. MaxBytes: 10e6, // 10MB
  377. })
  378. bs.kafkaW = kafka.NewWriter(kafka.WriterConfig{
  379. Brokers: bs.kafkaBroker,
  380. Topic: "BingoManager",
  381. Balancer: &kafka.LeastBytes{},
  382. })
  383. go bs.pollKafka()
  384. return nil
  385. }
  386. func (bs *BingoManager) pollKafka() error {
  387. for {
  388. m, err := bs.kafkaR.ReadMessage(context.Background())
  389. if err != nil {
  390. log.Print(err)
  391. time.Sleep(time.Second)
  392. continue
  393. }
  394. bs.ReceiveMessages(m)
  395. }
  396. }
  397. func randomHex(n int) (string, error) {
  398. bytes := make([]byte, n)
  399. if _, err := rand.Read(bytes); err != nil {
  400. return "", err
  401. }
  402. return hex.EncodeToString(bytes), nil
  403. }