From daa0b3c204da0b3ef46df87db578726e14c5b022 Mon Sep 17 00:00:00 2001 From: walkerning Date: Tue, 26 Apr 2016 12:01:34 +0800 Subject: [PATCH] refactor(manager): command pulling to command pushing and tests --- manager/middleware.go | 4 +- manager/server.go | 71 +++++------------------------- manager/server_test.go | 99 +++++++++++++++++++++++++++++++++++------- manager/status.go | 7 +++ manager/util.go | 13 ++++++ 5 files changed, 119 insertions(+), 75 deletions(-) create mode 100644 manager/util.go diff --git a/manager/middleware.go b/manager/middleware.go index f620261..3c2d1ea 100644 --- a/manager/middleware.go +++ b/manager/middleware.go @@ -11,7 +11,9 @@ func contextErrorLogger(c *gin.Context) { errs := c.Errors.ByType(gin.ErrorTypeAny) if len(errs) > 0 { for _, err := range errs { - logger.Error(`"in request "%s %s: %s"`, c.Request.Method, c.Request.URL.Path, err.Error()) + logger.Error(`"in request "%s %s: %s"`, + c.Request.Method, c.Request.URL.Path, + err.Error()) } } // pass on to the next middleware in chain diff --git a/manager/server.go b/manager/server.go index 122d5c4..b8fe5a8 100644 --- a/manager/server.go +++ b/manager/server.go @@ -3,35 +3,17 @@ package manager import ( "fmt" "net/http" - "sync" - "time" "github.com/gin-gonic/gin" . "github.com/tuna/tunasync/internal" ) -const ( - maxQueuedCmdNum = 3 - cmdPollTime = 10 * time.Second -) - const ( _errorKey = "error" _infoKey = "message" ) -type workerStatus struct { - ID string `json:"id"` // worker name - Token string `json:"token"` // session token - LastOnline time.Time `json:"last_online"` // last seen -} - -var ( - workerChannelMu sync.RWMutex - workerChannels = make(map[string]chan WorkerCmd) -) - type managerServer struct { *gin.Engine adapter dbAdapter @@ -84,9 +66,6 @@ func (s *managerServer) registerWorker(c *gin.Context) { return } // create workerCmd channel for this worker - workerChannelMu.Lock() - defer workerChannelMu.Unlock() - workerChannels[_worker.ID] = make(chan WorkerCmd, maxQueuedCmdNum) c.JSON(http.StatusOK, newWorker) } @@ -129,11 +108,8 @@ func (s *managerServer) updateJobOfWorker(c *gin.Context) { } func (s *managerServer) handleClientCmd(c *gin.Context) { - workerChannelMu.RLock() - defer workerChannelMu.RUnlock() var clientCmd ClientCmd c.BindJSON(&clientCmd) - // TODO: decide which worker should do this mirror when WorkerID is null string workerID := clientCmd.WorkerID if workerID == "" { // TODO: decide which worker should do this mirror when WorkerID is null string @@ -142,50 +118,30 @@ func (s *managerServer) handleClientCmd(c *gin.Context) { return } - workerChannel, ok := workerChannels[workerID] - if !ok { + w, err := s.adapter.GetWorker(workerID) + if err != nil { err := fmt.Errorf("worker %s is not registered yet", workerID) s.returnErrJSON(c, http.StatusBadRequest, err) return } + workerURL := w.URL // parse client cmd into worker cmd workerCmd := WorkerCmd{ Cmd: clientCmd.Cmd, MirrorID: clientCmd.MirrorID, Args: clientCmd.Args, } - select { - case workerChannel <- workerCmd: - // successfully insert command to channel - c.JSON(http.StatusOK, struct{}{}) - default: - // pending commands for that worker exceed - // the maxQueuedCmdNum threshold - err := fmt.Errorf("pending commands for worker %s exceed"+ - "the %d threshold, the command is dropped", - workerID, maxQueuedCmdNum) + + // post command to worker + _, err = postJSON(workerURL, workerCmd) + if err != nil { + err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error()) c.Error(err) - s.returnErrJSON(c, http.StatusServiceUnavailable, err) + s.returnErrJSON(c, http.StatusInternalServerError, err) return } -} - -func (s *managerServer) getCmdOfWorker(c *gin.Context) { - workerID := c.Param("id") - workerChannelMu.RLock() - defer workerChannelMu.RUnlock() - - workerChannel := workerChannels[workerID] - for { - select { - case _ = <-workerChannel: - // TODO: push new command to worker client - continue - case <-time.After(cmdPollTime): - // time limit exceeded, close the connection - break - } - } + // TODO: check response for success + c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID}) } func (s *managerServer) setDBAdapter(adapter dbAdapter) { @@ -223,11 +179,8 @@ func makeHTTPServer(debug bool) *managerServer { // post job status workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker) - // worker command polling - workerValidateGroup.GET(":id/cmd_stream", s.getCmdOfWorker) - // for tunasynctl to post commands - s.POST("/cmd/", s.handleClientCmd) + s.POST("/cmd", s.handleClientCmd) return s } diff --git a/manager/server_test.go b/manager/server_test.go index 97b7e89..6a14d9b 100644 --- a/manager/server_test.go +++ b/manager/server_test.go @@ -1,7 +1,6 @@ package manager import ( - "bytes" "encoding/json" "fmt" "io/ioutil" @@ -11,6 +10,8 @@ import ( "testing" "time" + "github.com/gin-gonic/gin" + . "github.com/smartystreets/goconvey/convey" . "github.com/tuna/tunasync/internal" ) @@ -19,14 +20,8 @@ const ( _magicBadWorkerID = "magic_bad_worker_id" ) -func postJSON(url string, obj interface{}) (*http.Response, error) { - b := new(bytes.Buffer) - json.NewEncoder(b).Encode(obj) - return http.Post(url, "application/json; charset=utf-8", b) -} - func TestHTTPServer(t *testing.T) { - Convey("HTTP server should work", t, func() { + Convey("HTTP server should work", t, func(ctx C) { InitLogger(true, true, false) s := makeHTTPServer(false) So(s, ShouldNotBeNil) @@ -55,7 +50,7 @@ func TestHTTPServer(t *testing.T) { So(err, ShouldBeNil) So(p[_infoKey], ShouldEqual, "pong") - Convey("when database fail", func() { + Convey("when database fail", func(ctx C) { resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID)) So(err, ShouldBeNil) So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) @@ -66,7 +61,7 @@ func TestHTTPServer(t *testing.T) { So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail")) }) - Convey("when register a worker", func() { + Convey("when register a worker", func(ctx C) { w := workerStatus{ ID: "test_worker1", } @@ -74,7 +69,7 @@ func TestHTTPServer(t *testing.T) { So(err, ShouldBeNil) So(resp.StatusCode, ShouldEqual, http.StatusOK) - Convey("list all workers", func() { + Convey("list all workers", func(ctx C) { So(err, ShouldBeNil) resp, err := http.Get(baseURL + "/workers") So(err, ShouldBeNil) @@ -85,7 +80,7 @@ func TestHTTPServer(t *testing.T) { So(len(actualResponseObj), ShouldEqual, 2) }) - Convey("update mirror status of a existed worker", func() { + Convey("update mirror status of a existed worker", func(ctx C) { status := mirrorStatus{ Name: "arch-sync1", Worker: "test_worker1", @@ -96,10 +91,11 @@ func TestHTTPServer(t *testing.T) { Size: "3GB", } resp, err := postJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status) + defer resp.Body.Close() So(err, ShouldBeNil) So(resp.StatusCode, ShouldEqual, http.StatusOK) - Convey("list mirror status of an existed worker", func() { + Convey("list mirror status of an existed worker", func(ctx C) { expectedResponse, err := json.Marshal([]mirrorStatus{status}) So(err, ShouldBeNil) @@ -113,7 +109,7 @@ func TestHTTPServer(t *testing.T) { So(strings.TrimSpace(string(body)), ShouldEqual, string(expectedResponse)) }) - Convey("list all job status of all workers", func() { + Convey("list all job status of all workers", func(ctx C) { expectedResponse, err := json.Marshal([]mirrorStatus{status}) So(err, ShouldBeNil) resp, err := http.Get(baseURL + "/jobs") @@ -127,7 +123,7 @@ func TestHTTPServer(t *testing.T) { }) }) - Convey("update mirror status of an inexisted worker", func() { + Convey("update mirror status of an inexisted worker", func(ctx C) { invalidWorker := "test_worker2" status := mirrorStatus{ Name: "arch-sync2", @@ -148,6 +144,65 @@ func TestHTTPServer(t *testing.T) { So(err, ShouldBeNil) So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker) }) + Convey("handle client command", func(ctx C) { + cmdChan := make(chan WorkerCmd, 1) + workerServer := makeMockWorkerServer(cmdChan) + workerPort := rand.Intn(10000) + 30000 + bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort) + workerBaseURL := fmt.Sprintf("http://%s", bindAddress) + w := workerStatus{ + ID: "test_worker_cmd", + URL: workerBaseURL + "/cmd", + } + resp, err := postJSON(baseURL+"/workers", w) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusOK) + + go func() { + // run the mock worker server + workerServer.Run(bindAddress) + }() + time.Sleep(50 * time.Microsecond) + // verify the worker mock server is running + workerResp, err := http.Get(workerBaseURL + "/ping") + defer workerResp.Body.Close() + So(err, ShouldBeNil) + So(workerResp.StatusCode, ShouldEqual, http.StatusOK) + + Convey("when client send wrong cmd", func(ctx C) { + clientCmd := ClientCmd{ + Cmd: CmdStart, + MirrorID: "ubuntu-sync", + WorkerID: "not_exist_worker", + } + resp, err := postJSON(baseURL+"/cmd", clientCmd) + defer resp.Body.Close() + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusBadRequest) + }) + + Convey("when client send correct cmd", func(ctx C) { + clientCmd := ClientCmd{ + Cmd: CmdStart, + MirrorID: "ubuntu-sync", + WorkerID: w.ID, + } + + resp, err := postJSON(baseURL+"/cmd", clientCmd) + defer resp.Body.Close() + + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusOK) + time.Sleep(50 * time.Microsecond) + select { + case cmd := <-cmdChan: + ctx.So(cmd.Cmd, ShouldEqual, clientCmd.Cmd) + ctx.So(cmd.MirrorID, ShouldEqual, clientCmd.MirrorID) + default: + ctx.So(0, ShouldEqual, 1) + } + }) + }) }) }) } @@ -233,3 +288,17 @@ func (b *mockDBAdapter) ListAllMirrorStatus() ([]mirrorStatus, error) { func (b *mockDBAdapter) Close() error { return nil } + +func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine { + r := gin.Default() + r.GET("/ping", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{_infoKey: "pong"}) + }) + r.POST("/cmd", func(c *gin.Context) { + var cmd WorkerCmd + c.BindJSON(&cmd) + cmdChan <- cmd + }) + + return r +} diff --git a/manager/status.go b/manager/status.go index 4708163..8c6150c 100644 --- a/manager/status.go +++ b/manager/status.go @@ -106,3 +106,10 @@ func (s *mirrorStatus) UnmarshalJSON(v []byte) error { } return nil } + +type workerStatus struct { + ID string `json:"id"` // worker name + Token string `json:"token"` // session token + URL string `json:"url"` // worker url + LastOnline time.Time `json:"last_online"` // last seen +} diff --git a/manager/util.go b/manager/util.go new file mode 100644 index 0000000..4174eea --- /dev/null +++ b/manager/util.go @@ -0,0 +1,13 @@ +package manager + +import ( + "bytes" + "encoding/json" + "net/http" +) + +func postJSON(url string, obj interface{}) (*http.Response, error) { + b := new(bytes.Buffer) + json.NewEncoder(b).Encode(obj) + return http.Post(url, "application/json; charset=utf-8", b) +}