package BingoManager import ( "context" "crypto/rand" "encoding/hex" "errors" "log" mathrand "math/rand" "strings" "sync" "time" "github.com/golang/protobuf/proto" "github.com/segmentio/kafka-go" glbingopb "git.alfi.li/gamelang/protobuf/golang-bingo" ) // BingoManager manages holds type BingoManager struct { Games map[string]*glbingopb.Bingo GamesLock sync.Mutex kafkaBroker []string kafkaR *kafka.Reader kafkaW *kafka.Writer notichan chan glbingopb.Event } // NewBingoManager returns an initialized BingoManager func NewBingoManager(kafkaBroker []string, notichan chan glbingopb.Event) *BingoManager { // default broker: []string{"localhost:9092"} bs := BingoManager{notichan: notichan} bs.Games = map[string]*glbingopb.Bingo{} if len(kafkaBroker) > 0 { bs.kafkaBroker = kafkaBroker bs.InitKafka() } go bs.CheckWin() return &bs } func (bs *BingoManager) CheckWin() { for { bs.GamesLock.Lock() games := bs.Games for _, game := range games { //log.Println("CheckWin: checking game", game) if strings.Split(game.Name, "\uffff")[1] == "lobby" { continue } win, winner := bs.CheckHorizontal(game) if !win { win, winner = bs.CheckVertical(game) } if win { game.Winner = winner game.Finished = true bs.notify(glbingopb.Event{Type: "Win", Gamename: game.Name, Username: winner}) } } bs.GamesLock.Unlock() time.Sleep(time.Second) } } func (bs *BingoManager) CheckHorizontal(game *glbingopb.Bingo) (bool, string) { log.Println("CheckHorizontal: checking game", game.Name) fields := game.GetFields() for _, row := range fields.GetRows() { //log.Println("CheckHorizontal: checking row", i) hits := uint32(0) winner := "" for _, field := range row.GetFields() { //log.Println("CheckHorizontal: checking field", y) if field.Checked { if winner == "" { winner = field.Checker[0] //log.Println("CheckHorizontal: new winner", winner) hits++ continue } if winner == field.Checker[0] { //log.Println("CheckHorizontal: same winner", winner) hits++ continue } //log.Println("CheckHorizontal: other winner", winner) break } //else { //log.Println("CheckHorizontal: not checked") //} } if hits == game.Numcols { //log.Println("got a winner", winner) return true, winner } } return false, "" } func (bs *BingoManager) CheckVertical(game *glbingopb.Bingo) (bool, string) { log.Println("CheckHorizontal: checking game", game.Name) fields := game.GetFields() hits := uint32(0) winner := "" for col := uint32(0); col < game.GetNumcols(); col++ { for _, row := range fields.GetRows() { field := row.Fields[col] if field.Checked { //log.Println("CheckHorizontal: checking field", y) if field.Checked { if winner == "" { winner = field.Checker[0] //log.Println("CheckHorizontal: new winner", winner) hits++ continue } if winner == field.Checker[0] { //log.Println("CheckHorizontal: same winner", winner) hits++ continue } //log.Println("CheckHorizontal: other winner", winner) break } } } } if hits == game.Numrows { //log.Println("got a winner", winner) return true, winner } return false, "" } // CraftGame crafts a new Game func (bs *BingoManager) CraftGame(id uint32, name string, wordlist []string, numrows, numcols uint32, options ...*glbingopb.Option) glbingopb.Bingo { log.Println("crafting game", name) newGame := glbingopb.Bingo{Id: id, Name: name, Textlist: wordlist, Options: options, Numrows: numrows, Numcols: numcols} log.Println("new game:\n", newGame) matrix := glbingopb.FieldMatrix{Rows: []*glbingopb.FieldMatrix_MatrixRow{}} for row := uint32(0); row < numrows; row += 1 { matrix.Rows = append(matrix.Rows, &glbingopb.FieldMatrix_MatrixRow{}) matrix.Rows[row].Fields = []*glbingopb.Field{} for col := uint32(0); col < numrows; col += 1 { if getOption(options, "unique fields").Value == "true" { var word string word, wordlist = pickRandomEntryAndDelete(wordlist) matrix.Rows[row].Fields = append(matrix.Rows[row].Fields, &glbingopb.Field{Text: word}) } else { word, _ := pickRandomEntry(wordlist) matrix.Rows[row].Fields = append(matrix.Rows[row].Fields, &glbingopb.Field{Text: word}) } } } newGame.Fields = &matrix return newGame } func getOption(options []*glbingopb.Option, search string) *glbingopb.Option { for _, option := range options { if option.Key == search { return option } } return &glbingopb.Option{} } func pickRandomEntry(list []string) (string, int) { index := mathrand.Int() % len(list) return list[index], index } func pickRandomEntryAndDelete(list []string) (string, []string) { entry, index := pickRandomEntry(list) list[index] = list[len(list)-1] return entry, list[:len(list)-1] } // Create adds an entry into the Game database func (bs *BingoManager) Create(newGame glbingopb.Bingo) error { log.Println("BingoManager: create game", newGame.Name) if bs.kafkaW != nil { newGameBin, _ := proto.Marshal(&newGame) if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:create"), Value: newGameBin}); err != nil { log.Println("ERROR: cannot write to kafka\n", err.Error()) return err } return nil } else { log.Println("creating Game:\n", newGame) bs._create(newGame) return nil } } func (bs *BingoManager) _create(newGame glbingopb.Bingo) { bs.GamesLock.Lock() bs.Games[newGame.Name] = &newGame bs.GamesLock.Unlock() log.Println("BingoManager: Game has been created", newGame) } // Remove removes an entry from the Game database func (bs *BingoManager) Remove(name string) error { if bs.kafkaW != nil { bs.GamesLock.Lock() delGame := bs.Games[name] bs.GamesLock.Unlock() delGameBin, _ := proto.Marshal(delGame) if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:delete"), Value: delGameBin}); err != nil { log.Fatal(err) } else { // print common key info log.Print("Delete is done") } } else { bs.GamesLock.Lock() delete(bs.Games, name) bs.GamesLock.Unlock() } return nil } // Get returns an Instance of a Game func (bs *BingoManager) Get(searchGame glbingopb.Bingo) glbingopb.Bingo { bs.GamesLock.Lock() Game, ok := bs.Games[searchGame.Name] bs.GamesLock.Unlock() if !ok { log.Println("BingoManager: cannot find Game", searchGame.Name) return glbingopb.Bingo{} } return *Game } // List returns a list of all Games func (bs *BingoManager) List() []glbingopb.Bingo { list := []glbingopb.Bingo{} bs.GamesLock.Lock() games := bs.Games for _, Game := range games { list = append(list, *Game) } bs.GamesLock.Unlock() return list } // Modify adapts the changeset in modifyGame func (bs *BingoManager) Modify(modGame glbingopb.Bingo) error { if bs.kafkaW != nil { modGameBin, _ := proto.Marshal(&modGame) if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:modify"), Value: modGameBin}); err != nil { log.Fatal(err) return err } else { log.Print("modify sent") return nil } } else { return bs._modifyGame(modGame) } } // modifyGame _actually_ applies the diff to the live game func (bs *BingoManager) _modifyGame(modGame glbingopb.Bingo) error { bs.GamesLock.Lock() Game, ok := bs.Games[modGame.Name] bs.GamesLock.Unlock() if !ok { log.Printf("BingoManager: Game \"%v\" not found", modGame.Name) return errors.New("Game not found") } // do nothing if game is won by someone if Game.Finished { return errors.New("game is already finished") } bs.GamesLock.Lock() bs.Games[modGame.Name] = &modGame bs.GamesLock.Unlock() return nil } // HandleEvent handles incoming game events func (bs *BingoManager) HandleEvent(event glbingopb.Event) error { bs.GamesLock.Lock() game, ok := bs.Games[event.Gamename] if game.Finished { return errors.New("game finished") } if !ok { return errors.New("game not found") } bs.GamesLock.Unlock() if bs.kafkaW != nil { eventBin, _ := proto.Marshal(&event) if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:event"), Value: eventBin}); err != nil { log.Fatal(err) return err } else { log.Print("event sent") return nil } } else { return bs._handleEvent(event) } } // HandleEvent _actually_ handles incoming game events func (bs *BingoManager) _handleEvent(event glbingopb.Event) error { log.Println("BingoManager: handling event", event) // if this is not a common game, all other games from the world should also receive the event neighbors := event.GetNeighborgames() if len(neighbors) > 0 { for _, game := range neighbors { event.Gamename = game event.Neighborgames = []string{} log.Println("BingoManager: trigger event for", game, "with", event) go bs._handleEvent(event) } } if event.Type == "checkField" { bs.GamesLock.Lock() game, ok := bs.Games[event.Gamename] bs.GamesLock.Unlock() if !ok { log.Println("BingoManager: cannot find Game to check", event.Gamename) return errors.New("cannot find Game") } checkedgame := bs.checkField(event.Value, *game, event.Username) bs.GamesLock.Lock() bs.Games[event.Gamename] = &checkedgame bs.GamesLock.Unlock() // send the event out to wss clients go bs.notify(event) return nil } return errors.New("unknown event type") } func (bs *BingoManager) checkField(text string, game glbingopb.Bingo, checker string) glbingopb.Bingo { matrix := game.GetFields() for _, row := range matrix.GetRows() { for _, field := range row.Fields { if field.Text == text { field.Checked = true field.Checker = append(field.Checker, checker) } } } //game.Fields = matrix return game } func (bs *BingoManager) notify(event glbingopb.Event) { log.Println("BingoManager: notify", event) bs.notichan <- event log.Print("BingoManager: notify sent") } // ReceiveMessages handles kafka messages func (bs *BingoManager) ReceiveMessages(msg kafka.Message) { log.Println("bs receive: new msg", msg.Offset, string(msg.Key)) switch string(msg.Key) { // uc - Game create event case "Game:create": newGame := &glbingopb.Bingo{} proto.Unmarshal(msg.Value, newGame) bs._create(*newGame) log.Println("bs receive: Game \"", newGame.Name, "\" has been created") // ur - Game remove event case "Game:remove": delGame := &glbingopb.Bingo{} proto.Unmarshal(msg.Value, delGame) name := delGame.Name bs.GamesLock.Lock() delete(bs.Games, name) bs.GamesLock.Unlock() log.Println("bs receive: Game \"", delGame.Name, "\" has been deleted") case "Game:modify": modGame := &glbingopb.Bingo{} proto.Unmarshal(msg.Value, modGame) bs._modifyGame(*modGame) case "Game:event": event := &glbingopb.Event{} proto.Unmarshal(msg.Value, event) bs._handleEvent(*event) default: log.Println("unknown type ", string(msg.Key)) } } // InitKafka initializes the kafka reader and writer and starts the watch thread func (bs *BingoManager) InitKafka() error { bs.kafkaR = kafka.NewReader(kafka.ReaderConfig{ Brokers: bs.kafkaBroker, Topic: "BingoManager", Partition: 0, MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) bs.kafkaW = kafka.NewWriter(kafka.WriterConfig{ Brokers: bs.kafkaBroker, Topic: "BingoManager", Balancer: &kafka.LeastBytes{}, }) go bs.pollKafka() return nil } func (bs *BingoManager) pollKafka() error { for { m, err := bs.kafkaR.ReadMessage(context.Background()) if err != nil { log.Print(err) time.Sleep(time.Second) continue } bs.ReceiveMessages(m) } } func randomHex(n int) (string, error) { bytes := make([]byte, n) if _, err := rand.Read(bytes); err != nil { return "", err } return hex.EncodeToString(bytes), nil }