diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index 1e5a399..d6df6e7 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -66,5 +66,9 @@ func (p *cmdProvider) Start() error { return err } - return p.cmd.Start() + if err := p.cmd.Start(); err != nil { + return err + } + p.isRunning.Store(true) + return nil } diff --git a/worker/provider.go b/worker/provider.go index a27dbfe..143ccd8 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -1,9 +1,9 @@ package worker import ( - "errors" "os" "sync" + "sync/atomic" "time" ) @@ -31,6 +31,8 @@ type mirrorProvider interface { // terminate mirror job Terminate() error // job hooks + IsRunning() bool + Hooks() []jobHook Interval() time.Duration @@ -54,7 +56,9 @@ type baseProvider struct { name string interval time.Duration - cmd *cmdJob + cmd *cmdJob + isRunning atomic.Value + logFile *os.File hooks []jobHook @@ -142,9 +146,15 @@ func (p *baseProvider) Start() error { panic("Not Implemented") } +func (p *baseProvider) IsRunning() bool { + isRunning, _ := p.isRunning.Load().(bool) + return isRunning +} + func (p *baseProvider) Wait() error { defer func() { p.Lock() + p.isRunning.Store(false) if p.logFile != nil { p.logFile.Close() p.logFile = nil @@ -156,8 +166,8 @@ func (p *baseProvider) Wait() error { func (p *baseProvider) Terminate() error { logger.Debug("terminating provider: %s", p.Name()) - if p.cmd == nil { - return errors.New("provider command job not initialized") + if !p.IsRunning() { + return nil } p.Lock() @@ -168,5 +178,7 @@ func (p *baseProvider) Terminate() error { p.Unlock() err := p.cmd.Terminate() + p.isRunning.Store(false) + return err } diff --git a/worker/provider_test.go b/worker/provider_test.go index 13f550e..10fbceb 100644 --- a/worker/provider_test.go +++ b/worker/provider_test.go @@ -13,15 +13,21 @@ import ( func TestRsyncProvider(t *testing.T) { Convey("Rsync Provider should work", t, func() { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + scriptFile := filepath.Join(tmpDir, "myrsync") + tmpFile := filepath.Join(tmpDir, "log_file") c := rsyncConfig{ name: "tuna", upstreamURL: "rsync://rsync.tuna.moe/tuna/", - workingDir: "/srv/mirror/production/tuna", - logDir: "/var/log/tunasync", - logFile: "tuna.log", + rsyncCmd: scriptFile, + workingDir: tmpDir, + logDir: tmpDir, + logFile: tmpFile, useIPv6: true, - interval: 600, + interval: 600 * time.Second, } provider, err := newRsyncProvider(c) @@ -61,6 +67,38 @@ func TestRsyncProvider(t *testing.T) { }) }) + Convey("Let's try a run", func() { + scriptContent := `#!/bin/bash +echo "syncing to $(pwd)" +echo $@ +sleep 1 +echo "Done" +exit 0 + ` + err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) + So(err, ShouldBeNil) + + expectedOutput := fmt.Sprintf( + "syncing to %s\n"+ + "%s\n"+ + "Done\n", + provider.WorkingDir(), + fmt.Sprintf( + "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ + "--delete --delete-after --delay-updates --safe-links "+ + "--timeout=120 --contimeout=120 -6 %s %s", + provider.upstreamURL, provider.WorkingDir(), + ), + ) + + err = provider.Run() + So(err, ShouldBeNil) + loggedContent, err := ioutil.ReadFile(provider.LogFile()) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, expectedOutput) + // fmt.Println(string(loggedContent)) + }) + }) } @@ -79,7 +117,7 @@ func TestCmdProvider(t *testing.T) { workingDir: tmpDir, logDir: tmpDir, logFile: tmpFile, - interval: 600, + interval: 600 * time.Second, env: map[string]string{ "AOSP_REPO_BIN": "/usr/local/bin/repo", }, @@ -102,7 +140,7 @@ echo $TUNASYNC_UPSTREAM_URL echo $TUNASYNC_LOG_FILE echo $AOSP_REPO_BIN ` - exceptedOutput := fmt.Sprintf( + expectedOutput := fmt.Sprintf( "%s\n%s\n%s\n%s\n%s\n", provider.WorkingDir(), provider.Name(), @@ -121,7 +159,7 @@ echo $AOSP_REPO_BIN loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil) - So(string(loggedContent), ShouldEqual, exceptedOutput) + So(string(loggedContent), ShouldEqual, expectedOutput) }) Convey("If a command fails", func() { @@ -156,3 +194,108 @@ sleep 5 }) }) } + +func TestTwoStageRsyncProvider(t *testing.T) { + Convey("TwoStageRsync Provider should work", t, func(ctx C) { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + scriptFile := filepath.Join(tmpDir, "myrsync") + tmpFile := filepath.Join(tmpDir, "log_file") + + c := twoStageRsyncConfig{ + name: "tuna-two-stage-rsync", + upstreamURL: "rsync://mirrors.tuna.moe/", + stage1Profile: "debian", + rsyncCmd: scriptFile, + workingDir: tmpDir, + logDir: tmpDir, + logFile: tmpFile, + useIPv6: true, + excludeFile: tmpFile, + } + + provider, err := newTwoStageRsyncProvider(c) + So(err, ShouldBeNil) + + So(provider.Name(), ShouldEqual, c.name) + So(provider.WorkingDir(), ShouldEqual, c.workingDir) + So(provider.LogDir(), ShouldEqual, c.logDir) + So(provider.LogFile(), ShouldEqual, c.logFile) + So(provider.Interval(), ShouldEqual, c.interval) + + Convey("Try a command", func(ctx C) { + scriptContent := `#!/bin/bash +echo "syncing to $(pwd)" +echo $@ +sleep 1 +echo "Done" +exit 0 + ` + err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) + So(err, ShouldBeNil) + + err = provider.Run() + So(err, ShouldBeNil) + + expectedOutput := fmt.Sprintf( + "syncing to %s\n"+ + "%s\n"+ + "Done\n"+ + "syncing to %s\n"+ + "%s\n"+ + "Done\n", + provider.WorkingDir(), + fmt.Sprintf( + "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ + "--timeout=120 --contimeout=120 --exclude dists/ -6 "+ + "--exclude-from %s %s %s", + provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), + ), + provider.WorkingDir(), + fmt.Sprintf( + "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ + "--delete --delete-after --delay-updates --safe-links "+ + "--timeout=120 --contimeout=120 -6 --exclude-from %s %s %s", + provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), + ), + ) + + loggedContent, err := ioutil.ReadFile(provider.LogFile()) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, expectedOutput) + // fmt.Println(string(loggedContent)) + + }) + Convey("Try terminating", func(ctx C) { + scriptContent := `#!/bin/bash +echo $@ +sleep 4 +exit 0 + ` + err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) + So(err, ShouldBeNil) + + go func() { + err = provider.Run() + ctx.So(err, ShouldNotBeNil) + }() + + time.Sleep(1 * time.Second) + err = provider.Terminate() + So(err, ShouldBeNil) + + expectedOutput := fmt.Sprintf( + "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ + "--timeout=120 --contimeout=120 --exclude dists/ -6 "+ + "--exclude-from %s %s %s\n", + provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), + ) + + loggedContent, err := ioutil.ReadFile(provider.LogFile()) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, expectedOutput) + // fmt.Println(string(loggedContent)) + }) + }) +} diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index 83da9ec..34d8ddb 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -1,6 +1,10 @@ package worker -import "time" +import ( + "errors" + "strings" + "time" +) type rsyncConfig struct { name string @@ -20,6 +24,9 @@ type rsyncProvider struct { func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) { // TODO: check config options + if !strings.HasSuffix(c.upstreamURL, "/") { + return nil, errors.New("rsync upstream URL should ends with /") + } provider := &rsyncProvider{ baseProvider: baseProvider{ name: c.name, @@ -47,6 +54,7 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) { if c.excludeFile != "" { options = append(options, "--exclude-from", c.excludeFile) } + provider.options = options provider.ctx.Set(_WorkingDirKey, c.workingDir) provider.ctx.Set(_LogDirKey, c.logDir) @@ -63,6 +71,7 @@ func (p *rsyncProvider) Run() error { } func (p *rsyncProvider) Start() error { + env := map[string]string{} if p.password != "" { env["RSYNC_PASSWORD"] = p.password @@ -76,5 +85,9 @@ func (p *rsyncProvider) Start() error { return err } - return p.cmd.Start() + if err := p.cmd.Start(); err != nil { + return err + } + p.isRunning.Store(true) + return nil } diff --git a/worker/runner.go b/worker/runner.go index 1e196a5..aa8519f 100644 --- a/worker/runner.go +++ b/worker/runner.go @@ -54,7 +54,7 @@ func (c *cmdJob) Start() error { func (c *cmdJob) Wait() error { err := c.cmd.Wait() - c.finished <- empty{} + close(c.finished) return err } diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go new file mode 100644 index 0000000..d5d7380 --- /dev/null +++ b/worker/two_stage_rsync_provider.go @@ -0,0 +1,136 @@ +package worker + +import ( + "errors" + "fmt" + "strings" + "time" +) + +type twoStageRsyncConfig struct { + name string + rsyncCmd string + stage1Profile string + upstreamURL, password, excludeFile string + workingDir, logDir, logFile string + useIPv6 bool + interval time.Duration +} + +// An RsyncProvider provides the implementation to rsync-based syncing jobs +type twoStageRsyncProvider struct { + baseProvider + twoStageRsyncConfig + stage1Options []string + stage2Options []string +} + +var rsyncStage1Profiles = map[string]([]string){ + "debian": []string{"dists/"}, + "debian-oldstyle": []string{ + "Packages*", "Sources*", "Release*", + "InRelease", "i18n/*", "ls-lR*", "dep11/*", + }, +} + +func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, error) { + // TODO: check config options + if !strings.HasSuffix(c.upstreamURL, "/") { + return nil, errors.New("rsync upstream URL should ends with /") + } + + provider := &twoStageRsyncProvider{ + baseProvider: baseProvider{ + name: c.name, + ctx: NewContext(), + interval: c.interval, + }, + twoStageRsyncConfig: c, + stage1Options: []string{ + "-aHvh", "--no-o", "--no-g", "--stats", + "--exclude", ".~tmp~/", + "--safe-links", "--timeout=120", "--contimeout=120", + }, + stage2Options: []string{ + "-aHvh", "--no-o", "--no-g", "--stats", + "--exclude", ".~tmp~/", + "--delete", "--delete-after", "--delay-updates", + "--safe-links", "--timeout=120", "--contimeout=120", + }, + } + + if c.rsyncCmd == "" { + provider.rsyncCmd = "rsync" + } + + provider.ctx.Set(_WorkingDirKey, c.workingDir) + provider.ctx.Set(_LogDirKey, c.logDir) + provider.ctx.Set(_LogFileKey, c.logFile) + + return provider, nil +} + +func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) { + var options []string + if stage == 1 { + options = append(options, p.stage1Options...) + stage1Excludes, ok := rsyncStage1Profiles[p.stage1Profile] + if !ok { + return nil, errors.New("Invalid Stage 1 Profile") + } + for _, exc := range stage1Excludes { + options = append(options, "--exclude", exc) + } + + } else if stage == 2 { + options = append(options, p.stage2Options...) + } else { + return []string{}, fmt.Errorf("Invalid stage: %d", stage) + } + + if p.useIPv6 { + options = append(options, "-6") + } + + if p.excludeFile != "" { + options = append(options, "--exclude-from", p.excludeFile) + } + + return options, nil +} + +func (p *twoStageRsyncProvider) Run() error { + + env := map[string]string{} + if p.password != "" { + env["RSYNC_PASSWORD"] = p.password + } + + stages := []int{1, 2} + for _, stage := range stages { + command := []string{p.rsyncCmd} + options, err := p.Options(stage) + if err != nil { + return err + } + command = append(command, options...) + command = append(command, p.upstreamURL, p.WorkingDir()) + + p.cmd = newCmdJob(command, p.WorkingDir(), env) + if err := p.setLogFile(); err != nil { + return err + } + + if err = p.cmd.Start(); err != nil { + return err + } + p.isRunning.Store(true) + + err = p.cmd.Wait() + p.isRunning.Store(false) + if err != nil { + return err + } + } + return nil +}