usermanager.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package usermanager
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "encoding/hex"
  6. "log"
  7. "time"
  8. "github.com/golang/protobuf/proto"
  9. "github.com/segmentio/kafka-go"
  10. "golang.org/x/crypto/bcrypt"
  11. gamelangpb "git.alfi.li/gamelang/protobuf/gamelang"
  12. )
  13. // UserManager manages holds
  14. type UserManager struct {
  15. Users map[string]*gamelangpb.User
  16. kafkaBroker []string
  17. kafkaR *kafka.Reader
  18. kafkaW *kafka.Writer
  19. }
  20. // NewUserManager returns an initialized UserManager
  21. func NewUserManager(kafkaBroker []string) *UserManager {
  22. // default broker: []string{"localhost:9092"}
  23. um := UserManager{}
  24. um.Users = map[string]*gamelangpb.User{}
  25. if len(kafkaBroker) > 0 {
  26. um.kafkaBroker = kafkaBroker
  27. um.InitKafka()
  28. }
  29. return &um
  30. }
  31. // Create adds an entry into the user database
  32. func (um *UserManager) Create(id uint32, name, password string) (gamelangpb.User, error) {
  33. if um.kafkaW != nil {
  34. pass, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
  35. if err != nil {
  36. return gamelangpb.User{}, err
  37. }
  38. newUser := gamelangpb.User{Id: id, Name: name, Password: pass}
  39. newUserBin, _ := proto.Marshal(&newUser)
  40. if err := um.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("user:create"), Value: newUserBin}); err != nil {
  41. log.Println("ERROR: cannot write to kafka\n", err.Error())
  42. return gamelangpb.User{}, err
  43. }
  44. return newUser, nil
  45. } else {
  46. pass, _ := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
  47. user := gamelangpb.User{Id: id, Name: name, Password: pass}
  48. um.Users[name] = &user
  49. return user, nil
  50. }
  51. }
  52. // Remove removes an entry from the user database
  53. func (um *UserManager) Remove(name string) error {
  54. if um.kafkaW != nil {
  55. delUser := um.Users[name]
  56. delUserBin, _ := proto.Marshal(delUser)
  57. if err := um.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("user:delete"), Value: delUserBin}); err != nil {
  58. log.Fatal(err)
  59. } else {
  60. // print common key info
  61. log.Print("Delete is done")
  62. }
  63. } else {
  64. delete(um.Users, name)
  65. }
  66. return nil
  67. }
  68. // List returns a list of all Users
  69. func (um *UserManager) List() []gamelangpb.User {
  70. list := []gamelangpb.User{}
  71. for _, userP := range um.Users {
  72. user := gamelangpb.User{}
  73. user.Id = userP.Id
  74. user.Name = userP.Name
  75. list = append(list, user)
  76. }
  77. return list
  78. }
  79. // CheckUser checks a loginagains the user database
  80. func (um *UserManager) CheckUser(name, password string) (gamelangpb.User, bool) {
  81. user, ok := um.Users[name]
  82. if !ok {
  83. log.Printf("UserManager: user \"%v\" not found", name)
  84. return gamelangpb.User{}, false
  85. }
  86. err := bcrypt.CompareHashAndPassword(user.Password, []byte(password))
  87. if err != nil {
  88. log.Print(err.Error())
  89. return gamelangpb.User{}, false
  90. }
  91. return *user, true
  92. }
  93. // ReceiveMessages handles kafka messages
  94. func (um *UserManager) ReceiveMessages(msg kafka.Message) {
  95. log.Println("um receive: new msg", msg.Offset, string(msg.Key))
  96. switch string(msg.Key) {
  97. // uc - user create event
  98. case "user:create":
  99. newUser := &gamelangpb.User{}
  100. proto.Unmarshal(msg.Value, newUser)
  101. um.Users[newUser.Name] = newUser
  102. log.Println("um receive: user \"", newUser.Name, "\" has been created")
  103. // ur - user remove event
  104. case "user:remove":
  105. delUser := &gamelangpb.User{}
  106. proto.Unmarshal(msg.Value, delUser)
  107. name := delUser.Name
  108. delete(um.Users, name)
  109. log.Println("um receive: user \"", delUser.Name, "\" has been deleted")
  110. default:
  111. log.Println("unknown type ", string(msg.Key))
  112. }
  113. }
  114. // InitKafka initializes the kafka reader and writer and starts the watch thread
  115. func (um *UserManager) InitKafka() error {
  116. um.kafkaR = kafka.NewReader(kafka.ReaderConfig{
  117. Brokers: um.kafkaBroker,
  118. Topic: "usermanager",
  119. Partition: 0,
  120. MinBytes: 10e3, // 10KB
  121. MaxBytes: 10e6, // 10MB
  122. })
  123. um.kafkaW = kafka.NewWriter(kafka.WriterConfig{
  124. Brokers: um.kafkaBroker,
  125. Topic: "usermanager",
  126. Balancer: &kafka.LeastBytes{},
  127. })
  128. go um.pollKafka()
  129. return nil
  130. }
  131. func (um *UserManager) pollKafka() error {
  132. for {
  133. m, err := um.kafkaR.ReadMessage(context.Background())
  134. if err != nil {
  135. log.Print(err)
  136. time.Sleep(time.Second)
  137. continue
  138. }
  139. um.ReceiveMessages(m)
  140. }
  141. }
  142. func randomHex(n int) (string, error) {
  143. bytes := make([]byte, n)
  144. if _, err := rand.Read(bytes); err != nil {
  145. return "", err
  146. }
  147. return hex.EncodeToString(bytes), nil
  148. }