diff --git a/internal/msg.go b/internal/msg.go index d34116f..01647c1 100644 --- a/internal/msg.go +++ b/internal/msg.go @@ -15,9 +15,10 @@ type StatusUpdateMsg struct { ErrorMsg string `json:"error_msg"` } -// A WorkerInfoMsg is +// A WorkerInfoMsg is the information struct that describe +// a worker, and sent from the manager to clients. type WorkerInfoMsg struct { - Name string `json:"name"` + ID string `json:"id"` } type CmdVerb uint8 @@ -30,11 +31,16 @@ const ( CmdPing // ensure the goroutine is alive ) +// A WorkerCmd is the command message send from the +// manager to a worker type WorkerCmd struct { - Cmd CmdVerb `json:"cmd"` - Args []string `json:"args"` + Cmd CmdVerb `json:"cmd"` + MirrorID string `json:"mirror_id"` + Args []string `json:"args"` } +// A ClientCmd is the command message send from client +// to the manager type ClientCmd struct { Cmd CmdVerb `json:"cmd"` MirrorID string `json:"mirror_id"` diff --git a/manager/config.go b/manager/config.go index 9a422f7..155ee95 100644 --- a/manager/config.go +++ b/manager/config.go @@ -24,6 +24,7 @@ type ServerConfig struct { type FileConfig struct { StatusFile string `toml:"status_file"` DBFile string `toml:"db_file"` + DBType string `toml:"db_type"` // used to connect to worker CACert string `toml:"ca_cert"` } @@ -36,6 +37,7 @@ func loadConfig(cfgFile string, c *cli.Context) (*Config, error) { cfg.Debug = false cfg.Files.StatusFile = "/var/lib/tunasync/tunasync.json" cfg.Files.DBFile = "/var/lib/tunasync/tunasync.db" + cfg.Files.DBType = "bolt" if cfgFile != "" { if _, err := toml.DecodeFile(cfgFile, cfg); err != nil { @@ -60,6 +62,9 @@ func loadConfig(cfgFile string, c *cli.Context) (*Config, error) { if c.String("db-file") != "" { cfg.Files.DBFile = c.String("db-file") } + if c.String("db-type") != "" { + cfg.Files.DBFile = c.String("db-type") + } return cfg, nil } diff --git a/manager/db.go b/manager/db.go index 4cf3edd..1a1d3ed 100644 --- a/manager/db.go +++ b/manager/db.go @@ -1,13 +1,35 @@ package manager -import "github.com/boltdb/bolt" +import ( + "fmt" + "github.com/boltdb/bolt" +) type dbAdapter interface { - GetWorker(workerID string) - UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) - GetMirrorStatus(workerID, mirrorID string) - GetMirrorStatusList(workerID string) - Close() + ListWorkers() ([]worker, error) + GetWorker(workerID string) (worker, error) + CreateWorker(w worker) (worker, error) + UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) + GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error) + ListMirrorStatus(workerID string) ([]mirrorStatus, error) + ListAllMirrorStatus() ([]mirrorStatus, error) + Close() error +} + +func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { + if dbType == "bolt" { + innerDB, err := bolt.Open(dbFile, 0600, nil) + if err != nil { + return nil, err + } + db := boltAdapter{ + db: innerDB, + dbFile: dbFile, + } + return &db, nil + } + // unsupported db-type + return nil, fmt.Errorf("unsupported db-type: %s", dbType) } type boltAdapter struct { @@ -15,6 +37,34 @@ type boltAdapter struct { dbFile string } +func (b *boltAdapter) ListWorkers() ([]worker, error) { + return []worker{}, nil +} + +func (b *boltAdapter) GetWorker(workerID string) (worker, error) { + return worker{}, nil +} + +func (b *boltAdapter) CreateWorker(w worker) (worker, error) { + return worker{}, nil +} + +func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) { + return mirrorStatus{}, nil +} + +func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error) { + return mirrorStatus{}, nil +} + +func (b *boltAdapter) ListMirrorStatus(workerID string) ([]mirrorStatus, error) { + return []mirrorStatus{}, nil +} + +func (b *boltAdapter) ListAllMirrorStatus() ([]mirrorStatus, error) { + return []mirrorStatus{}, nil +} + func (b *boltAdapter) Close() error { if b.db != nil { return b.db.Close() diff --git a/manager/server.go b/manager/server.go index 4d674bd..828e975 100644 --- a/manager/server.go +++ b/manager/server.go @@ -1,42 +1,226 @@ package manager import ( - "net/http" - + "fmt" "github.com/gin-gonic/gin" + . "github.com/tuna/tunasync/internal" + "net/http" + "sync" + "time" +) + +const ( + maxQueuedCmdNum = 3 + cmdPollTime = 10 * time.Second +) + +const ( + _errorKey = "error" + _infoKey = "message" ) type worker struct { // worker name - name string - // url to connect to worker - url string + id string // session token token string } -func makeHTTPServer(debug bool) *gin.Engine { +var ( + workerChannelMu sync.RWMutex + workerChannels = make(map[string]chan WorkerCmd) +) + +type managerServer struct { + *gin.Engine + adapter dbAdapter +} + +// listAllJobs repond with all jobs of specified workers +func (s *managerServer) listAllJobs(c *gin.Context) { + mirrorStatusList, err := s.adapter.ListAllMirrorStatus() + if err != nil { + err := fmt.Errorf("failed to list all mirror status: %s", + err.Error(), + ) + c.Error(err) + s.returnErrJSON(c, http.StatusInternalServerError, err) + return + } + c.JSON(http.StatusOK, mirrorStatusList) +} + +// listWrokers respond with informations of all the workers +func (s *managerServer) listWorkers(c *gin.Context) { + var workerInfos []WorkerInfoMsg + workers, err := s.adapter.ListWorkers() + if err != nil { + err := fmt.Errorf("failed to list workers: %s", + err.Error(), + ) + c.Error(err) + s.returnErrJSON(c, http.StatusInternalServerError, err) + return + } + for _, w := range workers { + workerInfos = append(workerInfos, + WorkerInfoMsg{w.id}) + } + c.JSON(http.StatusOK, workerInfos) +} + +// registerWorker register an newly-online worker +func (s *managerServer) registerWorker(c *gin.Context) { + var _worker worker + c.BindJSON(&_worker) + newWorker, err := s.adapter.CreateWorker(_worker) + if err != nil { + err := fmt.Errorf("failed to register worker: %s", + err.Error(), + ) + c.Error(err) + s.returnErrJSON(c, http.StatusInternalServerError, err) + return + } + // create workerCmd channel for this worker + workerChannelMu.Lock() + defer workerChannelMu.Unlock() + workerChannels[_worker.id] = make(chan WorkerCmd, maxQueuedCmdNum) + c.JSON(http.StatusOK, newWorker) +} + +// listJobsOfWorker respond with all the jobs of the specified worker +func (s *managerServer) listJobsOfWorker(c *gin.Context) { + workerID := c.Param("id") + mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID) + if err != nil { + err := fmt.Errorf("failed to list jobs of worker %s: %s", + workerID, err.Error(), + ) + c.Error(err) + s.returnErrJSON(c, http.StatusInternalServerError, err) + return + } + c.JSON(http.StatusOK, mirrorStatusList) +} + +func (s *managerServer) returnErrJSON(c *gin.Context, code int, err error) { + c.JSON(code, gin.H{ + _errorKey: err.Error(), + }) +} + +func (s *managerServer) updateJobOfWorker(c *gin.Context) { + workerID := c.Param("id") + var status mirrorStatus + c.BindJSON(&status) + mirrorName := status.Name + newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status) + if err != nil { + err := fmt.Errorf("failed to update job %s of worker %s: %s", + mirrorName, workerID, err.Error(), + ) + c.Error(err) + s.returnErrJSON(c, http.StatusInternalServerError, err) + return + } + c.JSON(http.StatusOK, newStatus) +} + +func (s *managerServer) handleClientCmd(c *gin.Context) { + workerChannelMu.RLock() + defer workerChannelMu.RUnlock() + var clientCmd ClientCmd + c.BindJSON(&clientCmd) + // TODO: decide which worker should do this mirror when WorkerID is null string + workerID := clientCmd.WorkerID + if workerID == "" { + // TODO: decide which worker should do this mirror when WorkerID is null string + logger.Error("handleClientCmd case workerID == \" \" not implemented yet") + c.AbortWithStatus(http.StatusInternalServerError) + return + } + + workerChannel, ok := workerChannels[workerID] + if !ok { + err := fmt.Errorf("worker %s is not registered yet", workerID) + s.returnErrJSON(c, http.StatusBadRequest, err) + return + } + // parse client cmd into worker cmd + workerCmd := WorkerCmd{ + Cmd: clientCmd.Cmd, + MirrorID: clientCmd.MirrorID, + Args: clientCmd.Args, + } + select { + case workerChannel <- workerCmd: + // successfully insert command to channel + c.JSON(http.StatusOK, struct{}{}) + default: + // pending commands for that worker exceed + // the maxQueuedCmdNum threshold + err := fmt.Errorf("pending commands for worker %s exceed"+ + "the %d threshold, the command is dropped", + workerID, maxQueuedCmdNum) + c.Error(err) + s.returnErrJSON(c, http.StatusServiceUnavailable, err) + return + } +} + +func (s *managerServer) getCmdOfWorker(c *gin.Context) { + workerID := c.Param("id") + workerChannelMu.RLock() + defer workerChannelMu.RUnlock() + + workerChannel := workerChannels[workerID] + for { + select { + case _ = <-workerChannel: + // TODO: push new command to worker client + continue + case <-time.After(cmdPollTime): + // time limit exceeded, close the connection + break + } + } +} + +func (s *managerServer) setDBAdapter(adapter dbAdapter) { + s.adapter = adapter +} + +func makeHTTPServer(debug bool) *managerServer { + // create gin engine if !debug { gin.SetMode(gin.ReleaseMode) } - r := gin.Default() - r.GET("/ping", func(c *gin.Context) { + s := &managerServer{ + gin.Default(), + nil, + } + s.GET("/ping", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"msg": "pong"}) }) // list jobs, status page - r.GET("/jobs", func(c *gin.Context) {}) + s.GET("/jobs", s.listAllJobs) + + // list workers + s.GET("/workers", s.listWorkers) // worker online - r.POST("/workers/:name", func(c *gin.Context) {}) + s.POST("/workers/:id", s.registerWorker) + // get job list - r.GET("/workers/:name/jobs", func(c *gin.Context) {}) + s.GET("/workers/:id/jobs", s.listJobsOfWorker) // post job status - r.POST("/workers/:name/jobs/:job", func(c *gin.Context) {}) + s.POST("/workers/:id/jobs/:job", s.updateJobOfWorker) // worker command polling - r.GET("/workers/:name/cmd_stream", func(c *gin.Context) {}) + s.GET("/workers/:id/cmd_stream", s.getCmdOfWorker) // for tunasynctl to post commands - r.POST("/cmd/", func(c *gin.Context) {}) + s.POST("/cmd/", s.handleClientCmd) - return r + return s } diff --git a/manager/server_test.go b/manager/server_test.go index c6cb0e7..09ca72c 100644 --- a/manager/server_test.go +++ b/manager/server_test.go @@ -6,12 +6,79 @@ import ( "io/ioutil" "math/rand" "net/http" + "strings" "testing" "time" . "github.com/smartystreets/goconvey/convey" ) +type mockDBAdapter struct { + workerStore map[string]worker + statusStore map[string]mirrorStatus +} + +func (b *mockDBAdapter) ListWorkers() ([]worker, error) { + workers := make([]worker, len(b.workerStore)) + idx := 0 + for _, w := range b.workerStore { + workers[idx] = w + idx++ + } + return workers, nil +} + +func (b *mockDBAdapter) GetWorker(workerID string) (worker, error) { + w, ok := b.workerStore[workerID] + if !ok { + return worker{}, fmt.Errorf("inexist workerId") + } + return w, nil +} + +func (b *mockDBAdapter) CreateWorker(w worker) (worker, error) { + _, ok := b.workerStore[w.id] + if ok { + return worker{}, fmt.Errorf("duplicate worker name") + } + b.workerStore[w.id] = w + return w, nil +} + +func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error) { + // TODO: need to check worker exist first + id := workerID + "/" + mirrorID + status, ok := b.statusStore[id] + if !ok { + return mirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID) + } + return status, nil +} + +func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) { + id := workerID + "/" + mirrorID + b.statusStore[id] = status + return status, nil +} + +func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]mirrorStatus, error) { + var mirrorStatusList []mirrorStatus + for k, v := range b.statusStore { + if wID := strings.Split(k, "/")[1]; wID == workerID { + mirrorStatusList = append(mirrorStatusList, v) + } + } + return mirrorStatusList, nil +} + +func (b *mockDBAdapter) ListAllMirrorStatus() ([]mirrorStatus, error) { + var mirrorStatusList []mirrorStatus + for _, v := range b.statusStore { + mirrorStatusList = append(mirrorStatusList, v) + } + return mirrorStatusList, nil +} + func TestHTTPServer(t *testing.T) { Convey("HTTP server should work", t, func() { s := makeHTTPServer(false) diff --git a/manager/status.go b/manager/status.go index c9e2c90..b360063 100644 --- a/manager/status.go +++ b/manager/status.go @@ -12,6 +12,8 @@ import ( type mirrorStatus struct { Name string + Worker string + IsMaster bool Status SyncStatus LastUpdate time.Time Upstream string @@ -21,6 +23,8 @@ type mirrorStatus struct { func (s mirrorStatus) MarshalJSON() ([]byte, error) { m := map[string]interface{}{ "name": s.Name, + "worker": s.Worker, + "is_master": s.IsMaster, "status": s.Status, "last_update": s.LastUpdate.Format("2006-01-02 15:04:05"), "last_update_ts": fmt.Sprintf("%d", s.LastUpdate.Unix()),