jeopardymanager.go 11 KB


  1. package JeopardyManager
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "encoding/hex"
  6. "errors"
  7. "log"
  8. mathrand "math/rand"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/golang/protobuf/proto"
  13. "github.com/segmentio/kafka-go"
  14. gljeopardypb "git.alfi.li/gamelang/protobuf/gamelang-jeopardy"
  15. )
  16. // JeopardyManager manages holds
  17. type JeopardyManager struct {
  18. Games map[string]*gljeopardypb.Jeopardy
  19. GamesLock sync.Mutex
  20. kafkaBroker []string
  21. kafkaR *kafka.Reader
  22. kafkaW *kafka.Writer
  23. notichan chan gljeopardypb.Event
  24. }
  25. // NewJeopardyManager returns an initialized JeopardyManager
  26. func NewJeopardyManager(kafkaBroker []string, notichan chan gljeopardypb.Event) *JeopardyManager {
  27. // default broker: []string{"localhost:9092"}
  28. bs := JeopardyManager{notichan: notichan}
  29. bs.Games = map[string]*gljeopardypb.Jeopardy{}
  30. if len(kafkaBroker) > 0 {
  31. bs.kafkaBroker = kafkaBroker
  32. bs.InitKafka()
  33. }
  34. //go bs.CheckWin()
  35. return &bs
  36. }
  37. func (bs *JeopardyManager) CheckWin() {
  38. for {
  39. bs.GamesLock.Lock()
  40. games := bs.Games
  41. for _, game := range games {
  42. //log.Println("CheckWin: checking game", game)
  43. if strings.Split(game.Name, "\uffff")[1] == "lobby" {
  44. continue
  45. }
  46. win, winner := bs.CheckHorizontal(game)
  47. if !win {
  48. win, winner = bs.CheckVertical(game)
  49. }
  50. if win {
  51. game.Winner = winner
  52. game.Finished = true
  53. bs.notify(gljeopardypb.Event{Type: "Win", Gamename: game.Name, Username: winner})
  54. }
  55. }
  56. bs.GamesLock.Unlock()
  57. time.Sleep(time.Second)
  58. }
  59. }
  60. func (bs *JeopardyManager) CheckHorizontal(game *gljeopardypb.Jeopardy) (bool, string) {
  61. //log.Println("CheckHorizontal: checking game", game.Name)
  62. fields := game.GetFields()
  63. for _, row := range fields.GetRows() {
  64. //log.Println("CheckHorizontal: checking row", i)
  65. hits := uint32(0)
  66. winner := ""
  67. for _, field := range row.GetFields() {
  68. //log.Println("CheckHorizontal: checking field", y)
  69. if field.Checked {
  70. if winner == "" {
  71. winner = field.Checker[0]
  72. //log.Println("CheckHorizontal: new winner", winner)
  73. hits++
  74. continue
  75. }
  76. if winner == field.Checker[0] {
  77. //log.Println("CheckHorizontal: same winner", winner)
  78. hits++
  79. continue
  80. }
  81. //log.Println("CheckHorizontal: other winner", winner)
  82. break
  83. } //else {
  84. //log.Println("CheckHorizontal: not checked")
  85. //}
  86. }
  87. if hits == game.Numcols {
  88. //log.Println("got a winner", winner)
  89. return true, winner
  90. }
  91. }
  92. return false, ""
  93. }
  94. func (bs *JeopardyManager) CheckVertical(game *gljeopardypb.Jeopardy) (bool, string) {
  95. log.Println("CheckHorizontal: checking game", game.Name)
  96. fields := game.GetFields()
  97. hits := uint32(0)
  98. winner := ""
  99. for col := uint32(0); col < game.GetNumcols(); col++ {
  100. for _, row := range fields.GetRows() {
  101. field := row.Fields[col]
  102. if field.Checked {
  103. //log.Println("CheckHorizontal: checking field", y)
  104. if field.Checked {
  105. if winner == "" {
  106. winner = field.Checker[0]
  107. //log.Println("CheckHorizontal: new winner", winner)
  108. hits++
  109. continue
  110. }
  111. if winner == field.Checker[0] {
  112. //log.Println("CheckHorizontal: same winner", winner)
  113. hits++
  114. continue
  115. }
  116. //log.Println("CheckHorizontal: other winner", winner)
  117. break
  118. }
  119. }
  120. }
  121. }
  122. if hits == game.Numrows {
  123. //log.Println("got a winner", winner)
  124. return true, winner
  125. }
  126. return false, ""
  127. }
  128. // CraftGame crafts a new Game
  129. func (bs *JeopardyManager) CraftGame(id uint32, name string, options ...*gljeopardypb.Option) *gljeopardypb.Jeopardy {
  130. log.Println("crafting game", name)
  131. newGame := gljeopardypb.Jeopardy{Id: id, Name: name, Options: options, categories: []gljeopardypb.Category{}, players: []gljeopardypb.Player{}}
  132. log.Println("new game:\n", newGame)
  133. return &newGame
  134. }
  135. func getOption(options []*gljeopardypb.Option, search string) *gljeopardypb.Option {
  136. for _, option := range options {
  137. if option.Key == search {
  138. return option
  139. }
  140. }
  141. return &gljeopardypb.Option{}
  142. }
  143. func pickRandomEntry(list []string) (string, int) {
  144. index := mathrand.Int() % len(list)
  145. return list[index], index
  146. }
  147. func pickRandomEntryAndDelete(list []string) (string, []string) {
  148. entry, index := pickRandomEntry(list)
  149. list[index] = list[len(list)-1]
  150. return entry, list[:len(list)-1]
  151. }
  152. // Create adds an entry into the Game database
  153. func (bs *JeopardyManager) Create(newGame gljeopardypb.Jeopardy) error {
  154. log.Println("JeopardyManager: create game", newGame.Name)
  155. if bs.kafkaW != nil {
  156. newGameBin, _ := proto.Marshal(&newGame)
  157. if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:create"), Value: newGameBin}); err != nil {
  158. log.Println("ERROR: cannot write to kafka\n", err.Error())
  159. return err
  160. }
  161. return nil
  162. } else {
  163. log.Println("creating Game:\n", newGame)
  164. bs._create(newGame)
  165. return nil
  166. }
  167. }
  168. func (bs *JeopardyManager) _create(newGame gljeopardypb.Jeopardy) {
  169. bs.GamesLock.Lock()
  170. bs.Games[newGame.Name] = &newGame
  171. bs.GamesLock.Unlock()
  172. log.Println("JeopardyManager: Game has been created", newGame)
  173. }
  174. // Remove removes an entry from the Game database
  175. func (bs *JeopardyManager) Remove(name string) error {
  176. if bs.kafkaW != nil {
  177. bs.GamesLock.Lock()
  178. delGame := bs.Games[name]
  179. bs.GamesLock.Unlock()
  180. delGameBin, _ := proto.Marshal(delGame)
  181. if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:delete"), Value: delGameBin}); err != nil {
  182. log.Fatal(err)
  183. } else {
  184. // print common key info
  185. log.Print("Delete is done")
  186. }
  187. } else {
  188. bs.GamesLock.Lock()
  189. delete(bs.Games, name)
  190. bs.GamesLock.Unlock()
  191. }
  192. return nil
  193. }
  194. // Get returns an Instance of a Game
  195. func (bs *JeopardyManager) Get(searchGame gljeopardypb.Jeopardy) gljeopardypb.Jeopardy {
  196. bs.GamesLock.Lock()
  197. Game, ok := bs.Games[searchGame.Name]
  198. bs.GamesLock.Unlock()
  199. if !ok {
  200. log.Println("JeopardyManager: cannot find Game", searchGame.Name)
  201. return gljeopardypb.Jeopardy{}
  202. }
  203. return *Game
  204. }
  205. // List returns a list of all Games
  206. func (bs *JeopardyManager) List() []gljeopardypb.Jeopardy {
  207. list := []gljeopardypb.Jeopardy{}
  208. bs.GamesLock.Lock()
  209. games := bs.Games
  210. for _, Game := range games {
  211. list = append(list, *Game)
  212. }
  213. bs.GamesLock.Unlock()
  214. return list
  215. }
  216. // Modify adapts the changeset in modifyGame
  217. func (bs *JeopardyManager) Modify(modGame gljeopardypb.Jeopardy) error {
  218. if bs.kafkaW != nil {
  219. modGameBin, _ := proto.Marshal(&modGame)
  220. if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:modify"), Value: modGameBin}); err != nil {
  221. log.Fatal(err)
  222. return err
  223. } else {
  224. log.Print("modify sent")
  225. return nil
  226. }
  227. } else {
  228. return bs._modifyGame(modGame)
  229. }
  230. }
  231. // modifyGame _actually_ applies the diff to the live game
  232. func (bs *JeopardyManager) _modifyGame(modGame gljeopardypb.Jeopardy) error {
  233. bs.GamesLock.Lock()
  234. Game, ok := bs.Games[modGame.Name]
  235. bs.GamesLock.Unlock()
  236. if !ok {
  237. log.Printf("JeopardyManager: Game \"%v\" not found", modGame.Name)
  238. return errors.New("Game not found")
  239. }
  240. // do nothing if game is won by someone
  241. if Game.Finished {
  242. return errors.New("game is already finished")
  243. }
  244. bs.GamesLock.Lock()
  245. bs.Games[modGame.Name] = &modGame
  246. bs.GamesLock.Unlock()
  247. return nil
  248. }
  249. // HandleEvent handles incoming game events
  250. func (bs *JeopardyManager) HandleEvent(event gljeopardypb.Event) error {
  251. bs.GamesLock.Lock()
  252. game, ok := bs.Games[event.Gamename]
  253. if game.Finished {
  254. return errors.New("game finished")
  255. }
  256. if !ok {
  257. return errors.New("game not found")
  258. }
  259. bs.GamesLock.Unlock()
  260. if bs.kafkaW != nil {
  261. eventBin, _ := proto.Marshal(&event)
  262. if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:event"), Value: eventBin}); err != nil {
  263. log.Fatal(err)
  264. return err
  265. } else {
  266. log.Print("event sent")
  267. return nil
  268. }
  269. } else {
  270. return bs._handleEvent(event)
  271. }
  272. }
  273. // HandleEvent _actually_ handles incoming game events
  274. func (bs *JeopardyManager) _handleEvent(event gljeopardypb.Event) error {
  275. log.Println("JeopardyManager: handling event", event)
  276. // if this is not a common game, all other games from the world should also receive the event
  277. neighbors := event.GetNeighborgames()
  278. if len(neighbors) > 0 {
  279. for _, game := range neighbors {
  280. event.Gamename = game
  281. event.Neighborgames = []string{}
  282. log.Println("JeopardyManager: trigger event for", game, "with", event)
  283. go bs._handleEvent(event)
  284. }
  285. }
  286. if event.Type == "checkField" {
  287. bs.GamesLock.Lock()
  288. game, ok := bs.Games[event.Gamename]
  289. bs.GamesLock.Unlock()
  290. if !ok {
  291. log.Println("JeopardyManager: cannot find Game to check", event.Gamename)
  292. return errors.New("cannot find Game")
  293. }
  294. checkedgame := bs.checkField(event.Value, *game, event.Username)
  295. bs.GamesLock.Lock()
  296. bs.Games[event.Gamename] = &checkedgame
  297. bs.GamesLock.Unlock()
  298. // send the event out to wss clients
  299. go bs.notify(event)
  300. return nil
  301. }
  302. return errors.New("unknown event type")
  303. }
  304. func (bs *JeopardyManager) checkField(text string, game gljeopardypb.Jeopardy, checker string) gljeopardypb.Jeopardy {
  305. matrix := game.GetFields()
  306. for _, row := range matrix.GetRows() {
  307. for _, field := range row.Fields {
  308. if field.Text == text {
  309. field.Checked = true
  310. field.Checker = append(field.Checker, checker)
  311. }
  312. }
  313. }
  314. //game.Fields = matrix
  315. return game
  316. }
  317. func (bs *JeopardyManager) notify(event gljeopardypb.Event) {
  318. log.Println("JeopardyManager: notify", event)
  319. bs.notichan <- event
  320. log.Print("JeopardyManager: notify sent")
  321. }
  322. // ReceiveMessages handles kafka messages
  323. func (bs *JeopardyManager) ReceiveMessages(msg kafka.Message) {
  324. log.Println("bs receive: new msg", msg.Offset, string(msg.Key))
  325. switch string(msg.Key) {
  326. // uc - Game create event
  327. case "Game:create":
  328. newGame := &gljeopardypb.Jeopardy{}
  329. proto.Unmarshal(msg.Value, newGame)
  330. bs._create(*newGame)
  331. log.Println("bs receive: Game \"", newGame.Name, "\" has been created")
  332. // ur - Game remove event
  333. case "Game:remove":
  334. delGame := &gljeopardypb.Jeopardy{}
  335. proto.Unmarshal(msg.Value, delGame)
  336. name := delGame.Name
  337. bs.GamesLock.Lock()
  338. delete(bs.Games, name)
  339. bs.GamesLock.Unlock()
  340. log.Println("bs receive: Game \"", delGame.Name, "\" has been deleted")
  341. case "Game:modify":
  342. modGame := &gljeopardypb.Jeopardy{}
  343. proto.Unmarshal(msg.Value, modGame)
  344. bs._modifyGame(*modGame)
  345. case "Game:event":
  346. event := &gljeopardypb.Event{}
  347. proto.Unmarshal(msg.Value, event)
  348. bs._handleEvent(*event)
  349. default:
  350. log.Println("unknown type ", string(msg.Key))
  351. }
  352. }
  353. // InitKafka initializes the kafka reader and writer and starts the watch thread
  354. func (bs *JeopardyManager) InitKafka() error {
  355. bs.kafkaR = kafka.NewReader(kafka.ReaderConfig{
  356. Brokers: bs.kafkaBroker,
  357. Topic: "JeopardyManager",
  358. Partition: 0,
  359. MinBytes: 10e3, // 10KB
  360. MaxBytes: 10e6, // 10MB
  361. })
  362. bs.kafkaW = kafka.NewWriter(kafka.WriterConfig{
  363. Brokers: bs.kafkaBroker,
  364. Topic: "JeopardyManager",
  365. Balancer: &kafka.LeastBytes{},
  366. })
  367. go bs.pollKafka()
  368. return nil
  369. }
  370. func (bs *JeopardyManager) pollKafka() error {
  371. for {
  372. m, err := bs.kafkaR.ReadMessage(context.Background())
  373. if err != nil {
  374. log.Print(err)
  375. time.Sleep(time.Second)
  376. continue
  377. }
  378. bs.ReceiveMessages(m)
  379. }
  380. }
  381. func randomHex(n int) (string, error) {
  382. bytes := make([]byte, n)
  383. if _, err := rand.Read(bytes); err != nil {
  384. return "", err
  385. }
  386. return hex.EncodeToString(bytes), nil
  387. }