diff --git a/worker/base_provider.go b/worker/base_provider.go index 0505239..0f8ec80 100644 --- a/worker/base_provider.go +++ b/worker/base_provider.go @@ -109,12 +109,16 @@ func (p *baseProvider) Docker() *dockerHook { return p.docker } -func (p *baseProvider) prepareLogFile() error { +func (p *baseProvider) prepareLogFile(append bool) error { if p.LogFile() == "/dev/null" { p.cmd.SetLogFile(nil) return nil } - logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644) + appendMode := 0 + if append { + appendMode = os.O_APPEND + } + logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE|appendMode, 0644) if err != nil { logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error()) return err @@ -138,19 +142,22 @@ func (p *baseProvider) IsRunning() bool { func (p *baseProvider) Wait() error { defer func() { + logger.Debugf("set isRunning to false: %s", p.Name()) p.isRunning.Store(false) }() + logger.Debugf("calling Wait: %s", p.Name()) return p.cmd.Wait() } func (p *baseProvider) Terminate() error { + p.Lock() + defer p.Unlock() logger.Debugf("terminating provider: %s", p.Name()) if !p.IsRunning() { return nil } err := p.cmd.Terminate() - p.isRunning.Store(false) return err } diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index 2274cec..e1cac2e 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -1,6 +1,7 @@ package worker import ( + "errors" "time" "github.com/anmitsu/go-shlex" @@ -60,6 +61,13 @@ func (p *cmdProvider) Run() error { } func (p *cmdProvider) Start() error { + p.Lock() + defer p.Unlock() + + if p.IsRunning() { + return errors.New("provider is currently running") + } + env := map[string]string{ "TUNASYNC_MIRROR_NAME": p.Name(), "TUNASYNC_WORKING_DIR": p.WorkingDir(), @@ -71,7 +79,7 @@ func (p *cmdProvider) Start() error { env[k] = v } p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env) - if err := p.prepareLogFile(); err != nil { + if err := p.prepareLogFile(false); err != nil { return err } diff --git a/worker/job.go b/worker/job.go index a5feca1..e07af45 100644 --- a/worker/job.go +++ b/worker/job.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" tunasync "github.com/tuna/tunasync/internal" ) @@ -154,9 +155,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err syncDone := make(chan error, 1) go func() { err := provider.Run() - if !stopASAP { - syncDone <- err - } + syncDone <- err }() select { @@ -248,6 +247,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err m.SetState(stateReady) close(kill) <-jobDone + time.Sleep(time.Second) // Restart may fail if the process was not exited yet continue case jobStart: m.SetState(stateReady) diff --git a/worker/provider_test.go b/worker/provider_test.go index ab29621..3098676 100644 --- a/worker/provider_test.go +++ b/worker/provider_test.go @@ -79,11 +79,12 @@ exit 0 err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) So(err, ShouldBeNil) + targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir()) expectedOutput := fmt.Sprintf( "syncing to %s\n"+ "%s\n"+ "Done\n", - provider.WorkingDir(), + targetDir, fmt.Sprintf( "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "--delete --delete-after --delay-updates --safe-links "+ @@ -144,11 +145,12 @@ exit 0 err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) So(err, ShouldBeNil) + targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir()) expectedOutput := fmt.Sprintf( "syncing to %s\n"+ "%s\n"+ "Done\n", - provider.WorkingDir(), + targetDir, fmt.Sprintf( "%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "--delete --delete-after --delay-updates --safe-links "+ @@ -306,6 +308,7 @@ exit 0 err = provider.Run() So(err, ShouldBeNil) + targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir()) expectedOutput := fmt.Sprintf( "syncing to %s\n"+ "%s\n"+ @@ -313,14 +316,14 @@ exit 0 "syncing to %s\n"+ "%s\n"+ "Done\n", - provider.WorkingDir(), + targetDir, fmt.Sprintf( "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "--timeout=120 --contimeout=120 --exclude dists/ -6 "+ "--exclude-from %s %s %s", provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), ), - provider.WorkingDir(), + targetDir, fmt.Sprintf( "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "--delete --delete-after --delay-updates --safe-links "+ diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index d34a396..b9d8fd5 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -81,6 +81,12 @@ func (p *rsyncProvider) Run() error { } func (p *rsyncProvider) Start() error { + p.Lock() + defer p.Unlock() + + if p.IsRunning() { + return errors.New("provider is currently running") + } env := map[string]string{} if p.username != "" { @@ -94,7 +100,7 @@ func (p *rsyncProvider) Start() error { command = append(command, p.upstreamURL, p.WorkingDir()) p.cmd = newCmdJob(p, command, p.WorkingDir(), env) - if err := p.prepareLogFile(); err != nil { + if err := p.prepareLogFile(false); err != nil { return err } diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go index 48e5125..25bb17f 100644 --- a/worker/two_stage_rsync_provider.go +++ b/worker/two_stage_rsync_provider.go @@ -108,7 +108,12 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) { } func (p *twoStageRsyncProvider) Run() error { - defer p.Wait() + p.Lock() + defer p.Unlock() + + if p.IsRunning() { + return errors.New("provider is currently running") + } env := map[string]string{} if p.username != "" { @@ -129,7 +134,7 @@ func (p *twoStageRsyncProvider) Run() error { command = append(command, p.upstreamURL, p.WorkingDir()) p.cmd = newCmdJob(p, command, p.WorkingDir(), env) - if err := p.prepareLogFile(); err != nil { + if err := p.prepareLogFile(stage > 1); err != nil { return err } @@ -137,9 +142,11 @@ func (p *twoStageRsyncProvider) Run() error { return err } p.isRunning.Store(true) + logger.Debugf("set isRunning to true: %s", p.Name()) - err = p.cmd.Wait() - p.isRunning.Store(false) + p.Unlock() + err = p.Wait() + p.Lock() if err != nil { return err }