From 9339fba074b4a8966bc49de082d80082bb402b11 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Sun, 24 Apr 2016 23:21:03 +0800 Subject: [PATCH] refactor(worker): use Run instead of Start and Wait --- worker/cmd_provider.go | 7 +++++++ worker/job.go | 11 ++-------- worker/provider.go | 45 ++++++++++++++++++++++++++++++---------- worker/provider_test.go | 15 ++++---------- worker/rsync_provider.go | 7 +++++++ worker/runner.go | 9 ++++---- 6 files changed, 58 insertions(+), 36 deletions(-) diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index 61280c6..1e5a399 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -44,6 +44,13 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) { return provider, nil } +func (p *cmdProvider) Run() error { + if err := p.Start(); err != nil { + return err + } + return p.Wait() +} + func (p *cmdProvider) Start() error { env := map[string]string{ "TUNASYNC_MIRROR_NAME": p.Name(), diff --git a/worker/job.go b/worker/job.go index 50d4c3d..186764b 100644 --- a/worker/job.go +++ b/worker/job.go @@ -103,18 +103,11 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err // start syncing managerChan <- jobMessage{tunasync.Syncing, m.Name(), ""} - err = provider.Start() - if err != nil { - logger.Error( - "failed to start syncing job for %s: %s", - m.Name(), err.Error(), - ) - return err - } + var syncErr error syncDone := make(chan error, 1) go func() { - err := provider.Wait() + err := provider.Run() if !stopASAP { syncDone <- err } diff --git a/worker/provider.go b/worker/provider.go index 700df5d..a27dbfe 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -3,6 +3,7 @@ package worker import ( "errors" "os" + "sync" "time" ) @@ -21,7 +22,8 @@ type mirrorProvider interface { // name Name() string - // TODO: implement Run, Terminate and Hooks + // run mirror job in background + Run() error // run mirror job in background Start() error // Wait job to finish @@ -46,6 +48,8 @@ type mirrorProvider interface { } type baseProvider struct { + sync.Mutex + ctx *Context name string interval time.Duration @@ -118,21 +122,35 @@ func (p *baseProvider) setLogFile() error { p.cmd.SetLogFile(nil) return nil } - - logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - logger.Error("Error opening logfile %s: %s", p.LogFile(), err.Error()) - return err + if p.logFile == nil { + logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + logger.Error("Error opening logfile %s: %s", p.LogFile(), err.Error()) + return err + } + p.logFile = logFile } - p.logFile = logFile - p.cmd.SetLogFile(logFile) + p.cmd.SetLogFile(p.logFile) return nil } +func (p *baseProvider) Run() error { + panic("Not Implemented") +} + +func (p *baseProvider) Start() error { + panic("Not Implemented") +} + func (p *baseProvider) Wait() error { - if p.logFile != nil { - defer p.logFile.Close() - } + defer func() { + p.Lock() + if p.logFile != nil { + p.logFile.Close() + p.logFile = nil + } + p.Unlock() + }() return p.cmd.Wait() } @@ -141,9 +159,14 @@ func (p *baseProvider) Terminate() error { if p.cmd == nil { return errors.New("provider command job not initialized") } + + p.Lock() if p.logFile != nil { p.logFile.Close() + p.logFile = nil } + p.Unlock() + err := p.cmd.Terminate() return err } diff --git a/worker/provider_test.go b/worker/provider_test.go index 79b5b03..13f550e 100644 --- a/worker/provider_test.go +++ b/worker/provider_test.go @@ -116,9 +116,7 @@ echo $AOSP_REPO_BIN So(err, ShouldBeNil) So(readedScriptContent, ShouldResemble, []byte(scriptContent)) - err = provider.Start() - So(err, ShouldBeNil) - err = provider.Wait() + err = provider.Run() So(err, ShouldBeNil) loggedContent, err := ioutil.ReadFile(provider.LogFile()) @@ -134,9 +132,7 @@ echo $AOSP_REPO_BIN So(err, ShouldBeNil) So(readedScriptContent, ShouldResemble, []byte(scriptContent)) - err = provider.Start() - So(err, ShouldBeNil) - err = provider.Wait() + err = provider.Run() So(err, ShouldNotBeNil) }) @@ -148,15 +144,12 @@ sleep 5 err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) So(err, ShouldBeNil) - err = provider.Start() - So(err, ShouldBeNil) - go func() { - err = provider.Wait() + err = provider.Run() ctx.So(err, ShouldNotBeNil) }() - time.Sleep(2) + time.Sleep(1 * time.Second) err = provider.Terminate() So(err, ShouldBeNil) diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index 5d25971..83da9ec 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -55,6 +55,13 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) { return provider, nil } +func (p *rsyncProvider) Run() error { + if err := p.Start(); err != nil { + return err + } + return p.Wait() +} + func (p *rsyncProvider) Start() error { env := map[string]string{} if p.password != "" { diff --git a/worker/runner.go b/worker/runner.go index ed6bc65..1e196a5 100644 --- a/worker/runner.go +++ b/worker/runner.go @@ -15,6 +15,8 @@ import ( // it's an alternative to python-sh or go-sh // TODO: cgroup excution +var errProcessNotStarted = errors.New("Process Not Started") + type cmdJob struct { cmd *exec.Cmd workingDir string @@ -62,11 +64,8 @@ func (c *cmdJob) SetLogFile(logFile *os.File) { } func (c *cmdJob) Terminate() error { - if c.cmd == nil { - return nil - } - if c.cmd.Process == nil { - return nil + if c.cmd == nil || c.cmd.Process == nil { + return errProcessNotStarted } err := unix.Kill(c.cmd.Process.Pid, syscall.SIGTERM) if err != nil {