diff --git a/internal/status_web_test.go b/internal/status_web_test.go index 97453ed..1043d61 100644 --- a/internal/status_web_test.go +++ b/internal/status_web_test.go @@ -21,6 +21,8 @@ func TestStatus(t *testing.T) { LastUpdateTs: stampTime{t}, LastEnded: textTime{t}, LastEndedTs: stampTime{t}, + Scheduled: textTime{t}, + ScheduledTs: stampTime{t}, Size: "5GB", Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/", } @@ -42,6 +44,10 @@ func TestStatus(t *testing.T) { So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix()) So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano()) So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano()) + So(m2.Scheduled.Unix(), ShouldEqual, m.Scheduled.Unix()) + So(m2.ScheduledTs.Unix(), ShouldEqual, m.Scheduled.Unix()) + So(m2.Scheduled.UnixNano(), ShouldEqual, m.Scheduled.UnixNano()) + So(m2.ScheduledTs.UnixNano(), ShouldEqual, m.Scheduled.UnixNano()) So(m2.Size, ShouldEqual, m.Size) So(m2.Upstream, ShouldEqual, m.Upstream) }) @@ -53,6 +59,7 @@ func TestStatus(t *testing.T) { Status: Failed, LastUpdate: time.Now().Add(-time.Minute * 30), LastEnded: time.Now(), + Scheduled: time.Now().Add(time.Minute * 5), Upstream: "mirrors.tuna.tsinghua.edu.cn", Size: "4GB", } @@ -70,6 +77,10 @@ func TestStatus(t *testing.T) { So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix()) So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano()) So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano()) + So(m2.Scheduled.Unix(), ShouldEqual, m.Scheduled.Unix()) + So(m2.ScheduledTs.Unix(), ShouldEqual, m.Scheduled.Unix()) + So(m2.Scheduled.UnixNano(), ShouldEqual, m.Scheduled.UnixNano()) + So(m2.ScheduledTs.UnixNano(), ShouldEqual, m.Scheduled.UnixNano()) So(m2.Size, ShouldEqual, m.Size) So(m2.Upstream, ShouldEqual, m.Upstream) }) diff --git a/manager/server_test.go b/manager/server_test.go index 29486bb..39b48be 100644 --- a/manager/server_test.go +++ b/manager/server_test.go @@ -202,6 +202,20 @@ func TestHTTPServer(t *testing.T) { }) }) + Convey("Update schedule of valid mirrors", func(ctx C) { + msg := MirrorSchedules{ + []MirrorSchedule{ + MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)}, + MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)}, + }, + } + + url := fmt.Sprintf("%s/workers/%s/schedules", baseURL, status.Worker) + resp, err := PostJSON(url, msg, nil) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusOK) + }) + Convey("Update size of an invalid mirror", func(ctx C) { msg := struct { Name string `json:"name"` @@ -263,6 +277,24 @@ func TestHTTPServer(t *testing.T) { So(err, ShouldBeNil) So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker) }) + Convey("update schedule of an non-existent worker", func(ctx C) { + invalidWorker := "test_worker2" + sch := MirrorSchedules{ + []MirrorSchedule{ + MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)}, + MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)}, + }, + } + resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/schedules", + baseURL, invalidWorker), sch, nil) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusBadRequest) + defer resp.Body.Close() + var msg map[string]string + err = json.NewDecoder(resp.Body).Decode(&msg) + So(err, ShouldBeNil) + So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker) + }) Convey("handle client command", func(ctx C) { cmdChan := make(chan WorkerCmd, 1) workerServer := makeMockWorkerServer(cmdChan) diff --git a/worker/worker_test.go b/worker/worker_test.go new file mode 100644 index 0000000..e71fb0a --- /dev/null +++ b/worker/worker_test.go @@ -0,0 +1,180 @@ +package worker + +import ( + "net/http" + "strconv" + "testing" + "time" + + "github.com/gin-gonic/gin" + . "github.com/smartystreets/goconvey/convey" + . "github.com/tuna/tunasync/internal" +) + +type workTestFunc func(*Worker) + +func makeMockManagerServer(recvData chan interface{}) *gin.Engine { + r := gin.Default() + r.GET("/ping", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"_infoKey": "pong"}) + }) + r.POST("/workers", func(c *gin.Context) { + var _worker WorkerStatus + c.BindJSON(&_worker) + _worker.LastOnline = time.Now() + recvData <- _worker + c.JSON(http.StatusOK, _worker) + }) + r.POST("/workers/dut/schedules", func(c *gin.Context) { + var _sch MirrorSchedules + c.BindJSON(&_sch) + recvData <- _sch + c.JSON(http.StatusOK, empty{}) + }) + r.GET("/workers/dut/jobs", func(c *gin.Context) { + mirrorStatusList := []MirrorStatus{} + c.JSON(http.StatusOK, mirrorStatusList) + }) + + return r +} + +func startWorkerThenStop(cfg *Config, tester workTestFunc) { + exitedChan := make(chan int) + w := NewTUNASyncWorker(cfg) + So(w, ShouldNotBeNil) + go func() { + w.Run() + exitedChan <- 1 + }() + + tester(w) + + w.Halt() + select { + case exited := <-exitedChan: + So(exited, ShouldEqual, 1) + case <-time.After(2 * time.Second): + So(0, ShouldEqual, 1) + } + +} + +func TestWorker(t *testing.T) { + managerPort := 5001 + InitLogger(false, true, false) + + recvDataChan := make(chan interface{}) + _s := makeMockManagerServer(recvDataChan) + httpServer := &http.Server{ + Addr: "localhost:" + strconv.Itoa(managerPort), + Handler: _s, + ReadTimeout: 2 * time.Second, + WriteTimeout: 2 * time.Second, + } + go func() { + err := httpServer.ListenAndServe() + So(err, ShouldBeNil) + }() + + Convey("Worker should work", t, func(ctx C) { + + workerCfg := Config{ + Global: globalConfig{ + Name: "dut", + LogDir: "/tmp", + MirrorDir: "/tmp", + Concurrent: 2, + Interval: 1, + }, + Manager: managerConfig{ + APIBase: "http://localhost:" + strconv.Itoa(managerPort), + }, + } + Convey("with no job", func(ctx C) { + dummyTester := func(*Worker) { + for { + select { + case data := <-recvDataChan: + if reg, ok := data.(WorkerStatus); ok { + So(reg.ID, ShouldEqual, "dut") + } else if sch, ok := data.(MirrorSchedules); ok { + So(len(sch.Schedules), ShouldEqual, 0) + } + case <-time.After(2 * time.Second): + return + } + } + } + + startWorkerThenStop(&workerCfg, dummyTester) + }) + Convey("with one job", func(ctx C) { + workerCfg.Mirrors = []mirrorConfig{ + mirrorConfig{ + Name: "job-ls", + Provider: provCommand, + Command: "ls", + }, + } + + dummyTester := func(*Worker) { + for { + select { + case data := <-recvDataChan: + if reg, ok := data.(WorkerStatus); ok { + So(reg.ID, ShouldEqual, "dut") + } else if sch, ok := data.(MirrorSchedules); ok { + So(len(sch.Schedules), ShouldEqual, 1) + So(sch.Schedules[0].MirrorName, ShouldEqual, "job-ls") + So(sch.Schedules[0].NextSchedule, + ShouldHappenBetween, + time.Now().Add(-2*time.Second), + time.Now().Add(1*time.Minute)) + } + case <-time.After(2 * time.Second): + return + } + } + } + + startWorkerThenStop(&workerCfg, dummyTester) + }) + Convey("with several jobs", func(ctx C) { + workerCfg.Mirrors = []mirrorConfig{ + mirrorConfig{ + Name: "job-ls-1", + Provider: provCommand, + Command: "ls", + }, + mirrorConfig{ + Name: "job-fail", + Provider: provCommand, + Command: "non-existent-command-xxxx", + }, + mirrorConfig{ + Name: "job-ls-2", + Provider: provCommand, + Command: "ls", + }, + } + + dummyTester := func(*Worker) { + for { + select { + case data := <-recvDataChan: + if reg, ok := data.(WorkerStatus); ok { + So(reg.ID, ShouldEqual, "dut") + } else if sch, ok := data.(MirrorSchedules); ok { + So(len(sch.Schedules), ShouldEqual, 3) + } + case <-time.After(2 * time.Second): + return + } + } + } + + startWorkerThenStop(&workerCfg, dummyTester) + }) + }) +}