feature(worker): toplevel mirror job logic

This commit is contained in:
bigeagle 2016-04-23 17:52:30 +08:00
parent 0e808a449a
commit 681388ffdd
No known key found for this signature in database
GPG Key ID: 9171A4571C27920A
8 changed files with 422 additions and 25 deletions

View File

@ -3,6 +3,7 @@ package worker
import (
"errors"
"os"
"time"
"github.com/anmitsu/go-shlex"
)
@ -11,7 +12,7 @@ type cmdConfig struct {
name string
upstreamURL, command string
workingDir, logDir, logFile string
interval int
interval time.Duration
env map[string]string
}
@ -77,17 +78,13 @@ func (p *cmdProvider) Wait() error {
}
func (p *cmdProvider) Terminate() error {
logger.Debug("terminating provider: %s", p.Name())
if p.cmd == nil {
return errors.New("provider command job not initialized")
}
if p.logFile != nil {
defer p.logFile.Close()
p.logFile.Close()
}
err := p.cmd.Terminate()
return err
}
// TODO: implement this
func (p *cmdProvider) Hooks() {
}

13
worker/common.go Normal file
View File

@ -0,0 +1,13 @@
package worker
// put global viables and types here
import (
"gopkg.in/op/go-logging.v1"
)
type empty struct{}
const maxRetry = 2
var logger = logging.MustGetLogger("tunasync")

42
worker/hooks.go Normal file
View File

@ -0,0 +1,42 @@
package worker
/*
hooks to exec before/after syncing
failed
+------------------ post-fail hooks -------------------+
| |
job start -> pre-job hooks --v-> pre-exec hooks --> (syncing) --> post-exec hooks --+---------> post-success --> end
success
*/
type jobHook interface {
preJob() error
preExec() error
postExec() error
postSuccess() error
postFail() error
}
type emptyHook struct {
provider mirrorProvider
}
func (h *emptyHook) preJob() error {
return nil
}
func (h *emptyHook) preExec() error {
return nil
}
func (h *emptyHook) postExec() error {
return nil
}
func (h *emptyHook) postSuccess() error {
return nil
}
func (h *emptyHook) postFail() error {
return nil
}

202
worker/job.go Normal file
View File

@ -0,0 +1,202 @@
package worker
import (
"errors"
"time"
)
// this file contains the workflow of a mirror jb
type ctrlAction uint8
const (
jobStart ctrlAction = iota
jobStop // stop syncing keep the job
jobDisable // disable the job (stops goroutine)
jobRestart // restart syncing
jobPing // ensure the goroutine is alive
)
// runMirrorJob is the goroutine where syncing job runs in
// arguments:
// provider: mirror provider object
// ctrlChan: receives messages from the manager
// managerChan: push messages to the manager
// sempaphore: make sure the concurrent running syncing job won't explode
// TODO: message struct for managerChan
func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerChan chan<- struct{}, semaphore chan empty) error {
// to make code shorter
runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
for _, hook := range Hooks {
if err := action(hook); err != nil {
logger.Error(
"failed at %s hooks for %s: %s",
hookname, provider.Name(), err.Error(),
)
return err
}
}
return nil
}
runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
defer func() { jobDone <- empty{} }()
logger.Info("start syncing: %s", provider.Name())
Hooks := provider.Hooks()
rHooks := []jobHook{}
for i := len(Hooks); i > 0; i-- {
rHooks = append(rHooks, Hooks[i-1])
}
logger.Debug("hooks: pre-job")
err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
if err != nil {
return err
}
for retry := 0; retry < maxRetry; retry++ {
stopASAP := false // stop job as soon as possible
if retry > 0 {
logger.Info("retry syncing: %s, retry: %d", provider.Name(), retry)
}
err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
if err != nil {
return err
}
// start syncing
err = provider.Start()
if err != nil {
logger.Error(
"failed to start syncing job for %s: %s",
provider.Name(), err.Error(),
)
return err
}
var syncErr error
syncDone := make(chan error, 1)
go func() {
err := provider.Wait()
if !stopASAP {
syncDone <- err
}
}()
select {
case syncErr = <-syncDone:
logger.Debug("syncing done")
case <-kill:
stopASAP = true
err := provider.Terminate()
if err != nil {
logger.Error("failed to terminate provider %s: %s", provider.Name(), err.Error())
return err
}
syncErr = errors.New("killed by manager")
}
// post-exec hooks
herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
if herr != nil {
return herr
}
if syncErr == nil {
// syncing success
logger.Info("succeeded syncing %s", provider.Name())
managerChan <- struct{}{}
// post-success hooks
err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
if err != nil {
return err
}
return nil
}
// syncing failed
logger.Info("failed syncing %s: %s", provider.Name(), err.Error())
managerChan <- struct{}{}
// post-fail hooks
err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
if err != nil {
return err
}
// gracefully exit
if stopASAP {
return nil
}
// continue to next retry
} // for retry
return nil
}
runJob := func(kill <-chan empty, jobDone chan<- empty) {
select {
case <-semaphore:
defer func() { semaphore <- empty{} }()
runJobWrapper(kill, jobDone)
case <-kill:
return
}
}
enabled := true // whether this job is stopped by the manager
for {
if enabled {
kill := make(chan empty)
jobDone := make(chan empty)
go runJob(kill, jobDone)
_wait_for_job:
select {
case <-jobDone:
logger.Debug("job done")
case ctrl := <-ctrlChan:
switch ctrl {
case jobStop:
enabled = false
close(kill)
case jobDisable:
close(kill)
return nil
case jobRestart:
enabled = true
close(kill)
continue
case jobStart:
enabled = true
goto _wait_for_job
default:
// TODO: implement this
close(kill)
return nil
}
}
}
select {
case <-time.After(provider.Interval()):
continue
case ctrl := <-ctrlChan:
switch ctrl {
case jobStop:
enabled = false
case jobDisable:
return nil
case jobRestart:
enabled = true
case jobStart:
enabled = true
default:
// TODO
return nil
}
}
}
return nil
}

135
worker/job_test.go Normal file
View File

@ -0,0 +1,135 @@
package worker
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
)
func TestMirrorJob(t *testing.T) {
Convey("MirrorJob should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
scriptFile := filepath.Join(tmpDir, "cmd.sh")
tmpFile := filepath.Join(tmpDir, "log_file")
c := cmdConfig{
name: "tuna-cmd-jobtest",
upstreamURL: "http://mirrors.tuna.moe/",
command: "bash " + scriptFile,
workingDir: tmpDir,
logDir: tmpDir,
logFile: tmpFile,
interval: 1 * time.Second,
}
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
So(provider.Name(), ShouldEqual, c.name)
So(provider.WorkingDir(), ShouldEqual, c.workingDir)
So(provider.LogDir(), ShouldEqual, c.logDir)
So(provider.LogFile(), ShouldEqual, c.logFile)
So(provider.Interval(), ShouldEqual, c.interval)
Convey("For a normal mirror job", func(ctx C) {
scriptContent := `#!/bin/bash
echo $TUNASYNC_WORKING_DIR
echo $TUNASYNC_MIRROR_NAME
echo $TUNASYNC_UPSTREAM_URL
echo $TUNASYNC_LOG_FILE
`
exceptedOutput := fmt.Sprintf(
"%s\n%s\n%s\n%s\n",
provider.WorkingDir(),
provider.Name(),
provider.upstreamURL,
provider.LogFile(),
)
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil)
readedScriptContent, err := ioutil.ReadFile(scriptFile)
So(err, ShouldBeNil)
So(readedScriptContent, ShouldResemble, []byte(scriptContent))
Convey("If we let it run several times", func(ctx C) {
ctrlChan := make(chan ctrlAction)
managerChan := make(chan struct{})
semaphore := make(chan empty, 1)
semaphore <- empty{}
go runMirrorJob(provider, ctrlChan, managerChan, semaphore)
for i := 0; i < 2; i++ {
<-managerChan
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, exceptedOutput)
}
select {
case <-managerChan:
So(0, ShouldEqual, 0) // made this fail
case <-time.After(2 * time.Second):
So(0, ShouldEqual, 1)
}
ctrlChan <- jobDisable
select {
case <-managerChan:
So(0, ShouldEqual, 1) // made this fail
case <-time.After(2 * time.Second):
So(0, ShouldEqual, 0)
}
})
})
Convey("When running long jobs", func(ctx C) {
scriptContent := `#!/bin/bash
echo $TUNASYNC_WORKING_DIR
sleep 5
echo $TUNASYNC_WORKING_DIR
`
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil)
ctrlChan := make(chan ctrlAction)
managerChan := make(chan struct{})
semaphore := make(chan empty, 1)
semaphore <- empty{}
Convey("If we kill it", func(ctx C) {
go runMirrorJob(provider, ctrlChan, managerChan, semaphore)
time.Sleep(1 * time.Second)
ctrlChan <- jobStop
time.Sleep(1 * time.Second)
exceptedOutput := fmt.Sprintf("%s\n", provider.WorkingDir())
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, exceptedOutput)
ctrlChan <- jobDisable
})
Convey("If we don't kill it", func(ctx C) {
go runMirrorJob(provider, ctrlChan, managerChan, semaphore)
<-managerChan
exceptedOutput := fmt.Sprintf(
"%s\n%s\n",
provider.WorkingDir(), provider.WorkingDir(),
)
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, exceptedOutput)
ctrlChan <- jobDisable
})
})
})
}

View File

@ -1,5 +1,7 @@
package worker
import "time"
// mirror provider is the wrapper of mirror jobs
type providerType uint8
@ -17,15 +19,15 @@ type mirrorProvider interface {
// TODO: implement Run, Terminate and Hooks
// run mirror job in background
Start()
Start() error
// Wait job to finish
Wait()
Wait() error
// terminate mirror job
Terminate()
Terminate() error
// job hooks
Hooks()
Hooks() []jobHook
Interval() int
Interval() time.Duration
WorkingDir() string
LogDir() string
@ -42,7 +44,8 @@ type mirrorProvider interface {
type baseProvider struct {
ctx *Context
name string
interval int
interval time.Duration
hooks []jobHook
}
func (p *baseProvider) Name() string {
@ -63,7 +66,7 @@ func (p *baseProvider) Context() *Context {
return p.ctx
}
func (p *baseProvider) Interval() int {
func (p *baseProvider) Interval() time.Duration {
return p.interval
}
@ -93,3 +96,11 @@ func (p *baseProvider) LogFile() string {
}
panic("log dir is impossible to be unavailable")
}
func (p *baseProvider) AddHook(hook jobHook) {
p.hooks = append(p.hooks, hook)
}
func (p *baseProvider) Hooks() []jobHook {
return p.hooks
}

View File

@ -1,11 +1,13 @@
package worker
import "time"
type rsyncConfig struct {
name string
upstreamURL, password, excludeFile string
workingDir, logDir, logFile string
useIPv6 bool
interval int
interval time.Duration
}
// An RsyncProvider provides the implementation to rsync-based syncing jobs
@ -41,8 +43,3 @@ func (p *rsyncProvider) Start() {
func (p *rsyncProvider) Terminate() {
}
// TODO: implement this
func (p *rsyncProvider) Hooks() {
}

View File

@ -20,7 +20,7 @@ type cmdJob struct {
workingDir string
env map[string]string
logFile *os.File
finished chan struct{}
finished chan empty
}
func newCmdJob(cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob {
@ -46,13 +46,13 @@ func newCmdJob(cmdAndArgs []string, workingDir string, env map[string]string) *c
}
func (c *cmdJob) Start() error {
c.finished = make(chan struct{}, 1)
c.finished = make(chan empty, 1)
return c.cmd.Start()
}
func (c *cmdJob) Wait() error {
err := c.cmd.Wait()
c.finished <- struct{}{}
c.finished <- empty{}
return err
}
@ -63,10 +63,10 @@ func (c *cmdJob) SetLogFile(logFile *os.File) {
func (c *cmdJob) Terminate() error {
if c.cmd == nil {
return errors.New("Command not initialized")
return nil
}
if c.cmd.Process == nil {
return errors.New("No Process Running")
return nil
}
err := unix.Kill(c.cmd.Process.Pid, syscall.SIGTERM)
if err != nil {