package worldmanager import ( "context" "crypto/rand" "encoding/hex" "errors" "fmt" "log" "time" gamelangpb "git.alfi.li/gamelang/protobuf/gamelang" "github.com/golang/protobuf/proto" "github.com/segmentio/kafka-go" ) type WorldManager struct { worlds map[string]*gamelangpb.World kafkaBroker []string kafkaR *kafka.Reader kafkaW *kafka.Writer } func NewWorldManager(kafkaBroker []string) *WorldManager { wm := WorldManager{kafkaBroker: kafkaBroker} wm.worlds = map[string]*gamelangpb.World{} if len(kafkaBroker) > 0 { wm.kafkaBroker = kafkaBroker wm.InitKafka() } return &wm } func (wm *WorldManager) CraftWorldManager(id uint32, name string) (gamelangpb.World, error) { newWorld := gamelangpb.World{Id: id, Name: name} return newWorld, nil } // Create adds an entry into the user database func (wm *WorldManager) Create(newWorld gamelangpb.World) (gamelangpb.World, error) { if _, exists := wm.worlds[newWorld.Name]; exists { log.Println("world already exists", newWorld.Name) return gamelangpb.World{}, errors.New("world already exists") } if wm.kafkaW != nil { newWorldBin, _ := proto.Marshal(&newWorld) if err := wm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("world:create"), Value: newWorldBin}); err != nil { log.Println("ERROR: cannot write to kafka\n", err.Error()) return gamelangpb.World{}, err } return newWorld, nil } else { log.Println("world:create adding world\n", newWorld) wm.worlds[newWorld.Name] = &newWorld return newWorld, nil } } // Remove removes an entry from the user database func (wm *WorldManager) Remove(name string) error { if wm.kafkaW != nil { delWorld := wm.worlds[name] delWorldBin, _ := proto.Marshal(delWorld) if err := wm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("world:delete"), Value: delWorldBin}); err != nil { log.Fatal(err) } else { // print common key info log.Print("Delete is done") } } else { delete(wm.worlds, name) } return nil } // List returns a list of all Users func (wm *WorldManager) List() []gamelangpb.World { list := []gamelangpb.World{} for _, worldP := range wm.worlds { world := *worldP list = append(list, world) } return list } // Join adds a user to a world func (wm *WorldManager) Join(worldname string, user *gamelangpb.User) error { if _, ok := wm.worlds[worldname]; !ok { log.Println("world doesnt exists", worldname) return errors.New(fmt.Sprintln("world doesnt exists", worldname)) } for _, worlduser := range wm.worlds[worldname].Users { if user.Name == worlduser.Name { log.Printf("user \"%s\" already joined \"%s\"", user.Name, worldname) return errors.New(fmt.Sprintf("user \"%s\" already joined \"%s\"", user.Name, worldname)) } } if wm.kafkaW != nil { joinWorld := &gamelangpb.World{} joinWorld.Id = wm.worlds[worldname].Id joinWorld.Name = wm.worlds[worldname].Name joinWorld.Users = []*gamelangpb.User{user} joinWorldBin, _ := proto.Marshal(joinWorld) if err := wm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("world:join"), Value: joinWorldBin}); err != nil { log.Fatal(err) } else { log.Print("join is done") } return nil } else { world := wm.worlds[worldname] world.Users = append(world.Users, user) return nil } } // AddGame adds a game to a world func (wm *WorldManager) AddGame(worldname string, game string) error { if _, ok := wm.worlds[worldname]; !ok { log.Println("world doesnt exists", worldname) return errors.New(fmt.Sprintln("world doesnt exists", worldname)) } if wm.kafkaW != nil { modWorld := wm.worlds[worldname] modWorld.Games = []string{game} modWorldBin, _ := proto.Marshal(modWorld) if err := wm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("world:addgame"), Value: modWorldBin}); err != nil { log.Fatal(err) } else { // print common key info log.Print("game add is done") } } else { wm.worlds[worldname].Games = append(wm.worlds[worldname].Games, game) } return nil } // ReceiveMessages handles kafka messages func (wm *WorldManager) ReceiveMessages(msg kafka.Message) { log.Println("wm receive: new msg", msg.Offset, string(msg.Key)) switch string(msg.Key) { // uc - user create event case "world:create": newWorld := &gamelangpb.World{} proto.Unmarshal(msg.Value, newWorld) log.Println("world:create adding world\n", newWorld) wm.worlds[newWorld.Name] = newWorld log.Println("wm receive: world \"", newWorld.Name, "\" has been created") // ur - user remove event case "world:remove": delWorld := &gamelangpb.World{} proto.Unmarshal(msg.Value, delWorld) name := delWorld.Name delete(wm.worlds, name) log.Println("wm receive: world \"", delWorld.Name, "\" has been deleted") case "world:join": joinWorld := &gamelangpb.World{} proto.Unmarshal(msg.Value, joinWorld) world := wm.worlds[joinWorld.Name] for _, user := range joinWorld.Users { world.Users = append(world.Users, user) } log.Println("wm receive: world \"", joinWorld.Name, "\" was joined by", joinWorld.Users) case "world:addgame": modWorld := &gamelangpb.World{} proto.Unmarshal(msg.Value, modWorld) wm.worlds[modWorld.Name].Games = append(wm.worlds[modWorld.Name].Games, modWorld.Games[0]) default: log.Println("unknown type ", string(msg.Key)) } } // InitKafka initializes the kafka reader and writer and starts the watch thread func (wm *WorldManager) InitKafka() error { wm.kafkaR = kafka.NewReader(kafka.ReaderConfig{ Brokers: wm.kafkaBroker, Topic: "worldmanager", Partition: 0, MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) wm.kafkaW = kafka.NewWriter(kafka.WriterConfig{ Brokers: wm.kafkaBroker, Topic: "worldmanager", Balancer: &kafka.LeastBytes{}, }) go wm.pollKafka() return nil } func (wm *WorldManager) pollKafka() error { for { m, err := wm.kafkaR.ReadMessage(context.Background()) if err != nil { log.Print(err) time.Sleep(time.Second) continue } wm.ReceiveMessages(m) } } func randomHex(n int) (string, error) { bytes := make([]byte, n) if _, err := rand.Read(bytes); err != nil { return "", err } return hex.EncodeToString(bytes), nil }