123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- package JeopardyManager
- import (
- "context"
- "crypto/rand"
- "encoding/hex"
- "errors"
- "log"
- mathrand "math/rand"
- "strings"
- "sync"
- "time"
- "github.com/golang/protobuf/proto"
- "github.com/segmentio/kafka-go"
- gljeopardypb "git.alfi.li/gamelang/protobuf/gamelang-jeopardy"
- )
- // JeopardyManager manages holds
- type JeopardyManager struct {
- Games map[string]*gljeopardypb.Jeopardy
- GamesLock sync.Mutex
- kafkaBroker []string
- kafkaR *kafka.Reader
- kafkaW *kafka.Writer
- notichan chan gljeopardypb.Event
- }
- // NewJeopardyManager returns an initialized JeopardyManager
- func NewJeopardyManager(kafkaBroker []string, notichan chan gljeopardypb.Event) *JeopardyManager {
- // default broker: []string{"localhost:9092"}
- bs := JeopardyManager{notichan: notichan}
- bs.Games = map[string]*gljeopardypb.Jeopardy{}
- if len(kafkaBroker) > 0 {
- bs.kafkaBroker = kafkaBroker
- bs.InitKafka()
- }
- //go bs.CheckWin()
- return &bs
- }
- func (bs *JeopardyManager) CheckWin() {
- for {
- bs.GamesLock.Lock()
- games := bs.Games
- for _, game := range games {
- //log.Println("CheckWin: checking game", game)
- if strings.Split(game.Name, "\uffff")[1] == "lobby" {
- continue
- }
- win, winner := bs.CheckHorizontal(game)
- if !win {
- win, winner = bs.CheckVertical(game)
- }
- if win {
- game.Winner = winner
- game.Finished = true
- bs.notify(gljeopardypb.Event{Type: "Win", Gamename: game.Name, Username: winner})
- }
- }
- bs.GamesLock.Unlock()
- time.Sleep(time.Second)
- }
- }
- func (bs *JeopardyManager) CheckHorizontal(game *gljeopardypb.Jeopardy) (bool, string) {
- //log.Println("CheckHorizontal: checking game", game.Name)
- fields := game.GetFields()
- for _, row := range fields.GetRows() {
- //log.Println("CheckHorizontal: checking row", i)
- hits := uint32(0)
- winner := ""
- for _, field := range row.GetFields() {
- //log.Println("CheckHorizontal: checking field", y)
- if field.Checked {
- if winner == "" {
- winner = field.Checker[0]
- //log.Println("CheckHorizontal: new winner", winner)
- hits++
- continue
- }
- if winner == field.Checker[0] {
- //log.Println("CheckHorizontal: same winner", winner)
- hits++
- continue
- }
- //log.Println("CheckHorizontal: other winner", winner)
- break
- } //else {
- //log.Println("CheckHorizontal: not checked")
- //}
- }
- if hits == game.Numcols {
- //log.Println("got a winner", winner)
- return true, winner
- }
- }
- return false, ""
- }
- func (bs *JeopardyManager) CheckVertical(game *gljeopardypb.Jeopardy) (bool, string) {
- log.Println("CheckHorizontal: checking game", game.Name)
- fields := game.GetFields()
- hits := uint32(0)
- winner := ""
- for col := uint32(0); col < game.GetNumcols(); col++ {
- for _, row := range fields.GetRows() {
- field := row.Fields[col]
- if field.Checked {
- //log.Println("CheckHorizontal: checking field", y)
- if field.Checked {
- if winner == "" {
- winner = field.Checker[0]
- //log.Println("CheckHorizontal: new winner", winner)
- hits++
- continue
- }
- if winner == field.Checker[0] {
- //log.Println("CheckHorizontal: same winner", winner)
- hits++
- continue
- }
- //log.Println("CheckHorizontal: other winner", winner)
- break
- }
- }
- }
- }
- if hits == game.Numrows {
- //log.Println("got a winner", winner)
- return true, winner
- }
- return false, ""
- }
- // CraftGame crafts a new Game
- func (bs *JeopardyManager) CraftGame(id uint32, name string, options ...*gljeopardypb.Option) *gljeopardypb.Jeopardy {
- log.Println("crafting game", name)
- newGame := gljeopardypb.Jeopardy{Id: id, Name: name, Options: options, categories: []gljeopardypb.Category{}, players: []gljeopardypb.Player{}}
- log.Println("new game:\n", newGame)
- return &newGame
- }
- func getOption(options []*gljeopardypb.Option, search string) *gljeopardypb.Option {
- for _, option := range options {
- if option.Key == search {
- return option
- }
- }
- return &gljeopardypb.Option{}
- }
- func pickRandomEntry(list []string) (string, int) {
- index := mathrand.Int() % len(list)
- return list[index], index
- }
- func pickRandomEntryAndDelete(list []string) (string, []string) {
- entry, index := pickRandomEntry(list)
- list[index] = list[len(list)-1]
- return entry, list[:len(list)-1]
- }
- // Create adds an entry into the Game database
- func (bs *JeopardyManager) Create(newGame gljeopardypb.Jeopardy) error {
- log.Println("JeopardyManager: create game", newGame.Name)
- if bs.kafkaW != nil {
- newGameBin, _ := proto.Marshal(&newGame)
- if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:create"), Value: newGameBin}); err != nil {
- log.Println("ERROR: cannot write to kafka\n", err.Error())
- return err
- }
- return nil
- } else {
- log.Println("creating Game:\n", newGame)
- bs._create(newGame)
- return nil
- }
- }
- func (bs *JeopardyManager) _create(newGame gljeopardypb.Jeopardy) {
- bs.GamesLock.Lock()
- bs.Games[newGame.Name] = &newGame
- bs.GamesLock.Unlock()
- log.Println("JeopardyManager: Game has been created", newGame)
- }
- // Remove removes an entry from the Game database
- func (bs *JeopardyManager) Remove(name string) error {
- if bs.kafkaW != nil {
- bs.GamesLock.Lock()
- delGame := bs.Games[name]
- bs.GamesLock.Unlock()
- delGameBin, _ := proto.Marshal(delGame)
- if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:delete"), Value: delGameBin}); err != nil {
- log.Fatal(err)
- } else {
- // print common key info
- log.Print("Delete is done")
- }
- } else {
- bs.GamesLock.Lock()
- delete(bs.Games, name)
- bs.GamesLock.Unlock()
- }
- return nil
- }
- // Get returns an Instance of a Game
- func (bs *JeopardyManager) Get(searchGame gljeopardypb.Jeopardy) gljeopardypb.Jeopardy {
- bs.GamesLock.Lock()
- Game, ok := bs.Games[searchGame.Name]
- bs.GamesLock.Unlock()
- if !ok {
- log.Println("JeopardyManager: cannot find Game", searchGame.Name)
- return gljeopardypb.Jeopardy{}
- }
- return *Game
- }
- // List returns a list of all Games
- func (bs *JeopardyManager) List() []gljeopardypb.Jeopardy {
- list := []gljeopardypb.Jeopardy{}
- bs.GamesLock.Lock()
- games := bs.Games
- for _, Game := range games {
- list = append(list, *Game)
- }
- bs.GamesLock.Unlock()
- return list
- }
- // Modify adapts the changeset in modifyGame
- func (bs *JeopardyManager) Modify(modGame gljeopardypb.Jeopardy) error {
- if bs.kafkaW != nil {
- modGameBin, _ := proto.Marshal(&modGame)
- if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:modify"), Value: modGameBin}); err != nil {
- log.Fatal(err)
- return err
- } else {
- log.Print("modify sent")
- return nil
- }
- } else {
- return bs._modifyGame(modGame)
- }
- }
- // modifyGame _actually_ applies the diff to the live game
- func (bs *JeopardyManager) _modifyGame(modGame gljeopardypb.Jeopardy) error {
- bs.GamesLock.Lock()
- Game, ok := bs.Games[modGame.Name]
- bs.GamesLock.Unlock()
- if !ok {
- log.Printf("JeopardyManager: Game \"%v\" not found", modGame.Name)
- return errors.New("Game not found")
- }
- // do nothing if game is won by someone
- if Game.Finished {
- return errors.New("game is already finished")
- }
- bs.GamesLock.Lock()
- bs.Games[modGame.Name] = &modGame
- bs.GamesLock.Unlock()
- return nil
- }
- // HandleEvent handles incoming game events
- func (bs *JeopardyManager) HandleEvent(event gljeopardypb.Event) error {
- bs.GamesLock.Lock()
- game, ok := bs.Games[event.Gamename]
- if game.Finished {
- return errors.New("game finished")
- }
- if !ok {
- return errors.New("game not found")
- }
- bs.GamesLock.Unlock()
- if bs.kafkaW != nil {
- eventBin, _ := proto.Marshal(&event)
- if err := bs.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("Game:event"), Value: eventBin}); err != nil {
- log.Fatal(err)
- return err
- } else {
- log.Print("event sent")
- return nil
- }
- } else {
- return bs._handleEvent(event)
- }
- }
- // HandleEvent _actually_ handles incoming game events
- func (bs *JeopardyManager) _handleEvent(event gljeopardypb.Event) error {
- log.Println("JeopardyManager: handling event", event)
- // if this is not a common game, all other games from the world should also receive the event
- neighbors := event.GetNeighborgames()
- if len(neighbors) > 0 {
- for _, game := range neighbors {
- event.Gamename = game
- event.Neighborgames = []string{}
- log.Println("JeopardyManager: trigger event for", game, "with", event)
- go bs._handleEvent(event)
- }
- }
- if event.Type == "checkField" {
- bs.GamesLock.Lock()
- game, ok := bs.Games[event.Gamename]
- bs.GamesLock.Unlock()
- if !ok {
- log.Println("JeopardyManager: cannot find Game to check", event.Gamename)
- return errors.New("cannot find Game")
- }
- checkedgame := bs.checkField(event.Value, *game, event.Username)
- bs.GamesLock.Lock()
- bs.Games[event.Gamename] = &checkedgame
- bs.GamesLock.Unlock()
- // send the event out to wss clients
- go bs.notify(event)
- return nil
- }
- return errors.New("unknown event type")
- }
- func (bs *JeopardyManager) checkField(text string, game gljeopardypb.Jeopardy, checker string) gljeopardypb.Jeopardy {
- matrix := game.GetFields()
- for _, row := range matrix.GetRows() {
- for _, field := range row.Fields {
- if field.Text == text {
- field.Checked = true
- field.Checker = append(field.Checker, checker)
- }
- }
- }
- //game.Fields = matrix
- return game
- }
- func (bs *JeopardyManager) notify(event gljeopardypb.Event) {
- log.Println("JeopardyManager: notify", event)
- bs.notichan <- event
- log.Print("JeopardyManager: notify sent")
- }
- // ReceiveMessages handles kafka messages
- func (bs *JeopardyManager) ReceiveMessages(msg kafka.Message) {
- log.Println("bs receive: new msg", msg.Offset, string(msg.Key))
- switch string(msg.Key) {
- // uc - Game create event
- case "Game:create":
- newGame := &gljeopardypb.Jeopardy{}
- proto.Unmarshal(msg.Value, newGame)
- bs._create(*newGame)
- log.Println("bs receive: Game \"", newGame.Name, "\" has been created")
- // ur - Game remove event
- case "Game:remove":
- delGame := &gljeopardypb.Jeopardy{}
- proto.Unmarshal(msg.Value, delGame)
- name := delGame.Name
- bs.GamesLock.Lock()
- delete(bs.Games, name)
- bs.GamesLock.Unlock()
- log.Println("bs receive: Game \"", delGame.Name, "\" has been deleted")
- case "Game:modify":
- modGame := &gljeopardypb.Jeopardy{}
- proto.Unmarshal(msg.Value, modGame)
- bs._modifyGame(*modGame)
- case "Game:event":
- event := &gljeopardypb.Event{}
- proto.Unmarshal(msg.Value, event)
- bs._handleEvent(*event)
- default:
- log.Println("unknown type ", string(msg.Key))
- }
- }
- // InitKafka initializes the kafka reader and writer and starts the watch thread
- func (bs *JeopardyManager) InitKafka() error {
- bs.kafkaR = kafka.NewReader(kafka.ReaderConfig{
- Brokers: bs.kafkaBroker,
- Topic: "JeopardyManager",
- Partition: 0,
- MinBytes: 10e3, // 10KB
- MaxBytes: 10e6, // 10MB
- })
- bs.kafkaW = kafka.NewWriter(kafka.WriterConfig{
- Brokers: bs.kafkaBroker,
- Topic: "JeopardyManager",
- Balancer: &kafka.LeastBytes{},
- })
- go bs.pollKafka()
- return nil
- }
- func (bs *JeopardyManager) pollKafka() error {
- for {
- m, err := bs.kafkaR.ReadMessage(context.Background())
- if err != nil {
- log.Print(err)
- time.Sleep(time.Second)
- continue
- }
- bs.ReceiveMessages(m)
- }
- }
- func randomHex(n int) (string, error) {
- bytes := make([]byte, n)
- if _, err := rand.Read(bytes); err != nil {
- return "", err
- }
- return hex.EncodeToString(bytes), nil
- }
|