package usermanager import ( "context" "crypto/rand" "encoding/hex" "log" "time" "github.com/golang/protobuf/proto" "github.com/segmentio/kafka-go" "golang.org/x/crypto/bcrypt" gamelangpb "git.alfi.li/gamelang/protobuf/gamelang" ) // UserManager manages holds type UserManager struct { Users map[string]*gamelangpb.User kafkaBroker []string kafkaR *kafka.Reader kafkaW *kafka.Writer } // NewUserManager returns an initialized UserManager func NewUserManager(kafkaBroker []string) *UserManager { // default broker: []string{"localhost:9092"} um := UserManager{} um.Users = map[string]*gamelangpb.User{} if len(kafkaBroker) > 0 { um.kafkaBroker = kafkaBroker um.InitKafka() } return &um } // Create adds an entry into the user database func (um *UserManager) Create(id uint32, name, password string) (gamelangpb.User, error) { if um.kafkaW != nil { pass, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) if err != nil { return gamelangpb.User{}, err } newUser := gamelangpb.User{Id: id, Name: name, Password: pass} newUserBin, _ := proto.Marshal(&newUser) if err := um.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("user:create"), Value: newUserBin}); err != nil { log.Println("ERROR: cannot write to kafka\n", err.Error()) return gamelangpb.User{}, err } return newUser, nil } else { pass, _ := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) user := gamelangpb.User{Id: id, Name: name, Password: pass} um.Users[name] = &user return user, nil } } // Remove removes an entry from the user database func (um *UserManager) Remove(name string) error { if um.kafkaW != nil { delUser := um.Users[name] delUserBin, _ := proto.Marshal(delUser) if err := um.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("user:delete"), Value: delUserBin}); err != nil { log.Fatal(err) } else { // print common key info log.Print("Delete is done") } } else { delete(um.Users, name) } return nil } // List returns a list of all Users func (um *UserManager) List() []gamelangpb.User { list := []gamelangpb.User{} for _, userP := range um.Users { user := gamelangpb.User{} user.Id = userP.Id user.Name = userP.Name list = append(list, user) } return list } // CheckUser checks a loginagains the user database func (um *UserManager) CheckUser(name, password string) (gamelangpb.User, bool) { user, ok := um.Users[name] if !ok { log.Printf("UserManager: user \"%v\" not found", name) return gamelangpb.User{}, false } err := bcrypt.CompareHashAndPassword(user.Password, []byte(password)) if err != nil { log.Print(err.Error()) return gamelangpb.User{}, false } return *user, true } // ReceiveMessages handles kafka messages func (um *UserManager) ReceiveMessages(msg kafka.Message) { log.Println("um receive: new msg", msg.Offset, string(msg.Key)) switch string(msg.Key) { // uc - user create event case "user:create": newUser := &gamelangpb.User{} proto.Unmarshal(msg.Value, newUser) um.Users[newUser.Name] = newUser log.Println("um receive: user \"", newUser.Name, "\" has been created") // ur - user remove event case "user:remove": delUser := &gamelangpb.User{} proto.Unmarshal(msg.Value, delUser) name := delUser.Name delete(um.Users, name) log.Println("um receive: user \"", delUser.Name, "\" has been deleted") default: log.Println("unknown type ", string(msg.Key)) } } // InitKafka initializes the kafka reader and writer and starts the watch thread func (um *UserManager) InitKafka() error { um.kafkaR = kafka.NewReader(kafka.ReaderConfig{ Brokers: um.kafkaBroker, Topic: "usermanager", Partition: 0, MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) um.kafkaW = kafka.NewWriter(kafka.WriterConfig{ Brokers: um.kafkaBroker, Topic: "usermanager", Balancer: &kafka.LeastBytes{}, }) go um.pollKafka() return nil } func (um *UserManager) pollKafka() error { for { m, err := um.kafkaR.ReadMessage(context.Background()) if err != nil { log.Print(err) time.Sleep(time.Second) continue } um.ReceiveMessages(m) } } func randomHex(n int) (string, error) { bytes := make([]byte, n) if _, err := rand.Read(bytes); err != nil { return "", err } return hex.EncodeToString(bytes), nil }