From 033aa60540de29d1275eaad5b21e9b3353670ea9 Mon Sep 17 00:00:00 2001 From: Shengqi Chen Date: Fri, 28 Feb 2025 14:43:51 +0800 Subject: [PATCH 1/2] Implement mirror.success_exit_codes and global.dangerous_global_success_exit_codes Signed-off-by: Harry Chen --- worker/base_provider.go | 22 +++++++++++++++++++--- worker/config.go | 6 ++++++ worker/provider.go | 16 ++++++++++++++++ worker/runner.go | 14 ++++++++++++-- 4 files changed, 53 insertions(+), 5 deletions(-) diff --git a/worker/base_provider.go b/worker/base_provider.go index 5c690cf..cb18eb7 100644 --- a/worker/base_provider.go +++ b/worker/base_provider.go @@ -19,9 +19,10 @@ type baseProvider struct { timeout time.Duration isMaster bool - cmd *cmdJob - logFileFd *os.File - isRunning atomic.Value + cmd *cmdJob + logFileFd *os.File + isRunning atomic.Value + successExitCodes []int cgroup *cgroupHook zfs *zfsHook @@ -186,3 +187,18 @@ func (p *baseProvider) Terminate() error { func (p *baseProvider) DataSize() string { return "" } + +func (p *baseProvider) SetSuccessExitCodes(codes []int) { + if codes == nil { + p.successExitCodes = []int{} + } else { + p.successExitCodes = codes + } +} + +func (p *baseProvider) GetSuccessExitCodes() []int { + if p.successExitCodes == nil { + return []int{} + } + return p.successExitCodes +} diff --git a/worker/config.go b/worker/config.go index 152f056..dc91c79 100644 --- a/worker/config.go +++ b/worker/config.go @@ -63,6 +63,9 @@ type globalConfig struct { ExecOnSuccess []string `toml:"exec_on_success"` ExecOnFailure []string `toml:"exec_on_failure"` + + // merged with mirror-specific options. make sure you know what you are doing! + SuccessExitCodes []int `toml:"dangerous_global_success_exit_codes"` } type managerConfig struct { @@ -169,6 +172,9 @@ type mirrorConfig struct { ExecOnSuccessExtra []string `toml:"exec_on_success_extra"` ExecOnFailureExtra []string `toml:"exec_on_failure_extra"` + // will be merged with global option + SuccessExitCodes []int `toml:"success_exit_codes"` + Command string `toml:"command"` FailOnMatch string `toml:"fail_on_match"` SizePattern string `toml:"size_pattern"` diff --git a/worker/provider.go b/worker/provider.go index d45b0ed..dada7e3 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -60,6 +60,10 @@ type mirrorProvider interface { ExitContext() *Context // return context Context() *Context + + // set in newMirrorProvider, used by cmdJob.Wait + SetSuccessExitCodes(codes []int) + GetSuccessExitCodes() []int } // newProvider creates a mirrorProvider instance @@ -249,5 +253,17 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { } addHookFromCmdList(mirror.ExecOnFailureExtra, execOnFailure) + successExitCodes := []int{} + if cfg.Global.SuccessExitCodes != nil { + successExitCodes = append(successExitCodes, cfg.Global.SuccessExitCodes...) + } + if mirror.SuccessExitCodes != nil { + successExitCodes = append(successExitCodes, mirror.SuccessExitCodes...) + } + if len(successExitCodes) > 0 { + logger.Infof("Non-zero success exit codes set for mirror %s: %v", mirror.Name, successExitCodes) + provider.SetSuccessExitCodes(successExitCodes) + } + return provider } diff --git a/worker/runner.go b/worker/runner.go index 50f0378..ebf2d74 100644 --- a/worker/runner.go +++ b/worker/runner.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "os/exec" + "slices" "strings" "sync" "syscall" @@ -171,9 +172,18 @@ func (c *cmdJob) Wait() error { return c.retErr default: err := c.cmd.Wait() - c.retErr = err close(c.finished) - return err + if err != nil { + code := err.(*exec.ExitError).ExitCode() + allowedCodes := c.provider.GetSuccessExitCodes() + if slices.Contains(allowedCodes, code) { + // process exited with non-success status + logger.Infof("Command %s exited with code %d: treated as success (allowed: %v)", c.cmd.Args, code, allowedCodes) + } else { + c.retErr = err + } + } + return c.retErr } } From a5b72b8c5539cdeb78321c12419b5edbe0d5d813 Mon Sep 17 00:00:00 2001 From: Shengqi Chen Date: Fri, 28 Feb 2025 14:44:20 +0800 Subject: [PATCH 2/2] Add tests for success_exit_codes in config and provider Signed-off-by: Shengqi Chen --- worker/config_test.go | 56 +++++++++++++++++++++++++++++++++++++++++ worker/provider_test.go | 53 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/worker/config_test.go b/worker/config_test.go index 19d85c9..5b564ee 100644 --- a/worker/config_test.go +++ b/worker/config_test.go @@ -521,4 +521,60 @@ rsync_options = ["--local"] "--local", // from mirror.rsync_options }) }) + + Convey("success_exit_codes should work globally and per mirror", t, func() { + tmpfile, err := os.CreateTemp("", "tunasync") + So(err, ShouldEqual, nil) + defer os.Remove(tmpfile.Name()) + + cfgBlob1 := ` +[global] +name = "test_worker" +log_dir = "/var/log/tunasync/{{.Name}}" +mirror_dir = "/data/mirrors" +concurrent = 10 +interval = 240 +retry = 3 +timeout = 86400 +dangerous_global_success_exit_codes = [10, 20] + +[manager] +api_base = "https://127.0.0.1:5000" +token = "some_token" + +[server] +hostname = "worker1.example.com" +listen_addr = "127.0.0.1" +listen_port = 6000 +ssl_cert = "/etc/tunasync.d/worker1.cert" +ssl_key = "/etc/tunasync.d/worker1.key" + +[[mirrors]] +name = "foo" +provider = "rsync" +upstream = "rsync://foo.bar/" +interval = 720 +retry = 2 +timeout = 3600 +mirror_dir = "/data/foo" +success_exit_codes = [30, 40] +` + + err = os.WriteFile(tmpfile.Name(), []byte(cfgBlob1), 0644) + So(err, ShouldEqual, nil) + defer tmpfile.Close() + + cfg, err := LoadConfig(tmpfile.Name()) + So(err, ShouldBeNil) + + providers := map[string]mirrorProvider{} + for _, m := range cfg.Mirrors { + p := newMirrorProvider(m, cfg) + providers[p.Name()] = p + } + + p, ok := providers["foo"].(*rsyncProvider) + So(ok, ShouldBeTrue) + So(p.successExitCodes, ShouldResemble, []int{10, 20, 30, 40}) + }) } diff --git a/worker/provider_test.go b/worker/provider_test.go index d87bc82..1d6bbfe 100644 --- a/worker/provider_test.go +++ b/worker/provider_test.go @@ -552,6 +552,59 @@ sleep 10 So(provider.DataSize(), ShouldBeEmpty) }) }) + Convey("Command Provider with successExitCodes should work", t, func(ctx C) { + tmpDir, err := os.MkdirTemp("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + scriptFile := filepath.Join(tmpDir, "cmd.sh") + tmpFile := filepath.Join(tmpDir, "log_file") + + c := cmdConfig{ + name: "tuna-cmd", + upstreamURL: "http://mirrors.tuna.moe/", + command: "bash " + scriptFile, + workingDir: tmpDir, + logDir: tmpDir, + logFile: tmpFile, + interval: 600 * time.Second, + } + + provider, err := newCmdProvider(c) + provider.SetSuccessExitCodes([]int{199, 200}) + So(err, ShouldBeNil) + + So(provider.Type(), ShouldEqual, provCommand) + So(provider.Name(), ShouldEqual, c.name) + So(provider.WorkingDir(), ShouldEqual, c.workingDir) + So(provider.LogDir(), ShouldEqual, c.logDir) + So(provider.LogFile(), ShouldEqual, c.logFile) + So(provider.Interval(), ShouldEqual, c.interval) + So(provider.GetSuccessExitCodes(), ShouldResemble, []int{199, 200}) + + Convey("Command exits with configured successExitCodes", func() { + scriptContent := `exit 199` + err = os.WriteFile(scriptFile, []byte(scriptContent), 0755) + So(err, ShouldBeNil) + readedScriptContent, err := os.ReadFile(scriptFile) + So(err, ShouldBeNil) + So(readedScriptContent, ShouldResemble, []byte(scriptContent)) + + err = provider.Run(make(chan empty, 1)) + So(err, ShouldBeNil) + }) + + Convey("Command exits with unknown exit code", func() { + scriptContent := `exit 201` + err = os.WriteFile(scriptFile, []byte(scriptContent), 0755) + So(err, ShouldBeNil) + readedScriptContent, err := os.ReadFile(scriptFile) + So(err, ShouldBeNil) + So(readedScriptContent, ShouldResemble, []byte(scriptContent)) + + err = provider.Run(make(chan empty, 1)) + So(err, ShouldNotBeNil) + }) + }) } func TestTwoStageRsyncProvider(t *testing.T) {