From 79e6167028d85ee462a9953c4b26a0a712fa0549 Mon Sep 17 00:00:00 2001 From: Yuxiang Zhang Date: Wed, 30 May 2018 01:46:16 +0800 Subject: [PATCH 1/6] fix race condition on logFile of baseProvider --- worker/base_provider.go | 28 +++++----------------------- worker/runner.go | 3 +++ 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/worker/base_provider.go b/worker/base_provider.go index 0befa61..0505239 100644 --- a/worker/base_provider.go +++ b/worker/base_provider.go @@ -20,8 +20,6 @@ type baseProvider struct { cmd *cmdJob isRunning atomic.Value - logFile *os.File - cgroup *cgroupHook zfs *zfsHook docker *dockerHook @@ -116,15 +114,12 @@ func (p *baseProvider) prepareLogFile() error { p.cmd.SetLogFile(nil) return nil } - if p.logFile == nil { - logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error()) - return err - } - p.logFile = logFile + logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error()) + return err } - p.cmd.SetLogFile(p.logFile) + p.cmd.SetLogFile(logFile) return nil } @@ -143,13 +138,7 @@ func (p *baseProvider) IsRunning() bool { func (p *baseProvider) Wait() error { defer func() { - p.Lock() p.isRunning.Store(false) - if p.logFile != nil { - p.logFile.Close() - p.logFile = nil - } - p.Unlock() }() return p.cmd.Wait() } @@ -160,13 +149,6 @@ func (p *baseProvider) Terminate() error { return nil } - p.Lock() - if p.logFile != nil { - p.logFile.Close() - p.logFile = nil - } - p.Unlock() - err := p.cmd.Terminate() p.isRunning.Store(false) diff --git a/worker/runner.go b/worker/runner.go index 05a6f6b..e47417e 100644 --- a/worker/runner.go +++ b/worker/runner.go @@ -118,6 +118,9 @@ func (c *cmdJob) Wait() error { return c.retErr default: err := c.cmd.Wait() + if c.cmd.Stdout != nil { + c.cmd.Stdout.(*os.File).Close() + } c.retErr = err close(c.finished) return err From c5bb172f993150b4162e438a875d12f2ec604552 Mon Sep 17 00:00:00 2001 From: Yuxiang Zhang Date: Wed, 30 May 2018 11:45:05 +0800 Subject: [PATCH 2/6] increase test coverage rate of job.go --- worker/job_test.go | 93 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/worker/job_test.go b/worker/job_test.go index 7ffed72..b63a646 100644 --- a/worker/job_test.go +++ b/worker/job_test.go @@ -135,6 +135,8 @@ echo $TUNASYNC_WORKING_DIR msg = <-managerChan So(msg.status, ShouldEqual, Syncing) + job.ctrlChan <- jobStart // should be ignored + job.ctrlChan <- jobStop msg = <-managerChan @@ -170,6 +172,97 @@ echo $TUNASYNC_WORKING_DIR job.ctrlChan <- jobDisable <-job.disabled }) + + Convey("If we restart it", func(ctx C) { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + + msg := <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + + job.ctrlChan <- jobRestart + + msg = <-managerChan + So(msg.status, ShouldEqual, Failed) + So(msg.msg, ShouldEqual, "killed by manager") + + msg = <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Success) + + expectedOutput := fmt.Sprintf( + "%s\n%s\n", + provider.WorkingDir(), provider.WorkingDir(), + ) + + loggedContent, err := ioutil.ReadFile(provider.LogFile()) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, expectedOutput) + job.ctrlChan <- jobDisable + <-job.disabled + }) + + Convey("If we disable it", func(ctx C) { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + + msg := <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + + job.ctrlChan <- jobDisable + + msg = <-managerChan + So(msg.status, ShouldEqual, Failed) + So(msg.msg, ShouldEqual, "killed by manager") + + <-job.disabled + }) + + Convey("If we stop it twice, than start it", func(ctx C) { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + + msg := <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + + job.ctrlChan <- jobStop + + msg = <-managerChan + So(msg.status, ShouldEqual, Failed) + So(msg.msg, ShouldEqual, "killed by manager") + + job.ctrlChan <- jobStop // should be ignored + + job.ctrlChan <- jobStart + + msg = <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Success) + + expectedOutput := fmt.Sprintf( + "%s\n%s\n", + provider.WorkingDir(), provider.WorkingDir(), + ) + + loggedContent, err := ioutil.ReadFile(provider.LogFile()) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, expectedOutput) + + job.ctrlChan <- jobDisable + <-job.disabled + }) }) }) From 0fdb07d06199f53b4438b6e0a041569519c2fa37 Mon Sep 17 00:00:00 2001 From: Yuxiang Zhang Date: Wed, 30 May 2018 12:28:09 +0800 Subject: [PATCH 3/6] bug fix: log over-written in twoStageRsyncProvider solve more DATA RACE problem --- worker/base_provider.go | 13 ++++++++++--- worker/cmd_provider.go | 10 +++++++++- worker/job.go | 6 +++--- worker/provider_test.go | 11 +++++++---- worker/rsync_provider.go | 8 +++++++- worker/two_stage_rsync_provider.go | 15 +++++++++++---- 6 files changed, 47 insertions(+), 16 deletions(-) diff --git a/worker/base_provider.go b/worker/base_provider.go index 0505239..0f8ec80 100644 --- a/worker/base_provider.go +++ b/worker/base_provider.go @@ -109,12 +109,16 @@ func (p *baseProvider) Docker() *dockerHook { return p.docker } -func (p *baseProvider) prepareLogFile() error { +func (p *baseProvider) prepareLogFile(append bool) error { if p.LogFile() == "/dev/null" { p.cmd.SetLogFile(nil) return nil } - logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644) + appendMode := 0 + if append { + appendMode = os.O_APPEND + } + logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE|appendMode, 0644) if err != nil { logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error()) return err @@ -138,19 +142,22 @@ func (p *baseProvider) IsRunning() bool { func (p *baseProvider) Wait() error { defer func() { + logger.Debugf("set isRunning to false: %s", p.Name()) p.isRunning.Store(false) }() + logger.Debugf("calling Wait: %s", p.Name()) return p.cmd.Wait() } func (p *baseProvider) Terminate() error { + p.Lock() + defer p.Unlock() logger.Debugf("terminating provider: %s", p.Name()) if !p.IsRunning() { return nil } err := p.cmd.Terminate() - p.isRunning.Store(false) return err } diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index 2274cec..e1cac2e 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -1,6 +1,7 @@ package worker import ( + "errors" "time" "github.com/anmitsu/go-shlex" @@ -60,6 +61,13 @@ func (p *cmdProvider) Run() error { } func (p *cmdProvider) Start() error { + p.Lock() + defer p.Unlock() + + if p.IsRunning() { + return errors.New("provider is currently running") + } + env := map[string]string{ "TUNASYNC_MIRROR_NAME": p.Name(), "TUNASYNC_WORKING_DIR": p.WorkingDir(), @@ -71,7 +79,7 @@ func (p *cmdProvider) Start() error { env[k] = v } p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env) - if err := p.prepareLogFile(); err != nil { + if err := p.prepareLogFile(false); err != nil { return err } diff --git a/worker/job.go b/worker/job.go index a5feca1..e07af45 100644 --- a/worker/job.go +++ b/worker/job.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" tunasync "github.com/tuna/tunasync/internal" ) @@ -154,9 +155,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err syncDone := make(chan error, 1) go func() { err := provider.Run() - if !stopASAP { - syncDone <- err - } + syncDone <- err }() select { @@ -248,6 +247,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err m.SetState(stateReady) close(kill) <-jobDone + time.Sleep(time.Second) // Restart may fail if the process was not exited yet continue case jobStart: m.SetState(stateReady) diff --git a/worker/provider_test.go b/worker/provider_test.go index ab29621..3098676 100644 --- a/worker/provider_test.go +++ b/worker/provider_test.go @@ -79,11 +79,12 @@ exit 0 err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) So(err, ShouldBeNil) + targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir()) expectedOutput := fmt.Sprintf( "syncing to %s\n"+ "%s\n"+ "Done\n", - provider.WorkingDir(), + targetDir, fmt.Sprintf( "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "--delete --delete-after --delay-updates --safe-links "+ @@ -144,11 +145,12 @@ exit 0 err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) So(err, ShouldBeNil) + targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir()) expectedOutput := fmt.Sprintf( "syncing to %s\n"+ "%s\n"+ "Done\n", - provider.WorkingDir(), + targetDir, fmt.Sprintf( "%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "--delete --delete-after --delay-updates --safe-links "+ @@ -306,6 +308,7 @@ exit 0 err = provider.Run() So(err, ShouldBeNil) + targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir()) expectedOutput := fmt.Sprintf( "syncing to %s\n"+ "%s\n"+ @@ -313,14 +316,14 @@ exit 0 "syncing to %s\n"+ "%s\n"+ "Done\n", - provider.WorkingDir(), + targetDir, fmt.Sprintf( "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "--timeout=120 --contimeout=120 --exclude dists/ -6 "+ "--exclude-from %s %s %s", provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), ), - provider.WorkingDir(), + targetDir, fmt.Sprintf( "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "--delete --delete-after --delay-updates --safe-links "+ diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index d34a396..b9d8fd5 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -81,6 +81,12 @@ func (p *rsyncProvider) Run() error { } func (p *rsyncProvider) Start() error { + p.Lock() + defer p.Unlock() + + if p.IsRunning() { + return errors.New("provider is currently running") + } env := map[string]string{} if p.username != "" { @@ -94,7 +100,7 @@ func (p *rsyncProvider) Start() error { command = append(command, p.upstreamURL, p.WorkingDir()) p.cmd = newCmdJob(p, command, p.WorkingDir(), env) - if err := p.prepareLogFile(); err != nil { + if err := p.prepareLogFile(false); err != nil { return err } diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go index 48e5125..25bb17f 100644 --- a/worker/two_stage_rsync_provider.go +++ b/worker/two_stage_rsync_provider.go @@ -108,7 +108,12 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) { } func (p *twoStageRsyncProvider) Run() error { - defer p.Wait() + p.Lock() + defer p.Unlock() + + if p.IsRunning() { + return errors.New("provider is currently running") + } env := map[string]string{} if p.username != "" { @@ -129,7 +134,7 @@ func (p *twoStageRsyncProvider) Run() error { command = append(command, p.upstreamURL, p.WorkingDir()) p.cmd = newCmdJob(p, command, p.WorkingDir(), env) - if err := p.prepareLogFile(); err != nil { + if err := p.prepareLogFile(stage > 1); err != nil { return err } @@ -137,9 +142,11 @@ func (p *twoStageRsyncProvider) Run() error { return err } p.isRunning.Store(true) + logger.Debugf("set isRunning to true: %s", p.Name()) - err = p.cmd.Wait() - p.isRunning.Store(false) + p.Unlock() + err = p.Wait() + p.Lock() if err != nil { return err } From 89a792986d09f2450150d2373d025d5e97ca77c0 Mon Sep 17 00:00:00 2001 From: Yuxiang Zhang Date: Wed, 30 May 2018 14:00:10 +0800 Subject: [PATCH 4/6] increase test coverage rate of job & provider --- worker/job_test.go | 120 ++++++++++++++++++++++++++++++++++++++++ worker/provider_test.go | 36 ++++++++++++ 2 files changed, 156 insertions(+) diff --git a/worker/job_test.go b/worker/job_test.go index b63a646..92476d5 100644 --- a/worker/job_test.go +++ b/worker/job_test.go @@ -268,3 +268,123 @@ echo $TUNASYNC_WORKING_DIR }) } + +func TestConcurrentMirrorJobs(t *testing.T) { + + InitLogger(true, true, false) + + Convey("Concurrent MirrorJobs should work", t, func(ctx C) { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + + const CONCURRENT = 5 + + var providers [CONCURRENT]*cmdProvider + var jobs [CONCURRENT]*mirrorJob + for i := 0; i < CONCURRENT; i++ { + c := cmdConfig{ + name: fmt.Sprintf("job-%d", i), + upstreamURL: "http://mirrors.tuna.moe/", + command: "sleep 3", + workingDir: tmpDir, + logDir: tmpDir, + logFile: "/dev/null", + interval: 10 * time.Second, + } + + var err error + providers[i], err = newCmdProvider(c) + So(err, ShouldBeNil) + jobs[i] = newMirrorJob(providers[i]) + } + + managerChan := make(chan jobMessage, 10) + semaphore := make(chan empty, CONCURRENT-2) + + Convey("When we run them all", func(ctx C) { + for _, job := range jobs { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + } + + counterEnded := 0 + counterRunning := 0 + maxRunning := 0 + counterFailed := 0 + for counterEnded < CONCURRENT { + msg := <-managerChan + switch msg.status { + case PreSyncing: + counterRunning++ + case Syncing: + case Failed: + counterFailed++ + fallthrough + case Success: + counterEnded++ + counterRunning-- + default: + So(0, ShouldEqual, 1) + } + // Test if semaphore works + So(counterRunning, ShouldBeLessThanOrEqualTo, CONCURRENT-2) + if counterRunning > maxRunning { + maxRunning = counterRunning + } + } + + So(maxRunning, ShouldEqual, CONCURRENT-2) + So(counterFailed, ShouldEqual, 0) + + for _, job := range jobs { + job.ctrlChan <- jobDisable + <-job.disabled + } + }) + Convey("If we cancel one job", func(ctx C) { + for _, job := range jobs { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobRestart + time.Sleep(200 * time.Millisecond) + } + + // Cancel the one waiting for semaphore + jobs[len(jobs)-1].ctrlChan <- jobStop + + counterEnded := 0 + counterRunning := 0 + maxRunning := 0 + counterFailed := 0 + for counterEnded < CONCURRENT-1 { + msg := <-managerChan + switch msg.status { + case PreSyncing: + counterRunning++ + case Syncing: + case Failed: + counterFailed++ + fallthrough + case Success: + counterEnded++ + counterRunning-- + default: + So(0, ShouldEqual, 1) + } + // Test if semaphore works + So(counterRunning, ShouldBeLessThanOrEqualTo, CONCURRENT-2) + if counterRunning > maxRunning { + maxRunning = counterRunning + } + } + + So(maxRunning, ShouldEqual, CONCURRENT-2) + So(counterFailed, ShouldEqual, 0) + + for _, job := range jobs { + job.ctrlChan <- jobDisable + <-job.disabled + } + }) + }) +} diff --git a/worker/provider_test.go b/worker/provider_test.go index 3098676..9705cbb 100644 --- a/worker/provider_test.go +++ b/worker/provider_test.go @@ -262,6 +262,40 @@ sleep 5 }) }) + Convey("Command Provider without log file should work", t, func(ctx C) { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + + c := cmdConfig{ + name: "run-pwd", + upstreamURL: "http://mirrors.tuna.moe/", + command: "pwd", + workingDir: tmpDir, + logDir: tmpDir, + logFile: "/dev/null", + interval: 600 * time.Second, + } + + provider, err := newCmdProvider(c) + So(err, ShouldBeNil) + + So(provider.IsMaster(), ShouldEqual, false) + So(provider.ZFS(), ShouldBeNil) + So(provider.Type(), ShouldEqual, provCommand) + So(provider.Name(), ShouldEqual, c.name) + So(provider.WorkingDir(), ShouldEqual, c.workingDir) + So(provider.LogDir(), ShouldEqual, c.logDir) + So(provider.LogFile(), ShouldEqual, c.logFile) + So(provider.Interval(), ShouldEqual, c.interval) + + Convey("Run the command", func() { + + err = provider.Run() + So(err, ShouldBeNil) + + }) + }) } func TestTwoStageRsyncProvider(t *testing.T) { @@ -282,6 +316,8 @@ func TestTwoStageRsyncProvider(t *testing.T) { logFile: tmpFile, useIPv6: true, excludeFile: tmpFile, + username: "hello", + password: "world", } provider, err := newTwoStageRsyncProvider(c) From 6cbe91b4f1c29611ed2a257e6f7ef72fa8c1cfa7 Mon Sep 17 00:00:00 2001 From: Yuxiang Zhang Date: Wed, 30 May 2018 16:07:07 +0800 Subject: [PATCH 5/6] new command: jobForceStart --- worker/job.go | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/worker/job.go b/worker/job.go index e07af45..a4c23dc 100644 --- a/worker/job.go +++ b/worker/job.go @@ -15,12 +15,13 @@ import ( type ctrlAction uint8 const ( - jobStart ctrlAction = iota - jobStop // stop syncing keep the job - jobDisable // disable the job (stops goroutine) - jobRestart // restart syncing - jobPing // ensure the goroutine is alive - jobHalt // worker halts + jobStart ctrlAction = iota + jobStop // stop syncing keep the job + jobDisable // disable the job (stops goroutine) + jobRestart // restart syncing + jobPing // ensure the goroutine is alive + jobHalt // worker halts + jobForceStart // ignore concurrent limit ) type jobMessage struct { @@ -211,22 +212,25 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err return nil } - runJob := func(kill <-chan empty, jobDone chan<- empty) { + runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) { select { case semaphore <- empty{}: defer func() { <-semaphore }() runJobWrapper(kill, jobDone) + case <-bypassSemaphore: + runJobWrapper(kill, jobDone) case <-kill: jobDone <- empty{} return } } + bypassSemaphore := make(chan empty, 1) for { if m.State() == stateReady { kill := make(chan empty) jobDone := make(chan empty) - go runJob(kill, jobDone) + go runJob(kill, jobDone, bypassSemaphore) _wait_for_job: select { @@ -249,6 +253,12 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err <-jobDone time.Sleep(time.Second) // Restart may fail if the process was not exited yet continue + case jobForceStart: + select { //non-blocking + default: + case bypassSemaphore <- empty{}: + } + fallthrough case jobStart: m.SetState(stateReady) goto _wait_for_job @@ -272,8 +282,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err case jobDisable: m.SetState(stateDisabled) return nil + case jobForceStart: + select { //non-blocking + default: + case bypassSemaphore <- empty{}: + } + fallthrough case jobRestart: - m.SetState(stateReady) + fallthrough case jobStart: m.SetState(stateReady) default: From c750aa1871a1b36ad8f4e6da183c4bbbd3d1ddb5 Mon Sep 17 00:00:00 2001 From: z4yx Date: Wed, 30 May 2018 18:55:06 +0800 Subject: [PATCH 6/6] new feature: run "tunasynctl start" with "-f" to override concurrent job limit --- cmd/tunasynctl/tunasynctl.go | 12 ++++- internal/msg.go | 16 +++--- manager/server.go | 1 + worker/job.go | 1 + worker/job_test.go | 98 ++++++++++++++++++++++-------------- worker/provider_test.go | 4 +- worker/worker.go | 6 ++- 7 files changed, 88 insertions(+), 50 deletions(-) diff --git a/cmd/tunasynctl/tunasynctl.go b/cmd/tunasynctl/tunasynctl.go index f335971..3e2d6fe 100644 --- a/cmd/tunasynctl/tunasynctl.go +++ b/cmd/tunasynctl/tunasynctl.go @@ -285,11 +285,16 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc { "argument WORKER", 1) } + options := map[string]bool{} + if c.Bool("force") { + options["force"] = true + } cmd := tunasync.ClientCmd{ Cmd: cmd, MirrorID: mirrorID, WorkerID: c.String("worker"), Args: argsList, + Options: options, } resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client) if err != nil { @@ -410,6 +415,11 @@ func main() { }, } + forceStartFlag := cli.BoolFlag{ + Name: "force, f", + Usage: "Override the concurrent limit", + } + app.Commands = []cli.Command{ { Name: "list", @@ -450,7 +460,7 @@ func main() { { Name: "start", Usage: "Start a job", - Flags: append(commonFlags, cmdFlags...), + Flags: append(append(commonFlags, cmdFlags...), forceStartFlag), Action: initializeWrapper(cmdJob(tunasync.CmdStart)), }, { diff --git a/internal/msg.go b/internal/msg.go index c0efb7a..14a9102 100644 --- a/internal/msg.go +++ b/internal/msg.go @@ -68,9 +68,10 @@ func (c CmdVerb) String() string { // A WorkerCmd is the command message send from the // manager to a worker type WorkerCmd struct { - Cmd CmdVerb `json:"cmd"` - MirrorID string `json:"mirror_id"` - Args []string `json:"args"` + Cmd CmdVerb `json:"cmd"` + MirrorID string `json:"mirror_id"` + Args []string `json:"args"` + Options map[string]bool `json:"options"` } func (c WorkerCmd) String() string { @@ -83,8 +84,9 @@ func (c WorkerCmd) String() string { // A ClientCmd is the command message send from client // to the manager type ClientCmd struct { - Cmd CmdVerb `json:"cmd"` - MirrorID string `json:"mirror_id"` - WorkerID string `json:"worker_id"` - Args []string `json:"args"` + Cmd CmdVerb `json:"cmd"` + MirrorID string `json:"mirror_id"` + WorkerID string `json:"worker_id"` + Args []string `json:"args"` + Options map[string]bool `json:"options"` } diff --git a/manager/server.go b/manager/server.go index 2563582..2605446 100644 --- a/manager/server.go +++ b/manager/server.go @@ -337,6 +337,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) { Cmd: clientCmd.Cmd, MirrorID: clientCmd.MirrorID, Args: clientCmd.Args, + Options: clientCmd.Options, } // update job status, even if the job did not disable successfully, diff --git a/worker/job.go b/worker/job.go index a4c23dc..7ba0df4 100644 --- a/worker/job.go +++ b/worker/job.go @@ -218,6 +218,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err defer func() { <-semaphore }() runJobWrapper(kill, jobDone) case <-bypassSemaphore: + logger.Noticef("Concurrent limit ignored by %s", m.Name()) runJobWrapper(kill, jobDone) case <-kill: jobDone <- empty{} diff --git a/worker/job_test.go b/worker/job_test.go index 92476d5..370c9be 100644 --- a/worker/job_test.go +++ b/worker/job_test.go @@ -286,7 +286,7 @@ func TestConcurrentMirrorJobs(t *testing.T) { c := cmdConfig{ name: fmt.Sprintf("job-%d", i), upstreamURL: "http://mirrors.tuna.moe/", - command: "sleep 3", + command: "sleep 2", workingDir: tmpDir, logDir: tmpDir, logFile: "/dev/null", @@ -302,17 +302,12 @@ func TestConcurrentMirrorJobs(t *testing.T) { managerChan := make(chan jobMessage, 10) semaphore := make(chan empty, CONCURRENT-2) - Convey("When we run them all", func(ctx C) { - for _, job := range jobs { - go job.Run(managerChan, semaphore) - job.ctrlChan <- jobStart - } - + countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) { counterEnded := 0 counterRunning := 0 - maxRunning := 0 - counterFailed := 0 - for counterEnded < CONCURRENT { + peakConcurrent = 0 + counterFailed = 0 + for counterEnded < totalJobs { msg := <-managerChan switch msg.status { case PreSyncing: @@ -328,13 +323,29 @@ func TestConcurrentMirrorJobs(t *testing.T) { So(0, ShouldEqual, 1) } // Test if semaphore works - So(counterRunning, ShouldBeLessThanOrEqualTo, CONCURRENT-2) - if counterRunning > maxRunning { - maxRunning = counterRunning + So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck) + if counterRunning > peakConcurrent { + peakConcurrent = counterRunning } } + // select { + // case msg := <-managerChan: + // logger.Errorf("extra message received: %v", msg) + // So(0, ShouldEqual, 1) + // case <-time.After(2 * time.Second): + // } + return + } - So(maxRunning, ShouldEqual, CONCURRENT-2) + Convey("When we run them all", func(ctx C) { + for _, job := range jobs { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + } + + peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2) + + So(peakConcurrent, ShouldEqual, CONCURRENT-2) So(counterFailed, ShouldEqual, 0) for _, job := range jobs { @@ -352,33 +363,42 @@ func TestConcurrentMirrorJobs(t *testing.T) { // Cancel the one waiting for semaphore jobs[len(jobs)-1].ctrlChan <- jobStop - counterEnded := 0 - counterRunning := 0 - maxRunning := 0 - counterFailed := 0 - for counterEnded < CONCURRENT-1 { - msg := <-managerChan - switch msg.status { - case PreSyncing: - counterRunning++ - case Syncing: - case Failed: - counterFailed++ - fallthrough - case Success: - counterEnded++ - counterRunning-- - default: - So(0, ShouldEqual, 1) - } - // Test if semaphore works - So(counterRunning, ShouldBeLessThanOrEqualTo, CONCURRENT-2) - if counterRunning > maxRunning { - maxRunning = counterRunning - } + peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2) + + So(peakConcurrent, ShouldEqual, CONCURRENT-2) + So(counterFailed, ShouldEqual, 0) + + for _, job := range jobs { + job.ctrlChan <- jobDisable + <-job.disabled + } + }) + Convey("If we override the concurrent limit", func(ctx C) { + for _, job := range jobs { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + time.Sleep(200 * time.Millisecond) } - So(maxRunning, ShouldEqual, CONCURRENT-2) + jobs[len(jobs)-1].ctrlChan <- jobForceStart + jobs[len(jobs)-2].ctrlChan <- jobForceStart + + peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT) + + So(peakConcurrent, ShouldEqual, CONCURRENT) + So(counterFailed, ShouldEqual, 0) + + time.Sleep(1 * time.Second) + + // fmt.Println("Restart them") + + for _, job := range jobs { + job.ctrlChan <- jobStart + } + + peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2) + + So(peakConcurrent, ShouldEqual, CONCURRENT-2) So(counterFailed, ShouldEqual, 0) for _, job := range jobs { diff --git a/worker/provider_test.go b/worker/provider_test.go index 9705cbb..56ddc8d 100644 --- a/worker/provider_test.go +++ b/worker/provider_test.go @@ -268,9 +268,9 @@ sleep 5 So(err, ShouldBeNil) c := cmdConfig{ - name: "run-pwd", + name: "run-ls", upstreamURL: "http://mirrors.tuna.moe/", - command: "pwd", + command: "ls", workingDir: tmpDir, logDir: tmpDir, logFile: "/dev/null", diff --git a/worker/worker.go b/worker/worker.go index 877b526..99f8c2d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -219,7 +219,11 @@ func (w *Worker) makeHTTPServer() { } switch cmd.Cmd { case CmdStart: - job.ctrlChan <- jobStart + if cmd.Options["force"] { + job.ctrlChan <- jobForceStart + } else { + job.ctrlChan <- jobStart + } case CmdRestart: job.ctrlChan <- jobRestart case CmdStop: