From 38b0156faec7b161c55dbe237f0d545b0e76077b Mon Sep 17 00:00:00 2001 From: z4yx Date: Sat, 9 May 2020 18:42:54 +0800 Subject: [PATCH] [bug fix] provider is not terminated if premature stop command received --- worker/base_provider.go | 3 ++- worker/cgroup_test.go | 2 +- worker/cmd_provider.go | 4 ++- worker/docker_test.go | 2 +- worker/job.go | 12 ++++++++- worker/provider.go | 6 ++--- worker/provider_test.go | 40 +++++++++++++++++------------- worker/rsync_provider.go | 4 ++- worker/two_stage_rsync_provider.go | 3 ++- 9 files changed, 49 insertions(+), 27 deletions(-) diff --git a/worker/base_provider.go b/worker/base_provider.go index 8fdb934..1656029 100644 --- a/worker/base_provider.go +++ b/worker/base_provider.go @@ -142,7 +142,7 @@ func (p *baseProvider) closeLogFile() (err error) { return } -func (p *baseProvider) Run() error { +func (p *baseProvider) Run(started chan empty) error { panic("Not Implemented") } @@ -169,6 +169,7 @@ func (p *baseProvider) Terminate() error { defer p.Unlock() logger.Debugf("terminating provider: %s", p.Name()) if !p.IsRunning() { + logger.Warningf("Terminate() called while IsRunning is false: %s", p.Name()) return nil } diff --git a/worker/cgroup_test.go b/worker/cgroup_test.go index 4b0366a..835ddd6 100644 --- a/worker/cgroup_test.go +++ b/worker/cgroup_test.go @@ -83,7 +83,7 @@ sleep 30 So(err, ShouldBeNil) go func() { - err = provider.Run() + err := provider.Run(make(chan empty, 1)) ctx.So(err, ShouldNotBeNil) }() diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index da02334..6fe4aa4 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -86,12 +86,13 @@ func (p *cmdProvider) DataSize() string { return p.dataSize } -func (p *cmdProvider) Run() error { +func (p *cmdProvider) Run(started chan empty) error { p.dataSize = "" defer p.closeLogFile() if err := p.Start(); err != nil { return err } + started <- empty{} if err := p.Wait(); err != nil { return err } @@ -139,5 +140,6 @@ func (p *cmdProvider) Start() error { return err } p.isRunning.Store(true) + logger.Debugf("set isRunning to true: %s", p.Name()) return nil } diff --git a/worker/docker_test.go b/worker/docker_test.go index e556967..01d6a45 100644 --- a/worker/docker_test.go +++ b/worker/docker_test.go @@ -87,7 +87,7 @@ sleep 20 cmdRun("docker", []string{"images"}) exitedErr := make(chan error, 1) go func() { - err = provider.Run() + err = provider.Run(make(chan empty, 1)) logger.Debugf("provider.Run() exited") if err != nil { logger.Errorf("provider.Run() failed: %v", err) diff --git a/worker/job.go b/worker/job.go index 248f39c..c77528c 100644 --- a/worker/job.go +++ b/worker/job.go @@ -155,11 +155,21 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err var syncErr error syncDone := make(chan error, 1) + started := make(chan empty, 10) // we may receive "started" more than one time (e.g. two_stage_rsync) go func() { - err := provider.Run() + err := provider.Run(started) syncDone <- err }() + select { // Wait until provider started or error happened + case err := <-syncDone: + logger.Errorf("failed to start provider %s: %s", m.Name(), err.Error()) + syncDone <- err // it will be read again later + case <-started: + logger.Debug("provider started") + } + // Now terminating the provider is feasible + select { case syncErr = <-syncDone: logger.Debug("syncing done") diff --git a/worker/provider.go b/worker/provider.go index af05e02..33cb9c5 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -24,9 +24,9 @@ type mirrorProvider interface { Type() providerEnum - // run mirror job in background - Run() error - // run mirror job in background + // Start then Wait + Run(started chan empty) error + // Start the job Start() error // Wait job to finish Wait() error diff --git a/worker/provider_test.go b/worker/provider_test.go index 49d2904..aec506f 100644 --- a/worker/provider_test.go +++ b/worker/provider_test.go @@ -96,7 +96,7 @@ exit 0 ), ) - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldBeNil) loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil) @@ -127,7 +127,7 @@ exit 0 provider, err := newRsyncProvider(c) So(err, ShouldBeNil) - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldNotBeNil) loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil) @@ -195,7 +195,7 @@ exit 0 ), ) - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldBeNil) loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil) @@ -257,7 +257,7 @@ exit 0 provider.WorkingDir(), ) - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldBeNil) loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil) @@ -321,7 +321,7 @@ echo $AOSP_REPO_BIN So(err, ShouldBeNil) So(readedScriptContent, ShouldResemble, []byte(scriptContent)) - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldBeNil) loggedContent, err := ioutil.ReadFile(provider.LogFile()) @@ -337,7 +337,7 @@ echo $AOSP_REPO_BIN So(err, ShouldBeNil) So(readedScriptContent, ShouldResemble, []byte(scriptContent)) - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldNotBeNil) }) @@ -349,11 +349,14 @@ sleep 10 err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) So(err, ShouldBeNil) + started := make(chan empty, 1) go func() { - err = provider.Run() + err := provider.Run(started) ctx.So(err, ShouldNotBeNil) }() + <-started + So(provider.IsRunning(), ShouldBeTrue) time.Sleep(1 * time.Second) err = provider.Terminate() So(err, ShouldBeNil) @@ -389,7 +392,7 @@ sleep 10 Convey("Run the command", func() { - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldBeNil) }) @@ -417,7 +420,7 @@ sleep 10 provider, err := newCmdProvider(c) So(err, ShouldBeNil) - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldNotBeNil) So(provider.DataSize(), ShouldBeEmpty) }) @@ -427,7 +430,7 @@ sleep 10 provider, err := newCmdProvider(c) So(err, ShouldBeNil) - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldBeNil) }) @@ -437,7 +440,7 @@ sleep 10 provider, err := newCmdProvider(c) So(err, ShouldBeNil) - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldNotBeNil) }) @@ -446,7 +449,7 @@ sleep 10 provider, err := newCmdProvider(c) So(err, ShouldBeNil) - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldBeNil) So(provider.DataSize(), ShouldNotBeEmpty) _, err = strconv.ParseFloat(provider.DataSize(), 32) @@ -458,7 +461,7 @@ sleep 10 provider, err := newCmdProvider(c) So(err, ShouldBeNil) - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldBeNil) So(provider.DataSize(), ShouldBeEmpty) }) @@ -469,7 +472,7 @@ sleep 10 provider, err := newCmdProvider(c) So(err, ShouldBeNil) - err = provider.Run() + err = provider.Run(make(chan empty, 1)) So(err, ShouldNotBeNil) So(provider.DataSize(), ShouldBeEmpty) }) @@ -520,7 +523,7 @@ exit 0 err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) So(err, ShouldBeNil) - err = provider.Run() + err = provider.Run(make(chan empty, 2)) So(err, ShouldBeNil) targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir()) @@ -562,11 +565,14 @@ exit 0 err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) So(err, ShouldBeNil) + started := make(chan empty, 2) go func() { - err = provider.Run() + err := provider.Run(started) ctx.So(err, ShouldNotBeNil) }() + <-started + So(provider.IsRunning(), ShouldBeTrue) time.Sleep(1 * time.Second) err = provider.Terminate() So(err, ShouldBeNil) @@ -606,7 +612,7 @@ exit 0 provider, err := newTwoStageRsyncProvider(c) So(err, ShouldBeNil) - err = provider.Run() + err = provider.Run(make(chan empty, 2)) So(err, ShouldNotBeNil) loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil) diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index 5ce1abb..6c37170 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -103,12 +103,13 @@ func (p *rsyncProvider) DataSize() string { return p.dataSize } -func (p *rsyncProvider) Run() error { +func (p *rsyncProvider) Run(started chan empty) error { p.dataSize = "" defer p.closeLogFile() if err := p.Start(); err != nil { return err } + started <- empty{} if err := p.Wait(); err != nil { code, msg := internal.TranslateRsyncErrorCode(err) if code != 0 { @@ -144,5 +145,6 @@ func (p *rsyncProvider) Start() error { return err } p.isRunning.Store(true) + logger.Debugf("set isRunning to true: %s", p.Name()) return nil } diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go index 572d275..0e2a799 100644 --- a/worker/two_stage_rsync_provider.go +++ b/worker/two_stage_rsync_provider.go @@ -133,7 +133,7 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) { return options, nil } -func (p *twoStageRsyncProvider) Run() error { +func (p *twoStageRsyncProvider) Run(started chan empty) error { p.Lock() defer p.Unlock() @@ -163,6 +163,7 @@ func (p *twoStageRsyncProvider) Run() error { } p.isRunning.Store(true) logger.Debugf("set isRunning to true: %s", p.Name()) + started <- empty{} p.Unlock() err = p.Wait()