package JeopardyManager import ( "context" "crypto/rand" "encoding/hex" "errors" "log" mathrand "math/rand" "strings" "sync" "time" "github.com/golang/protobuf/proto" "github.com/segmentio/kafka-go" gljeopardypb "git.alfi.li/gamelang/protobuf/gamelang-jeopardy" ) // JeopardyManager manages holds type JeopardyManager struct { Games map[string]*gljeopardypb.Jeopardy GamesLock sync.Mutex kafkaBroker []string kafkaR *kafka.Reader kafkaW *kafka.Writer notichan chan gljeopardypb.Event } // NewJeopardyManager returns an initialized JeopardyManager func NewJeopardyManager(kafkaBroker []string, notichan chan gljeopardypb.Event) *JeopardyManager { // default broker: []string{"localhost:9092"} bs := JeopardyManager{notichan: notichan} bs.Games = map[string]*gljeopardypb.Jeopardy{} if len(kafkaBroker) > 0 { bs.kafkaBroker = kafkaBroker bs.InitKafka() } //go bs.CheckWin() return &bs } func (bs *JeopardyManager) CheckWin() { for { bs.GamesLock.Lock() games := bs.Games for _, game := range games { //log.Println("CheckWin: checking game", game) if strings.Split(game.Name, "\uffff")[1] == "lobby" { continue } win, winner := bs.CheckHorizontal(game) if !win { win, winner = bs.CheckVertical(game) } if win { game.Winner = winner game.Finished = true bs.notify(gljeopardypb.Event{Type: "Win", Gamename: game.Name, Username: winner}) } } bs.GamesLock.Unlock() time.Sleep(time.Second) } } func (bs *JeopardyManager) CheckHorizontal(game *gljeopardypb.Jeopardy) (bool, string) { //log.Println("CheckHorizontal: checking game", game.Name) fields := game.GetFields() for _, row := range fields.GetRows() { //log.Println("CheckHorizontal: checking row", i) hits := uint32(0) winner := "" for _, field := range row.GetFields() { //log.Println("CheckHorizontal: checking field", y) if field.Checked { if winner == "" { winner = field.Checker[0] //log.Println("CheckHorizontal: new winner", winner) hits++ continue } if winner == field.Checker[0] { //log.Println("CheckHorizontal: same winner", winner) hits++ continue } //log.Println("CheckHorizontal: other winner", winner) break } //else { //log.Println("CheckHorizontal: not checked") //} } if hits == game.Numcols { //log.Println("got a winner", winner) return true, winner } } return false, "" } func (bs *JeopardyManager) CheckVertical(game *gljeopardypb.Jeopardy) (bool, string) { log.Println("CheckHorizontal: checking game", game.Name) fields := game.GetFields() hits := uint32(0) winner := "" for col := uint32(0); col < game.GetNumcols(); col++ { for _, row := range fields.GetRows() { field := row.Fields[col] if field.Checked { //log.Println("CheckHorizontal: checking field", y) if field.Checked { if winner == "" { winner = field.Checker[0] //log.Println("CheckHorizontal: new winner", winner) hits++ continue } if winner == field.Checker[0] { //log.Println("CheckHorizontal: same winner", winner) hits++ continue } //log.Println("CheckHorizontal: other winner", winner) break } } } } if hits == game.Numrows { //log.Println("got a winner", winner) return true, winner } return false, "" } // CraftGame crafts a new Game func (bs *JeopardyManager) CraftGame(id uint32, name string, options ...*gljeopardypb.Option) *gljeopardypb.Jeopardy { log.Println("crafting game", name) newGame := gljeopardypb.Jeopardy{Id: id, Name: name, Options: options, categories: []gljeopardypb.Category{}, players: []gljeopardypb.Player{}} log.Println("new game:\n", newGame) return &newGame } func getOption(options []*gljeopardypb.Option, search string) *gljeopardypb.Option { for _, option := range options { if option.Key == search { return option } } return &gljeopardypb.Option{} } func pickRandomEntry(list []string) (string, int) { index := mathrand.Int() % len(list) return list[index], index } func pickRandomEntryAndDelete(list []string) (string, []string) { entry, index := pickRandomEntry(list) list[index] = list[len(list)-1] return entry, list[:len(list)-1] } // Create adds an entry into the Game database func (bs *JeopardyManager) Create(newGame gljeopardypb.Jeopardy) error { log.Println("JeopardyManager: create game", newGame.Name) if bs.kafkaW != nil { newGameBin, _ := proto.Marshal(&newGame) if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:create"), Value: newGameBin}); err != nil { log.Println("ERROR: cannot write to kafka\n", err.Error()) return err } return nil } else { log.Println("creating Game:\n", newGame) bs._create(newGame) return nil } } func (bs *JeopardyManager) _create(newGame gljeopardypb.Jeopardy) { bs.GamesLock.Lock() bs.Games[newGame.Name] = &newGame bs.GamesLock.Unlock() log.Println("JeopardyManager: Game has been created", newGame) } // Remove removes an entry from the Game database func (bs *JeopardyManager) Remove(name string) error { if bs.kafkaW != nil { bs.GamesLock.Lock() delGame := bs.Games[name] bs.GamesLock.Unlock() delGameBin, _ := proto.Marshal(delGame) if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:delete"), Value: delGameBin}); err != nil { log.Fatal(err) } else { // print common key info log.Print("Delete is done") } } else { bs.GamesLock.Lock() delete(bs.Games, name) bs.GamesLock.Unlock() } return nil } // Get returns an Instance of a Game func (bs *JeopardyManager) Get(searchGame gljeopardypb.Jeopardy) gljeopardypb.Jeopardy { bs.GamesLock.Lock() Game, ok := bs.Games[searchGame.Name] bs.GamesLock.Unlock() if !ok { log.Println("JeopardyManager: cannot find Game", searchGame.Name) return gljeopardypb.Jeopardy{} } return *Game } // List returns a list of all Games func (bs *JeopardyManager) List() []gljeopardypb.Jeopardy { list := []gljeopardypb.Jeopardy{} bs.GamesLock.Lock() games := bs.Games for _, Game := range games { list = append(list, *Game) } bs.GamesLock.Unlock() return list } // Modify adapts the changeset in modifyGame func (bs *JeopardyManager) Modify(modGame gljeopardypb.Jeopardy) error { if bs.kafkaW != nil { modGameBin, _ := proto.Marshal(&modGame) if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:modify"), Value: modGameBin}); err != nil { log.Fatal(err) return err } else { log.Print("modify sent") return nil } } else { return bs._modifyGame(modGame) } } // modifyGame _actually_ applies the diff to the live game func (bs *JeopardyManager) _modifyGame(modGame gljeopardypb.Jeopardy) error { bs.GamesLock.Lock() Game, ok := bs.Games[modGame.Name] bs.GamesLock.Unlock() if !ok { log.Printf("JeopardyManager: Game \"%v\" not found", modGame.Name) return errors.New("Game not found") } // do nothing if game is won by someone if Game.Finished { return errors.New("game is already finished") } bs.GamesLock.Lock() bs.Games[modGame.Name] = &modGame bs.GamesLock.Unlock() return nil } // HandleEvent handles incoming game events func (bs *JeopardyManager) HandleEvent(event gljeopardypb.Event) error { bs.GamesLock.Lock() game, ok := bs.Games[event.Gamename] if game.Finished { return errors.New("game finished") } if !ok { return errors.New("game not found") } bs.GamesLock.Unlock() if bs.kafkaW != nil { eventBin, _ := proto.Marshal(&event) if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:event"), Value: eventBin}); err != nil { log.Fatal(err) return err } else { log.Print("event sent") return nil } } else { return bs._handleEvent(event) } } // HandleEvent _actually_ handles incoming game events func (bs *JeopardyManager) _handleEvent(event gljeopardypb.Event) error { log.Println("JeopardyManager: handling event", event) // if this is not a common game, all other games from the world should also receive the event neighbors := event.GetNeighborgames() if len(neighbors) > 0 { for _, game := range neighbors { event.Gamename = game event.Neighborgames = []string{} log.Println("JeopardyManager: trigger event for", game, "with", event) go bs._handleEvent(event) } } if event.Type == "checkField" { bs.GamesLock.Lock() game, ok := bs.Games[event.Gamename] bs.GamesLock.Unlock() if !ok { log.Println("JeopardyManager: cannot find Game to check", event.Gamename) return errors.New("cannot find Game") } checkedgame := bs.checkField(event.Value, *game, event.Username) bs.GamesLock.Lock() bs.Games[event.Gamename] = &checkedgame bs.GamesLock.Unlock() // send the event out to wss clients go bs.notify(event) return nil } return errors.New("unknown event type") } func (bs *JeopardyManager) checkField(text string, game gljeopardypb.Jeopardy, checker string) gljeopardypb.Jeopardy { matrix := game.GetFields() for _, row := range matrix.GetRows() { for _, field := range row.Fields { if field.Text == text { field.Checked = true field.Checker = append(field.Checker, checker) } } } //game.Fields = matrix return game } func (bs *JeopardyManager) notify(event gljeopardypb.Event) { log.Println("JeopardyManager: notify", event) bs.notichan <- event log.Print("JeopardyManager: notify sent") } // ReceiveMessages handles kafka messages func (bs *JeopardyManager) ReceiveMessages(msg kafka.Message) { log.Println("bs receive: new msg", msg.Offset, string(msg.Key)) switch string(msg.Key) { // uc - Game create event case "Game:create": newGame := &gljeopardypb.Jeopardy{} proto.Unmarshal(msg.Value, newGame) bs._create(*newGame) log.Println("bs receive: Game \"", newGame.Name, "\" has been created") // ur - Game remove event case "Game:remove": delGame := &gljeopardypb.Jeopardy{} proto.Unmarshal(msg.Value, delGame) name := delGame.Name bs.GamesLock.Lock() delete(bs.Games, name) bs.GamesLock.Unlock() log.Println("bs receive: Game \"", delGame.Name, "\" has been deleted") case "Game:modify": modGame := &gljeopardypb.Jeopardy{} proto.Unmarshal(msg.Value, modGame) bs._modifyGame(*modGame) case "Game:event": event := &gljeopardypb.Event{} proto.Unmarshal(msg.Value, event) bs._handleEvent(*event) default: log.Println("unknown type ", string(msg.Key)) } } // InitKafka initializes the kafka reader and writer and starts the watch thread func (bs *JeopardyManager) InitKafka() error { bs.kafkaR = kafka.NewReader(kafka.ReaderConfig{ Brokers: bs.kafkaBroker, Topic: "JeopardyManager", Partition: 0, MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) bs.kafkaW = kafka.NewWriter(kafka.WriterConfig{ Brokers: bs.kafkaBroker, Topic: "JeopardyManager", Balancer: &kafka.LeastBytes{}, }) go bs.pollKafka() return nil } func (bs *JeopardyManager) pollKafka() error { for { m, err := bs.kafkaR.ReadMessage(context.Background()) if err != nil { log.Print(err) time.Sleep(time.Second) continue } bs.ReceiveMessages(m) } } func randomHex(n int) (string, error) { bytes := make([]byte, n) if _, err := rand.Read(bytes); err != nil { return "", err } return hex.EncodeToString(bytes), nil }