refactor(manager): command pulling to command pushing and tests

This commit is contained in:
walkerning 2016-04-26 12:01:34 +08:00 committed by bigeagle
parent 734826fa67
commit daa0b3c204
No known key found for this signature in database
GPG Key ID: 9171A4571C27920A
5 changed files with 119 additions and 75 deletions

View File

@ -11,7 +11,9 @@ func contextErrorLogger(c *gin.Context) {
errs := c.Errors.ByType(gin.ErrorTypeAny) errs := c.Errors.ByType(gin.ErrorTypeAny)
if len(errs) > 0 { if len(errs) > 0 {
for _, err := range errs { for _, err := range errs {
logger.Error(`"in request "%s %s: %s"`, c.Request.Method, c.Request.URL.Path, err.Error()) logger.Error(`"in request "%s %s: %s"`,
c.Request.Method, c.Request.URL.Path,
err.Error())
} }
} }
// pass on to the next middleware in chain // pass on to the next middleware in chain

View File

@ -3,35 +3,17 @@ package manager
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"sync"
"time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
. "github.com/tuna/tunasync/internal" . "github.com/tuna/tunasync/internal"
) )
const (
maxQueuedCmdNum = 3
cmdPollTime = 10 * time.Second
)
const ( const (
_errorKey = "error" _errorKey = "error"
_infoKey = "message" _infoKey = "message"
) )
type workerStatus struct {
ID string `json:"id"` // worker name
Token string `json:"token"` // session token
LastOnline time.Time `json:"last_online"` // last seen
}
var (
workerChannelMu sync.RWMutex
workerChannels = make(map[string]chan WorkerCmd)
)
type managerServer struct { type managerServer struct {
*gin.Engine *gin.Engine
adapter dbAdapter adapter dbAdapter
@ -84,9 +66,6 @@ func (s *managerServer) registerWorker(c *gin.Context) {
return return
} }
// create workerCmd channel for this worker // create workerCmd channel for this worker
workerChannelMu.Lock()
defer workerChannelMu.Unlock()
workerChannels[_worker.ID] = make(chan WorkerCmd, maxQueuedCmdNum)
c.JSON(http.StatusOK, newWorker) c.JSON(http.StatusOK, newWorker)
} }
@ -129,11 +108,8 @@ func (s *managerServer) updateJobOfWorker(c *gin.Context) {
} }
func (s *managerServer) handleClientCmd(c *gin.Context) { func (s *managerServer) handleClientCmd(c *gin.Context) {
workerChannelMu.RLock()
defer workerChannelMu.RUnlock()
var clientCmd ClientCmd var clientCmd ClientCmd
c.BindJSON(&clientCmd) c.BindJSON(&clientCmd)
// TODO: decide which worker should do this mirror when WorkerID is null string
workerID := clientCmd.WorkerID workerID := clientCmd.WorkerID
if workerID == "" { if workerID == "" {
// TODO: decide which worker should do this mirror when WorkerID is null string // TODO: decide which worker should do this mirror when WorkerID is null string
@ -142,50 +118,30 @@ func (s *managerServer) handleClientCmd(c *gin.Context) {
return return
} }
workerChannel, ok := workerChannels[workerID] w, err := s.adapter.GetWorker(workerID)
if !ok { if err != nil {
err := fmt.Errorf("worker %s is not registered yet", workerID) err := fmt.Errorf("worker %s is not registered yet", workerID)
s.returnErrJSON(c, http.StatusBadRequest, err) s.returnErrJSON(c, http.StatusBadRequest, err)
return return
} }
workerURL := w.URL
// parse client cmd into worker cmd // parse client cmd into worker cmd
workerCmd := WorkerCmd{ workerCmd := WorkerCmd{
Cmd: clientCmd.Cmd, Cmd: clientCmd.Cmd,
MirrorID: clientCmd.MirrorID, MirrorID: clientCmd.MirrorID,
Args: clientCmd.Args, Args: clientCmd.Args,
} }
select {
case workerChannel <- workerCmd: // post command to worker
// successfully insert command to channel _, err = postJSON(workerURL, workerCmd)
c.JSON(http.StatusOK, struct{}{}) if err != nil {
default: err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
// pending commands for that worker exceed
// the maxQueuedCmdNum threshold
err := fmt.Errorf("pending commands for worker %s exceed"+
"the %d threshold, the command is dropped",
workerID, maxQueuedCmdNum)
c.Error(err) c.Error(err)
s.returnErrJSON(c, http.StatusServiceUnavailable, err) s.returnErrJSON(c, http.StatusInternalServerError, err)
return return
} }
} // TODO: check response for success
c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID})
func (s *managerServer) getCmdOfWorker(c *gin.Context) {
workerID := c.Param("id")
workerChannelMu.RLock()
defer workerChannelMu.RUnlock()
workerChannel := workerChannels[workerID]
for {
select {
case _ = <-workerChannel:
// TODO: push new command to worker client
continue
case <-time.After(cmdPollTime):
// time limit exceeded, close the connection
break
}
}
} }
func (s *managerServer) setDBAdapter(adapter dbAdapter) { func (s *managerServer) setDBAdapter(adapter dbAdapter) {
@ -223,11 +179,8 @@ func makeHTTPServer(debug bool) *managerServer {
// post job status // post job status
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker) workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
// worker command polling
workerValidateGroup.GET(":id/cmd_stream", s.getCmdOfWorker)
// for tunasynctl to post commands // for tunasynctl to post commands
s.POST("/cmd/", s.handleClientCmd) s.POST("/cmd", s.handleClientCmd)
return s return s
} }

View File

@ -1,7 +1,6 @@
package manager package manager
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -11,6 +10,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/gin-gonic/gin"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
. "github.com/tuna/tunasync/internal" . "github.com/tuna/tunasync/internal"
) )
@ -19,14 +20,8 @@ const (
_magicBadWorkerID = "magic_bad_worker_id" _magicBadWorkerID = "magic_bad_worker_id"
) )
func postJSON(url string, obj interface{}) (*http.Response, error) {
b := new(bytes.Buffer)
json.NewEncoder(b).Encode(obj)
return http.Post(url, "application/json; charset=utf-8", b)
}
func TestHTTPServer(t *testing.T) { func TestHTTPServer(t *testing.T) {
Convey("HTTP server should work", t, func() { Convey("HTTP server should work", t, func(ctx C) {
InitLogger(true, true, false) InitLogger(true, true, false)
s := makeHTTPServer(false) s := makeHTTPServer(false)
So(s, ShouldNotBeNil) So(s, ShouldNotBeNil)
@ -55,7 +50,7 @@ func TestHTTPServer(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(p[_infoKey], ShouldEqual, "pong") So(p[_infoKey], ShouldEqual, "pong")
Convey("when database fail", func() { Convey("when database fail", func(ctx C) {
resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID)) resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID))
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
@ -66,7 +61,7 @@ func TestHTTPServer(t *testing.T) {
So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail")) So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
}) })
Convey("when register a worker", func() { Convey("when register a worker", func(ctx C) {
w := workerStatus{ w := workerStatus{
ID: "test_worker1", ID: "test_worker1",
} }
@ -74,7 +69,7 @@ func TestHTTPServer(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK) So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("list all workers", func() { Convey("list all workers", func(ctx C) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
resp, err := http.Get(baseURL + "/workers") resp, err := http.Get(baseURL + "/workers")
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -85,7 +80,7 @@ func TestHTTPServer(t *testing.T) {
So(len(actualResponseObj), ShouldEqual, 2) So(len(actualResponseObj), ShouldEqual, 2)
}) })
Convey("update mirror status of a existed worker", func() { Convey("update mirror status of a existed worker", func(ctx C) {
status := mirrorStatus{ status := mirrorStatus{
Name: "arch-sync1", Name: "arch-sync1",
Worker: "test_worker1", Worker: "test_worker1",
@ -96,10 +91,11 @@ func TestHTTPServer(t *testing.T) {
Size: "3GB", Size: "3GB",
} }
resp, err := postJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status) resp, err := postJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status)
defer resp.Body.Close()
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK) So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("list mirror status of an existed worker", func() { Convey("list mirror status of an existed worker", func(ctx C) {
expectedResponse, err := json.Marshal([]mirrorStatus{status}) expectedResponse, err := json.Marshal([]mirrorStatus{status})
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -113,7 +109,7 @@ func TestHTTPServer(t *testing.T) {
So(strings.TrimSpace(string(body)), ShouldEqual, string(expectedResponse)) So(strings.TrimSpace(string(body)), ShouldEqual, string(expectedResponse))
}) })
Convey("list all job status of all workers", func() { Convey("list all job status of all workers", func(ctx C) {
expectedResponse, err := json.Marshal([]mirrorStatus{status}) expectedResponse, err := json.Marshal([]mirrorStatus{status})
So(err, ShouldBeNil) So(err, ShouldBeNil)
resp, err := http.Get(baseURL + "/jobs") resp, err := http.Get(baseURL + "/jobs")
@ -127,7 +123,7 @@ func TestHTTPServer(t *testing.T) {
}) })
}) })
Convey("update mirror status of an inexisted worker", func() { Convey("update mirror status of an inexisted worker", func(ctx C) {
invalidWorker := "test_worker2" invalidWorker := "test_worker2"
status := mirrorStatus{ status := mirrorStatus{
Name: "arch-sync2", Name: "arch-sync2",
@ -148,6 +144,65 @@ func TestHTTPServer(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker) So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
}) })
Convey("handle client command", func(ctx C) {
cmdChan := make(chan WorkerCmd, 1)
workerServer := makeMockWorkerServer(cmdChan)
workerPort := rand.Intn(10000) + 30000
bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort)
workerBaseURL := fmt.Sprintf("http://%s", bindAddress)
w := workerStatus{
ID: "test_worker_cmd",
URL: workerBaseURL + "/cmd",
}
resp, err := postJSON(baseURL+"/workers", w)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
go func() {
// run the mock worker server
workerServer.Run(bindAddress)
}()
time.Sleep(50 * time.Microsecond)
// verify the worker mock server is running
workerResp, err := http.Get(workerBaseURL + "/ping")
defer workerResp.Body.Close()
So(err, ShouldBeNil)
So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
Convey("when client send wrong cmd", func(ctx C) {
clientCmd := ClientCmd{
Cmd: CmdStart,
MirrorID: "ubuntu-sync",
WorkerID: "not_exist_worker",
}
resp, err := postJSON(baseURL+"/cmd", clientCmd)
defer resp.Body.Close()
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
})
Convey("when client send correct cmd", func(ctx C) {
clientCmd := ClientCmd{
Cmd: CmdStart,
MirrorID: "ubuntu-sync",
WorkerID: w.ID,
}
resp, err := postJSON(baseURL+"/cmd", clientCmd)
defer resp.Body.Close()
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
time.Sleep(50 * time.Microsecond)
select {
case cmd := <-cmdChan:
ctx.So(cmd.Cmd, ShouldEqual, clientCmd.Cmd)
ctx.So(cmd.MirrorID, ShouldEqual, clientCmd.MirrorID)
default:
ctx.So(0, ShouldEqual, 1)
}
})
})
}) })
}) })
} }
@ -233,3 +288,17 @@ func (b *mockDBAdapter) ListAllMirrorStatus() ([]mirrorStatus, error) {
func (b *mockDBAdapter) Close() error { func (b *mockDBAdapter) Close() error {
return nil return nil
} }
func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
r := gin.Default()
r.GET("/ping", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
})
r.POST("/cmd", func(c *gin.Context) {
var cmd WorkerCmd
c.BindJSON(&cmd)
cmdChan <- cmd
})
return r
}

View File

@ -106,3 +106,10 @@ func (s *mirrorStatus) UnmarshalJSON(v []byte) error {
} }
return nil return nil
} }
type workerStatus struct {
ID string `json:"id"` // worker name
Token string `json:"token"` // session token
URL string `json:"url"` // worker url
LastOnline time.Time `json:"last_online"` // last seen
}

13
manager/util.go Normal file
View File

@ -0,0 +1,13 @@
package manager
import (
"bytes"
"encoding/json"
"net/http"
)
func postJSON(url string, obj interface{}) (*http.Response, error) {
b := new(bytes.Buffer)
json.NewEncoder(b).Encode(obj)
return http.Post(url, "application/json; charset=utf-8", b)
}