From 563860d4242340d0efb5e0352a5e602579e791b0 Mon Sep 17 00:00:00 2001 From: 7IN0SAN9 Date: Mon, 27 Mar 2017 13:09:56 +0800 Subject: [PATCH] fix #63 --- worker/base_provider.go | 5 +++++ worker/cmd_provider.go | 2 ++ worker/common.go | 2 +- worker/config.go | 2 ++ worker/config_test.go | 4 ++++ worker/exec_post_test.go | 2 +- worker/job.go | 4 ++-- worker/provider.go | 7 +++++++ worker/rsync_provider.go | 2 ++ worker/two_stage_rsync_provider.go | 2 ++ worker/worker.go | 4 ++++ 11 files changed, 32 insertions(+), 4 deletions(-) diff --git a/worker/base_provider.go b/worker/base_provider.go index 0befa61..78cdfdd 100644 --- a/worker/base_provider.go +++ b/worker/base_provider.go @@ -15,6 +15,7 @@ type baseProvider struct { ctx *Context name string interval time.Duration + retry int isMaster bool cmd *cmdJob @@ -52,6 +53,10 @@ func (p *baseProvider) Interval() time.Duration { return p.interval } +func (p *baseProvider) Retry() int { + return p.retry +} + func (p *baseProvider) IsMaster() bool { return p.isMaster } diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index 2274cec..d897bab 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -11,6 +11,7 @@ type cmdConfig struct { upstreamURL, command string workingDir, logDir, logFile string interval time.Duration + retry int env map[string]string } @@ -27,6 +28,7 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) { name: c.name, ctx: NewContext(), interval: c.interval, + retry: c.retry, }, cmdConfig: c, } diff --git a/worker/common.go b/worker/common.go index 65bdb47..51dfd11 100644 --- a/worker/common.go +++ b/worker/common.go @@ -8,6 +8,6 @@ import ( type empty struct{} -const maxRetry = 2 +const defaultMaxRetry = 2 var logger = logging.MustGetLogger("tunasync") diff --git a/worker/config.go b/worker/config.go index 97da0ff..b3d79b9 100644 --- a/worker/config.go +++ b/worker/config.go @@ -49,6 +49,7 @@ type globalConfig struct { MirrorDir string `toml:"mirror_dir"` Concurrent int `toml:"concurrent"` Interval int `toml:"interval"` + Retry int `toml:"retry"` ExecOnSuccess []string `toml:"exec_on_success"` ExecOnFailure []string `toml:"exec_on_failure"` @@ -108,6 +109,7 @@ type mirrorConfig struct { Provider providerEnum `toml:"provider"` Upstream string `toml:"upstream"` Interval int `toml:"interval"` + Retry int `toml:"retry"` MirrorDir string `toml:"mirror_dir"` LogDir string `toml:"log_dir"` Env map[string]string `toml:"env"` diff --git a/worker/config_test.go b/worker/config_test.go index abed134..b35c99d 100644 --- a/worker/config_test.go +++ b/worker/config_test.go @@ -18,6 +18,7 @@ log_dir = "/var/log/tunasync/{{.Name}}" mirror_dir = "/data/mirrors" concurrent = 10 interval = 240 +retry = 3 [manager] api_base = "https://127.0.0.1:5000" @@ -35,6 +36,7 @@ name = "AOSP" provider = "command" upstream = "https://aosp.google.com/" interval = 720 +retry = 2 mirror_dir = "/data/git/AOSP" exec_on_success = [ "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'" @@ -116,6 +118,7 @@ use_ipv6 = true So(err, ShouldBeNil) So(cfg.Global.Name, ShouldEqual, "test_worker") So(cfg.Global.Interval, ShouldEqual, 240) + So(cfg.Global.Retry, ShouldEqual, 3) So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors") So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000") @@ -126,6 +129,7 @@ use_ipv6 = true So(m.MirrorDir, ShouldEqual, "/data/git/AOSP") So(m.Provider, ShouldEqual, provCommand) So(m.Interval, ShouldEqual, 720) + So(m.Retry, ShouldEqual, 2) So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo") m = cfg.Mirrors[1] diff --git a/worker/exec_post_test.go b/worker/exec_post_test.go index 203c607..ee956a7 100644 --- a/worker/exec_post_test.go +++ b/worker/exec_post_test.go @@ -92,7 +92,7 @@ exit 1 job.ctrlChan <- jobStart msg := <-managerChan So(msg.status, ShouldEqual, PreSyncing) - for i := 0; i < maxRetry; i++ { + for i := 0; i < defaultMaxRetry; i++ { msg = <-managerChan So(msg.status, ShouldEqual, Syncing) msg = <-managerChan diff --git a/worker/job.go b/worker/job.go index a5feca1..a634d56 100644 --- a/worker/job.go +++ b/worker/job.go @@ -136,7 +136,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err return err } - for retry := 0; retry < maxRetry; retry++ { + for retry := 0; retry < provider.Retry(); retry++ { stopASAP := false // stop job as soon as possible if retry > 0 { @@ -194,7 +194,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err // syncing failed logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error()) - managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)} + managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == provider.Retry()-1) && (m.State() == stateReady)} // post-fail hooks logger.Debug("post-fail hooks") diff --git a/worker/provider.go b/worker/provider.go index 9662dcd..547ec37 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -45,6 +45,7 @@ type mirrorProvider interface { Hooks() []jobHook Interval() time.Duration + Retry() int WorkingDir() string LogDir() string @@ -86,6 +87,9 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { if mirror.Interval == 0 { mirror.Interval = cfg.Global.Interval } + if mirror.Retry == 0 { + mirror.Retry = cfg.Global.Retry + } logDir = formatLogDir(logDir, mirror) // IsMaster @@ -110,6 +114,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { logDir: logDir, logFile: filepath.Join(logDir, "latest.log"), interval: time.Duration(mirror.Interval) * time.Minute, + retry: mirror.Retry, env: mirror.Env, } p, err := newCmdProvider(pc) @@ -131,6 +136,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { logFile: filepath.Join(logDir, "latest.log"), useIPv6: mirror.UseIPv6, interval: time.Duration(mirror.Interval) * time.Minute, + retry: mirror.Retry, } p, err := newRsyncProvider(rc) p.isMaster = isMaster @@ -152,6 +158,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { logFile: filepath.Join(logDir, "latest.log"), useIPv6: mirror.UseIPv6, interval: time.Duration(mirror.Interval) * time.Minute, + retry: mirror.Retry, } p, err := newTwoStageRsyncProvider(rc) p.isMaster = isMaster diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index aafd761..bba0a51 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -13,6 +13,7 @@ type rsyncConfig struct { workingDir, logDir, logFile string useIPv6 bool interval time.Duration + retry int } // An RsyncProvider provides the implementation to rsync-based syncing jobs @@ -32,6 +33,7 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) { name: c.name, ctx: NewContext(), interval: c.interval, + retry: c.retry, }, rsyncConfig: c, } diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go index 48e5125..6363e93 100644 --- a/worker/two_stage_rsync_provider.go +++ b/worker/two_stage_rsync_provider.go @@ -15,6 +15,7 @@ type twoStageRsyncConfig struct { workingDir, logDir, logFile string useIPv6 bool interval time.Duration + retry int } // An RsyncProvider provides the implementation to rsync-based syncing jobs @@ -44,6 +45,7 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er name: c.name, ctx: NewContext(), interval: c.interval, + retry: c.retry, }, twoStageRsyncConfig: c, stage1Options: []string{ diff --git a/worker/worker.go b/worker/worker.go index 877b526..cc7f69c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -35,6 +35,10 @@ func GetTUNASyncWorker(cfg *Config) *Worker { return tunasyncWorker } + if cfg.Global.Retry == 0 { + cfg.Global.Retry = defaultMaxRetry + } + w := &Worker{ cfg: cfg, jobs: make(map[string]*mirrorJob),