From c750aa1871a1b36ad8f4e6da183c4bbbd3d1ddb5 Mon Sep 17 00:00:00 2001 From: z4yx Date: Wed, 30 May 2018 18:55:06 +0800 Subject: [PATCH] new feature: run "tunasynctl start" with "-f" to override concurrent job limit --- cmd/tunasynctl/tunasynctl.go | 12 ++++- internal/msg.go | 16 +++--- manager/server.go | 1 + worker/job.go | 1 + worker/job_test.go | 98 ++++++++++++++++++++++-------------- worker/provider_test.go | 4 +- worker/worker.go | 6 ++- 7 files changed, 88 insertions(+), 50 deletions(-) diff --git a/cmd/tunasynctl/tunasynctl.go b/cmd/tunasynctl/tunasynctl.go index f335971..3e2d6fe 100644 --- a/cmd/tunasynctl/tunasynctl.go +++ b/cmd/tunasynctl/tunasynctl.go @@ -285,11 +285,16 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc { "argument WORKER", 1) } + options := map[string]bool{} + if c.Bool("force") { + options["force"] = true + } cmd := tunasync.ClientCmd{ Cmd: cmd, MirrorID: mirrorID, WorkerID: c.String("worker"), Args: argsList, + Options: options, } resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client) if err != nil { @@ -410,6 +415,11 @@ func main() { }, } + forceStartFlag := cli.BoolFlag{ + Name: "force, f", + Usage: "Override the concurrent limit", + } + app.Commands = []cli.Command{ { Name: "list", @@ -450,7 +460,7 @@ func main() { { Name: "start", Usage: "Start a job", - Flags: append(commonFlags, cmdFlags...), + Flags: append(append(commonFlags, cmdFlags...), forceStartFlag), Action: initializeWrapper(cmdJob(tunasync.CmdStart)), }, { diff --git a/internal/msg.go b/internal/msg.go index c0efb7a..14a9102 100644 --- a/internal/msg.go +++ b/internal/msg.go @@ -68,9 +68,10 @@ func (c CmdVerb) String() string { // A WorkerCmd is the command message send from the // manager to a worker type WorkerCmd struct { - Cmd CmdVerb `json:"cmd"` - MirrorID string `json:"mirror_id"` - Args []string `json:"args"` + Cmd CmdVerb `json:"cmd"` + MirrorID string `json:"mirror_id"` + Args []string `json:"args"` + Options map[string]bool `json:"options"` } func (c WorkerCmd) String() string { @@ -83,8 +84,9 @@ func (c WorkerCmd) String() string { // A ClientCmd is the command message send from client // to the manager type ClientCmd struct { - Cmd CmdVerb `json:"cmd"` - MirrorID string `json:"mirror_id"` - WorkerID string `json:"worker_id"` - Args []string `json:"args"` + Cmd CmdVerb `json:"cmd"` + MirrorID string `json:"mirror_id"` + WorkerID string `json:"worker_id"` + Args []string `json:"args"` + Options map[string]bool `json:"options"` } diff --git a/manager/server.go b/manager/server.go index 2563582..2605446 100644 --- a/manager/server.go +++ b/manager/server.go @@ -337,6 +337,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) { Cmd: clientCmd.Cmd, MirrorID: clientCmd.MirrorID, Args: clientCmd.Args, + Options: clientCmd.Options, } // update job status, even if the job did not disable successfully, diff --git a/worker/job.go b/worker/job.go index a4c23dc..7ba0df4 100644 --- a/worker/job.go +++ b/worker/job.go @@ -218,6 +218,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err defer func() { <-semaphore }() runJobWrapper(kill, jobDone) case <-bypassSemaphore: + logger.Noticef("Concurrent limit ignored by %s", m.Name()) runJobWrapper(kill, jobDone) case <-kill: jobDone <- empty{} diff --git a/worker/job_test.go b/worker/job_test.go index 92476d5..370c9be 100644 --- a/worker/job_test.go +++ b/worker/job_test.go @@ -286,7 +286,7 @@ func TestConcurrentMirrorJobs(t *testing.T) { c := cmdConfig{ name: fmt.Sprintf("job-%d", i), upstreamURL: "http://mirrors.tuna.moe/", - command: "sleep 3", + command: "sleep 2", workingDir: tmpDir, logDir: tmpDir, logFile: "/dev/null", @@ -302,17 +302,12 @@ func TestConcurrentMirrorJobs(t *testing.T) { managerChan := make(chan jobMessage, 10) semaphore := make(chan empty, CONCURRENT-2) - Convey("When we run them all", func(ctx C) { - for _, job := range jobs { - go job.Run(managerChan, semaphore) - job.ctrlChan <- jobStart - } - + countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) { counterEnded := 0 counterRunning := 0 - maxRunning := 0 - counterFailed := 0 - for counterEnded < CONCURRENT { + peakConcurrent = 0 + counterFailed = 0 + for counterEnded < totalJobs { msg := <-managerChan switch msg.status { case PreSyncing: @@ -328,13 +323,29 @@ func TestConcurrentMirrorJobs(t *testing.T) { So(0, ShouldEqual, 1) } // Test if semaphore works - So(counterRunning, ShouldBeLessThanOrEqualTo, CONCURRENT-2) - if counterRunning > maxRunning { - maxRunning = counterRunning + So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck) + if counterRunning > peakConcurrent { + peakConcurrent = counterRunning } } + // select { + // case msg := <-managerChan: + // logger.Errorf("extra message received: %v", msg) + // So(0, ShouldEqual, 1) + // case <-time.After(2 * time.Second): + // } + return + } - So(maxRunning, ShouldEqual, CONCURRENT-2) + Convey("When we run them all", func(ctx C) { + for _, job := range jobs { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + } + + peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2) + + So(peakConcurrent, ShouldEqual, CONCURRENT-2) So(counterFailed, ShouldEqual, 0) for _, job := range jobs { @@ -352,33 +363,42 @@ func TestConcurrentMirrorJobs(t *testing.T) { // Cancel the one waiting for semaphore jobs[len(jobs)-1].ctrlChan <- jobStop - counterEnded := 0 - counterRunning := 0 - maxRunning := 0 - counterFailed := 0 - for counterEnded < CONCURRENT-1 { - msg := <-managerChan - switch msg.status { - case PreSyncing: - counterRunning++ - case Syncing: - case Failed: - counterFailed++ - fallthrough - case Success: - counterEnded++ - counterRunning-- - default: - So(0, ShouldEqual, 1) - } - // Test if semaphore works - So(counterRunning, ShouldBeLessThanOrEqualTo, CONCURRENT-2) - if counterRunning > maxRunning { - maxRunning = counterRunning - } + peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2) + + So(peakConcurrent, ShouldEqual, CONCURRENT-2) + So(counterFailed, ShouldEqual, 0) + + for _, job := range jobs { + job.ctrlChan <- jobDisable + <-job.disabled + } + }) + Convey("If we override the concurrent limit", func(ctx C) { + for _, job := range jobs { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + time.Sleep(200 * time.Millisecond) } - So(maxRunning, ShouldEqual, CONCURRENT-2) + jobs[len(jobs)-1].ctrlChan <- jobForceStart + jobs[len(jobs)-2].ctrlChan <- jobForceStart + + peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT) + + So(peakConcurrent, ShouldEqual, CONCURRENT) + So(counterFailed, ShouldEqual, 0) + + time.Sleep(1 * time.Second) + + // fmt.Println("Restart them") + + for _, job := range jobs { + job.ctrlChan <- jobStart + } + + peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2) + + So(peakConcurrent, ShouldEqual, CONCURRENT-2) So(counterFailed, ShouldEqual, 0) for _, job := range jobs { diff --git a/worker/provider_test.go b/worker/provider_test.go index 9705cbb..56ddc8d 100644 --- a/worker/provider_test.go +++ b/worker/provider_test.go @@ -268,9 +268,9 @@ sleep 5 So(err, ShouldBeNil) c := cmdConfig{ - name: "run-pwd", + name: "run-ls", upstreamURL: "http://mirrors.tuna.moe/", - command: "pwd", + command: "ls", workingDir: tmpDir, logDir: tmpDir, logFile: "/dev/null", diff --git a/worker/worker.go b/worker/worker.go index 877b526..99f8c2d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -219,7 +219,11 @@ func (w *Worker) makeHTTPServer() { } switch cmd.Cmd { case CmdStart: - job.ctrlChan <- jobStart + if cmd.Options["force"] { + job.ctrlChan <- jobForceStart + } else { + job.ctrlChan <- jobStart + } case CmdRestart: job.ctrlChan <- jobRestart case CmdStop: