worldmanager.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package worldmanager
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "time"
  10. gamelangpb "git.alfi.li/gamelang/protobuf/gamelang"
  11. "github.com/golang/protobuf/proto"
  12. "github.com/segmentio/kafka-go"
  13. )
  14. type WorldManager struct {
  15. worlds map[string]*gamelangpb.World
  16. kafkaBroker []string
  17. kafkaR *kafka.Reader
  18. kafkaW *kafka.Writer
  19. }
  20. func NewWorldManager(kafkaBroker []string) *WorldManager {
  21. wm := WorldManager{kafkaBroker: kafkaBroker}
  22. wm.worlds = map[string]*gamelangpb.World{}
  23. if len(kafkaBroker) > 0 {
  24. wm.kafkaBroker = kafkaBroker
  25. wm.InitKafka()
  26. }
  27. return &wm
  28. }
  29. func (wm *WorldManager) CraftWorldManager(id uint32, name string) (gamelangpb.World, error) {
  30. newWorld := gamelangpb.World{Id: id, Name: name}
  31. return newWorld, nil
  32. }
  33. // Create adds an entry into the user database
  34. func (wm *WorldManager) Create(newWorld gamelangpb.World) (gamelangpb.World, error) {
  35. if _, exists := wm.worlds[newWorld.Name]; exists {
  36. log.Println("world already exists", newWorld.Name)
  37. return gamelangpb.World{}, errors.New("world already exists")
  38. }
  39. if wm.kafkaW != nil {
  40. newWorldBin, _ := proto.Marshal(&newWorld)
  41. if err := wm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("world:create"), Value: newWorldBin}); err != nil {
  42. log.Println("ERROR: cannot write to kafka\n", err.Error())
  43. return gamelangpb.World{}, err
  44. }
  45. return newWorld, nil
  46. } else {
  47. log.Println("world:create adding world\n", newWorld)
  48. wm.worlds[newWorld.Name] = &newWorld
  49. return newWorld, nil
  50. }
  51. }
  52. // Remove removes an entry from the user database
  53. func (wm *WorldManager) Remove(name string) error {
  54. if wm.kafkaW != nil {
  55. delWorld := wm.worlds[name]
  56. delWorldBin, _ := proto.Marshal(delWorld)
  57. if err := wm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("world:delete"), Value: delWorldBin}); err != nil {
  58. log.Fatal(err)
  59. } else {
  60. // print common key info
  61. log.Print("Delete is done")
  62. }
  63. } else {
  64. delete(wm.worlds, name)
  65. }
  66. return nil
  67. }
  68. // List returns a list of all Users
  69. func (wm *WorldManager) List() []gamelangpb.World {
  70. list := []gamelangpb.World{}
  71. for _, worldP := range wm.worlds {
  72. world := *worldP
  73. list = append(list, world)
  74. }
  75. return list
  76. }
  77. // Join adds a user to a world
  78. func (wm *WorldManager) Join(worldname string, user *gamelangpb.User) error {
  79. if _, ok := wm.worlds[worldname]; !ok {
  80. log.Println("world doesnt exists", worldname)
  81. return errors.New(fmt.Sprintln("world doesnt exists", worldname))
  82. }
  83. for _, worlduser := range wm.worlds[worldname].Users {
  84. if user.Name == worlduser.Name {
  85. log.Printf("user \"%s\" already joined \"%s\"", user.Name, worldname)
  86. return errors.New(fmt.Sprintf("user \"%s\" already joined \"%s\"", user.Name, worldname))
  87. }
  88. }
  89. if wm.kafkaW != nil {
  90. joinWorld := &gamelangpb.World{}
  91. joinWorld.Id = wm.worlds[worldname].Id
  92. joinWorld.Name = wm.worlds[worldname].Name
  93. joinWorld.Users = []*gamelangpb.User{user}
  94. joinWorldBin, _ := proto.Marshal(joinWorld)
  95. if err := wm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("world:join"), Value: joinWorldBin}); err != nil {
  96. log.Fatal(err)
  97. } else {
  98. log.Print("join is done")
  99. }
  100. return nil
  101. } else {
  102. world := wm.worlds[worldname]
  103. world.Users = append(world.Users, user)
  104. return nil
  105. }
  106. }
  107. // AddGame adds a game to a world
  108. func (wm *WorldManager) AddGame(worldname string, game string) error {
  109. if _, ok := wm.worlds[worldname]; !ok {
  110. log.Println("world doesnt exists", worldname)
  111. return errors.New(fmt.Sprintln("world doesnt exists", worldname))
  112. }
  113. if wm.kafkaW != nil {
  114. modWorld := wm.worlds[worldname]
  115. modWorld.Games = []string{game}
  116. modWorldBin, _ := proto.Marshal(modWorld)
  117. if err := wm.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("world:addgame"), Value: modWorldBin}); err != nil {
  118. log.Fatal(err)
  119. } else {
  120. // print common key info
  121. log.Print("game add is done")
  122. }
  123. } else {
  124. wm.worlds[worldname].Games = append(wm.worlds[worldname].Games, game)
  125. }
  126. return nil
  127. }
  128. // ReceiveMessages handles kafka messages
  129. func (wm *WorldManager) ReceiveMessages(msg kafka.Message) {
  130. log.Println("wm receive: new msg", msg.Offset, string(msg.Key))
  131. switch string(msg.Key) {
  132. // uc - user create event
  133. case "world:create":
  134. newWorld := &gamelangpb.World{}
  135. proto.Unmarshal(msg.Value, newWorld)
  136. log.Println("world:create adding world\n", newWorld)
  137. wm.worlds[newWorld.Name] = newWorld
  138. log.Println("wm receive: world \"", newWorld.Name, "\" has been created")
  139. // ur - user remove event
  140. case "world:remove":
  141. delWorld := &gamelangpb.World{}
  142. proto.Unmarshal(msg.Value, delWorld)
  143. name := delWorld.Name
  144. delete(wm.worlds, name)
  145. log.Println("wm receive: world \"", delWorld.Name, "\" has been deleted")
  146. case "world:join":
  147. joinWorld := &gamelangpb.World{}
  148. proto.Unmarshal(msg.Value, joinWorld)
  149. world := wm.worlds[joinWorld.Name]
  150. for _, user := range joinWorld.Users {
  151. world.Users = append(world.Users, user)
  152. }
  153. log.Println("wm receive: world \"", joinWorld.Name, "\" was joined by", joinWorld.Users)
  154. case "world:addgame":
  155. modWorld := &gamelangpb.World{}
  156. proto.Unmarshal(msg.Value, modWorld)
  157. wm.worlds[modWorld.Name].Games = append(wm.worlds[modWorld.Name].Games, modWorld.Games[0])
  158. default:
  159. log.Println("unknown type ", string(msg.Key))
  160. }
  161. }
  162. // InitKafka initializes the kafka reader and writer and starts the watch thread
  163. func (wm *WorldManager) InitKafka() error {
  164. wm.kafkaR = kafka.NewReader(kafka.ReaderConfig{
  165. Brokers: wm.kafkaBroker,
  166. Topic: "worldmanager",
  167. Partition: 0,
  168. MinBytes: 10e3, // 10KB
  169. MaxBytes: 10e6, // 10MB
  170. })
  171. wm.kafkaW = kafka.NewWriter(kafka.WriterConfig{
  172. Brokers: wm.kafkaBroker,
  173. Topic: "worldmanager",
  174. Balancer: &kafka.LeastBytes{},
  175. })
  176. go wm.pollKafka()
  177. return nil
  178. }
  179. func (wm *WorldManager) pollKafka() error {
  180. for {
  181. m, err := wm.kafkaR.ReadMessage(context.Background())
  182. if err != nil {
  183. log.Print(err)
  184. time.Sleep(time.Second)
  185. continue
  186. }
  187. wm.ReceiveMessages(m)
  188. }
  189. }
  190. func randomHex(n int) (string, error) {
  191. bytes := make([]byte, n)
  192. if _, err := rand.Read(bytes); err != nil {
  193. return "", err
  194. }
  195. return hex.EncodeToString(bytes), nil
  196. }