123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- package worldmanager
- import (
- "context"
- "crypto/rand"
- "encoding/hex"
- "errors"
- "fmt"
- "log"
- "time"
- gamelangpb "git.alfi.li/gamelang/protobuf/gamelang"
- "github.com/golang/protobuf/proto"
- "github.com/segmentio/kafka-go"
- )
- type WorldManager struct {
- worlds map[string]*gamelangpb.World
- kafkaBroker []string
- kafkaR *kafka.Reader
- kafkaW *kafka.Writer
- }
- func NewWorldManager(kafkaBroker []string) *WorldManager {
- wm := WorldManager{kafkaBroker: kafkaBroker}
- wm.worlds = map[string]*gamelangpb.World{}
- if len(kafkaBroker) > 0 {
- wm.kafkaBroker = kafkaBroker
- wm.InitKafka()
- }
- return &wm
- }
- func (wm *WorldManager) CraftWorldManager(id uint32, name string) (gamelangpb.World, error) {
- newWorld := gamelangpb.World{Id: id, Name: name}
- return newWorld, nil
- }
- // Create adds an entry into the user database
- func (wm *WorldManager) Create(newWorld gamelangpb.World) (gamelangpb.World, error) {
- if _, exists := wm.worlds[newWorld.Name]; exists {
- log.Println("world already exists", newWorld.Name)
- return gamelangpb.World{}, errors.New("world already exists")
- }
- if wm.kafkaW != nil {
- newWorldBin, _ := proto.Marshal(&newWorld)
- if err := wm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("world:create"), Value: newWorldBin}); err != nil {
- log.Println("ERROR: cannot write to kafka\n", err.Error())
- return gamelangpb.World{}, err
- }
- return newWorld, nil
- } else {
- log.Println("world:create adding world\n", newWorld)
- wm.worlds[newWorld.Name] = &newWorld
- return newWorld, nil
- }
- }
- // Remove removes an entry from the user database
- func (wm *WorldManager) Remove(name string) error {
- if wm.kafkaW != nil {
- delWorld := wm.worlds[name]
- delWorldBin, _ := proto.Marshal(delWorld)
- if err := wm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("world:delete"), Value: delWorldBin}); err != nil {
- log.Fatal(err)
- } else {
- // print common key info
- log.Print("Delete is done")
- }
- } else {
- delete(wm.worlds, name)
- }
- return nil
- }
- // List returns a list of all Users
- func (wm *WorldManager) List() []gamelangpb.World {
- list := []gamelangpb.World{}
- for _, worldP := range wm.worlds {
- world := *worldP
- list = append(list, world)
- }
- return list
- }
- // Join adds a user to a world
- func (wm *WorldManager) Join(worldname string, user *gamelangpb.User) error {
- if _, ok := wm.worlds[worldname]; !ok {
- log.Println("world doesnt exists", worldname)
- return errors.New(fmt.Sprintln("world doesnt exists", worldname))
- }
- for _, worlduser := range wm.worlds[worldname].Users {
- if user.Name == worlduser.Name {
- log.Printf("user \"%s\" already joined \"%s\"", user.Name, worldname)
- return errors.New(fmt.Sprintf("user \"%s\" already joined \"%s\"", user.Name, worldname))
- }
- }
- if wm.kafkaW != nil {
- joinWorld := &gamelangpb.World{}
- joinWorld.Id = wm.worlds[worldname].Id
- joinWorld.Name = wm.worlds[worldname].Name
- joinWorld.Users = []*gamelangpb.User{user}
- joinWorldBin, _ := proto.Marshal(joinWorld)
- if err := wm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("world:join"), Value: joinWorldBin}); err != nil {
- log.Fatal(err)
- } else {
- log.Print("join is done")
- }
- return nil
- } else {
- world := wm.worlds[worldname]
- world.Users = append(world.Users, user)
- return nil
- }
- }
- // AddGame adds a game to a world
- func (wm *WorldManager) AddGame(worldname string, game string) error {
- if _, ok := wm.worlds[worldname]; !ok {
- log.Println("world doesnt exists", worldname)
- return errors.New(fmt.Sprintln("world doesnt exists", worldname))
- }
- if wm.kafkaW != nil {
- modWorld := wm.worlds[worldname]
- modWorld.Games = []string{game}
- modWorldBin, _ := proto.Marshal(modWorld)
- if err := wm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("world:addgame"), Value: modWorldBin}); err != nil {
- log.Fatal(err)
- } else {
- // print common key info
- log.Print("game add is done")
- }
- } else {
- wm.worlds[worldname].Games = append(wm.worlds[worldname].Games, game)
- }
- return nil
- }
- // ReceiveMessages handles kafka messages
- func (wm *WorldManager) ReceiveMessages(msg kafka.Message) {
- log.Println("wm receive: new msg", msg.Offset, string(msg.Key))
- switch string(msg.Key) {
- // uc - user create event
- case "world:create":
- newWorld := &gamelangpb.World{}
- proto.Unmarshal(msg.Value, newWorld)
- log.Println("world:create adding world\n", newWorld)
- wm.worlds[newWorld.Name] = newWorld
- log.Println("wm receive: world \"", newWorld.Name, "\" has been created")
- // ur - user remove event
- case "world:remove":
- delWorld := &gamelangpb.World{}
- proto.Unmarshal(msg.Value, delWorld)
- name := delWorld.Name
- delete(wm.worlds, name)
- log.Println("wm receive: world \"", delWorld.Name, "\" has been deleted")
- case "world:join":
- joinWorld := &gamelangpb.World{}
- proto.Unmarshal(msg.Value, joinWorld)
- world := wm.worlds[joinWorld.Name]
- for _, user := range joinWorld.Users {
- world.Users = append(world.Users, user)
- }
- log.Println("wm receive: world \"", joinWorld.Name, "\" was joined by", joinWorld.Users)
- case "world:addgame":
- modWorld := &gamelangpb.World{}
- proto.Unmarshal(msg.Value, modWorld)
- wm.worlds[modWorld.Name].Games = append(wm.worlds[modWorld.Name].Games, modWorld.Games[0])
- default:
- log.Println("unknown type ", string(msg.Key))
- }
- }
- // InitKafka initializes the kafka reader and writer and starts the watch thread
- func (wm *WorldManager) InitKafka() error {
- wm.kafkaR = kafka.NewReader(kafka.ReaderConfig{
- Brokers: wm.kafkaBroker,
- Topic: "worldmanager",
- Partition: 0,
- MinBytes: 10e3, // 10KB
- MaxBytes: 10e6, // 10MB
- })
- wm.kafkaW = kafka.NewWriter(kafka.WriterConfig{
- Brokers: wm.kafkaBroker,
- Topic: "worldmanager",
- Balancer: &kafka.LeastBytes{},
- })
- go wm.pollKafka()
- return nil
- }
- func (wm *WorldManager) pollKafka() error {
- for {
- m, err := wm.kafkaR.ReadMessage(context.Background())
- if err != nil {
- log.Print(err)
- time.Sleep(time.Second)
- continue
- }
- wm.ReceiveMessages(m)
- }
- }
- func randomHex(n int) (string, error) {
- bytes := make([]byte, n)
- if _, err := rand.Read(bytes); err != nil {
- return "", err
- }
- return hex.EncodeToString(bytes), nil
- }
|