diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index b97933f..9bdaa09 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -3,6 +3,7 @@ package worker import ( "errors" "os" + "time" "github.com/anmitsu/go-shlex" ) @@ -11,7 +12,7 @@ type cmdConfig struct { name string upstreamURL, command string workingDir, logDir, logFile string - interval int + interval time.Duration env map[string]string } @@ -77,17 +78,13 @@ func (p *cmdProvider) Wait() error { } func (p *cmdProvider) Terminate() error { + logger.Debug("terminating provider: %s", p.Name()) if p.cmd == nil { return errors.New("provider command job not initialized") } if p.logFile != nil { - defer p.logFile.Close() + p.logFile.Close() } err := p.cmd.Terminate() return err } - -// TODO: implement this -func (p *cmdProvider) Hooks() { - -} diff --git a/worker/common.go b/worker/common.go new file mode 100644 index 0000000..65bdb47 --- /dev/null +++ b/worker/common.go @@ -0,0 +1,13 @@ +package worker + +// put global viables and types here + +import ( + "gopkg.in/op/go-logging.v1" +) + +type empty struct{} + +const maxRetry = 2 + +var logger = logging.MustGetLogger("tunasync") diff --git a/worker/hooks.go b/worker/hooks.go new file mode 100644 index 0000000..b5d318b --- /dev/null +++ b/worker/hooks.go @@ -0,0 +1,42 @@ +package worker + +/* +hooks to exec before/after syncing + failed + +------------------ post-fail hooks -------------------+ + | | + job start -> pre-job hooks --v-> pre-exec hooks --> (syncing) --> post-exec hooks --+---------> post-success --> end + success +*/ + +type jobHook interface { + preJob() error + preExec() error + postExec() error + postSuccess() error + postFail() error +} + +type emptyHook struct { + provider mirrorProvider +} + +func (h *emptyHook) preJob() error { + return nil +} + +func (h *emptyHook) preExec() error { + return nil +} + +func (h *emptyHook) postExec() error { + return nil +} + +func (h *emptyHook) postSuccess() error { + return nil +} + +func (h *emptyHook) postFail() error { + return nil +} diff --git a/worker/job.go b/worker/job.go new file mode 100644 index 0000000..5246f6b --- /dev/null +++ b/worker/job.go @@ -0,0 +1,202 @@ +package worker + +import ( + "errors" + "time" +) + +// this file contains the workflow of a mirror jb + +type ctrlAction uint8 + +const ( + jobStart ctrlAction = iota + jobStop // stop syncing keep the job + jobDisable // disable the job (stops goroutine) + jobRestart // restart syncing + jobPing // ensure the goroutine is alive +) + +// runMirrorJob is the goroutine where syncing job runs in +// arguments: +// provider: mirror provider object +// ctrlChan: receives messages from the manager +// managerChan: push messages to the manager +// sempaphore: make sure the concurrent running syncing job won't explode +// TODO: message struct for managerChan +func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerChan chan<- struct{}, semaphore chan empty) error { + + // to make code shorter + runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error { + for _, hook := range Hooks { + if err := action(hook); err != nil { + logger.Error( + "failed at %s hooks for %s: %s", + hookname, provider.Name(), err.Error(), + ) + return err + } + } + return nil + } + + runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error { + defer func() { jobDone <- empty{} }() + logger.Info("start syncing: %s", provider.Name()) + + Hooks := provider.Hooks() + rHooks := []jobHook{} + for i := len(Hooks); i > 0; i-- { + rHooks = append(rHooks, Hooks[i-1]) + } + + logger.Debug("hooks: pre-job") + err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job") + if err != nil { + return err + } + + for retry := 0; retry < maxRetry; retry++ { + stopASAP := false // stop job as soon as possible + + if retry > 0 { + logger.Info("retry syncing: %s, retry: %d", provider.Name(), retry) + } + err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec") + if err != nil { + return err + } + + // start syncing + err = provider.Start() + if err != nil { + logger.Error( + "failed to start syncing job for %s: %s", + provider.Name(), err.Error(), + ) + return err + } + var syncErr error + syncDone := make(chan error, 1) + go func() { + err := provider.Wait() + if !stopASAP { + syncDone <- err + } + }() + + select { + case syncErr = <-syncDone: + logger.Debug("syncing done") + case <-kill: + stopASAP = true + err := provider.Terminate() + if err != nil { + logger.Error("failed to terminate provider %s: %s", provider.Name(), err.Error()) + return err + } + syncErr = errors.New("killed by manager") + } + + // post-exec hooks + herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec") + if herr != nil { + return herr + } + + if syncErr == nil { + // syncing success + logger.Info("succeeded syncing %s", provider.Name()) + managerChan <- struct{}{} + // post-success hooks + err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success") + if err != nil { + return err + } + return nil + + } + + // syncing failed + logger.Info("failed syncing %s: %s", provider.Name(), err.Error()) + managerChan <- struct{}{} + // post-fail hooks + err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail") + if err != nil { + return err + } + // gracefully exit + if stopASAP { + return nil + } + // continue to next retry + } // for retry + return nil + } + + runJob := func(kill <-chan empty, jobDone chan<- empty) { + select { + case <-semaphore: + defer func() { semaphore <- empty{} }() + runJobWrapper(kill, jobDone) + case <-kill: + return + } + } + + enabled := true // whether this job is stopped by the manager + for { + if enabled { + kill := make(chan empty) + jobDone := make(chan empty) + go runJob(kill, jobDone) + + _wait_for_job: + select { + case <-jobDone: + logger.Debug("job done") + case ctrl := <-ctrlChan: + switch ctrl { + case jobStop: + enabled = false + close(kill) + case jobDisable: + close(kill) + return nil + case jobRestart: + enabled = true + close(kill) + continue + case jobStart: + enabled = true + goto _wait_for_job + default: + // TODO: implement this + close(kill) + return nil + } + } + } + + select { + case <-time.After(provider.Interval()): + continue + case ctrl := <-ctrlChan: + switch ctrl { + case jobStop: + enabled = false + case jobDisable: + return nil + case jobRestart: + enabled = true + case jobStart: + enabled = true + default: + // TODO + return nil + } + } + } + + return nil +} diff --git a/worker/job_test.go b/worker/job_test.go new file mode 100644 index 0000000..2388af1 --- /dev/null +++ b/worker/job_test.go @@ -0,0 +1,135 @@ +package worker + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestMirrorJob(t *testing.T) { + + Convey("MirrorJob should work", t, func(ctx C) { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + scriptFile := filepath.Join(tmpDir, "cmd.sh") + tmpFile := filepath.Join(tmpDir, "log_file") + + c := cmdConfig{ + name: "tuna-cmd-jobtest", + upstreamURL: "http://mirrors.tuna.moe/", + command: "bash " + scriptFile, + workingDir: tmpDir, + logDir: tmpDir, + logFile: tmpFile, + interval: 1 * time.Second, + } + + provider, err := newCmdProvider(c) + So(err, ShouldBeNil) + + So(provider.Name(), ShouldEqual, c.name) + So(provider.WorkingDir(), ShouldEqual, c.workingDir) + So(provider.LogDir(), ShouldEqual, c.logDir) + So(provider.LogFile(), ShouldEqual, c.logFile) + So(provider.Interval(), ShouldEqual, c.interval) + + Convey("For a normal mirror job", func(ctx C) { + scriptContent := `#!/bin/bash + echo $TUNASYNC_WORKING_DIR + echo $TUNASYNC_MIRROR_NAME + echo $TUNASYNC_UPSTREAM_URL + echo $TUNASYNC_LOG_FILE + ` + exceptedOutput := fmt.Sprintf( + "%s\n%s\n%s\n%s\n", + provider.WorkingDir(), + provider.Name(), + provider.upstreamURL, + provider.LogFile(), + ) + err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) + So(err, ShouldBeNil) + readedScriptContent, err := ioutil.ReadFile(scriptFile) + So(err, ShouldBeNil) + So(readedScriptContent, ShouldResemble, []byte(scriptContent)) + + Convey("If we let it run several times", func(ctx C) { + ctrlChan := make(chan ctrlAction) + managerChan := make(chan struct{}) + semaphore := make(chan empty, 1) + semaphore <- empty{} + + go runMirrorJob(provider, ctrlChan, managerChan, semaphore) + for i := 0; i < 2; i++ { + <-managerChan + loggedContent, err := ioutil.ReadFile(provider.LogFile()) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, exceptedOutput) + } + select { + case <-managerChan: + So(0, ShouldEqual, 0) // made this fail + case <-time.After(2 * time.Second): + So(0, ShouldEqual, 1) + } + ctrlChan <- jobDisable + select { + case <-managerChan: + So(0, ShouldEqual, 1) // made this fail + case <-time.After(2 * time.Second): + So(0, ShouldEqual, 0) + } + }) + + }) + + Convey("When running long jobs", func(ctx C) { + scriptContent := `#!/bin/bash +echo $TUNASYNC_WORKING_DIR +sleep 5 +echo $TUNASYNC_WORKING_DIR + ` + err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) + So(err, ShouldBeNil) + + ctrlChan := make(chan ctrlAction) + managerChan := make(chan struct{}) + semaphore := make(chan empty, 1) + semaphore <- empty{} + + Convey("If we kill it", func(ctx C) { + go runMirrorJob(provider, ctrlChan, managerChan, semaphore) + time.Sleep(1 * time.Second) + ctrlChan <- jobStop + time.Sleep(1 * time.Second) + exceptedOutput := fmt.Sprintf("%s\n", provider.WorkingDir()) + loggedContent, err := ioutil.ReadFile(provider.LogFile()) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, exceptedOutput) + ctrlChan <- jobDisable + }) + Convey("If we don't kill it", func(ctx C) { + go runMirrorJob(provider, ctrlChan, managerChan, semaphore) + <-managerChan + + exceptedOutput := fmt.Sprintf( + "%s\n%s\n", + provider.WorkingDir(), provider.WorkingDir(), + ) + + loggedContent, err := ioutil.ReadFile(provider.LogFile()) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, exceptedOutput) + ctrlChan <- jobDisable + }) + }) + + }) + +} diff --git a/worker/provider.go b/worker/provider.go index 806c073..7aca550 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -1,5 +1,7 @@ package worker +import "time" + // mirror provider is the wrapper of mirror jobs type providerType uint8 @@ -17,15 +19,15 @@ type mirrorProvider interface { // TODO: implement Run, Terminate and Hooks // run mirror job in background - Start() + Start() error // Wait job to finish - Wait() + Wait() error // terminate mirror job - Terminate() + Terminate() error // job hooks - Hooks() + Hooks() []jobHook - Interval() int + Interval() time.Duration WorkingDir() string LogDir() string @@ -42,7 +44,8 @@ type mirrorProvider interface { type baseProvider struct { ctx *Context name string - interval int + interval time.Duration + hooks []jobHook } func (p *baseProvider) Name() string { @@ -63,7 +66,7 @@ func (p *baseProvider) Context() *Context { return p.ctx } -func (p *baseProvider) Interval() int { +func (p *baseProvider) Interval() time.Duration { return p.interval } @@ -93,3 +96,11 @@ func (p *baseProvider) LogFile() string { } panic("log dir is impossible to be unavailable") } + +func (p *baseProvider) AddHook(hook jobHook) { + p.hooks = append(p.hooks, hook) +} + +func (p *baseProvider) Hooks() []jobHook { + return p.hooks +} diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index 1687109..ba8dcf1 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -1,11 +1,13 @@ package worker +import "time" + type rsyncConfig struct { name string upstreamURL, password, excludeFile string workingDir, logDir, logFile string useIPv6 bool - interval int + interval time.Duration } // An RsyncProvider provides the implementation to rsync-based syncing jobs @@ -41,8 +43,3 @@ func (p *rsyncProvider) Start() { func (p *rsyncProvider) Terminate() { } - -// TODO: implement this -func (p *rsyncProvider) Hooks() { - -} diff --git a/worker/runner.go b/worker/runner.go index 92edbba..ed6bc65 100644 --- a/worker/runner.go +++ b/worker/runner.go @@ -20,7 +20,7 @@ type cmdJob struct { workingDir string env map[string]string logFile *os.File - finished chan struct{} + finished chan empty } func newCmdJob(cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob { @@ -46,13 +46,13 @@ func newCmdJob(cmdAndArgs []string, workingDir string, env map[string]string) *c } func (c *cmdJob) Start() error { - c.finished = make(chan struct{}, 1) + c.finished = make(chan empty, 1) return c.cmd.Start() } func (c *cmdJob) Wait() error { err := c.cmd.Wait() - c.finished <- struct{}{} + c.finished <- empty{} return err } @@ -63,10 +63,10 @@ func (c *cmdJob) SetLogFile(logFile *os.File) { func (c *cmdJob) Terminate() error { if c.cmd == nil { - return errors.New("Command not initialized") + return nil } if c.cmd.Process == nil { - return errors.New("No Process Running") + return nil } err := unix.Kill(c.cmd.Process.Pid, syscall.SIGTERM) if err != nil {