diff --git a/worker/base_provider.go b/worker/base_provider.go index 4e4c099..9795fe5 100644 --- a/worker/base_provider.go +++ b/worker/base_provider.go @@ -15,6 +15,7 @@ type baseProvider struct { ctx *Context name string interval time.Duration + retry int isMaster bool cmd *cmdJob @@ -50,6 +51,10 @@ func (p *baseProvider) Interval() time.Duration { return p.interval } +func (p *baseProvider) Retry() int { + return p.retry +} + func (p *baseProvider) IsMaster() bool { return p.isMaster } diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index e1cac2e..292ed0e 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -12,6 +12,7 @@ type cmdConfig struct { upstreamURL, command string workingDir, logDir, logFile string interval time.Duration + retry int env map[string]string } @@ -28,6 +29,7 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) { name: c.name, ctx: NewContext(), interval: c.interval, + retry: c.retry, }, cmdConfig: c, } diff --git a/worker/common.go b/worker/common.go index 65bdb47..51dfd11 100644 --- a/worker/common.go +++ b/worker/common.go @@ -8,6 +8,6 @@ import ( type empty struct{} -const maxRetry = 2 +const defaultMaxRetry = 2 var logger = logging.MustGetLogger("tunasync") diff --git a/worker/config.go b/worker/config.go index f178ec5..c4346cd 100644 --- a/worker/config.go +++ b/worker/config.go @@ -49,6 +49,7 @@ type globalConfig struct { MirrorDir string `toml:"mirror_dir"` Concurrent int `toml:"concurrent"` Interval int `toml:"interval"` + Retry int `toml:"retry"` ExecOnSuccess []string `toml:"exec_on_success"` ExecOnFailure []string `toml:"exec_on_failure"` @@ -108,6 +109,7 @@ type mirrorConfig struct { Provider providerEnum `toml:"provider"` Upstream string `toml:"upstream"` Interval int `toml:"interval"` + Retry int `toml:"retry"` MirrorDir string `toml:"mirror_dir"` LogDir string `toml:"log_dir"` Env map[string]string `toml:"env"` diff --git a/worker/config_test.go b/worker/config_test.go index abed134..b35c99d 100644 --- a/worker/config_test.go +++ b/worker/config_test.go @@ -18,6 +18,7 @@ log_dir = "/var/log/tunasync/{{.Name}}" mirror_dir = "/data/mirrors" concurrent = 10 interval = 240 +retry = 3 [manager] api_base = "https://127.0.0.1:5000" @@ -35,6 +36,7 @@ name = "AOSP" provider = "command" upstream = "https://aosp.google.com/" interval = 720 +retry = 2 mirror_dir = "/data/git/AOSP" exec_on_success = [ "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'" @@ -116,6 +118,7 @@ use_ipv6 = true So(err, ShouldBeNil) So(cfg.Global.Name, ShouldEqual, "test_worker") So(cfg.Global.Interval, ShouldEqual, 240) + So(cfg.Global.Retry, ShouldEqual, 3) So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors") So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000") @@ -126,6 +129,7 @@ use_ipv6 = true So(m.MirrorDir, ShouldEqual, "/data/git/AOSP") So(m.Provider, ShouldEqual, provCommand) So(m.Interval, ShouldEqual, 720) + So(m.Retry, ShouldEqual, 2) So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo") m = cfg.Mirrors[1] diff --git a/worker/exec_post_test.go b/worker/exec_post_test.go index 203c607..ee956a7 100644 --- a/worker/exec_post_test.go +++ b/worker/exec_post_test.go @@ -92,7 +92,7 @@ exit 1 job.ctrlChan <- jobStart msg := <-managerChan So(msg.status, ShouldEqual, PreSyncing) - for i := 0; i < maxRetry; i++ { + for i := 0; i < defaultMaxRetry; i++ { msg = <-managerChan So(msg.status, ShouldEqual, Syncing) msg = <-managerChan diff --git a/worker/job.go b/worker/job.go index 84cf111..2df1764 100644 --- a/worker/job.go +++ b/worker/job.go @@ -139,7 +139,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err return err } - for retry := 0; retry < maxRetry; retry++ { + for retry := 0; retry < provider.Retry(); retry++ { stopASAP := false // stop job as soon as possible if retry > 0 { @@ -196,7 +196,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err // syncing failed logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error()) - managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)} + managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == provider.Retry()-1) && (m.State() == stateReady)} // post-fail hooks logger.Debug("post-fail hooks") diff --git a/worker/provider.go b/worker/provider.go index 2c44820..90c9c0e 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -45,6 +45,7 @@ type mirrorProvider interface { Hooks() []jobHook Interval() time.Duration + Retry() int WorkingDir() string LogDir() string @@ -87,6 +88,9 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { if mirror.Interval == 0 { mirror.Interval = cfg.Global.Interval } + if mirror.Retry == 0 { + mirror.Retry = cfg.Global.Retry + } logDir = formatLogDir(logDir, mirror) // IsMaster @@ -111,6 +115,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { logDir: logDir, logFile: filepath.Join(logDir, "latest.log"), interval: time.Duration(mirror.Interval) * time.Minute, + retry: mirror.Retry, env: mirror.Env, } p, err := newCmdProvider(pc) @@ -133,6 +138,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { useIPv6: mirror.UseIPv6, useIPv4: mirror.UseIPv4, interval: time.Duration(mirror.Interval) * time.Minute, + retry: mirror.Retry, } p, err := newRsyncProvider(rc) p.isMaster = isMaster @@ -154,6 +160,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { logFile: filepath.Join(logDir, "latest.log"), useIPv6: mirror.UseIPv6, interval: time.Duration(mirror.Interval) * time.Minute, + retry: mirror.Retry, } p, err := newTwoStageRsyncProvider(rc) p.isMaster = isMaster diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index c54219e..1ee0709 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -16,6 +16,7 @@ type rsyncConfig struct { workingDir, logDir, logFile string useIPv6, useIPv4 bool interval time.Duration + retry int } // An RsyncProvider provides the implementation to rsync-based syncing jobs @@ -36,6 +37,7 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) { name: c.name, ctx: NewContext(), interval: c.interval, + retry: c.retry, }, rsyncConfig: c, } diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go index 5b091fd..1346444 100644 --- a/worker/two_stage_rsync_provider.go +++ b/worker/two_stage_rsync_provider.go @@ -18,6 +18,7 @@ type twoStageRsyncConfig struct { workingDir, logDir, logFile string useIPv6 bool interval time.Duration + retry int } // An RsyncProvider provides the implementation to rsync-based syncing jobs @@ -48,6 +49,7 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er name: c.name, ctx: NewContext(), interval: c.interval, + retry: c.retry, }, twoStageRsyncConfig: c, stage1Options: []string{ diff --git a/worker/worker.go b/worker/worker.go index 2d330a0..0ed211f 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -30,6 +30,10 @@ type Worker struct { // NewTUNASyncWorker creates a worker func NewTUNASyncWorker(cfg *Config) *Worker { + if cfg.Global.Retry == 0 { + cfg.Global.Retry = defaultMaxRetry + } + w := &Worker{ cfg: cfg, jobs: make(map[string]*mirrorJob),