Merge remote-tracking branch 'kinosang/master' into wip-test-pr

This commit is contained in:
zyx 2019-04-13 02:07:41 +08:00
commit 1aa4ae9cc1
11 changed files with 32 additions and 4 deletions

View File

@ -15,6 +15,7 @@ type baseProvider struct {
ctx *Context ctx *Context
name string name string
interval time.Duration interval time.Duration
retry int
isMaster bool isMaster bool
cmd *cmdJob cmd *cmdJob
@ -50,6 +51,10 @@ func (p *baseProvider) Interval() time.Duration {
return p.interval return p.interval
} }
func (p *baseProvider) Retry() int {
return p.retry
}
func (p *baseProvider) IsMaster() bool { func (p *baseProvider) IsMaster() bool {
return p.isMaster return p.isMaster
} }

View File

@ -12,6 +12,7 @@ type cmdConfig struct {
upstreamURL, command string upstreamURL, command string
workingDir, logDir, logFile string workingDir, logDir, logFile string
interval time.Duration interval time.Duration
retry int
env map[string]string env map[string]string
} }
@ -28,6 +29,7 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
name: c.name, name: c.name,
ctx: NewContext(), ctx: NewContext(),
interval: c.interval, interval: c.interval,
retry: c.retry,
}, },
cmdConfig: c, cmdConfig: c,
} }

View File

@ -8,6 +8,6 @@ import (
type empty struct{} type empty struct{}
const maxRetry = 2 const defaultMaxRetry = 2
var logger = logging.MustGetLogger("tunasync") var logger = logging.MustGetLogger("tunasync")

View File

@ -49,6 +49,7 @@ type globalConfig struct {
MirrorDir string `toml:"mirror_dir"` MirrorDir string `toml:"mirror_dir"`
Concurrent int `toml:"concurrent"` Concurrent int `toml:"concurrent"`
Interval int `toml:"interval"` Interval int `toml:"interval"`
Retry int `toml:"retry"`
ExecOnSuccess []string `toml:"exec_on_success"` ExecOnSuccess []string `toml:"exec_on_success"`
ExecOnFailure []string `toml:"exec_on_failure"` ExecOnFailure []string `toml:"exec_on_failure"`
@ -108,6 +109,7 @@ type mirrorConfig struct {
Provider providerEnum `toml:"provider"` Provider providerEnum `toml:"provider"`
Upstream string `toml:"upstream"` Upstream string `toml:"upstream"`
Interval int `toml:"interval"` Interval int `toml:"interval"`
Retry int `toml:"retry"`
MirrorDir string `toml:"mirror_dir"` MirrorDir string `toml:"mirror_dir"`
LogDir string `toml:"log_dir"` LogDir string `toml:"log_dir"`
Env map[string]string `toml:"env"` Env map[string]string `toml:"env"`

View File

@ -18,6 +18,7 @@ log_dir = "/var/log/tunasync/{{.Name}}"
mirror_dir = "/data/mirrors" mirror_dir = "/data/mirrors"
concurrent = 10 concurrent = 10
interval = 240 interval = 240
retry = 3
[manager] [manager]
api_base = "https://127.0.0.1:5000" api_base = "https://127.0.0.1:5000"
@ -35,6 +36,7 @@ name = "AOSP"
provider = "command" provider = "command"
upstream = "https://aosp.google.com/" upstream = "https://aosp.google.com/"
interval = 720 interval = 720
retry = 2
mirror_dir = "/data/git/AOSP" mirror_dir = "/data/git/AOSP"
exec_on_success = [ exec_on_success = [
"bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'" "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'"
@ -116,6 +118,7 @@ use_ipv6 = true
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(cfg.Global.Name, ShouldEqual, "test_worker") So(cfg.Global.Name, ShouldEqual, "test_worker")
So(cfg.Global.Interval, ShouldEqual, 240) So(cfg.Global.Interval, ShouldEqual, 240)
So(cfg.Global.Retry, ShouldEqual, 3)
So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors") So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors")
So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000") So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000")
@ -126,6 +129,7 @@ use_ipv6 = true
So(m.MirrorDir, ShouldEqual, "/data/git/AOSP") So(m.MirrorDir, ShouldEqual, "/data/git/AOSP")
So(m.Provider, ShouldEqual, provCommand) So(m.Provider, ShouldEqual, provCommand)
So(m.Interval, ShouldEqual, 720) So(m.Interval, ShouldEqual, 720)
So(m.Retry, ShouldEqual, 2)
So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo") So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo")
m = cfg.Mirrors[1] m = cfg.Mirrors[1]

View File

@ -92,7 +92,7 @@ exit 1
job.ctrlChan <- jobStart job.ctrlChan <- jobStart
msg := <-managerChan msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing) So(msg.status, ShouldEqual, PreSyncing)
for i := 0; i < maxRetry; i++ { for i := 0; i < defaultMaxRetry; i++ {
msg = <-managerChan msg = <-managerChan
So(msg.status, ShouldEqual, Syncing) So(msg.status, ShouldEqual, Syncing)
msg = <-managerChan msg = <-managerChan

View File

@ -139,7 +139,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
return err return err
} }
for retry := 0; retry < maxRetry; retry++ { for retry := 0; retry < provider.Retry(); retry++ {
stopASAP := false // stop job as soon as possible stopASAP := false // stop job as soon as possible
if retry > 0 { if retry > 0 {
@ -196,7 +196,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
// syncing failed // syncing failed
logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error()) logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)} managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == provider.Retry()-1) && (m.State() == stateReady)}
// post-fail hooks // post-fail hooks
logger.Debug("post-fail hooks") logger.Debug("post-fail hooks")

View File

@ -45,6 +45,7 @@ type mirrorProvider interface {
Hooks() []jobHook Hooks() []jobHook
Interval() time.Duration Interval() time.Duration
Retry() int
WorkingDir() string WorkingDir() string
LogDir() string LogDir() string
@ -87,6 +88,9 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
if mirror.Interval == 0 { if mirror.Interval == 0 {
mirror.Interval = cfg.Global.Interval mirror.Interval = cfg.Global.Interval
} }
if mirror.Retry == 0 {
mirror.Retry = cfg.Global.Retry
}
logDir = formatLogDir(logDir, mirror) logDir = formatLogDir(logDir, mirror)
// IsMaster // IsMaster
@ -111,6 +115,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
logDir: logDir, logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry,
env: mirror.Env, env: mirror.Env,
} }
p, err := newCmdProvider(pc) p, err := newCmdProvider(pc)
@ -133,6 +138,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
useIPv6: mirror.UseIPv6, useIPv6: mirror.UseIPv6,
useIPv4: mirror.UseIPv4, useIPv4: mirror.UseIPv4,
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry,
} }
p, err := newRsyncProvider(rc) p, err := newRsyncProvider(rc)
p.isMaster = isMaster p.isMaster = isMaster
@ -154,6 +160,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
useIPv6: mirror.UseIPv6, useIPv6: mirror.UseIPv6,
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry,
} }
p, err := newTwoStageRsyncProvider(rc) p, err := newTwoStageRsyncProvider(rc)
p.isMaster = isMaster p.isMaster = isMaster

View File

@ -16,6 +16,7 @@ type rsyncConfig struct {
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6, useIPv4 bool useIPv6, useIPv4 bool
interval time.Duration interval time.Duration
retry int
} }
// An RsyncProvider provides the implementation to rsync-based syncing jobs // An RsyncProvider provides the implementation to rsync-based syncing jobs
@ -36,6 +37,7 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
name: c.name, name: c.name,
ctx: NewContext(), ctx: NewContext(),
interval: c.interval, interval: c.interval,
retry: c.retry,
}, },
rsyncConfig: c, rsyncConfig: c,
} }

View File

@ -18,6 +18,7 @@ type twoStageRsyncConfig struct {
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6 bool useIPv6 bool
interval time.Duration interval time.Duration
retry int
} }
// An RsyncProvider provides the implementation to rsync-based syncing jobs // An RsyncProvider provides the implementation to rsync-based syncing jobs
@ -48,6 +49,7 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
name: c.name, name: c.name,
ctx: NewContext(), ctx: NewContext(),
interval: c.interval, interval: c.interval,
retry: c.retry,
}, },
twoStageRsyncConfig: c, twoStageRsyncConfig: c,
stage1Options: []string{ stage1Options: []string{

View File

@ -30,6 +30,10 @@ type Worker struct {
// NewTUNASyncWorker creates a worker // NewTUNASyncWorker creates a worker
func NewTUNASyncWorker(cfg *Config) *Worker { func NewTUNASyncWorker(cfg *Config) *Worker {
if cfg.Global.Retry == 0 {
cfg.Global.Retry = defaultMaxRetry
}
w := &Worker{ w := &Worker{
cfg: cfg, cfg: cfg,
jobs: make(map[string]*mirrorJob), jobs: make(map[string]*mirrorJob),