From fecfc8f3b133dbd0bfb4b274a6cd14ec140f2255 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Mon, 2 May 2016 13:38:25 +0800 Subject: [PATCH 1/5] fix(worker): log file link should use relative path --- worker/loglimit_hook.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/worker/loglimit_hook.go b/worker/loglimit_hook.go index 69367b3..ad89a84 100644 --- a/worker/loglimit_hook.go +++ b/worker/loglimit_hook.go @@ -68,13 +68,13 @@ func (l *logLimiter) preExec() error { } } - logFile := filepath.Join( - logDir, - fmt.Sprintf( - "%s_%s.log", - p.Name(), - time.Now().Format("2006-01-02_15_04"), - ), + logFileName := fmt.Sprintf( + "%s_%s.log", + p.Name(), + time.Now().Format("2006-01-02_15_04"), + ) + logFilePath := filepath.Join( + logDir, logFileName, ) logLink := filepath.Join(logDir, "latest") @@ -82,10 +82,10 @@ func (l *logLimiter) preExec() error { if _, err = os.Stat(logLink); err == nil { os.Remove(logLink) } - os.Symlink(logFile, logLink) + os.Symlink(logFileName, logLink) ctx := p.EnterContext() - ctx.Set(_LogFileKey, logFile) + ctx.Set(_LogFileKey, logFilePath) return nil } @@ -101,7 +101,8 @@ func (l *logLimiter) postFail() error { logLink := filepath.Join(logDir, "latest") os.Rename(logFile, logFileFail) os.Remove(logLink) - os.Symlink(logFileFail, logLink) + logFileName := filepath.Base(logFileFail) + os.Symlink(logFileName, logLink) l.provider.ExitContext() return nil From 51fa12900d101608b4f9b13980f6f9247ab98b3b Mon Sep 17 00:00:00 2001 From: bigeagle Date: Mon, 2 May 2016 17:45:21 +0800 Subject: [PATCH 2/5] feature(worker): ability to hot reload mirror job configrations, close #18 --- cmd/tunasync/tunasync.go | 23 +++- worker/base_provider.go | 158 ++++++++++++++++++++++ worker/config.go | 20 +-- worker/config_diff.go | 88 +++++++++++++ worker/config_diff_test.go | 73 ++++++++++ worker/config_test.go | 21 ++- worker/job.go | 13 +- worker/provider.go | 261 +++++++++++++++++------------------- worker/schedule.go | 1 + worker/worker.go | 264 +++++++++++++++---------------------- 10 files changed, 603 insertions(+), 319 deletions(-) create mode 100644 worker/base_provider.go create mode 100644 worker/config_diff.go create mode 100644 worker/config_diff_test.go diff --git a/cmd/tunasync/tunasync.go b/cmd/tunasync/tunasync.go index 7000126..aadf868 100644 --- a/cmd/tunasync/tunasync.go +++ b/cmd/tunasync/tunasync.go @@ -2,6 +2,9 @@ package main import ( "os" + "os/signal" + "syscall" + "time" "github.com/codegangsta/cli" "github.com/gin-gonic/gin" @@ -12,7 +15,7 @@ import ( "github.com/tuna/tunasync/worker" ) -var logger = logging.MustGetLogger("tunasync-cmd") +var logger = logging.MustGetLogger("tunasync") func startManager(c *cli.Context) { tunasync.InitLogger(c.Bool("verbose"), c.Bool("debug"), c.Bool("with-systemd")) @@ -54,6 +57,24 @@ func startWorker(c *cli.Context) { os.Exit(1) } + go func() { + time.Sleep(1 * time.Second) + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGHUP) + for { + s := <-sigChan + switch s { + case syscall.SIGHUP: + logger.Info("Received reload signal") + newCfg, err := worker.LoadConfig(c.String("config")) + if err != nil { + logger.Errorf("Error loading config: %s", err.Error()) + } + w.ReloadMirrorConfig(newCfg.Mirrors) + } + } + }() + logger.Info("Run tunasync worker.") w.Run() } diff --git a/worker/base_provider.go b/worker/base_provider.go new file mode 100644 index 0000000..ace76d3 --- /dev/null +++ b/worker/base_provider.go @@ -0,0 +1,158 @@ +package worker + +import ( + "os" + "sync" + "sync/atomic" + "time" +) + +// baseProvider is the base mixin of providers + +type baseProvider struct { + sync.Mutex + + ctx *Context + name string + interval time.Duration + isMaster bool + + cmd *cmdJob + isRunning atomic.Value + + logFile *os.File + + cgroup *cgroupHook + hooks []jobHook +} + +func (p *baseProvider) Name() string { + return p.name +} + +func (p *baseProvider) EnterContext() *Context { + p.ctx = p.ctx.Enter() + return p.ctx +} + +func (p *baseProvider) ExitContext() *Context { + p.ctx, _ = p.ctx.Exit() + return p.ctx +} + +func (p *baseProvider) Context() *Context { + return p.ctx +} + +func (p *baseProvider) Interval() time.Duration { + // logger.Debug("interval for %s: %v", p.Name(), p.interval) + return p.interval +} + +func (p *baseProvider) IsMaster() bool { + return p.isMaster +} + +func (p *baseProvider) WorkingDir() string { + if v, ok := p.ctx.Get(_WorkingDirKey); ok { + if s, ok := v.(string); ok { + return s + } + } + panic("working dir is impossible to be non-exist") +} + +func (p *baseProvider) LogDir() string { + if v, ok := p.ctx.Get(_LogDirKey); ok { + if s, ok := v.(string); ok { + return s + } + } + panic("log dir is impossible to be unavailable") +} + +func (p *baseProvider) LogFile() string { + if v, ok := p.ctx.Get(_LogFileKey); ok { + if s, ok := v.(string); ok { + return s + } + } + panic("log dir is impossible to be unavailable") +} + +func (p *baseProvider) AddHook(hook jobHook) { + if cg, ok := hook.(*cgroupHook); ok { + p.cgroup = cg + } + p.hooks = append(p.hooks, hook) +} + +func (p *baseProvider) Hooks() []jobHook { + return p.hooks +} + +func (p *baseProvider) Cgroup() *cgroupHook { + return p.cgroup +} + +func (p *baseProvider) prepareLogFile() error { + if p.LogFile() == "/dev/null" { + p.cmd.SetLogFile(nil) + return nil + } + if p.logFile == nil { + logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error()) + return err + } + p.logFile = logFile + } + p.cmd.SetLogFile(p.logFile) + return nil +} + +func (p *baseProvider) Run() error { + panic("Not Implemented") +} + +func (p *baseProvider) Start() error { + panic("Not Implemented") +} + +func (p *baseProvider) IsRunning() bool { + isRunning, _ := p.isRunning.Load().(bool) + return isRunning +} + +func (p *baseProvider) Wait() error { + defer func() { + p.Lock() + p.isRunning.Store(false) + if p.logFile != nil { + p.logFile.Close() + p.logFile = nil + } + p.Unlock() + }() + return p.cmd.Wait() +} + +func (p *baseProvider) Terminate() error { + logger.Debugf("terminating provider: %s", p.Name()) + if !p.IsRunning() { + return nil + } + + p.Lock() + if p.logFile != nil { + p.logFile.Close() + p.logFile = nil + } + p.Unlock() + + err := p.cmd.Terminate() + p.isRunning.Store(false) + + return err +} diff --git a/worker/config.go b/worker/config.go index 0a37210..8515a83 100644 --- a/worker/config.go +++ b/worker/config.go @@ -7,30 +7,30 @@ import ( "github.com/BurntSushi/toml" ) -type ProviderEnum uint8 +type providerEnum uint8 const ( - ProvRsync ProviderEnum = iota - ProvTwoStageRsync - ProvCommand + provRsync providerEnum = iota + provTwoStageRsync + provCommand ) -func (p *ProviderEnum) UnmarshalText(text []byte) error { +func (p *providerEnum) UnmarshalText(text []byte) error { s := string(text) switch s { case `command`: - *p = ProvCommand + *p = provCommand case `rsync`: - *p = ProvRsync + *p = provRsync case `two-stage-rsync`: - *p = ProvTwoStageRsync + *p = provTwoStageRsync default: return errors.New("Invalid value to provierEnum") } return nil - } +// Worker config options type Config struct { Global globalConfig `toml:"global"` Manager managerConfig `toml:"manager"` @@ -69,7 +69,7 @@ type cgroupConfig struct { type mirrorConfig struct { Name string `toml:"name"` - Provider ProviderEnum `toml:"provider"` + Provider providerEnum `toml:"provider"` Upstream string `toml:"upstream"` Interval int `toml:"interval"` MirrorDir string `toml:"mirror_dir"` diff --git a/worker/config_diff.go b/worker/config_diff.go new file mode 100644 index 0000000..347eba1 --- /dev/null +++ b/worker/config_diff.go @@ -0,0 +1,88 @@ +package worker + +import ( + "fmt" + "reflect" + "sort" +) + +// Find difference of mirror config, this is important for hot reloading config file +// NOTICE: only the [[mirrors]] section is supported + +// make []mirrorConfig sortable +type sortableMirrorList []mirrorConfig + +func (l sortableMirrorList) Len() int { return len(l) } +func (l sortableMirrorList) Swap(i, j int) { l[i], l[j] = l[j], l[i] } +func (l sortableMirrorList) Less(i, j int) bool { return l[i].Name < l[j].Name } + +const ( + diffDelete uint8 = iota + diffAdd + diffModify +) + +// a unit of mirror config difference +type mirrorCfgTrans struct { + diffOp uint8 + mirCfg mirrorConfig +} + +func (t mirrorCfgTrans) String() string { + var op string + if t.diffOp == diffDelete { + op = "Del" + } else { + op = "Add" + } + return fmt.Sprintf("{%s, %s}", op, t.mirCfg.Name) +} + +// diffMirrorConfig finds the difference between the oldList and the newList +// it returns a series of operations that if these operations are applied to +// oldList, a newList equavuilance can be obtained. +func diffMirrorConfig(oldList, newList []mirrorConfig) []mirrorCfgTrans { + operations := []mirrorCfgTrans{} + + oList := make([]mirrorConfig, len(oldList)) + nList := make([]mirrorConfig, len(newList)) + copy(oList, oldList) + copy(nList, newList) + + // first ensure oldList and newList are sorted + sort.Sort(sortableMirrorList(oList)) + sort.Sort(sortableMirrorList(nList)) + + // insert a tail node to both lists + // as the maximum node + lastOld, lastNew := oList[len(oList)-1], nList[len(nList)-1] + maxName := lastOld.Name + if lastNew.Name > lastOld.Name { + maxName = lastNew.Name + } + Nil := mirrorConfig{Name: "~" + maxName} + if Nil.Name <= maxName { + panic("Nil.Name should be larger than maxName") + } + oList, nList = append(oList, Nil), append(nList, Nil) + + // iterate over both lists to find the difference + for i, j := 0, 0; i < len(oList) && j < len(nList); { + o, n := oList[i], nList[j] + if n.Name < o.Name { + operations = append(operations, mirrorCfgTrans{diffAdd, n}) + j++ + } else if o.Name < n.Name { + operations = append(operations, mirrorCfgTrans{diffDelete, o}) + i++ + } else { + if !reflect.DeepEqual(o, n) { + operations = append(operations, mirrorCfgTrans{diffModify, n}) + } + i++ + j++ + } + } + + return operations +} diff --git a/worker/config_diff_test.go b/worker/config_diff_test.go new file mode 100644 index 0000000..55c80cc --- /dev/null +++ b/worker/config_diff_test.go @@ -0,0 +1,73 @@ +package worker + +import ( + "sort" + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestConfigDiff(t *testing.T) { + Convey("When old and new configs are equal", t, func() { + oldList := []mirrorConfig{ + mirrorConfig{Name: "debian"}, + mirrorConfig{Name: "debian-security"}, + mirrorConfig{Name: "fedora"}, + mirrorConfig{Name: "archlinux"}, + mirrorConfig{Name: "AOSP"}, + mirrorConfig{Name: "ubuntu"}, + } + newList := make([]mirrorConfig, len(oldList)) + copy(newList, oldList) + + difference := diffMirrorConfig(oldList, newList) + So(len(difference), ShouldEqual, 0) + }) + Convey("When giving two config lists with different names", t, func() { + oldList := []mirrorConfig{ + mirrorConfig{Name: "debian"}, + mirrorConfig{Name: "debian-security"}, + mirrorConfig{Name: "fedora"}, + mirrorConfig{Name: "archlinux"}, + mirrorConfig{Name: "AOSP", Env: map[string]string{"REPO": "/usr/bin/repo"}}, + mirrorConfig{Name: "ubuntu"}, + } + newList := []mirrorConfig{ + mirrorConfig{Name: "debian"}, + mirrorConfig{Name: "debian-cd"}, + mirrorConfig{Name: "archlinuxcn"}, + mirrorConfig{Name: "AOSP", Env: map[string]string{"REPO": "/usr/local/bin/aosp-repo"}}, + mirrorConfig{Name: "ubuntu-ports"}, + } + + difference := diffMirrorConfig(oldList, newList) + + sort.Sort(sortableMirrorList(oldList)) + emptyList := []mirrorConfig{} + + for _, o := range oldList { + keep := true + for _, op := range difference { + if (op.diffOp == diffDelete || op.diffOp == diffModify) && + op.mirCfg.Name == o.Name { + + keep = false + break + } + } + if keep { + emptyList = append(emptyList, o) + } + } + + for _, op := range difference { + if op.diffOp == diffAdd || op.diffOp == diffModify { + emptyList = append(emptyList, op.mirCfg) + } + } + sort.Sort(sortableMirrorList(emptyList)) + sort.Sort(sortableMirrorList(newList)) + So(emptyList, ShouldResemble, newList) + + }) +} diff --git a/worker/config_test.go b/worker/config_test.go index 9c5b6d1..45762d9 100644 --- a/worker/config_test.go +++ b/worker/config_test.go @@ -82,19 +82,19 @@ exec_on_failure = "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKIN m := cfg.Mirrors[0] So(m.Name, ShouldEqual, "AOSP") So(m.MirrorDir, ShouldEqual, "/data/git/AOSP") - So(m.Provider, ShouldEqual, ProvCommand) + So(m.Provider, ShouldEqual, provCommand) So(m.Interval, ShouldEqual, 720) So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo") m = cfg.Mirrors[1] So(m.Name, ShouldEqual, "debian") So(m.MirrorDir, ShouldEqual, "") - So(m.Provider, ShouldEqual, ProvTwoStageRsync) + So(m.Provider, ShouldEqual, provTwoStageRsync) m = cfg.Mirrors[2] So(m.Name, ShouldEqual, "fedora") So(m.MirrorDir, ShouldEqual, "") - So(m.Provider, ShouldEqual, ProvRsync) + So(m.Provider, ShouldEqual, provRsync) So(m.ExcludeFile, ShouldEqual, "/etc/tunasync.d/fedora-exclude.txt") So(len(cfg.Mirrors), ShouldEqual, 3) @@ -112,14 +112,13 @@ exec_on_failure = "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKIN cfg, err := LoadConfig(tmpfile.Name()) So(err, ShouldBeNil) - w := &Worker{ - cfg: cfg, - providers: make(map[string]mirrorProvider), + providers := map[string]mirrorProvider{} + for _, m := range cfg.Mirrors { + p := newMirrorProvider(m, cfg) + providers[p.Name()] = p } - w.initProviders() - - p := w.providers["AOSP"] + p := providers["AOSP"] So(p.Name(), ShouldEqual, "AOSP") So(p.LogDir(), ShouldEqual, "/var/log/tunasync/AOSP") So(p.LogFile(), ShouldEqual, "/var/log/tunasync/AOSP/latest.log") @@ -132,7 +131,7 @@ exec_on_failure = "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKIN } } - p = w.providers["debian"] + p = providers["debian"] So(p.Name(), ShouldEqual, "debian") So(p.LogDir(), ShouldEqual, "/var/log/tunasync/debian") So(p.LogFile(), ShouldEqual, "/var/log/tunasync/debian/latest.log") @@ -141,7 +140,7 @@ exec_on_failure = "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKIN So(r2p.stage1Profile, ShouldEqual, "debian") So(r2p.WorkingDir(), ShouldEqual, "/data/mirrors/debian") - p = w.providers["fedora"] + p = providers["fedora"] So(p.Name(), ShouldEqual, "fedora") So(p.LogDir(), ShouldEqual, "/var/log/tunasync/fedora") So(p.LogFile(), ShouldEqual, "/var/log/tunasync/fedora/latest.log") diff --git a/worker/job.go b/worker/job.go index 9e2afb6..d8462fd 100644 --- a/worker/job.go +++ b/worker/job.go @@ -65,6 +65,15 @@ func (m *mirrorJob) SetState(state uint32) { atomic.StoreUint32(&(m.state), state) } +func (m *mirrorJob) SetProvider(provider mirrorProvider) error { + s := m.State() + if (s != stateNone) && (s != stateDisabled) { + return fmt.Errorf("Provider cannot be switched when job state is %d", s) + } + m.provider = provider + return nil +} + // runMirrorJob is the goroutine where syncing job runs in // arguments: // provider: mirror provider object @@ -165,7 +174,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err if syncErr == nil { // syncing success logger.Noticef("succeeded syncing %s", m.Name()) - managerChan <- jobMessage{tunasync.Success, m.Name(), "", true} + managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)} // post-success hooks err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success") if err != nil { @@ -177,7 +186,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err // syncing failed logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error()) - managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), retry == maxRetry-1} + managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)} // post-fail hooks logger.Debug("post-fail hooks") diff --git a/worker/provider.go b/worker/provider.go index 3a44b04..2791b56 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -1,9 +1,10 @@ package worker import ( - "os" - "sync" - "sync/atomic" + "bytes" + "errors" + "html/template" + "path/filepath" "time" ) @@ -54,150 +55,136 @@ type mirrorProvider interface { Context() *Context } -type baseProvider struct { - sync.Mutex +// newProvider creates a mirrorProvider instance +// using a mirrorCfg and the global cfg +func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { - ctx *Context - name string - interval time.Duration - isMaster bool - - cmd *cmdJob - isRunning atomic.Value - - logFile *os.File - - cgroup *cgroupHook - hooks []jobHook -} - -func (p *baseProvider) Name() string { - return p.name -} - -func (p *baseProvider) EnterContext() *Context { - p.ctx = p.ctx.Enter() - return p.ctx -} - -func (p *baseProvider) ExitContext() *Context { - p.ctx, _ = p.ctx.Exit() - return p.ctx -} - -func (p *baseProvider) Context() *Context { - return p.ctx -} - -func (p *baseProvider) Interval() time.Duration { - // logger.Debug("interval for %s: %v", p.Name(), p.interval) - return p.interval -} - -func (p *baseProvider) IsMaster() bool { - return p.isMaster -} - -func (p *baseProvider) WorkingDir() string { - if v, ok := p.ctx.Get(_WorkingDirKey); ok { - if s, ok := v.(string); ok { - return s - } - } - panic("working dir is impossible to be non-exist") -} - -func (p *baseProvider) LogDir() string { - if v, ok := p.ctx.Get(_LogDirKey); ok { - if s, ok := v.(string); ok { - return s - } - } - panic("log dir is impossible to be unavailable") -} - -func (p *baseProvider) LogFile() string { - if v, ok := p.ctx.Get(_LogFileKey); ok { - if s, ok := v.(string); ok { - return s - } - } - panic("log dir is impossible to be unavailable") -} - -func (p *baseProvider) AddHook(hook jobHook) { - if cg, ok := hook.(*cgroupHook); ok { - p.cgroup = cg - } - p.hooks = append(p.hooks, hook) -} - -func (p *baseProvider) Hooks() []jobHook { - return p.hooks -} - -func (p *baseProvider) Cgroup() *cgroupHook { - return p.cgroup -} - -func (p *baseProvider) prepareLogFile() error { - if p.LogFile() == "/dev/null" { - p.cmd.SetLogFile(nil) - return nil - } - if p.logFile == nil { - logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644) + formatLogDir := func(logDir string, m mirrorConfig) string { + tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir) if err != nil { - logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error()) - return err + panic(err) } - p.logFile = logFile + var formatedLogDir bytes.Buffer + tmpl.Execute(&formatedLogDir, m) + return formatedLogDir.String() } - p.cmd.SetLogFile(p.logFile) - return nil -} -func (p *baseProvider) Run() error { - panic("Not Implemented") -} + logDir := mirror.LogDir + mirrorDir := mirror.MirrorDir + if logDir == "" { + logDir = cfg.Global.LogDir + } + if mirrorDir == "" { + mirrorDir = filepath.Join( + cfg.Global.MirrorDir, mirror.Name, + ) + } + if mirror.Interval == 0 { + mirror.Interval = cfg.Global.Interval + } + logDir = formatLogDir(logDir, mirror) -func (p *baseProvider) Start() error { - panic("Not Implemented") -} - -func (p *baseProvider) IsRunning() bool { - isRunning, _ := p.isRunning.Load().(bool) - return isRunning -} - -func (p *baseProvider) Wait() error { - defer func() { - p.Lock() - p.isRunning.Store(false) - if p.logFile != nil { - p.logFile.Close() - p.logFile = nil + // IsMaster + isMaster := true + if mirror.Role == "slave" { + isMaster = false + } else { + if mirror.Role != "" && mirror.Role != "master" { + logger.Warningf("Invalid role configuration for %s", mirror.Name) } - p.Unlock() - }() - return p.cmd.Wait() -} - -func (p *baseProvider) Terminate() error { - logger.Debugf("terminating provider: %s", p.Name()) - if !p.IsRunning() { - return nil } - p.Lock() - if p.logFile != nil { - p.logFile.Close() - p.logFile = nil + var provider mirrorProvider + + switch mirror.Provider { + case provCommand: + pc := cmdConfig{ + name: mirror.Name, + upstreamURL: mirror.Upstream, + command: mirror.Command, + workingDir: mirrorDir, + logDir: logDir, + logFile: filepath.Join(logDir, "latest.log"), + interval: time.Duration(mirror.Interval) * time.Minute, + env: mirror.Env, + } + p, err := newCmdProvider(pc) + p.isMaster = isMaster + if err != nil { + panic(err) + } + provider = p + case provRsync: + rc := rsyncConfig{ + name: mirror.Name, + upstreamURL: mirror.Upstream, + rsyncCmd: mirror.Command, + password: mirror.Password, + excludeFile: mirror.ExcludeFile, + workingDir: mirrorDir, + logDir: logDir, + logFile: filepath.Join(logDir, "latest.log"), + useIPv6: mirror.UseIPv6, + interval: time.Duration(mirror.Interval) * time.Minute, + } + p, err := newRsyncProvider(rc) + p.isMaster = isMaster + if err != nil { + panic(err) + } + provider = p + case provTwoStageRsync: + rc := twoStageRsyncConfig{ + name: mirror.Name, + stage1Profile: mirror.Stage1Profile, + upstreamURL: mirror.Upstream, + rsyncCmd: mirror.Command, + password: mirror.Password, + excludeFile: mirror.ExcludeFile, + workingDir: mirrorDir, + logDir: logDir, + logFile: filepath.Join(logDir, "latest.log"), + useIPv6: mirror.UseIPv6, + interval: time.Duration(mirror.Interval) * time.Minute, + } + p, err := newTwoStageRsyncProvider(rc) + p.isMaster = isMaster + if err != nil { + panic(err) + } + provider = p + default: + panic(errors.New("Invalid mirror provider")) } - p.Unlock() - err := p.cmd.Terminate() - p.isRunning.Store(false) + // Add Logging Hook + provider.AddHook(newLogLimiter(provider)) - return err + // Add Cgroup Hook + if cfg.Cgroup.Enable { + provider.AddHook( + newCgroupHook(provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group), + ) + } + + // ExecOnSuccess hook + if mirror.ExecOnSuccess != "" { + h, err := newExecPostHook(provider, execOnSuccess, mirror.ExecOnSuccess) + if err != nil { + logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error()) + panic(err) + } + provider.AddHook(h) + } + // ExecOnFailure hook + if mirror.ExecOnFailure != "" { + h, err := newExecPostHook(provider, execOnFailure, mirror.ExecOnFailure) + if err != nil { + logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error()) + panic(err) + } + provider.AddHook(h) + } + + return provider } diff --git a/worker/schedule.go b/worker/schedule.go index 0d3f8f0..33d0d4f 100644 --- a/worker/schedule.go +++ b/worker/schedule.go @@ -30,6 +30,7 @@ func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) { q.Lock() defer q.Unlock() q.list.Set(schedTime, job) + logger.Debugf("Added job %s @ %v", job.Name(), schedTime) } // pop out the first job if it's time to run it diff --git a/worker/worker.go b/worker/worker.go index 19bc067..c6a8d31 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1,12 +1,9 @@ package worker import ( - "bytes" - "errors" "fmt" - "html/template" "net/http" - "path/filepath" + "sync" "time" "github.com/gin-gonic/gin" @@ -17,9 +14,9 @@ var tunasyncWorker *Worker // A Worker is a instance of tunasync worker type Worker struct { - cfg *Config - providers map[string]mirrorProvider - jobs map[string]*mirrorJob + L sync.Mutex + cfg *Config + jobs map[string]*mirrorJob managerChan chan jobMessage semaphore chan empty @@ -36,9 +33,8 @@ func GetTUNASyncWorker(cfg *Config) *Worker { } w := &Worker{ - cfg: cfg, - providers: make(map[string]mirrorProvider), - jobs: make(map[string]*mirrorJob), + cfg: cfg, + jobs: make(map[string]*mirrorJob), managerChan: make(chan jobMessage, 32), semaphore: make(chan empty, cfg.Global.Concurrent), @@ -61,147 +57,89 @@ func GetTUNASyncWorker(cfg *Config) *Worker { return w } -func (w *Worker) initProviders() { - c := w.cfg - - formatLogDir := func(logDir string, m mirrorConfig) string { - tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir) - if err != nil { - panic(err) - } - var formatedLogDir bytes.Buffer - tmpl.Execute(&formatedLogDir, m) - return formatedLogDir.String() - } - - for _, mirror := range c.Mirrors { - logDir := mirror.LogDir - mirrorDir := mirror.MirrorDir - if logDir == "" { - logDir = c.Global.LogDir - } - if mirrorDir == "" { - mirrorDir = filepath.Join( - c.Global.MirrorDir, mirror.Name, - ) - } - if mirror.Interval == 0 { - mirror.Interval = c.Global.Interval - } - logDir = formatLogDir(logDir, mirror) - - // IsMaster - isMaster := true - if mirror.Role == "slave" { - isMaster = false - } else { - if mirror.Role != "" && mirror.Role != "master" { - logger.Warningf("Invalid role configuration for %s", mirror.Name) - } - } - - var provider mirrorProvider - - switch mirror.Provider { - case ProvCommand: - pc := cmdConfig{ - name: mirror.Name, - upstreamURL: mirror.Upstream, - command: mirror.Command, - workingDir: mirrorDir, - logDir: logDir, - logFile: filepath.Join(logDir, "latest.log"), - interval: time.Duration(mirror.Interval) * time.Minute, - env: mirror.Env, - } - p, err := newCmdProvider(pc) - p.isMaster = isMaster - if err != nil { - panic(err) - } - provider = p - case ProvRsync: - rc := rsyncConfig{ - name: mirror.Name, - upstreamURL: mirror.Upstream, - rsyncCmd: mirror.Command, - password: mirror.Password, - excludeFile: mirror.ExcludeFile, - workingDir: mirrorDir, - logDir: logDir, - logFile: filepath.Join(logDir, "latest.log"), - useIPv6: mirror.UseIPv6, - interval: time.Duration(mirror.Interval) * time.Minute, - } - p, err := newRsyncProvider(rc) - p.isMaster = isMaster - if err != nil { - panic(err) - } - provider = p - case ProvTwoStageRsync: - rc := twoStageRsyncConfig{ - name: mirror.Name, - stage1Profile: mirror.Stage1Profile, - upstreamURL: mirror.Upstream, - rsyncCmd: mirror.Command, - password: mirror.Password, - excludeFile: mirror.ExcludeFile, - workingDir: mirrorDir, - logDir: logDir, - logFile: filepath.Join(logDir, "latest.log"), - useIPv6: mirror.UseIPv6, - interval: time.Duration(mirror.Interval) * time.Minute, - } - p, err := newTwoStageRsyncProvider(rc) - p.isMaster = isMaster - if err != nil { - panic(err) - } - provider = p - default: - panic(errors.New("Invalid mirror provider")) - - } - - provider.AddHook(newLogLimiter(provider)) - - // Add Cgroup Hook - if w.cfg.Cgroup.Enable { - provider.AddHook( - newCgroupHook(provider, w.cfg.Cgroup.BasePath, w.cfg.Cgroup.Group), - ) - } - - // ExecOnSuccess hook - if mirror.ExecOnSuccess != "" { - h, err := newExecPostHook(provider, execOnSuccess, mirror.ExecOnSuccess) - if err != nil { - logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error()) - panic(err) - } - provider.AddHook(h) - } - // ExecOnFailure hook - if mirror.ExecOnFailure != "" { - h, err := newExecPostHook(provider, execOnFailure, mirror.ExecOnFailure) - if err != nil { - logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error()) - panic(err) - } - provider.AddHook(h) - } - - w.providers[provider.Name()] = provider - +func (w *Worker) initJobs() { + for _, mirror := range w.cfg.Mirrors { + // Create Provider + provider := newMirrorProvider(mirror, w.cfg) + w.jobs[provider.Name()] = newMirrorJob(provider) } } -func (w *Worker) initJobs() { - w.initProviders() +// ReloadMirrorConfig refresh the providers and jobs +// from new mirror configs +// TODO: deleted job should be removed from manager-side mirror list +func (w *Worker) ReloadMirrorConfig(newMirrors []mirrorConfig) { + w.L.Lock() + defer w.L.Unlock() + logger.Info("Reloading mirror configs") - for name, provider := range w.providers { - w.jobs[name] = newMirrorJob(provider) + oldMirrors := w.cfg.Mirrors + difference := diffMirrorConfig(oldMirrors, newMirrors) + + // first deal with deletion and modifications + for _, op := range difference { + if op.diffOp == diffAdd { + continue + } + name := op.mirCfg.Name + job, ok := w.jobs[name] + if !ok { + logger.Warningf("Job %s not found", name) + continue + } + switch op.diffOp { + case diffDelete: + w.disableJob(job) + delete(w.jobs, name) + logger.Noticef("Deleted job %s", name) + case diffModify: + jobState := job.State() + w.disableJob(job) + // set new provider + provider := newMirrorProvider(op.mirCfg, w.cfg) + if err := job.SetProvider(provider); err != nil { + logger.Errorf("Error setting job provider of %s: %s", name, err.Error()) + continue + } + + // re-schedule job according to its previous state + if jobState == stateDisabled { + job.SetState(stateDisabled) + } else if jobState == statePaused { + job.SetState(statePaused) + go job.Run(w.managerChan, w.semaphore) + } else { + job.SetState(stateReady) + go job.Run(w.managerChan, w.semaphore) + w.schedule.AddJob(time.Now(), job) + } + logger.Noticef("Reloaded job %s", name) + } + } + // for added new jobs, just start new jobs + for _, op := range difference { + if op.diffOp != diffAdd { + continue + } + provider := newMirrorProvider(op.mirCfg, w.cfg) + job := newMirrorJob(provider) + w.jobs[provider.Name()] = job + + job.SetState(stateReady) + go job.Run(w.managerChan, w.semaphore) + w.schedule.AddJob(time.Now(), job) + logger.Noticef("New job %s", job.Name()) + } + + w.cfg.Mirrors = newMirrors + +} + +func (w *Worker) disableJob(job *mirrorJob) { + w.schedule.Remove(job.Name()) + if job.State() != stateDisabled { + job.ctrlChan <- jobDisable + <-job.disabled } } @@ -211,17 +149,22 @@ func (w *Worker) makeHTTPServer() { s.Use(gin.Recovery()) s.POST("/", func(c *gin.Context) { + w.L.Lock() + defer w.L.Unlock() + var cmd WorkerCmd if err := c.BindJSON(&cmd); err != nil { c.JSON(http.StatusBadRequest, gin.H{"msg": "Invalid request"}) return } + job, ok := w.jobs[cmd.MirrorID] if !ok { c.JSON(http.StatusNotFound, gin.H{"msg": fmt.Sprintf("Mirror ``%s'' not found", cmd.MirrorID)}) return } + logger.Noticef("Received command: %v", cmd) // if job disabled, start them first switch cmd.Cmd { @@ -243,11 +186,7 @@ func (w *Worker) makeHTTPServer() { job.ctrlChan <- jobStop } case CmdDisable: - w.schedule.Remove(job.Name()) - if job.State() != stateDisabled { - job.ctrlChan <- jobDisable - <-job.disabled - } + w.disableJob(job) case CmdPing: job.ctrlChan <- jobStart default: @@ -289,6 +228,8 @@ func (w *Worker) Run() { } func (w *Worker) runSchedule() { + w.L.Lock() + mirrorList := w.fetchJobStatus() unset := make(map[string]bool) for name := range w.jobs { @@ -327,11 +268,20 @@ func (w *Worker) runSchedule() { w.schedule.AddJob(time.Now(), job) } + w.L.Unlock() + for { select { case jobMsg := <-w.managerChan: // got status update from job - job := w.jobs[jobMsg.name] + w.L.Lock() + job, ok := w.jobs[jobMsg.name] + w.L.Unlock() + if !ok { + logger.Warningf("Job %s not found", jobMsg.name) + continue + } + if job.State() != stateReady { logger.Infof("Job %s state is not ready, skip adding new schedule", jobMsg.name) continue @@ -341,7 +291,7 @@ func (w *Worker) runSchedule() { // is running. If it's paused or disabled // a sync failure signal would be emitted // which needs to be ignored - w.updateStatus(jobMsg) + w.updateStatus(job, jobMsg) // only successful or the final failure msg // can trigger scheduling @@ -361,9 +311,7 @@ func (w *Worker) runSchedule() { job.ctrlChan <- jobStart } } - } - } // Name returns worker name @@ -397,14 +345,14 @@ func (w *Worker) registorWorker() { } } -func (w *Worker) updateStatus(jobMsg jobMessage) { +func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) { url := fmt.Sprintf( "%s/workers/%s/jobs/%s", w.cfg.Manager.APIBase, w.Name(), jobMsg.name, ) - p := w.providers[jobMsg.name] + p := job.provider smsg := MirrorStatus{ Name: jobMsg.name, Worker: w.cfg.Global.Name, From 65984053eb56b66cd38c9c9a02c617e4e73a5439 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Mon, 2 May 2016 18:22:23 +0800 Subject: [PATCH 3/5] fix(worker): fixed scheduling bugs --- worker/schedule.go | 15 ++++++++++++++- worker/schedule_test.go | 18 ++++++++++++++++++ worker/worker.go | 14 ++++++++------ 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/worker/schedule.go b/worker/schedule.go index 33d0d4f..c16ab9d 100644 --- a/worker/schedule.go +++ b/worker/schedule.go @@ -12,6 +12,7 @@ import ( type scheduleQueue struct { sync.Mutex list *skiplist.SkipList + jobs map[string]bool } func timeLessThan(l, r interface{}) bool { @@ -23,12 +24,18 @@ func timeLessThan(l, r interface{}) bool { func newScheduleQueue() *scheduleQueue { queue := new(scheduleQueue) queue.list = skiplist.NewCustomMap(timeLessThan) + queue.jobs = make(map[string]bool) return queue } func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) { q.Lock() defer q.Unlock() + if _, ok := q.jobs[job.Name()]; ok { + logger.Warningf("Job %s already scheduled, removing the existing one", job.Name()) + q.unsafeRemove(job.Name()) + } + q.jobs[job.Name()] = true q.list.Set(schedTime, job) logger.Debugf("Added job %s @ %v", job.Name(), schedTime) } @@ -45,10 +52,11 @@ func (q *scheduleQueue) Pop() *mirrorJob { defer first.Close() t := first.Key().(time.Time) - // logger.Debug("First job should run @%v", t) if t.Before(time.Now()) { job := first.Value().(*mirrorJob) q.list.Delete(first.Key()) + delete(q.jobs, job.Name()) + logger.Debug("Popped out job %s @%v", job.Name(), t) return job } return nil @@ -58,7 +66,11 @@ func (q *scheduleQueue) Pop() *mirrorJob { func (q *scheduleQueue) Remove(name string) bool { q.Lock() defer q.Unlock() + return q.unsafeRemove(name) +} +// remove job +func (q *scheduleQueue) unsafeRemove(name string) bool { cur := q.list.Iterator() defer cur.Close() @@ -66,6 +78,7 @@ func (q *scheduleQueue) Remove(name string) bool { cj := cur.Value().(*mirrorJob) if cj.Name() == name { q.list.Delete(cur.Key()) + delete(q.jobs, name) return true } } diff --git a/worker/schedule_test.go b/worker/schedule_test.go index 8bf3bc5..80475fe 100644 --- a/worker/schedule_test.go +++ b/worker/schedule_test.go @@ -30,6 +30,24 @@ func TestSchedule(t *testing.T) { time.Sleep(1200 * time.Millisecond) So(schedule.Pop(), ShouldEqual, job) + }) + Convey("When adding one job twice", func() { + c := cmdConfig{ + name: "schedule_test", + } + provider, _ := newCmdProvider(c) + job := newMirrorJob(provider) + sched := time.Now().Add(1 * time.Second) + + schedule.AddJob(sched, job) + schedule.AddJob(sched.Add(1*time.Second), job) + + So(schedule.Pop(), ShouldBeNil) + time.Sleep(1200 * time.Millisecond) + So(schedule.Pop(), ShouldBeNil) + time.Sleep(1200 * time.Millisecond) + So(schedule.Pop(), ShouldEqual, job) + }) Convey("When removing jobs", func() { c := cmdConfig{ diff --git a/worker/worker.go b/worker/worker.go index c6a8d31..c68c6d1 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -109,7 +109,7 @@ func (w *Worker) ReloadMirrorConfig(newMirrors []mirrorConfig) { job.SetState(statePaused) go job.Run(w.managerChan, w.semaphore) } else { - job.SetState(stateReady) + job.SetState(stateNone) go job.Run(w.managerChan, w.semaphore) w.schedule.AddJob(time.Now(), job) } @@ -125,7 +125,7 @@ func (w *Worker) ReloadMirrorConfig(newMirrors []mirrorConfig) { job := newMirrorJob(provider) w.jobs[provider.Name()] = job - job.SetState(stateReady) + job.SetState(stateNone) go job.Run(w.managerChan, w.semaphore) w.schedule.AddJob(time.Now(), job) logger.Noticef("New job %s", job.Name()) @@ -166,6 +166,9 @@ func (w *Worker) makeHTTPServer() { } logger.Noticef("Received command: %v", cmd) + // No matter what command, the existing job + // schedule should be flushed + w.schedule.Remove(job.Name()) // if job disabled, start them first switch cmd.Cmd { case CmdStart, CmdRestart: @@ -181,14 +184,13 @@ func (w *Worker) makeHTTPServer() { case CmdStop: // if job is disabled, no goroutine would be there // receiving this signal - w.schedule.Remove(job.Name()) if job.State() != stateDisabled { job.ctrlChan <- jobStop } case CmdDisable: w.disableJob(job) case CmdPing: - job.ctrlChan <- jobStart + // empty default: c.JSON(http.StatusNotAcceptable, gin.H{"msg": "Invalid Command"}) return @@ -250,7 +252,7 @@ func (w *Worker) runSchedule() { go job.Run(w.managerChan, w.semaphore) continue default: - job.SetState(stateReady) + job.SetState(stateNone) go job.Run(w.managerChan, w.semaphore) stime := m.LastUpdate.Add(job.provider.Interval()) logger.Debugf("Scheduling job %s @%s", job.Name(), stime.Format("2006-01-02 15:04:05")) @@ -263,7 +265,7 @@ func (w *Worker) runSchedule() { // manager's mirror list for name := range unset { job := w.jobs[name] - job.SetState(stateReady) + job.SetState(stateNone) go job.Run(w.managerChan, w.semaphore) w.schedule.AddJob(time.Now(), job) } From f768b48d721a340ed5062b58a100e5ba98650e95 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Mon, 2 May 2016 18:58:22 +0800 Subject: [PATCH 4/5] feature(manager): let manager log job syncing event --- manager/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manager/server.go b/manager/server.go index 30b4a3b..1787aee 100644 --- a/manager/server.go +++ b/manager/server.go @@ -224,7 +224,7 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) { case Failed: logger.Warningf("Job [%s] @<%s> failed", status.Name, status.Worker) case Syncing: - logger.Infof("Job [%s] @<%s> starts syncing", status.Name, status.Worker) + logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker) case Disabled: logger.Noticef("Job [%s] @<%s> disabled", status.Name, status.Worker) case Paused: From dbc96b475ac93a490a3f5a58b91bc94f2642e4e3 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Mon, 2 May 2016 19:04:21 +0800 Subject: [PATCH 5/5] added systemd service files --- README.md | 1 + systemd/tunasync-manager.service | 12 ++++++++++++ systemd/tunasync-worker.service | 14 ++++++++++++++ 3 files changed, 27 insertions(+) create mode 100644 systemd/tunasync-manager.service create mode 100644 systemd/tunasync-worker.service diff --git a/README.md b/README.md index 8fde1d2..2e42881 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,7 @@ tunasync [![Build Status](https://travis-ci.org/tuna/tunasync.svg?branch=dev)](https://travis-ci.org/tuna/tunasync) [![Coverage Status](https://coveralls.io/repos/github/tuna/tunasync/badge.svg?branch=dev)](https://coveralls.io/github/tuna/tunasync?branch=dev) +![GPLv3](https://img.shields.io/badge/license-GPLv3-blue.svg) ## Design diff --git a/systemd/tunasync-manager.service b/systemd/tunasync-manager.service new file mode 100644 index 0000000..600e424 --- /dev/null +++ b/systemd/tunasync-manager.service @@ -0,0 +1,12 @@ +[Unit] +Description = TUNA mirrors sync manager +After=network.target +Requires=network.target + +[Service] +Type=simple +User=tunasync +ExecStart = /home/bin/tunasync manager -c /etc/tunasync/manager.conf --with-systemd + +[Install] +WantedBy=multi-user.target diff --git a/systemd/tunasync-worker.service b/systemd/tunasync-worker.service new file mode 100644 index 0000000..95b9f47 --- /dev/null +++ b/systemd/tunasync-worker.service @@ -0,0 +1,14 @@ +[Unit] +Description = TUNA mirrors sync worker +After=network.target + +[Service] +Type=simple +User=tunasync +PermissionsStartOnly=true +ExecStartPre=/usr/bin/cgcreate -t tunasync -a tunasync -g cpu:tunasync +ExecStart=/home/bin/tunasync worker -c /etc/tunasync/worker.conf --with-systemd +ExecStopPost=/usr/bin/cgdelete cpu:tunasync + +[Install] +WantedBy=multi-user.target