123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- package bingomanager
- import (
- "context"
- "encoding/hex"
- "errors"
- "log"
- "math/rand"
- "time"
- gamelangBingopb "git.alfi.li/gamelang/protobuf/gamelang-bingo"
- "github.com/golang/protobuf/proto"
- "github.com/segmentio/kafka-go"
- )
- type BingoManager struct {
- bingos map[string]*gamelangBingopb.Bingo
- kafkaBroker []string
- kafkaR *kafka.Reader
- kafkaW *kafka.Writer
- }
- func NewBingoManager(kafkaBroker []string) *BingoManager {
- bm := BingoManager{kafkaBroker: kafkaBroker}
- bm.bingos = map[string]*gamelangBingopb.Bingo{}
- if len(kafkaBroker) > 0 {
- bm.kafkaBroker = kafkaBroker
- bm.InitKafka()
- }
- return &bm
- }
- // Create adds an entry into the user database
- func (bm *BingoManager) Create(inputBingo gamelangBingopb.Bingo) (gamelangBingopb.Bingo, error) {
- if _, exists := bm.bingos[inputBingo.GetName()]; exists {
- log.Println("bingo already exists", inputBingo.GetName())
- return gamelangBingopb.Bingo{}, errors.New("bingo already exists")
- }
- if bm.kafkaW != nil {
- newBingo := gamelangBingopb.Bingo{Id: inputBingo.GetId(), Name: inputBingo.GetName(), Textlist: inputBingo.GetTextlist(), Options: inputBingo.GetOptions()}
- newBingoBin, _ := proto.Marshal(&newBingo)
- if err := bm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("bingo:create"), Value: newBingoBin}); err != nil {
- log.Println("ERROR: cannot write to kafka\n", err.Error())
- return gamelangBingopb.Bingo{}, err
- }
- return newBingo, nil
- } else {
- bingo := gamelangBingopb.Bingo{Id: inputBingo.GetId(), Name: inputBingo.GetName(), Textlist: inputBingo.GetTextlist(), Options: inputBingo.GetOptions()}
- bm.bingos[inputBingo.Name] = &bingo
- return bingo, nil
- }
- }
- // Remove removes an entry from the user database
- func (bm *BingoManager) Remove(name string) error {
- if bm.kafkaW != nil {
- delBingo := bm.bingos[name]
- delBingoBin, _ := proto.Marshal(delBingo)
- if err := bm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("bingo:delete"), Value: delBingoBin}); err != nil {
- log.Fatal(err)
- } else {
- // print common key info
- log.Print("Delete is done")
- }
- } else {
- delete(bm.bingos, name)
- }
- return nil
- }
- func (bm *BingoManager) Get(name string) (gamelangBingopb.Bingo, error) {
- bingo, ok := bm.bingos[name]
- if !ok {
- return gamelangBingopb.Bingo{}, errors.New("Bingo not found")
- }
- return *bingo, nil
- }
- // List returns a list of all Users
- func (bm *BingoManager) List() []gamelangBingopb.Bingo {
- list := []gamelangBingopb.Bingo{}
- for _, bingoP := range bm.bingos {
- bingo := *bingoP
- list = append(list, bingo)
- }
- return list
- }
- // CheckField marks a field a marked
- // gets the name and the first field [0][0] from inputBingo
- func (bm *BingoManager) CheckField(inputBingo *gamelangBingopb.Bingo) error {
- bingo, ok := bm.bingos[inputBingo.GetName()]
- if !ok {
- return errors.New("cannot find bingo")
- }
- if bm.kafkaW != nil {
- inputBingoBin, _ := proto.Marshal(inputBingo)
- if err := bm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("bingo:check"), Value: inputBingoBin}); err != nil {
- log.Fatal(err)
- return err
- } else {
- // print common key info
- log.Print("check is done")
- return nil
- }
- } else {
- matrix := inputBingo.GetFields()
- for x, row := range matrix.GetRows() {
- for y, field := range row.Fields {
- if field != nil {
- bingo.Fields.Rows[x].Fields[y].Checked = true
- return nil
- }
- }
- }
- return errors.New("no field found")
- }
- }
- // ReceiveMessages handles kafka messages
- func (bm *BingoManager) ReceiveMessages(msg kafka.Message) {
- log.Println("bm receive: new msg", msg.Offset, string(msg.Key))
- switch string(msg.Key) {
- // uc - user create event
- case "bingo:create":
- newBingo := &gamelangBingopb.Bingo{}
- proto.Unmarshal(msg.Value, newBingo)
- bm.bingos[newBingo.Name] = newBingo
- log.Println("bm receive: bingo \"", newBingo.Name, "\" has been created")
- // ur - user remove event
- case "bingo:remove":
- delBingo := &gamelangBingopb.Bingo{}
- proto.Unmarshal(msg.Value, delBingo)
- name := delBingo.Name
- delete(bm.bingos, name)
- log.Println("bm receive: bingo \"", delBingo.Name, "\" has been deleted")
- case "bingo:check":
- checkBingo := &gamelangBingopb.Bingo{}
- err := proto.Unmarshal(msg.Value, checkBingo)
- bingo, ok := bm.bingos[checkBingo.GetName()]
- if !ok {
- log.Println("cannot find bingo")
- return
- }
- if err != nil {
- log.Println("cannot unmarshall bingo")
- return
- }
- matrix := checkBingo.GetFields()
- for x, row := range matrix.GetRows() {
- for y, field := range row.Fields {
- if field != nil {
- bingo.Fields.Rows[x].Fields[y].Checked = true
- return
- }
- }
- }
- log.Println("bm receive: field checked on \"", checkBingo.Name)
- default:
- log.Println("unknown type ", string(msg.Key))
- }
- }
- // InitKafka initializes the kafka reader and writer and starts the watch thread
- func (bm *BingoManager) InitKafka() error {
- bm.kafkaR = kafka.NewReader(kafka.ReaderConfig{
- Brokers: bm.kafkaBroker,
- Topic: "bingomanager",
- Partition: 0,
- MinBytes: 10e3, // 10KB
- MaxBytes: 10e6, // 10MB
- })
- bm.kafkaW = kafka.NewWriter(kafka.WriterConfig{
- Brokers: bm.kafkaBroker,
- Topic: "bingomanager",
- Balancer: &kafka.LeastBytes{},
- })
- go bm.pollKafka()
- return nil
- }
- func (bm *BingoManager) pollKafka() error {
- for {
- m, err := bm.kafkaR.ReadMessage(context.Background())
- if err != nil {
- log.Print(err)
- time.Sleep(time.Second)
- continue
- }
- bm.ReceiveMessages(m)
- }
- }
- func randomHex(n int) (string, error) {
- bytes := make([]byte, n)
- if _, err := rand.Read(bytes); err != nil {
- return "", err
- }
- return hex.EncodeToString(bytes), nil
- }
|