From f31bcfbcc365dde5b3b537d7940b4f8ec073759d Mon Sep 17 00:00:00 2001 From: bigeagle Date: Sun, 24 Apr 2016 17:20:47 +0800 Subject: [PATCH] feature(API): error message in manager channel --- internal/msg.go | 1 + worker/job.go | 27 ++++++++++++++++++++++----- worker/job_test.go | 40 +++++++++++++++++++++++++++++++++------- 3 files changed, 56 insertions(+), 12 deletions(-) diff --git a/internal/msg.go b/internal/msg.go index 5ff8ce9..d34116f 100644 --- a/internal/msg.go +++ b/internal/msg.go @@ -12,6 +12,7 @@ type StatusUpdateMsg struct { LastUpdate time.Time `json:"last_update"` Upstream string `json:"upstream"` Size string `json:"size"` + ErrorMsg string `json:"error_msg"` } // A WorkerInfoMsg is diff --git a/worker/job.go b/worker/job.go index b93844f..cae589a 100644 --- a/worker/job.go +++ b/worker/job.go @@ -1,6 +1,11 @@ package worker -import "errors" +import ( + "errors" + "fmt" + + tunasync "github.com/tuna/tunasync/internal" +) // this file contains the workflow of a mirror jb @@ -14,14 +19,20 @@ const ( jobPing // ensure the goroutine is alive ) +type jobMessage struct { + status tunasync.SyncStatus + name string + msg string +} + // runMirrorJob is the goroutine where syncing job runs in // arguments: // provider: mirror provider object // ctrlChan: receives messages from the manager -// managerChan: push messages to the manager +// managerChan: push messages to the manager, this channel should have a larger buffer // sempaphore: make sure the concurrent running syncing job won't explode // TODO: message struct for managerChan -func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerChan chan<- struct{}, semaphore chan empty) error { +func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerChan chan<- jobMessage, semaphore chan empty) error { // to make code shorter runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error { @@ -31,6 +42,10 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh "failed at %s hooks for %s: %s", hookname, provider.Name(), err.Error(), ) + managerChan <- jobMessage{ + tunasync.Failed, provider.Name(), + fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()), + } return err } } @@ -40,6 +55,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error { defer close(jobDone) + managerChan <- jobMessage{tunasync.PreSyncing, provider.Name(), ""} logger.Info("start syncing: %s", provider.Name()) Hooks := provider.Hooks() @@ -66,6 +82,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh } // start syncing + managerChan <- jobMessage{tunasync.Syncing, provider.Name(), ""} err = provider.Start() if err != nil { logger.Error( @@ -106,7 +123,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh if syncErr == nil { // syncing success logger.Info("succeeded syncing %s", provider.Name()) - managerChan <- struct{}{} + managerChan <- jobMessage{tunasync.Success, provider.Name(), ""} // post-success hooks err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success") if err != nil { @@ -118,7 +135,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh // syncing failed logger.Warning("failed syncing %s: %s", provider.Name(), syncErr.Error()) - managerChan <- struct{}{} + managerChan <- jobMessage{tunasync.Failed, provider.Name(), syncErr.Error()} // post-fail hooks logger.Debug("post-fail hooks") diff --git a/worker/job_test.go b/worker/job_test.go index f9ab7ee..f5e9382 100644 --- a/worker/job_test.go +++ b/worker/job_test.go @@ -64,23 +64,34 @@ func TestMirrorJob(t *testing.T) { Convey("If we let it run several times", func(ctx C) { ctrlChan := make(chan ctrlAction) - managerChan := make(chan struct{}) + managerChan := make(chan jobMessage, 10) semaphore := make(chan empty, 1) go runMirrorJob(provider, ctrlChan, managerChan, semaphore) for i := 0; i < 2; i++ { - <-managerChan + msg := <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Success) loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil) So(string(loggedContent), ShouldEqual, exceptedOutput) ctrlChan <- jobStart } select { - case <-managerChan: - So(0, ShouldEqual, 0) // made this fail + case msg := <-managerChan: + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Success) + case <-time.After(2 * time.Second): So(0, ShouldEqual, 1) } + ctrlChan <- jobDisable select { case <-managerChan: @@ -102,23 +113,38 @@ echo $TUNASYNC_WORKING_DIR So(err, ShouldBeNil) ctrlChan := make(chan ctrlAction) - managerChan := make(chan struct{}) + managerChan := make(chan jobMessage, 10) semaphore := make(chan empty, 1) Convey("If we kill it", func(ctx C) { go runMirrorJob(provider, ctrlChan, managerChan, semaphore) + time.Sleep(1 * time.Second) + msg := <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + ctrlChan <- jobStop - <-managerChan + + msg = <-managerChan + So(msg.status, ShouldEqual, Failed) + exceptedOutput := fmt.Sprintf("%s\n", provider.WorkingDir()) loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil) So(string(loggedContent), ShouldEqual, exceptedOutput) ctrlChan <- jobDisable }) + Convey("If we don't kill it", func(ctx C) { go runMirrorJob(provider, ctrlChan, managerChan, semaphore) - <-managerChan + msg := <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Success) exceptedOutput := fmt.Sprintf( "%s\n%s\n",