mirror of
https://github.com/tuna/tunasync.git
synced 2025-04-21 04:42:46 +00:00
Merge pull request #21 from tuna/dev
bug fixes and implement worker's hot reloading config file, closing #18
This commit is contained in:
commit
2d2df656af
@ -3,6 +3,7 @@ tunasync
|
||||
|
||||
[](https://travis-ci.org/tuna/tunasync)
|
||||
[](https://coveralls.io/github/tuna/tunasync?branch=dev)
|
||||

|
||||
|
||||
## Design
|
||||
|
||||
|
@ -2,6 +2,9 @@ package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/codegangsta/cli"
|
||||
"github.com/gin-gonic/gin"
|
||||
@ -12,7 +15,7 @@ import (
|
||||
"github.com/tuna/tunasync/worker"
|
||||
)
|
||||
|
||||
var logger = logging.MustGetLogger("tunasync-cmd")
|
||||
var logger = logging.MustGetLogger("tunasync")
|
||||
|
||||
func startManager(c *cli.Context) {
|
||||
tunasync.InitLogger(c.Bool("verbose"), c.Bool("debug"), c.Bool("with-systemd"))
|
||||
@ -54,6 +57,24 @@ func startWorker(c *cli.Context) {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
go func() {
|
||||
time.Sleep(1 * time.Second)
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGHUP)
|
||||
for {
|
||||
s := <-sigChan
|
||||
switch s {
|
||||
case syscall.SIGHUP:
|
||||
logger.Info("Received reload signal")
|
||||
newCfg, err := worker.LoadConfig(c.String("config"))
|
||||
if err != nil {
|
||||
logger.Errorf("Error loading config: %s", err.Error())
|
||||
}
|
||||
w.ReloadMirrorConfig(newCfg.Mirrors)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
logger.Info("Run tunasync worker.")
|
||||
w.Run()
|
||||
}
|
||||
|
@ -224,7 +224,7 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
||||
case Failed:
|
||||
logger.Warningf("Job [%s] @<%s> failed", status.Name, status.Worker)
|
||||
case Syncing:
|
||||
logger.Infof("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
|
||||
logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
|
||||
case Disabled:
|
||||
logger.Noticef("Job [%s] @<%s> disabled", status.Name, status.Worker)
|
||||
case Paused:
|
||||
|
12
systemd/tunasync-manager.service
Normal file
12
systemd/tunasync-manager.service
Normal file
@ -0,0 +1,12 @@
|
||||
[Unit]
|
||||
Description = TUNA mirrors sync manager
|
||||
After=network.target
|
||||
Requires=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=tunasync
|
||||
ExecStart = /home/bin/tunasync manager -c /etc/tunasync/manager.conf --with-systemd
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
14
systemd/tunasync-worker.service
Normal file
14
systemd/tunasync-worker.service
Normal file
@ -0,0 +1,14 @@
|
||||
[Unit]
|
||||
Description = TUNA mirrors sync worker
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=tunasync
|
||||
PermissionsStartOnly=true
|
||||
ExecStartPre=/usr/bin/cgcreate -t tunasync -a tunasync -g cpu:tunasync
|
||||
ExecStart=/home/bin/tunasync worker -c /etc/tunasync/worker.conf --with-systemd
|
||||
ExecStopPost=/usr/bin/cgdelete cpu:tunasync
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
158
worker/base_provider.go
Normal file
158
worker/base_provider.go
Normal file
@ -0,0 +1,158 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// baseProvider is the base mixin of providers
|
||||
|
||||
type baseProvider struct {
|
||||
sync.Mutex
|
||||
|
||||
ctx *Context
|
||||
name string
|
||||
interval time.Duration
|
||||
isMaster bool
|
||||
|
||||
cmd *cmdJob
|
||||
isRunning atomic.Value
|
||||
|
||||
logFile *os.File
|
||||
|
||||
cgroup *cgroupHook
|
||||
hooks []jobHook
|
||||
}
|
||||
|
||||
func (p *baseProvider) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
func (p *baseProvider) EnterContext() *Context {
|
||||
p.ctx = p.ctx.Enter()
|
||||
return p.ctx
|
||||
}
|
||||
|
||||
func (p *baseProvider) ExitContext() *Context {
|
||||
p.ctx, _ = p.ctx.Exit()
|
||||
return p.ctx
|
||||
}
|
||||
|
||||
func (p *baseProvider) Context() *Context {
|
||||
return p.ctx
|
||||
}
|
||||
|
||||
func (p *baseProvider) Interval() time.Duration {
|
||||
// logger.Debug("interval for %s: %v", p.Name(), p.interval)
|
||||
return p.interval
|
||||
}
|
||||
|
||||
func (p *baseProvider) IsMaster() bool {
|
||||
return p.isMaster
|
||||
}
|
||||
|
||||
func (p *baseProvider) WorkingDir() string {
|
||||
if v, ok := p.ctx.Get(_WorkingDirKey); ok {
|
||||
if s, ok := v.(string); ok {
|
||||
return s
|
||||
}
|
||||
}
|
||||
panic("working dir is impossible to be non-exist")
|
||||
}
|
||||
|
||||
func (p *baseProvider) LogDir() string {
|
||||
if v, ok := p.ctx.Get(_LogDirKey); ok {
|
||||
if s, ok := v.(string); ok {
|
||||
return s
|
||||
}
|
||||
}
|
||||
panic("log dir is impossible to be unavailable")
|
||||
}
|
||||
|
||||
func (p *baseProvider) LogFile() string {
|
||||
if v, ok := p.ctx.Get(_LogFileKey); ok {
|
||||
if s, ok := v.(string); ok {
|
||||
return s
|
||||
}
|
||||
}
|
||||
panic("log dir is impossible to be unavailable")
|
||||
}
|
||||
|
||||
func (p *baseProvider) AddHook(hook jobHook) {
|
||||
if cg, ok := hook.(*cgroupHook); ok {
|
||||
p.cgroup = cg
|
||||
}
|
||||
p.hooks = append(p.hooks, hook)
|
||||
}
|
||||
|
||||
func (p *baseProvider) Hooks() []jobHook {
|
||||
return p.hooks
|
||||
}
|
||||
|
||||
func (p *baseProvider) Cgroup() *cgroupHook {
|
||||
return p.cgroup
|
||||
}
|
||||
|
||||
func (p *baseProvider) prepareLogFile() error {
|
||||
if p.LogFile() == "/dev/null" {
|
||||
p.cmd.SetLogFile(nil)
|
||||
return nil
|
||||
}
|
||||
if p.logFile == nil {
|
||||
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
|
||||
return err
|
||||
}
|
||||
p.logFile = logFile
|
||||
}
|
||||
p.cmd.SetLogFile(p.logFile)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *baseProvider) Run() error {
|
||||
panic("Not Implemented")
|
||||
}
|
||||
|
||||
func (p *baseProvider) Start() error {
|
||||
panic("Not Implemented")
|
||||
}
|
||||
|
||||
func (p *baseProvider) IsRunning() bool {
|
||||
isRunning, _ := p.isRunning.Load().(bool)
|
||||
return isRunning
|
||||
}
|
||||
|
||||
func (p *baseProvider) Wait() error {
|
||||
defer func() {
|
||||
p.Lock()
|
||||
p.isRunning.Store(false)
|
||||
if p.logFile != nil {
|
||||
p.logFile.Close()
|
||||
p.logFile = nil
|
||||
}
|
||||
p.Unlock()
|
||||
}()
|
||||
return p.cmd.Wait()
|
||||
}
|
||||
|
||||
func (p *baseProvider) Terminate() error {
|
||||
logger.Debugf("terminating provider: %s", p.Name())
|
||||
if !p.IsRunning() {
|
||||
return nil
|
||||
}
|
||||
|
||||
p.Lock()
|
||||
if p.logFile != nil {
|
||||
p.logFile.Close()
|
||||
p.logFile = nil
|
||||
}
|
||||
p.Unlock()
|
||||
|
||||
err := p.cmd.Terminate()
|
||||
p.isRunning.Store(false)
|
||||
|
||||
return err
|
||||
}
|
@ -7,30 +7,30 @@ import (
|
||||
"github.com/BurntSushi/toml"
|
||||
)
|
||||
|
||||
type ProviderEnum uint8
|
||||
type providerEnum uint8
|
||||
|
||||
const (
|
||||
ProvRsync ProviderEnum = iota
|
||||
ProvTwoStageRsync
|
||||
ProvCommand
|
||||
provRsync providerEnum = iota
|
||||
provTwoStageRsync
|
||||
provCommand
|
||||
)
|
||||
|
||||
func (p *ProviderEnum) UnmarshalText(text []byte) error {
|
||||
func (p *providerEnum) UnmarshalText(text []byte) error {
|
||||
s := string(text)
|
||||
switch s {
|
||||
case `command`:
|
||||
*p = ProvCommand
|
||||
*p = provCommand
|
||||
case `rsync`:
|
||||
*p = ProvRsync
|
||||
*p = provRsync
|
||||
case `two-stage-rsync`:
|
||||
*p = ProvTwoStageRsync
|
||||
*p = provTwoStageRsync
|
||||
default:
|
||||
return errors.New("Invalid value to provierEnum")
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// Worker config options
|
||||
type Config struct {
|
||||
Global globalConfig `toml:"global"`
|
||||
Manager managerConfig `toml:"manager"`
|
||||
@ -69,7 +69,7 @@ type cgroupConfig struct {
|
||||
|
||||
type mirrorConfig struct {
|
||||
Name string `toml:"name"`
|
||||
Provider ProviderEnum `toml:"provider"`
|
||||
Provider providerEnum `toml:"provider"`
|
||||
Upstream string `toml:"upstream"`
|
||||
Interval int `toml:"interval"`
|
||||
MirrorDir string `toml:"mirror_dir"`
|
||||
|
88
worker/config_diff.go
Normal file
88
worker/config_diff.go
Normal file
@ -0,0 +1,88 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// Find difference of mirror config, this is important for hot reloading config file
|
||||
// NOTICE: only the [[mirrors]] section is supported
|
||||
|
||||
// make []mirrorConfig sortable
|
||||
type sortableMirrorList []mirrorConfig
|
||||
|
||||
func (l sortableMirrorList) Len() int { return len(l) }
|
||||
func (l sortableMirrorList) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
|
||||
func (l sortableMirrorList) Less(i, j int) bool { return l[i].Name < l[j].Name }
|
||||
|
||||
const (
|
||||
diffDelete uint8 = iota
|
||||
diffAdd
|
||||
diffModify
|
||||
)
|
||||
|
||||
// a unit of mirror config difference
|
||||
type mirrorCfgTrans struct {
|
||||
diffOp uint8
|
||||
mirCfg mirrorConfig
|
||||
}
|
||||
|
||||
func (t mirrorCfgTrans) String() string {
|
||||
var op string
|
||||
if t.diffOp == diffDelete {
|
||||
op = "Del"
|
||||
} else {
|
||||
op = "Add"
|
||||
}
|
||||
return fmt.Sprintf("{%s, %s}", op, t.mirCfg.Name)
|
||||
}
|
||||
|
||||
// diffMirrorConfig finds the difference between the oldList and the newList
|
||||
// it returns a series of operations that if these operations are applied to
|
||||
// oldList, a newList equavuilance can be obtained.
|
||||
func diffMirrorConfig(oldList, newList []mirrorConfig) []mirrorCfgTrans {
|
||||
operations := []mirrorCfgTrans{}
|
||||
|
||||
oList := make([]mirrorConfig, len(oldList))
|
||||
nList := make([]mirrorConfig, len(newList))
|
||||
copy(oList, oldList)
|
||||
copy(nList, newList)
|
||||
|
||||
// first ensure oldList and newList are sorted
|
||||
sort.Sort(sortableMirrorList(oList))
|
||||
sort.Sort(sortableMirrorList(nList))
|
||||
|
||||
// insert a tail node to both lists
|
||||
// as the maximum node
|
||||
lastOld, lastNew := oList[len(oList)-1], nList[len(nList)-1]
|
||||
maxName := lastOld.Name
|
||||
if lastNew.Name > lastOld.Name {
|
||||
maxName = lastNew.Name
|
||||
}
|
||||
Nil := mirrorConfig{Name: "~" + maxName}
|
||||
if Nil.Name <= maxName {
|
||||
panic("Nil.Name should be larger than maxName")
|
||||
}
|
||||
oList, nList = append(oList, Nil), append(nList, Nil)
|
||||
|
||||
// iterate over both lists to find the difference
|
||||
for i, j := 0, 0; i < len(oList) && j < len(nList); {
|
||||
o, n := oList[i], nList[j]
|
||||
if n.Name < o.Name {
|
||||
operations = append(operations, mirrorCfgTrans{diffAdd, n})
|
||||
j++
|
||||
} else if o.Name < n.Name {
|
||||
operations = append(operations, mirrorCfgTrans{diffDelete, o})
|
||||
i++
|
||||
} else {
|
||||
if !reflect.DeepEqual(o, n) {
|
||||
operations = append(operations, mirrorCfgTrans{diffModify, n})
|
||||
}
|
||||
i++
|
||||
j++
|
||||
}
|
||||
}
|
||||
|
||||
return operations
|
||||
}
|
73
worker/config_diff_test.go
Normal file
73
worker/config_diff_test.go
Normal file
@ -0,0 +1,73 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestConfigDiff(t *testing.T) {
|
||||
Convey("When old and new configs are equal", t, func() {
|
||||
oldList := []mirrorConfig{
|
||||
mirrorConfig{Name: "debian"},
|
||||
mirrorConfig{Name: "debian-security"},
|
||||
mirrorConfig{Name: "fedora"},
|
||||
mirrorConfig{Name: "archlinux"},
|
||||
mirrorConfig{Name: "AOSP"},
|
||||
mirrorConfig{Name: "ubuntu"},
|
||||
}
|
||||
newList := make([]mirrorConfig, len(oldList))
|
||||
copy(newList, oldList)
|
||||
|
||||
difference := diffMirrorConfig(oldList, newList)
|
||||
So(len(difference), ShouldEqual, 0)
|
||||
})
|
||||
Convey("When giving two config lists with different names", t, func() {
|
||||
oldList := []mirrorConfig{
|
||||
mirrorConfig{Name: "debian"},
|
||||
mirrorConfig{Name: "debian-security"},
|
||||
mirrorConfig{Name: "fedora"},
|
||||
mirrorConfig{Name: "archlinux"},
|
||||
mirrorConfig{Name: "AOSP", Env: map[string]string{"REPO": "/usr/bin/repo"}},
|
||||
mirrorConfig{Name: "ubuntu"},
|
||||
}
|
||||
newList := []mirrorConfig{
|
||||
mirrorConfig{Name: "debian"},
|
||||
mirrorConfig{Name: "debian-cd"},
|
||||
mirrorConfig{Name: "archlinuxcn"},
|
||||
mirrorConfig{Name: "AOSP", Env: map[string]string{"REPO": "/usr/local/bin/aosp-repo"}},
|
||||
mirrorConfig{Name: "ubuntu-ports"},
|
||||
}
|
||||
|
||||
difference := diffMirrorConfig(oldList, newList)
|
||||
|
||||
sort.Sort(sortableMirrorList(oldList))
|
||||
emptyList := []mirrorConfig{}
|
||||
|
||||
for _, o := range oldList {
|
||||
keep := true
|
||||
for _, op := range difference {
|
||||
if (op.diffOp == diffDelete || op.diffOp == diffModify) &&
|
||||
op.mirCfg.Name == o.Name {
|
||||
|
||||
keep = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if keep {
|
||||
emptyList = append(emptyList, o)
|
||||
}
|
||||
}
|
||||
|
||||
for _, op := range difference {
|
||||
if op.diffOp == diffAdd || op.diffOp == diffModify {
|
||||
emptyList = append(emptyList, op.mirCfg)
|
||||
}
|
||||
}
|
||||
sort.Sort(sortableMirrorList(emptyList))
|
||||
sort.Sort(sortableMirrorList(newList))
|
||||
So(emptyList, ShouldResemble, newList)
|
||||
|
||||
})
|
||||
}
|
@ -82,19 +82,19 @@ exec_on_failure = "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKIN
|
||||
m := cfg.Mirrors[0]
|
||||
So(m.Name, ShouldEqual, "AOSP")
|
||||
So(m.MirrorDir, ShouldEqual, "/data/git/AOSP")
|
||||
So(m.Provider, ShouldEqual, ProvCommand)
|
||||
So(m.Provider, ShouldEqual, provCommand)
|
||||
So(m.Interval, ShouldEqual, 720)
|
||||
So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo")
|
||||
|
||||
m = cfg.Mirrors[1]
|
||||
So(m.Name, ShouldEqual, "debian")
|
||||
So(m.MirrorDir, ShouldEqual, "")
|
||||
So(m.Provider, ShouldEqual, ProvTwoStageRsync)
|
||||
So(m.Provider, ShouldEqual, provTwoStageRsync)
|
||||
|
||||
m = cfg.Mirrors[2]
|
||||
So(m.Name, ShouldEqual, "fedora")
|
||||
So(m.MirrorDir, ShouldEqual, "")
|
||||
So(m.Provider, ShouldEqual, ProvRsync)
|
||||
So(m.Provider, ShouldEqual, provRsync)
|
||||
So(m.ExcludeFile, ShouldEqual, "/etc/tunasync.d/fedora-exclude.txt")
|
||||
|
||||
So(len(cfg.Mirrors), ShouldEqual, 3)
|
||||
@ -112,14 +112,13 @@ exec_on_failure = "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKIN
|
||||
cfg, err := LoadConfig(tmpfile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
w := &Worker{
|
||||
cfg: cfg,
|
||||
providers: make(map[string]mirrorProvider),
|
||||
providers := map[string]mirrorProvider{}
|
||||
for _, m := range cfg.Mirrors {
|
||||
p := newMirrorProvider(m, cfg)
|
||||
providers[p.Name()] = p
|
||||
}
|
||||
|
||||
w.initProviders()
|
||||
|
||||
p := w.providers["AOSP"]
|
||||
p := providers["AOSP"]
|
||||
So(p.Name(), ShouldEqual, "AOSP")
|
||||
So(p.LogDir(), ShouldEqual, "/var/log/tunasync/AOSP")
|
||||
So(p.LogFile(), ShouldEqual, "/var/log/tunasync/AOSP/latest.log")
|
||||
@ -132,7 +131,7 @@ exec_on_failure = "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKIN
|
||||
}
|
||||
}
|
||||
|
||||
p = w.providers["debian"]
|
||||
p = providers["debian"]
|
||||
So(p.Name(), ShouldEqual, "debian")
|
||||
So(p.LogDir(), ShouldEqual, "/var/log/tunasync/debian")
|
||||
So(p.LogFile(), ShouldEqual, "/var/log/tunasync/debian/latest.log")
|
||||
@ -141,7 +140,7 @@ exec_on_failure = "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKIN
|
||||
So(r2p.stage1Profile, ShouldEqual, "debian")
|
||||
So(r2p.WorkingDir(), ShouldEqual, "/data/mirrors/debian")
|
||||
|
||||
p = w.providers["fedora"]
|
||||
p = providers["fedora"]
|
||||
So(p.Name(), ShouldEqual, "fedora")
|
||||
So(p.LogDir(), ShouldEqual, "/var/log/tunasync/fedora")
|
||||
So(p.LogFile(), ShouldEqual, "/var/log/tunasync/fedora/latest.log")
|
||||
|
@ -65,6 +65,15 @@ func (m *mirrorJob) SetState(state uint32) {
|
||||
atomic.StoreUint32(&(m.state), state)
|
||||
}
|
||||
|
||||
func (m *mirrorJob) SetProvider(provider mirrorProvider) error {
|
||||
s := m.State()
|
||||
if (s != stateNone) && (s != stateDisabled) {
|
||||
return fmt.Errorf("Provider cannot be switched when job state is %d", s)
|
||||
}
|
||||
m.provider = provider
|
||||
return nil
|
||||
}
|
||||
|
||||
// runMirrorJob is the goroutine where syncing job runs in
|
||||
// arguments:
|
||||
// provider: mirror provider object
|
||||
@ -165,7 +174,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
if syncErr == nil {
|
||||
// syncing success
|
||||
logger.Noticef("succeeded syncing %s", m.Name())
|
||||
managerChan <- jobMessage{tunasync.Success, m.Name(), "", true}
|
||||
managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
|
||||
// post-success hooks
|
||||
err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
|
||||
if err != nil {
|
||||
@ -177,7 +186,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
|
||||
// syncing failed
|
||||
logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
|
||||
managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), retry == maxRetry-1}
|
||||
managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)}
|
||||
|
||||
// post-fail hooks
|
||||
logger.Debug("post-fail hooks")
|
||||
|
@ -68,13 +68,13 @@ func (l *logLimiter) preExec() error {
|
||||
}
|
||||
}
|
||||
|
||||
logFile := filepath.Join(
|
||||
logDir,
|
||||
fmt.Sprintf(
|
||||
"%s_%s.log",
|
||||
p.Name(),
|
||||
time.Now().Format("2006-01-02_15_04"),
|
||||
),
|
||||
logFileName := fmt.Sprintf(
|
||||
"%s_%s.log",
|
||||
p.Name(),
|
||||
time.Now().Format("2006-01-02_15_04"),
|
||||
)
|
||||
logFilePath := filepath.Join(
|
||||
logDir, logFileName,
|
||||
)
|
||||
|
||||
logLink := filepath.Join(logDir, "latest")
|
||||
@ -82,10 +82,10 @@ func (l *logLimiter) preExec() error {
|
||||
if _, err = os.Stat(logLink); err == nil {
|
||||
os.Remove(logLink)
|
||||
}
|
||||
os.Symlink(logFile, logLink)
|
||||
os.Symlink(logFileName, logLink)
|
||||
|
||||
ctx := p.EnterContext()
|
||||
ctx.Set(_LogFileKey, logFile)
|
||||
ctx.Set(_LogFileKey, logFilePath)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -101,7 +101,8 @@ func (l *logLimiter) postFail() error {
|
||||
logLink := filepath.Join(logDir, "latest")
|
||||
os.Rename(logFile, logFileFail)
|
||||
os.Remove(logLink)
|
||||
os.Symlink(logFileFail, logLink)
|
||||
logFileName := filepath.Base(logFileFail)
|
||||
os.Symlink(logFileName, logLink)
|
||||
|
||||
l.provider.ExitContext()
|
||||
return nil
|
||||
|
@ -1,9 +1,10 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"bytes"
|
||||
"errors"
|
||||
"html/template"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -54,150 +55,136 @@ type mirrorProvider interface {
|
||||
Context() *Context
|
||||
}
|
||||
|
||||
type baseProvider struct {
|
||||
sync.Mutex
|
||||
// newProvider creates a mirrorProvider instance
|
||||
// using a mirrorCfg and the global cfg
|
||||
func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
|
||||
|
||||
ctx *Context
|
||||
name string
|
||||
interval time.Duration
|
||||
isMaster bool
|
||||
|
||||
cmd *cmdJob
|
||||
isRunning atomic.Value
|
||||
|
||||
logFile *os.File
|
||||
|
||||
cgroup *cgroupHook
|
||||
hooks []jobHook
|
||||
}
|
||||
|
||||
func (p *baseProvider) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
func (p *baseProvider) EnterContext() *Context {
|
||||
p.ctx = p.ctx.Enter()
|
||||
return p.ctx
|
||||
}
|
||||
|
||||
func (p *baseProvider) ExitContext() *Context {
|
||||
p.ctx, _ = p.ctx.Exit()
|
||||
return p.ctx
|
||||
}
|
||||
|
||||
func (p *baseProvider) Context() *Context {
|
||||
return p.ctx
|
||||
}
|
||||
|
||||
func (p *baseProvider) Interval() time.Duration {
|
||||
// logger.Debug("interval for %s: %v", p.Name(), p.interval)
|
||||
return p.interval
|
||||
}
|
||||
|
||||
func (p *baseProvider) IsMaster() bool {
|
||||
return p.isMaster
|
||||
}
|
||||
|
||||
func (p *baseProvider) WorkingDir() string {
|
||||
if v, ok := p.ctx.Get(_WorkingDirKey); ok {
|
||||
if s, ok := v.(string); ok {
|
||||
return s
|
||||
}
|
||||
}
|
||||
panic("working dir is impossible to be non-exist")
|
||||
}
|
||||
|
||||
func (p *baseProvider) LogDir() string {
|
||||
if v, ok := p.ctx.Get(_LogDirKey); ok {
|
||||
if s, ok := v.(string); ok {
|
||||
return s
|
||||
}
|
||||
}
|
||||
panic("log dir is impossible to be unavailable")
|
||||
}
|
||||
|
||||
func (p *baseProvider) LogFile() string {
|
||||
if v, ok := p.ctx.Get(_LogFileKey); ok {
|
||||
if s, ok := v.(string); ok {
|
||||
return s
|
||||
}
|
||||
}
|
||||
panic("log dir is impossible to be unavailable")
|
||||
}
|
||||
|
||||
func (p *baseProvider) AddHook(hook jobHook) {
|
||||
if cg, ok := hook.(*cgroupHook); ok {
|
||||
p.cgroup = cg
|
||||
}
|
||||
p.hooks = append(p.hooks, hook)
|
||||
}
|
||||
|
||||
func (p *baseProvider) Hooks() []jobHook {
|
||||
return p.hooks
|
||||
}
|
||||
|
||||
func (p *baseProvider) Cgroup() *cgroupHook {
|
||||
return p.cgroup
|
||||
}
|
||||
|
||||
func (p *baseProvider) prepareLogFile() error {
|
||||
if p.LogFile() == "/dev/null" {
|
||||
p.cmd.SetLogFile(nil)
|
||||
return nil
|
||||
}
|
||||
if p.logFile == nil {
|
||||
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
|
||||
formatLogDir := func(logDir string, m mirrorConfig) string {
|
||||
tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
|
||||
if err != nil {
|
||||
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
|
||||
return err
|
||||
panic(err)
|
||||
}
|
||||
p.logFile = logFile
|
||||
var formatedLogDir bytes.Buffer
|
||||
tmpl.Execute(&formatedLogDir, m)
|
||||
return formatedLogDir.String()
|
||||
}
|
||||
p.cmd.SetLogFile(p.logFile)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *baseProvider) Run() error {
|
||||
panic("Not Implemented")
|
||||
}
|
||||
logDir := mirror.LogDir
|
||||
mirrorDir := mirror.MirrorDir
|
||||
if logDir == "" {
|
||||
logDir = cfg.Global.LogDir
|
||||
}
|
||||
if mirrorDir == "" {
|
||||
mirrorDir = filepath.Join(
|
||||
cfg.Global.MirrorDir, mirror.Name,
|
||||
)
|
||||
}
|
||||
if mirror.Interval == 0 {
|
||||
mirror.Interval = cfg.Global.Interval
|
||||
}
|
||||
logDir = formatLogDir(logDir, mirror)
|
||||
|
||||
func (p *baseProvider) Start() error {
|
||||
panic("Not Implemented")
|
||||
}
|
||||
|
||||
func (p *baseProvider) IsRunning() bool {
|
||||
isRunning, _ := p.isRunning.Load().(bool)
|
||||
return isRunning
|
||||
}
|
||||
|
||||
func (p *baseProvider) Wait() error {
|
||||
defer func() {
|
||||
p.Lock()
|
||||
p.isRunning.Store(false)
|
||||
if p.logFile != nil {
|
||||
p.logFile.Close()
|
||||
p.logFile = nil
|
||||
// IsMaster
|
||||
isMaster := true
|
||||
if mirror.Role == "slave" {
|
||||
isMaster = false
|
||||
} else {
|
||||
if mirror.Role != "" && mirror.Role != "master" {
|
||||
logger.Warningf("Invalid role configuration for %s", mirror.Name)
|
||||
}
|
||||
p.Unlock()
|
||||
}()
|
||||
return p.cmd.Wait()
|
||||
}
|
||||
|
||||
func (p *baseProvider) Terminate() error {
|
||||
logger.Debugf("terminating provider: %s", p.Name())
|
||||
if !p.IsRunning() {
|
||||
return nil
|
||||
}
|
||||
|
||||
p.Lock()
|
||||
if p.logFile != nil {
|
||||
p.logFile.Close()
|
||||
p.logFile = nil
|
||||
var provider mirrorProvider
|
||||
|
||||
switch mirror.Provider {
|
||||
case provCommand:
|
||||
pc := cmdConfig{
|
||||
name: mirror.Name,
|
||||
upstreamURL: mirror.Upstream,
|
||||
command: mirror.Command,
|
||||
workingDir: mirrorDir,
|
||||
logDir: logDir,
|
||||
logFile: filepath.Join(logDir, "latest.log"),
|
||||
interval: time.Duration(mirror.Interval) * time.Minute,
|
||||
env: mirror.Env,
|
||||
}
|
||||
p, err := newCmdProvider(pc)
|
||||
p.isMaster = isMaster
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
provider = p
|
||||
case provRsync:
|
||||
rc := rsyncConfig{
|
||||
name: mirror.Name,
|
||||
upstreamURL: mirror.Upstream,
|
||||
rsyncCmd: mirror.Command,
|
||||
password: mirror.Password,
|
||||
excludeFile: mirror.ExcludeFile,
|
||||
workingDir: mirrorDir,
|
||||
logDir: logDir,
|
||||
logFile: filepath.Join(logDir, "latest.log"),
|
||||
useIPv6: mirror.UseIPv6,
|
||||
interval: time.Duration(mirror.Interval) * time.Minute,
|
||||
}
|
||||
p, err := newRsyncProvider(rc)
|
||||
p.isMaster = isMaster
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
provider = p
|
||||
case provTwoStageRsync:
|
||||
rc := twoStageRsyncConfig{
|
||||
name: mirror.Name,
|
||||
stage1Profile: mirror.Stage1Profile,
|
||||
upstreamURL: mirror.Upstream,
|
||||
rsyncCmd: mirror.Command,
|
||||
password: mirror.Password,
|
||||
excludeFile: mirror.ExcludeFile,
|
||||
workingDir: mirrorDir,
|
||||
logDir: logDir,
|
||||
logFile: filepath.Join(logDir, "latest.log"),
|
||||
useIPv6: mirror.UseIPv6,
|
||||
interval: time.Duration(mirror.Interval) * time.Minute,
|
||||
}
|
||||
p, err := newTwoStageRsyncProvider(rc)
|
||||
p.isMaster = isMaster
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
provider = p
|
||||
default:
|
||||
panic(errors.New("Invalid mirror provider"))
|
||||
}
|
||||
p.Unlock()
|
||||
|
||||
err := p.cmd.Terminate()
|
||||
p.isRunning.Store(false)
|
||||
// Add Logging Hook
|
||||
provider.AddHook(newLogLimiter(provider))
|
||||
|
||||
return err
|
||||
// Add Cgroup Hook
|
||||
if cfg.Cgroup.Enable {
|
||||
provider.AddHook(
|
||||
newCgroupHook(provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group),
|
||||
)
|
||||
}
|
||||
|
||||
// ExecOnSuccess hook
|
||||
if mirror.ExecOnSuccess != "" {
|
||||
h, err := newExecPostHook(provider, execOnSuccess, mirror.ExecOnSuccess)
|
||||
if err != nil {
|
||||
logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
|
||||
panic(err)
|
||||
}
|
||||
provider.AddHook(h)
|
||||
}
|
||||
// ExecOnFailure hook
|
||||
if mirror.ExecOnFailure != "" {
|
||||
h, err := newExecPostHook(provider, execOnFailure, mirror.ExecOnFailure)
|
||||
if err != nil {
|
||||
logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
|
||||
panic(err)
|
||||
}
|
||||
provider.AddHook(h)
|
||||
}
|
||||
|
||||
return provider
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
type scheduleQueue struct {
|
||||
sync.Mutex
|
||||
list *skiplist.SkipList
|
||||
jobs map[string]bool
|
||||
}
|
||||
|
||||
func timeLessThan(l, r interface{}) bool {
|
||||
@ -23,13 +24,20 @@ func timeLessThan(l, r interface{}) bool {
|
||||
func newScheduleQueue() *scheduleQueue {
|
||||
queue := new(scheduleQueue)
|
||||
queue.list = skiplist.NewCustomMap(timeLessThan)
|
||||
queue.jobs = make(map[string]bool)
|
||||
return queue
|
||||
}
|
||||
|
||||
func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
if _, ok := q.jobs[job.Name()]; ok {
|
||||
logger.Warningf("Job %s already scheduled, removing the existing one", job.Name())
|
||||
q.unsafeRemove(job.Name())
|
||||
}
|
||||
q.jobs[job.Name()] = true
|
||||
q.list.Set(schedTime, job)
|
||||
logger.Debugf("Added job %s @ %v", job.Name(), schedTime)
|
||||
}
|
||||
|
||||
// pop out the first job if it's time to run it
|
||||
@ -44,10 +52,11 @@ func (q *scheduleQueue) Pop() *mirrorJob {
|
||||
defer first.Close()
|
||||
|
||||
t := first.Key().(time.Time)
|
||||
// logger.Debug("First job should run @%v", t)
|
||||
if t.Before(time.Now()) {
|
||||
job := first.Value().(*mirrorJob)
|
||||
q.list.Delete(first.Key())
|
||||
delete(q.jobs, job.Name())
|
||||
logger.Debug("Popped out job %s @%v", job.Name(), t)
|
||||
return job
|
||||
}
|
||||
return nil
|
||||
@ -57,7 +66,11 @@ func (q *scheduleQueue) Pop() *mirrorJob {
|
||||
func (q *scheduleQueue) Remove(name string) bool {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
return q.unsafeRemove(name)
|
||||
}
|
||||
|
||||
// remove job
|
||||
func (q *scheduleQueue) unsafeRemove(name string) bool {
|
||||
cur := q.list.Iterator()
|
||||
defer cur.Close()
|
||||
|
||||
@ -65,6 +78,7 @@ func (q *scheduleQueue) Remove(name string) bool {
|
||||
cj := cur.Value().(*mirrorJob)
|
||||
if cj.Name() == name {
|
||||
q.list.Delete(cur.Key())
|
||||
delete(q.jobs, name)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,24 @@ func TestSchedule(t *testing.T) {
|
||||
time.Sleep(1200 * time.Millisecond)
|
||||
So(schedule.Pop(), ShouldEqual, job)
|
||||
|
||||
})
|
||||
Convey("When adding one job twice", func() {
|
||||
c := cmdConfig{
|
||||
name: "schedule_test",
|
||||
}
|
||||
provider, _ := newCmdProvider(c)
|
||||
job := newMirrorJob(provider)
|
||||
sched := time.Now().Add(1 * time.Second)
|
||||
|
||||
schedule.AddJob(sched, job)
|
||||
schedule.AddJob(sched.Add(1*time.Second), job)
|
||||
|
||||
So(schedule.Pop(), ShouldBeNil)
|
||||
time.Sleep(1200 * time.Millisecond)
|
||||
So(schedule.Pop(), ShouldBeNil)
|
||||
time.Sleep(1200 * time.Millisecond)
|
||||
So(schedule.Pop(), ShouldEqual, job)
|
||||
|
||||
})
|
||||
Convey("When removing jobs", func() {
|
||||
c := cmdConfig{
|
||||
|
274
worker/worker.go
274
worker/worker.go
@ -1,12 +1,9 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@ -17,9 +14,9 @@ var tunasyncWorker *Worker
|
||||
|
||||
// A Worker is a instance of tunasync worker
|
||||
type Worker struct {
|
||||
cfg *Config
|
||||
providers map[string]mirrorProvider
|
||||
jobs map[string]*mirrorJob
|
||||
L sync.Mutex
|
||||
cfg *Config
|
||||
jobs map[string]*mirrorJob
|
||||
|
||||
managerChan chan jobMessage
|
||||
semaphore chan empty
|
||||
@ -36,9 +33,8 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
|
||||
}
|
||||
|
||||
w := &Worker{
|
||||
cfg: cfg,
|
||||
providers: make(map[string]mirrorProvider),
|
||||
jobs: make(map[string]*mirrorJob),
|
||||
cfg: cfg,
|
||||
jobs: make(map[string]*mirrorJob),
|
||||
|
||||
managerChan: make(chan jobMessage, 32),
|
||||
semaphore: make(chan empty, cfg.Global.Concurrent),
|
||||
@ -61,147 +57,89 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *Worker) initProviders() {
|
||||
c := w.cfg
|
||||
|
||||
formatLogDir := func(logDir string, m mirrorConfig) string {
|
||||
tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var formatedLogDir bytes.Buffer
|
||||
tmpl.Execute(&formatedLogDir, m)
|
||||
return formatedLogDir.String()
|
||||
}
|
||||
|
||||
for _, mirror := range c.Mirrors {
|
||||
logDir := mirror.LogDir
|
||||
mirrorDir := mirror.MirrorDir
|
||||
if logDir == "" {
|
||||
logDir = c.Global.LogDir
|
||||
}
|
||||
if mirrorDir == "" {
|
||||
mirrorDir = filepath.Join(
|
||||
c.Global.MirrorDir, mirror.Name,
|
||||
)
|
||||
}
|
||||
if mirror.Interval == 0 {
|
||||
mirror.Interval = c.Global.Interval
|
||||
}
|
||||
logDir = formatLogDir(logDir, mirror)
|
||||
|
||||
// IsMaster
|
||||
isMaster := true
|
||||
if mirror.Role == "slave" {
|
||||
isMaster = false
|
||||
} else {
|
||||
if mirror.Role != "" && mirror.Role != "master" {
|
||||
logger.Warningf("Invalid role configuration for %s", mirror.Name)
|
||||
}
|
||||
}
|
||||
|
||||
var provider mirrorProvider
|
||||
|
||||
switch mirror.Provider {
|
||||
case ProvCommand:
|
||||
pc := cmdConfig{
|
||||
name: mirror.Name,
|
||||
upstreamURL: mirror.Upstream,
|
||||
command: mirror.Command,
|
||||
workingDir: mirrorDir,
|
||||
logDir: logDir,
|
||||
logFile: filepath.Join(logDir, "latest.log"),
|
||||
interval: time.Duration(mirror.Interval) * time.Minute,
|
||||
env: mirror.Env,
|
||||
}
|
||||
p, err := newCmdProvider(pc)
|
||||
p.isMaster = isMaster
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
provider = p
|
||||
case ProvRsync:
|
||||
rc := rsyncConfig{
|
||||
name: mirror.Name,
|
||||
upstreamURL: mirror.Upstream,
|
||||
rsyncCmd: mirror.Command,
|
||||
password: mirror.Password,
|
||||
excludeFile: mirror.ExcludeFile,
|
||||
workingDir: mirrorDir,
|
||||
logDir: logDir,
|
||||
logFile: filepath.Join(logDir, "latest.log"),
|
||||
useIPv6: mirror.UseIPv6,
|
||||
interval: time.Duration(mirror.Interval) * time.Minute,
|
||||
}
|
||||
p, err := newRsyncProvider(rc)
|
||||
p.isMaster = isMaster
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
provider = p
|
||||
case ProvTwoStageRsync:
|
||||
rc := twoStageRsyncConfig{
|
||||
name: mirror.Name,
|
||||
stage1Profile: mirror.Stage1Profile,
|
||||
upstreamURL: mirror.Upstream,
|
||||
rsyncCmd: mirror.Command,
|
||||
password: mirror.Password,
|
||||
excludeFile: mirror.ExcludeFile,
|
||||
workingDir: mirrorDir,
|
||||
logDir: logDir,
|
||||
logFile: filepath.Join(logDir, "latest.log"),
|
||||
useIPv6: mirror.UseIPv6,
|
||||
interval: time.Duration(mirror.Interval) * time.Minute,
|
||||
}
|
||||
p, err := newTwoStageRsyncProvider(rc)
|
||||
p.isMaster = isMaster
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
provider = p
|
||||
default:
|
||||
panic(errors.New("Invalid mirror provider"))
|
||||
|
||||
}
|
||||
|
||||
provider.AddHook(newLogLimiter(provider))
|
||||
|
||||
// Add Cgroup Hook
|
||||
if w.cfg.Cgroup.Enable {
|
||||
provider.AddHook(
|
||||
newCgroupHook(provider, w.cfg.Cgroup.BasePath, w.cfg.Cgroup.Group),
|
||||
)
|
||||
}
|
||||
|
||||
// ExecOnSuccess hook
|
||||
if mirror.ExecOnSuccess != "" {
|
||||
h, err := newExecPostHook(provider, execOnSuccess, mirror.ExecOnSuccess)
|
||||
if err != nil {
|
||||
logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
|
||||
panic(err)
|
||||
}
|
||||
provider.AddHook(h)
|
||||
}
|
||||
// ExecOnFailure hook
|
||||
if mirror.ExecOnFailure != "" {
|
||||
h, err := newExecPostHook(provider, execOnFailure, mirror.ExecOnFailure)
|
||||
if err != nil {
|
||||
logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
|
||||
panic(err)
|
||||
}
|
||||
provider.AddHook(h)
|
||||
}
|
||||
|
||||
w.providers[provider.Name()] = provider
|
||||
|
||||
func (w *Worker) initJobs() {
|
||||
for _, mirror := range w.cfg.Mirrors {
|
||||
// Create Provider
|
||||
provider := newMirrorProvider(mirror, w.cfg)
|
||||
w.jobs[provider.Name()] = newMirrorJob(provider)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) initJobs() {
|
||||
w.initProviders()
|
||||
// ReloadMirrorConfig refresh the providers and jobs
|
||||
// from new mirror configs
|
||||
// TODO: deleted job should be removed from manager-side mirror list
|
||||
func (w *Worker) ReloadMirrorConfig(newMirrors []mirrorConfig) {
|
||||
w.L.Lock()
|
||||
defer w.L.Unlock()
|
||||
logger.Info("Reloading mirror configs")
|
||||
|
||||
for name, provider := range w.providers {
|
||||
w.jobs[name] = newMirrorJob(provider)
|
||||
oldMirrors := w.cfg.Mirrors
|
||||
difference := diffMirrorConfig(oldMirrors, newMirrors)
|
||||
|
||||
// first deal with deletion and modifications
|
||||
for _, op := range difference {
|
||||
if op.diffOp == diffAdd {
|
||||
continue
|
||||
}
|
||||
name := op.mirCfg.Name
|
||||
job, ok := w.jobs[name]
|
||||
if !ok {
|
||||
logger.Warningf("Job %s not found", name)
|
||||
continue
|
||||
}
|
||||
switch op.diffOp {
|
||||
case diffDelete:
|
||||
w.disableJob(job)
|
||||
delete(w.jobs, name)
|
||||
logger.Noticef("Deleted job %s", name)
|
||||
case diffModify:
|
||||
jobState := job.State()
|
||||
w.disableJob(job)
|
||||
// set new provider
|
||||
provider := newMirrorProvider(op.mirCfg, w.cfg)
|
||||
if err := job.SetProvider(provider); err != nil {
|
||||
logger.Errorf("Error setting job provider of %s: %s", name, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
// re-schedule job according to its previous state
|
||||
if jobState == stateDisabled {
|
||||
job.SetState(stateDisabled)
|
||||
} else if jobState == statePaused {
|
||||
job.SetState(statePaused)
|
||||
go job.Run(w.managerChan, w.semaphore)
|
||||
} else {
|
||||
job.SetState(stateNone)
|
||||
go job.Run(w.managerChan, w.semaphore)
|
||||
w.schedule.AddJob(time.Now(), job)
|
||||
}
|
||||
logger.Noticef("Reloaded job %s", name)
|
||||
}
|
||||
}
|
||||
// for added new jobs, just start new jobs
|
||||
for _, op := range difference {
|
||||
if op.diffOp != diffAdd {
|
||||
continue
|
||||
}
|
||||
provider := newMirrorProvider(op.mirCfg, w.cfg)
|
||||
job := newMirrorJob(provider)
|
||||
w.jobs[provider.Name()] = job
|
||||
|
||||
job.SetState(stateNone)
|
||||
go job.Run(w.managerChan, w.semaphore)
|
||||
w.schedule.AddJob(time.Now(), job)
|
||||
logger.Noticef("New job %s", job.Name())
|
||||
}
|
||||
|
||||
w.cfg.Mirrors = newMirrors
|
||||
|
||||
}
|
||||
|
||||
func (w *Worker) disableJob(job *mirrorJob) {
|
||||
w.schedule.Remove(job.Name())
|
||||
if job.State() != stateDisabled {
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
}
|
||||
}
|
||||
|
||||
@ -211,18 +149,26 @@ func (w *Worker) makeHTTPServer() {
|
||||
s.Use(gin.Recovery())
|
||||
|
||||
s.POST("/", func(c *gin.Context) {
|
||||
w.L.Lock()
|
||||
defer w.L.Unlock()
|
||||
|
||||
var cmd WorkerCmd
|
||||
|
||||
if err := c.BindJSON(&cmd); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"msg": "Invalid request"})
|
||||
return
|
||||
}
|
||||
|
||||
job, ok := w.jobs[cmd.MirrorID]
|
||||
if !ok {
|
||||
c.JSON(http.StatusNotFound, gin.H{"msg": fmt.Sprintf("Mirror ``%s'' not found", cmd.MirrorID)})
|
||||
return
|
||||
}
|
||||
|
||||
logger.Noticef("Received command: %v", cmd)
|
||||
// No matter what command, the existing job
|
||||
// schedule should be flushed
|
||||
w.schedule.Remove(job.Name())
|
||||
// if job disabled, start them first
|
||||
switch cmd.Cmd {
|
||||
case CmdStart, CmdRestart:
|
||||
@ -238,18 +184,13 @@ func (w *Worker) makeHTTPServer() {
|
||||
case CmdStop:
|
||||
// if job is disabled, no goroutine would be there
|
||||
// receiving this signal
|
||||
w.schedule.Remove(job.Name())
|
||||
if job.State() != stateDisabled {
|
||||
job.ctrlChan <- jobStop
|
||||
}
|
||||
case CmdDisable:
|
||||
w.schedule.Remove(job.Name())
|
||||
if job.State() != stateDisabled {
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
}
|
||||
w.disableJob(job)
|
||||
case CmdPing:
|
||||
job.ctrlChan <- jobStart
|
||||
// empty
|
||||
default:
|
||||
c.JSON(http.StatusNotAcceptable, gin.H{"msg": "Invalid Command"})
|
||||
return
|
||||
@ -289,6 +230,8 @@ func (w *Worker) Run() {
|
||||
}
|
||||
|
||||
func (w *Worker) runSchedule() {
|
||||
w.L.Lock()
|
||||
|
||||
mirrorList := w.fetchJobStatus()
|
||||
unset := make(map[string]bool)
|
||||
for name := range w.jobs {
|
||||
@ -309,7 +252,7 @@ func (w *Worker) runSchedule() {
|
||||
go job.Run(w.managerChan, w.semaphore)
|
||||
continue
|
||||
default:
|
||||
job.SetState(stateReady)
|
||||
job.SetState(stateNone)
|
||||
go job.Run(w.managerChan, w.semaphore)
|
||||
stime := m.LastUpdate.Add(job.provider.Interval())
|
||||
logger.Debugf("Scheduling job %s @%s", job.Name(), stime.Format("2006-01-02 15:04:05"))
|
||||
@ -322,16 +265,25 @@ func (w *Worker) runSchedule() {
|
||||
// manager's mirror list
|
||||
for name := range unset {
|
||||
job := w.jobs[name]
|
||||
job.SetState(stateReady)
|
||||
job.SetState(stateNone)
|
||||
go job.Run(w.managerChan, w.semaphore)
|
||||
w.schedule.AddJob(time.Now(), job)
|
||||
}
|
||||
|
||||
w.L.Unlock()
|
||||
|
||||
for {
|
||||
select {
|
||||
case jobMsg := <-w.managerChan:
|
||||
// got status update from job
|
||||
job := w.jobs[jobMsg.name]
|
||||
w.L.Lock()
|
||||
job, ok := w.jobs[jobMsg.name]
|
||||
w.L.Unlock()
|
||||
if !ok {
|
||||
logger.Warningf("Job %s not found", jobMsg.name)
|
||||
continue
|
||||
}
|
||||
|
||||
if job.State() != stateReady {
|
||||
logger.Infof("Job %s state is not ready, skip adding new schedule", jobMsg.name)
|
||||
continue
|
||||
@ -341,7 +293,7 @@ func (w *Worker) runSchedule() {
|
||||
// is running. If it's paused or disabled
|
||||
// a sync failure signal would be emitted
|
||||
// which needs to be ignored
|
||||
w.updateStatus(jobMsg)
|
||||
w.updateStatus(job, jobMsg)
|
||||
|
||||
// only successful or the final failure msg
|
||||
// can trigger scheduling
|
||||
@ -361,9 +313,7 @@ func (w *Worker) runSchedule() {
|
||||
job.ctrlChan <- jobStart
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Name returns worker name
|
||||
@ -397,14 +347,14 @@ func (w *Worker) registorWorker() {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) updateStatus(jobMsg jobMessage) {
|
||||
func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
|
||||
url := fmt.Sprintf(
|
||||
"%s/workers/%s/jobs/%s",
|
||||
w.cfg.Manager.APIBase,
|
||||
w.Name(),
|
||||
jobMsg.name,
|
||||
)
|
||||
p := w.providers[jobMsg.name]
|
||||
p := job.provider
|
||||
smsg := MirrorStatus{
|
||||
Name: jobMsg.name,
|
||||
Worker: w.cfg.Global.Name,
|
||||
|
Loading…
x
Reference in New Issue
Block a user