diff --git a/cmd/tunasynctl/tunasynctl.go b/cmd/tunasynctl/tunasynctl.go index f335971..3e2d6fe 100644 --- a/cmd/tunasynctl/tunasynctl.go +++ b/cmd/tunasynctl/tunasynctl.go @@ -285,11 +285,16 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc { "argument WORKER", 1) } + options := map[string]bool{} + if c.Bool("force") { + options["force"] = true + } cmd := tunasync.ClientCmd{ Cmd: cmd, MirrorID: mirrorID, WorkerID: c.String("worker"), Args: argsList, + Options: options, } resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client) if err != nil { @@ -410,6 +415,11 @@ func main() { }, } + forceStartFlag := cli.BoolFlag{ + Name: "force, f", + Usage: "Override the concurrent limit", + } + app.Commands = []cli.Command{ { Name: "list", @@ -450,7 +460,7 @@ func main() { { Name: "start", Usage: "Start a job", - Flags: append(commonFlags, cmdFlags...), + Flags: append(append(commonFlags, cmdFlags...), forceStartFlag), Action: initializeWrapper(cmdJob(tunasync.CmdStart)), }, { diff --git a/internal/msg.go b/internal/msg.go index c0efb7a..14a9102 100644 --- a/internal/msg.go +++ b/internal/msg.go @@ -68,9 +68,10 @@ func (c CmdVerb) String() string { // A WorkerCmd is the command message send from the // manager to a worker type WorkerCmd struct { - Cmd CmdVerb `json:"cmd"` - MirrorID string `json:"mirror_id"` - Args []string `json:"args"` + Cmd CmdVerb `json:"cmd"` + MirrorID string `json:"mirror_id"` + Args []string `json:"args"` + Options map[string]bool `json:"options"` } func (c WorkerCmd) String() string { @@ -83,8 +84,9 @@ func (c WorkerCmd) String() string { // A ClientCmd is the command message send from client // to the manager type ClientCmd struct { - Cmd CmdVerb `json:"cmd"` - MirrorID string `json:"mirror_id"` - WorkerID string `json:"worker_id"` - Args []string `json:"args"` + Cmd CmdVerb `json:"cmd"` + MirrorID string `json:"mirror_id"` + WorkerID string `json:"worker_id"` + Args []string `json:"args"` + Options map[string]bool `json:"options"` } diff --git a/manager/server.go b/manager/server.go index 2563582..2605446 100644 --- a/manager/server.go +++ b/manager/server.go @@ -337,6 +337,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) { Cmd: clientCmd.Cmd, MirrorID: clientCmd.MirrorID, Args: clientCmd.Args, + Options: clientCmd.Options, } // update job status, even if the job did not disable successfully, diff --git a/worker/base_provider.go b/worker/base_provider.go index 0befa61..0f8ec80 100644 --- a/worker/base_provider.go +++ b/worker/base_provider.go @@ -20,8 +20,6 @@ type baseProvider struct { cmd *cmdJob isRunning atomic.Value - logFile *os.File - cgroup *cgroupHook zfs *zfsHook docker *dockerHook @@ -111,20 +109,21 @@ func (p *baseProvider) Docker() *dockerHook { return p.docker } -func (p *baseProvider) prepareLogFile() error { +func (p *baseProvider) prepareLogFile(append bool) error { if p.LogFile() == "/dev/null" { p.cmd.SetLogFile(nil) return nil } - if p.logFile == nil { - logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error()) - return err - } - p.logFile = logFile + appendMode := 0 + if append { + appendMode = os.O_APPEND } - p.cmd.SetLogFile(p.logFile) + logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE|appendMode, 0644) + if err != nil { + logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error()) + return err + } + p.cmd.SetLogFile(logFile) return nil } @@ -143,32 +142,22 @@ func (p *baseProvider) IsRunning() bool { func (p *baseProvider) Wait() error { defer func() { - p.Lock() + logger.Debugf("set isRunning to false: %s", p.Name()) p.isRunning.Store(false) - if p.logFile != nil { - p.logFile.Close() - p.logFile = nil - } - p.Unlock() }() + logger.Debugf("calling Wait: %s", p.Name()) return p.cmd.Wait() } func (p *baseProvider) Terminate() error { + p.Lock() + defer p.Unlock() logger.Debugf("terminating provider: %s", p.Name()) if !p.IsRunning() { return nil } - p.Lock() - if p.logFile != nil { - p.logFile.Close() - p.logFile = nil - } - p.Unlock() - err := p.cmd.Terminate() - p.isRunning.Store(false) return err } diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index 2274cec..e1cac2e 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -1,6 +1,7 @@ package worker import ( + "errors" "time" "github.com/anmitsu/go-shlex" @@ -60,6 +61,13 @@ func (p *cmdProvider) Run() error { } func (p *cmdProvider) Start() error { + p.Lock() + defer p.Unlock() + + if p.IsRunning() { + return errors.New("provider is currently running") + } + env := map[string]string{ "TUNASYNC_MIRROR_NAME": p.Name(), "TUNASYNC_WORKING_DIR": p.WorkingDir(), @@ -71,7 +79,7 @@ func (p *cmdProvider) Start() error { env[k] = v } p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env) - if err := p.prepareLogFile(); err != nil { + if err := p.prepareLogFile(false); err != nil { return err } diff --git a/worker/job.go b/worker/job.go index a5feca1..7ba0df4 100644 --- a/worker/job.go +++ b/worker/job.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" tunasync "github.com/tuna/tunasync/internal" ) @@ -14,12 +15,13 @@ import ( type ctrlAction uint8 const ( - jobStart ctrlAction = iota - jobStop // stop syncing keep the job - jobDisable // disable the job (stops goroutine) - jobRestart // restart syncing - jobPing // ensure the goroutine is alive - jobHalt // worker halts + jobStart ctrlAction = iota + jobStop // stop syncing keep the job + jobDisable // disable the job (stops goroutine) + jobRestart // restart syncing + jobPing // ensure the goroutine is alive + jobHalt // worker halts + jobForceStart // ignore concurrent limit ) type jobMessage struct { @@ -154,9 +156,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err syncDone := make(chan error, 1) go func() { err := provider.Run() - if !stopASAP { - syncDone <- err - } + syncDone <- err }() select { @@ -212,22 +212,26 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err return nil } - runJob := func(kill <-chan empty, jobDone chan<- empty) { + runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) { select { case semaphore <- empty{}: defer func() { <-semaphore }() runJobWrapper(kill, jobDone) + case <-bypassSemaphore: + logger.Noticef("Concurrent limit ignored by %s", m.Name()) + runJobWrapper(kill, jobDone) case <-kill: jobDone <- empty{} return } } + bypassSemaphore := make(chan empty, 1) for { if m.State() == stateReady { kill := make(chan empty) jobDone := make(chan empty) - go runJob(kill, jobDone) + go runJob(kill, jobDone, bypassSemaphore) _wait_for_job: select { @@ -248,7 +252,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err m.SetState(stateReady) close(kill) <-jobDone + time.Sleep(time.Second) // Restart may fail if the process was not exited yet continue + case jobForceStart: + select { //non-blocking + default: + case bypassSemaphore <- empty{}: + } + fallthrough case jobStart: m.SetState(stateReady) goto _wait_for_job @@ -272,8 +283,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err case jobDisable: m.SetState(stateDisabled) return nil + case jobForceStart: + select { //non-blocking + default: + case bypassSemaphore <- empty{}: + } + fallthrough case jobRestart: - m.SetState(stateReady) + fallthrough case jobStart: m.SetState(stateReady) default: diff --git a/worker/job_test.go b/worker/job_test.go index 7ffed72..370c9be 100644 --- a/worker/job_test.go +++ b/worker/job_test.go @@ -135,6 +135,8 @@ echo $TUNASYNC_WORKING_DIR msg = <-managerChan So(msg.status, ShouldEqual, Syncing) + job.ctrlChan <- jobStart // should be ignored + job.ctrlChan <- jobStop msg = <-managerChan @@ -170,8 +172,239 @@ echo $TUNASYNC_WORKING_DIR job.ctrlChan <- jobDisable <-job.disabled }) + + Convey("If we restart it", func(ctx C) { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + + msg := <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + + job.ctrlChan <- jobRestart + + msg = <-managerChan + So(msg.status, ShouldEqual, Failed) + So(msg.msg, ShouldEqual, "killed by manager") + + msg = <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Success) + + expectedOutput := fmt.Sprintf( + "%s\n%s\n", + provider.WorkingDir(), provider.WorkingDir(), + ) + + loggedContent, err := ioutil.ReadFile(provider.LogFile()) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, expectedOutput) + job.ctrlChan <- jobDisable + <-job.disabled + }) + + Convey("If we disable it", func(ctx C) { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + + msg := <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + + job.ctrlChan <- jobDisable + + msg = <-managerChan + So(msg.status, ShouldEqual, Failed) + So(msg.msg, ShouldEqual, "killed by manager") + + <-job.disabled + }) + + Convey("If we stop it twice, than start it", func(ctx C) { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + + msg := <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + + job.ctrlChan <- jobStop + + msg = <-managerChan + So(msg.status, ShouldEqual, Failed) + So(msg.msg, ShouldEqual, "killed by manager") + + job.ctrlChan <- jobStop // should be ignored + + job.ctrlChan <- jobStart + + msg = <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Success) + + expectedOutput := fmt.Sprintf( + "%s\n%s\n", + provider.WorkingDir(), provider.WorkingDir(), + ) + + loggedContent, err := ioutil.ReadFile(provider.LogFile()) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, expectedOutput) + + job.ctrlChan <- jobDisable + <-job.disabled + }) }) }) } + +func TestConcurrentMirrorJobs(t *testing.T) { + + InitLogger(true, true, false) + + Convey("Concurrent MirrorJobs should work", t, func(ctx C) { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + + const CONCURRENT = 5 + + var providers [CONCURRENT]*cmdProvider + var jobs [CONCURRENT]*mirrorJob + for i := 0; i < CONCURRENT; i++ { + c := cmdConfig{ + name: fmt.Sprintf("job-%d", i), + upstreamURL: "http://mirrors.tuna.moe/", + command: "sleep 2", + workingDir: tmpDir, + logDir: tmpDir, + logFile: "/dev/null", + interval: 10 * time.Second, + } + + var err error + providers[i], err = newCmdProvider(c) + So(err, ShouldBeNil) + jobs[i] = newMirrorJob(providers[i]) + } + + managerChan := make(chan jobMessage, 10) + semaphore := make(chan empty, CONCURRENT-2) + + countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) { + counterEnded := 0 + counterRunning := 0 + peakConcurrent = 0 + counterFailed = 0 + for counterEnded < totalJobs { + msg := <-managerChan + switch msg.status { + case PreSyncing: + counterRunning++ + case Syncing: + case Failed: + counterFailed++ + fallthrough + case Success: + counterEnded++ + counterRunning-- + default: + So(0, ShouldEqual, 1) + } + // Test if semaphore works + So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck) + if counterRunning > peakConcurrent { + peakConcurrent = counterRunning + } + } + // select { + // case msg := <-managerChan: + // logger.Errorf("extra message received: %v", msg) + // So(0, ShouldEqual, 1) + // case <-time.After(2 * time.Second): + // } + return + } + + Convey("When we run them all", func(ctx C) { + for _, job := range jobs { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + } + + peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2) + + So(peakConcurrent, ShouldEqual, CONCURRENT-2) + So(counterFailed, ShouldEqual, 0) + + for _, job := range jobs { + job.ctrlChan <- jobDisable + <-job.disabled + } + }) + Convey("If we cancel one job", func(ctx C) { + for _, job := range jobs { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobRestart + time.Sleep(200 * time.Millisecond) + } + + // Cancel the one waiting for semaphore + jobs[len(jobs)-1].ctrlChan <- jobStop + + peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2) + + So(peakConcurrent, ShouldEqual, CONCURRENT-2) + So(counterFailed, ShouldEqual, 0) + + for _, job := range jobs { + job.ctrlChan <- jobDisable + <-job.disabled + } + }) + Convey("If we override the concurrent limit", func(ctx C) { + for _, job := range jobs { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + time.Sleep(200 * time.Millisecond) + } + + jobs[len(jobs)-1].ctrlChan <- jobForceStart + jobs[len(jobs)-2].ctrlChan <- jobForceStart + + peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT) + + So(peakConcurrent, ShouldEqual, CONCURRENT) + So(counterFailed, ShouldEqual, 0) + + time.Sleep(1 * time.Second) + + // fmt.Println("Restart them") + + for _, job := range jobs { + job.ctrlChan <- jobStart + } + + peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2) + + So(peakConcurrent, ShouldEqual, CONCURRENT-2) + So(counterFailed, ShouldEqual, 0) + + for _, job := range jobs { + job.ctrlChan <- jobDisable + <-job.disabled + } + }) + }) +} diff --git a/worker/provider_test.go b/worker/provider_test.go index ab29621..56ddc8d 100644 --- a/worker/provider_test.go +++ b/worker/provider_test.go @@ -79,11 +79,12 @@ exit 0 err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) So(err, ShouldBeNil) + targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir()) expectedOutput := fmt.Sprintf( "syncing to %s\n"+ "%s\n"+ "Done\n", - provider.WorkingDir(), + targetDir, fmt.Sprintf( "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "--delete --delete-after --delay-updates --safe-links "+ @@ -144,11 +145,12 @@ exit 0 err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) So(err, ShouldBeNil) + targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir()) expectedOutput := fmt.Sprintf( "syncing to %s\n"+ "%s\n"+ "Done\n", - provider.WorkingDir(), + targetDir, fmt.Sprintf( "%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "--delete --delete-after --delay-updates --safe-links "+ @@ -260,6 +262,40 @@ sleep 5 }) }) + Convey("Command Provider without log file should work", t, func(ctx C) { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + + c := cmdConfig{ + name: "run-ls", + upstreamURL: "http://mirrors.tuna.moe/", + command: "ls", + workingDir: tmpDir, + logDir: tmpDir, + logFile: "/dev/null", + interval: 600 * time.Second, + } + + provider, err := newCmdProvider(c) + So(err, ShouldBeNil) + + So(provider.IsMaster(), ShouldEqual, false) + So(provider.ZFS(), ShouldBeNil) + So(provider.Type(), ShouldEqual, provCommand) + So(provider.Name(), ShouldEqual, c.name) + So(provider.WorkingDir(), ShouldEqual, c.workingDir) + So(provider.LogDir(), ShouldEqual, c.logDir) + So(provider.LogFile(), ShouldEqual, c.logFile) + So(provider.Interval(), ShouldEqual, c.interval) + + Convey("Run the command", func() { + + err = provider.Run() + So(err, ShouldBeNil) + + }) + }) } func TestTwoStageRsyncProvider(t *testing.T) { @@ -280,6 +316,8 @@ func TestTwoStageRsyncProvider(t *testing.T) { logFile: tmpFile, useIPv6: true, excludeFile: tmpFile, + username: "hello", + password: "world", } provider, err := newTwoStageRsyncProvider(c) @@ -306,6 +344,7 @@ exit 0 err = provider.Run() So(err, ShouldBeNil) + targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir()) expectedOutput := fmt.Sprintf( "syncing to %s\n"+ "%s\n"+ @@ -313,14 +352,14 @@ exit 0 "syncing to %s\n"+ "%s\n"+ "Done\n", - provider.WorkingDir(), + targetDir, fmt.Sprintf( "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "--timeout=120 --contimeout=120 --exclude dists/ -6 "+ "--exclude-from %s %s %s", provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), ), - provider.WorkingDir(), + targetDir, fmt.Sprintf( "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "--delete --delete-after --delay-updates --safe-links "+ diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index d34a396..b9d8fd5 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -81,6 +81,12 @@ func (p *rsyncProvider) Run() error { } func (p *rsyncProvider) Start() error { + p.Lock() + defer p.Unlock() + + if p.IsRunning() { + return errors.New("provider is currently running") + } env := map[string]string{} if p.username != "" { @@ -94,7 +100,7 @@ func (p *rsyncProvider) Start() error { command = append(command, p.upstreamURL, p.WorkingDir()) p.cmd = newCmdJob(p, command, p.WorkingDir(), env) - if err := p.prepareLogFile(); err != nil { + if err := p.prepareLogFile(false); err != nil { return err } diff --git a/worker/runner.go b/worker/runner.go index 05a6f6b..e47417e 100644 --- a/worker/runner.go +++ b/worker/runner.go @@ -118,6 +118,9 @@ func (c *cmdJob) Wait() error { return c.retErr default: err := c.cmd.Wait() + if c.cmd.Stdout != nil { + c.cmd.Stdout.(*os.File).Close() + } c.retErr = err close(c.finished) return err diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go index 48e5125..25bb17f 100644 --- a/worker/two_stage_rsync_provider.go +++ b/worker/two_stage_rsync_provider.go @@ -108,7 +108,12 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) { } func (p *twoStageRsyncProvider) Run() error { - defer p.Wait() + p.Lock() + defer p.Unlock() + + if p.IsRunning() { + return errors.New("provider is currently running") + } env := map[string]string{} if p.username != "" { @@ -129,7 +134,7 @@ func (p *twoStageRsyncProvider) Run() error { command = append(command, p.upstreamURL, p.WorkingDir()) p.cmd = newCmdJob(p, command, p.WorkingDir(), env) - if err := p.prepareLogFile(); err != nil { + if err := p.prepareLogFile(stage > 1); err != nil { return err } @@ -137,9 +142,11 @@ func (p *twoStageRsyncProvider) Run() error { return err } p.isRunning.Store(true) + logger.Debugf("set isRunning to true: %s", p.Name()) - err = p.cmd.Wait() - p.isRunning.Store(false) + p.Unlock() + err = p.Wait() + p.Lock() if err != nil { return err } diff --git a/worker/worker.go b/worker/worker.go index 877b526..99f8c2d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -219,7 +219,11 @@ func (w *Worker) makeHTTPServer() { } switch cmd.Cmd { case CmdStart: - job.ctrlChan <- jobStart + if cmd.Options["force"] { + job.ctrlChan <- jobForceStart + } else { + job.ctrlChan <- jobStart + } case CmdRestart: job.ctrlChan <- jobRestart case CmdStop: