bug fix: log over-written in twoStageRsyncProvider

solve more DATA RACE problem
This commit is contained in:
Yuxiang Zhang 2018-05-30 12:28:09 +08:00
parent c5bb172f99
commit 0fdb07d061
6 changed files with 47 additions and 16 deletions

View File

@ -109,12 +109,16 @@ func (p *baseProvider) Docker() *dockerHook {
return p.docker return p.docker
} }
func (p *baseProvider) prepareLogFile() error { func (p *baseProvider) prepareLogFile(append bool) error {
if p.LogFile() == "/dev/null" { if p.LogFile() == "/dev/null" {
p.cmd.SetLogFile(nil) p.cmd.SetLogFile(nil)
return nil return nil
} }
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644) appendMode := 0
if append {
appendMode = os.O_APPEND
}
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE|appendMode, 0644)
if err != nil { if err != nil {
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error()) logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
return err return err
@ -138,19 +142,22 @@ func (p *baseProvider) IsRunning() bool {
func (p *baseProvider) Wait() error { func (p *baseProvider) Wait() error {
defer func() { defer func() {
logger.Debugf("set isRunning to false: %s", p.Name())
p.isRunning.Store(false) p.isRunning.Store(false)
}() }()
logger.Debugf("calling Wait: %s", p.Name())
return p.cmd.Wait() return p.cmd.Wait()
} }
func (p *baseProvider) Terminate() error { func (p *baseProvider) Terminate() error {
p.Lock()
defer p.Unlock()
logger.Debugf("terminating provider: %s", p.Name()) logger.Debugf("terminating provider: %s", p.Name())
if !p.IsRunning() { if !p.IsRunning() {
return nil return nil
} }
err := p.cmd.Terminate() err := p.cmd.Terminate()
p.isRunning.Store(false)
return err return err
} }

View File

@ -1,6 +1,7 @@
package worker package worker
import ( import (
"errors"
"time" "time"
"github.com/anmitsu/go-shlex" "github.com/anmitsu/go-shlex"
@ -60,6 +61,13 @@ func (p *cmdProvider) Run() error {
} }
func (p *cmdProvider) Start() error { func (p *cmdProvider) Start() error {
p.Lock()
defer p.Unlock()
if p.IsRunning() {
return errors.New("provider is currently running")
}
env := map[string]string{ env := map[string]string{
"TUNASYNC_MIRROR_NAME": p.Name(), "TUNASYNC_MIRROR_NAME": p.Name(),
"TUNASYNC_WORKING_DIR": p.WorkingDir(), "TUNASYNC_WORKING_DIR": p.WorkingDir(),
@ -71,7 +79,7 @@ func (p *cmdProvider) Start() error {
env[k] = v env[k] = v
} }
p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env) p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil { if err := p.prepareLogFile(false); err != nil {
return err return err
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
tunasync "github.com/tuna/tunasync/internal" tunasync "github.com/tuna/tunasync/internal"
) )
@ -154,9 +155,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
syncDone := make(chan error, 1) syncDone := make(chan error, 1)
go func() { go func() {
err := provider.Run() err := provider.Run()
if !stopASAP { syncDone <- err
syncDone <- err
}
}() }()
select { select {
@ -248,6 +247,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
m.SetState(stateReady) m.SetState(stateReady)
close(kill) close(kill)
<-jobDone <-jobDone
time.Sleep(time.Second) // Restart may fail if the process was not exited yet
continue continue
case jobStart: case jobStart:
m.SetState(stateReady) m.SetState(stateReady)

View File

@ -79,11 +79,12 @@ exit 0
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil) So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"syncing to %s\n"+ "syncing to %s\n"+
"%s\n"+ "%s\n"+
"Done\n", "Done\n",
provider.WorkingDir(), targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
@ -144,11 +145,12 @@ exit 0
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil) So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"syncing to %s\n"+ "syncing to %s\n"+
"%s\n"+ "%s\n"+
"Done\n", "Done\n",
provider.WorkingDir(), targetDir,
fmt.Sprintf( fmt.Sprintf(
"%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
@ -306,6 +308,7 @@ exit 0
err = provider.Run() err = provider.Run()
So(err, ShouldBeNil) So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"syncing to %s\n"+ "syncing to %s\n"+
"%s\n"+ "%s\n"+
@ -313,14 +316,14 @@ exit 0
"syncing to %s\n"+ "syncing to %s\n"+
"%s\n"+ "%s\n"+
"Done\n", "Done\n",
provider.WorkingDir(), targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+ "--timeout=120 --contimeout=120 --exclude dists/ -6 "+
"--exclude-from %s %s %s", "--exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
), ),
provider.WorkingDir(), targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+

View File

@ -81,6 +81,12 @@ func (p *rsyncProvider) Run() error {
} }
func (p *rsyncProvider) Start() error { func (p *rsyncProvider) Start() error {
p.Lock()
defer p.Unlock()
if p.IsRunning() {
return errors.New("provider is currently running")
}
env := map[string]string{} env := map[string]string{}
if p.username != "" { if p.username != "" {
@ -94,7 +100,7 @@ func (p *rsyncProvider) Start() error {
command = append(command, p.upstreamURL, p.WorkingDir()) command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(p, command, p.WorkingDir(), env) p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil { if err := p.prepareLogFile(false); err != nil {
return err return err
} }

View File

@ -108,7 +108,12 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
} }
func (p *twoStageRsyncProvider) Run() error { func (p *twoStageRsyncProvider) Run() error {
defer p.Wait() p.Lock()
defer p.Unlock()
if p.IsRunning() {
return errors.New("provider is currently running")
}
env := map[string]string{} env := map[string]string{}
if p.username != "" { if p.username != "" {
@ -129,7 +134,7 @@ func (p *twoStageRsyncProvider) Run() error {
command = append(command, p.upstreamURL, p.WorkingDir()) command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(p, command, p.WorkingDir(), env) p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil { if err := p.prepareLogFile(stage > 1); err != nil {
return err return err
} }
@ -137,9 +142,11 @@ func (p *twoStageRsyncProvider) Run() error {
return err return err
} }
p.isRunning.Store(true) p.isRunning.Store(true)
logger.Debugf("set isRunning to true: %s", p.Name())
err = p.cmd.Wait() p.Unlock()
p.isRunning.Store(false) err = p.Wait()
p.Lock()
if err != nil { if err != nil {
return err return err
} }