From 6b05a5894e7b5aa4810cbc57b4edc1be2d9557a3 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Sun, 24 Apr 2016 16:48:47 +0800 Subject: [PATCH] feature(worker): runMirrorJob no longer controls the interval --- internal/logger.go | 4 ++-- worker/job.go | 50 +++++++++++++++++++++++----------------------- worker/job_test.go | 6 +++++- 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/internal/logger.go b/internal/logger.go index bf1a978..9ac0632 100644 --- a/internal/logger.go +++ b/internal/logger.go @@ -10,9 +10,9 @@ import ( func InitLogger(verbose, debug, withSystemd bool) { var fmtString string if withSystemd { - fmtString = "\r[%{level:.6s}] %{message}" + fmtString = "[%{level:.6s}] %{message}" } else { - fmtString = "\r%{color}[%{time:06-01-02 15:04:05}][%{level:.6s}]%{color:reset} %{message}" + fmtString = "%{color}[%{time:06-01-02 15:04:05}][%{level:.6s}][%{shortfile}]%{color:reset} %{message}" } format := logging.MustStringFormatter(fmtString) logging.SetFormatter(format) diff --git a/worker/job.go b/worker/job.go index 5246f6b..fe459f3 100644 --- a/worker/job.go +++ b/worker/job.go @@ -1,9 +1,6 @@ package worker -import ( - "errors" - "time" -) +import "errors" // this file contains the workflow of a mirror jb @@ -41,7 +38,8 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh } runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error { - defer func() { jobDone <- empty{} }() + defer close(jobDone) + logger.Info("start syncing: %s", provider.Name()) Hooks := provider.Hooks() @@ -89,6 +87,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh case syncErr = <-syncDone: logger.Debug("syncing done") case <-kill: + logger.Debug("received kill") stopASAP = true err := provider.Terminate() if err != nil { @@ -118,15 +117,18 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh } // syncing failed - logger.Info("failed syncing %s: %s", provider.Name(), err.Error()) + logger.Warning("failed syncing %s: %s", provider.Name(), syncErr.Error()) managerChan <- struct{}{} + // post-fail hooks + logger.Debug("post-fail hooks") err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail") if err != nil { return err } // gracefully exit if stopASAP { + logger.Debug("No retry, exit directly") return nil } // continue to next retry @@ -140,6 +142,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh defer func() { semaphore <- empty{} }() runJobWrapper(kill, jobDone) case <-kill: + jobDone <- empty{} return } } @@ -160,12 +163,15 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh case jobStop: enabled = false close(kill) + <-jobDone case jobDisable: close(kill) + <-jobDone return nil case jobRestart: enabled = true close(kill) + <-jobDone continue case jobStart: enabled = true @@ -178,25 +184,19 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh } } - select { - case <-time.After(provider.Interval()): - continue - case ctrl := <-ctrlChan: - switch ctrl { - case jobStop: - enabled = false - case jobDisable: - return nil - case jobRestart: - enabled = true - case jobStart: - enabled = true - default: - // TODO - return nil - } + ctrl := <-ctrlChan + switch ctrl { + case jobStop: + enabled = false + case jobDisable: + return nil + case jobRestart: + enabled = true + case jobStart: + enabled = true + default: + // TODO + return nil } } - - return nil } diff --git a/worker/job_test.go b/worker/job_test.go index 2388af1..0fa90e4 100644 --- a/worker/job_test.go +++ b/worker/job_test.go @@ -9,10 +9,13 @@ import ( "time" . "github.com/smartystreets/goconvey/convey" + . "github.com/tuna/tunasync/internal" ) func TestMirrorJob(t *testing.T) { + InitLogger(true, true, false) + Convey("MirrorJob should work", t, func(ctx C) { tmpDir, err := ioutil.TempDir("", "tunasync") defer os.RemoveAll(tmpDir) @@ -71,6 +74,7 @@ func TestMirrorJob(t *testing.T) { loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil) So(string(loggedContent), ShouldEqual, exceptedOutput) + ctrlChan <- jobStart } select { case <-managerChan: @@ -107,7 +111,7 @@ echo $TUNASYNC_WORKING_DIR go runMirrorJob(provider, ctrlChan, managerChan, semaphore) time.Sleep(1 * time.Second) ctrlChan <- jobStop - time.Sleep(1 * time.Second) + <-managerChan exceptedOutput := fmt.Sprintf("%s\n", provider.WorkingDir()) loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil)