123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- package usermanager
- import (
- "context"
- "crypto/rand"
- "encoding/hex"
- "log"
- "time"
- "github.com/golang/protobuf/proto"
- "github.com/segmentio/kafka-go"
- "golang.org/x/crypto/bcrypt"
- gamelangpb "git.alfi.li/gamelang/protobuf/gamelang"
- )
- // UserManager manages holds
- type UserManager struct {
- Users map[string]*gamelangpb.User
- kafkaBroker []string
- kafkaR *kafka.Reader
- kafkaW *kafka.Writer
- }
- // NewUserManager returns an initialized UserManager
- func NewUserManager(kafkaBroker []string) *UserManager {
- // default broker: []string{"localhost:9092"}
- um := UserManager{}
- um.Users = map[string]*gamelangpb.User{}
- if len(kafkaBroker) > 0 {
- um.kafkaBroker = kafkaBroker
- um.InitKafka()
- }
- return &um
- }
- // Create adds an entry into the user database
- func (um *UserManager) Create(id uint32, name, password string) (gamelangpb.User, error) {
- if um.kafkaW != nil {
- pass, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
- if err != nil {
- return gamelangpb.User{}, err
- }
- newUser := gamelangpb.User{Id: id, Name: name, Password: pass}
- newUserBin, _ := proto.Marshal(&newUser)
- if err := um.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("user:create"), Value: newUserBin}); err != nil {
- log.Println("ERROR: cannot write to kafka\n", err.Error())
- return gamelangpb.User{}, err
- }
- return newUser, nil
- } else {
- pass, _ := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
- user := gamelangpb.User{Id: id, Name: name, Password: pass}
- um.Users[name] = &user
- return user, nil
- }
- }
- // Remove removes an entry from the user database
- func (um *UserManager) Remove(name string) error {
- if um.kafkaW != nil {
- delUser := um.Users[name]
- delUserBin, _ := proto.Marshal(delUser)
- if err := um.kafkaW.WriteMessages(context.Background(), kafka.Message{Key: []byte("user:delete"), Value: delUserBin}); err != nil {
- log.Fatal(err)
- } else {
- // print common key info
- log.Print("Delete is done")
- }
- } else {
- delete(um.Users, name)
- }
- return nil
- }
- // List returns a list of all Users
- func (um *UserManager) List() []gamelangpb.User {
- list := []gamelangpb.User{}
- for _, userP := range um.Users {
- user := gamelangpb.User{}
- user.Id = userP.Id
- user.Name = userP.Name
- list = append(list, user)
- }
- return list
- }
- // CheckUser checks a loginagains the user database
- func (um *UserManager) CheckUser(name, password string) (gamelangpb.User, bool) {
- user, ok := um.Users[name]
- if !ok {
- log.Printf("UserManager: user \"%v\" not found", name)
- return gamelangpb.User{}, false
- }
- err := bcrypt.CompareHashAndPassword(user.Password, []byte(password))
- if err != nil {
- log.Print(err.Error())
- return gamelangpb.User{}, false
- }
- return *user, true
- }
- // ReceiveMessages handles kafka messages
- func (um *UserManager) ReceiveMessages(msg kafka.Message) {
- log.Println("um receive: new msg", msg.Offset, string(msg.Key))
- switch string(msg.Key) {
- // uc - user create event
- case "user:create":
- newUser := &gamelangpb.User{}
- proto.Unmarshal(msg.Value, newUser)
- um.Users[newUser.Name] = newUser
- log.Println("um receive: user \"", newUser.Name, "\" has been created")
- // ur - user remove event
- case "user:remove":
- delUser := &gamelangpb.User{}
- proto.Unmarshal(msg.Value, delUser)
- name := delUser.Name
- delete(um.Users, name)
- log.Println("um receive: user \"", delUser.Name, "\" has been deleted")
- default:
- log.Println("unknown type ", string(msg.Key))
- }
- }
- // InitKafka initializes the kafka reader and writer and starts the watch thread
- func (um *UserManager) InitKafka() error {
- um.kafkaR = kafka.NewReader(kafka.ReaderConfig{
- Brokers: um.kafkaBroker,
- Topic: "usermanager",
- Partition: 0,
- MinBytes: 10e3, // 10KB
- MaxBytes: 10e6, // 10MB
- })
- um.kafkaW = kafka.NewWriter(kafka.WriterConfig{
- Brokers: um.kafkaBroker,
- Topic: "usermanager",
- Balancer: &kafka.LeastBytes{},
- })
- go um.pollKafka()
- return nil
- }
- func (um *UserManager) pollKafka() error {
- for {
- m, err := um.kafkaR.ReadMessage(context.Background())
- if err != nil {
- log.Print(err)
- time.Sleep(time.Second)
- continue
- }
- um.ReceiveMessages(m)
- }
- }
- func randomHex(n int) (string, error) {
- bytes := make([]byte, n)
- if _, err := rand.Read(bytes); err != nil {
- return "", err
- }
- return hex.EncodeToString(bytes), nil
- }
|