From d8b2675fda7d7014594155b0ee418bc75dae49fd Mon Sep 17 00:00:00 2001 From: kebyn Date: Sat, 12 Mar 2022 13:16:45 +0000 Subject: [PATCH] Fix code formatting according to go-staticcheck --- cmd/tunasync/tunasync.go | 6 +- worker/cgroup.go | 79 +++--- worker/cgroup_test.go | 435 +++++++++++++++-------------- worker/cmd_provider.go | 2 +- worker/config.go | 6 +- worker/config_diff_test.go | 34 +-- worker/context.go | 2 +- worker/docker.go | 14 +- worker/exec_post_hook.go | 4 +- worker/job.go | 2 +- worker/provider.go | 2 +- worker/runner.go | 15 +- worker/two_stage_rsync_provider.go | 4 +- worker/worker_test.go | 8 +- worker/zfs_hook_test.go | 4 +- 15 files changed, 309 insertions(+), 308 deletions(-) diff --git a/cmd/tunasync/tunasync.go b/cmd/tunasync/tunasync.go index a762e34..4ee3766 100644 --- a/cmd/tunasync/tunasync.go +++ b/cmd/tunasync/tunasync.go @@ -9,10 +9,10 @@ import ( "time" "github.com/gin-gonic/gin" - "github.com/pkg/profile" - "gopkg.in/op/go-logging.v1" - "github.com/urfave/cli" "github.com/moby/moby/pkg/reexec" + "github.com/pkg/profile" + "github.com/urfave/cli" + "gopkg.in/op/go-logging.v1" tunasync "github.com/tuna/tunasync/internal" "github.com/tuna/tunasync/manager" diff --git a/worker/cgroup.go b/worker/cgroup.go index 2f87bd5..761d77f 100644 --- a/worker/cgroup.go +++ b/worker/cgroup.go @@ -12,32 +12,32 @@ import ( "golang.org/x/sys/unix" - "github.com/moby/moby/pkg/reexec" cgv1 "github.com/containerd/cgroups" cgv2 "github.com/containerd/cgroups/v2" + "github.com/moby/moby/pkg/reexec" contspecs "github.com/opencontainers/runtime-spec/specs-go" ) type cgroupHook struct { emptyHook - cgCfg cgroupConfig - memLimit MemBytes - cgMgrV1 cgv1.Cgroup - cgMgrV2 *cgv2.Manager + cgCfg cgroupConfig + memLimit MemBytes + cgMgrV1 cgv1.Cgroup + cgMgrV2 *cgv2.Manager } type execCmd string const ( - cmdCont execCmd = "cont" - cmdAbrt execCmd = "abrt" + cmdCont execCmd = "cont" + cmdAbrt execCmd = "abrt" ) -func init () { +func init() { reexec.Register("tunasync-exec", waitExec) } -func waitExec () { +func waitExec() { binary, err := exec.LookPath(os.Args[1]) if err != nil { panic(err) @@ -51,14 +51,15 @@ func waitExec () { panic(err) } if err := pipe.Close(); err != nil { + panic(err) } cmd := execCmd(string(cmdBytes)) switch cmd { - case cmdAbrt: - fallthrough - default: - panic("Exited on request") - case cmdCont: + case cmdAbrt: + fallthrough + case cmdCont: + default: + panic("Exited on request") } } } @@ -71,7 +72,7 @@ func waitExec () { panic("Exec failed.") } -func initCgroup(cfg *cgroupConfig) (error) { +func initCgroup(cfg *cgroupConfig) error { logger.Debugf("Initializing cgroup") baseGroup := cfg.Group @@ -103,7 +104,7 @@ func initCgroup(cfg *cgroupConfig) (error) { } if baseGroup == "" { logger.Debugf("Creating a sub group and move all processes into it") - wkrMgr, err := cfg.cgMgrV2.NewChild("__worker", nil); + wkrMgr, err := cfg.cgMgrV2.NewChild("__worker", nil) if err != nil { return err } @@ -117,8 +118,8 @@ func initCgroup(cfg *cgroupConfig) (error) { if len(procs) == 0 { break } - for _, p := range(procs) { - if err := wkrMgr.AddProc(p); err != nil{ + for _, p := range procs { + if err := wkrMgr.AddProc(p); err != nil { if errors.Is(err, syscall.ESRCH) { logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring") } else { @@ -129,7 +130,7 @@ func initCgroup(cfg *cgroupConfig) (error) { } } else { logger.Debugf("Trying to create a sub group in that group") - testMgr, err := cfg.cgMgrV2.NewChild("__test", nil); + testMgr, err := cfg.cgMgrV2.NewChild("__test", nil) if err != nil { logger.Errorf("Cannot create a sub group in the cgroup") return err @@ -143,7 +144,7 @@ func initCgroup(cfg *cgroupConfig) (error) { return err } if len(procs) != 0 { - return fmt.Errorf("There are remaining processes in cgroup %s", baseGroup) + return fmt.Errorf("there are remaining processes in cgroup %s", baseGroup) } } } else { @@ -152,9 +153,9 @@ func initCgroup(cfg *cgroupConfig) (error) { if baseGroup != "" { pather = cgv1.StaticPath(baseGroup) } else { - pather = (func(p cgv1.Path) (cgv1.Path){ - return func(subsys cgv1.Name) (string, error){ - path, err := p(subsys); + pather = (func(p cgv1.Path) cgv1.Path { + return func(subsys cgv1.Name) (string, error) { + path, err := p(subsys) if err != nil { return "", err } @@ -167,14 +168,14 @@ func initCgroup(cfg *cgroupConfig) (error) { } logger.Infof("Loading cgroup") var err error - if cfg.cgMgrV1, err = cgv1.Load(cgv1.V1, pather, func(cfg *cgv1.InitConfig) error{ + if cfg.cgMgrV1, err = cgv1.Load(cgv1.V1, pather, func(cfg *cgv1.InitConfig) error { cfg.InitCheck = cgv1.AllowAny return nil }); err != nil { return err } logger.Debugf("Available subsystems:") - for _, subsys := range(cfg.cgMgrV1.Subsystems()) { + for _, subsys := range cfg.cgMgrV1.Subsystems() { p, err := pather(subsys.Name()) if err != nil { return err @@ -183,11 +184,11 @@ func initCgroup(cfg *cgroupConfig) (error) { } if baseGroup == "" { logger.Debugf("Creating a sub group and move all processes into it") - wkrMgr, err := cfg.cgMgrV1.New("__worker", &contspecs.LinuxResources{}); + wkrMgr, err := cfg.cgMgrV1.New("__worker", &contspecs.LinuxResources{}) if err != nil { return err } - for _, subsys := range(cfg.cgMgrV1.Subsystems()) { + for _, subsys := range cfg.cgMgrV1.Subsystems() { logger.Debugf("Reading pids for subsystem %s", subsys.Name()) for { procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false) @@ -202,7 +203,7 @@ func initCgroup(cfg *cgroupConfig) (error) { if len(procs) == 0 { break } - for _, proc := range(procs) { + for _, proc := range procs { if err := wkrMgr.Add(proc); err != nil { if errors.Is(err, syscall.ESRCH) { logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring") @@ -215,7 +216,7 @@ func initCgroup(cfg *cgroupConfig) (error) { } } else { logger.Debugf("Trying to create a sub group in that group") - testMgr, err := cfg.cgMgrV1.New("__test", &contspecs.LinuxResources{}); + testMgr, err := cfg.cgMgrV1.New("__test", &contspecs.LinuxResources{}) if err != nil { logger.Errorf("Cannot create a sub group in the cgroup") return err @@ -223,7 +224,7 @@ func initCgroup(cfg *cgroupConfig) (error) { if err := testMgr.Delete(); err != nil { return err } - for _, subsys := range(cfg.cgMgrV1.Subsystems()) { + for _, subsys := range cfg.cgMgrV1.Subsystems() { logger.Debugf("Reading pids for subsystem %s", subsys.Name()) procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false) if err != nil { @@ -239,7 +240,7 @@ func initCgroup(cfg *cgroupConfig) (error) { if err != nil { return err } - return fmt.Errorf("There are remaining processes in cgroup %s of subsystem %s", p, subsys.Name()) + return fmt.Errorf("there are remaining processes in cgroup %s of subsystem %s", p, subsys.Name()) } } } @@ -253,7 +254,7 @@ func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgrou emptyHook: emptyHook{ provider: p, }, - cgCfg: cfg, + cgCfg: cfg, memLimit: memLimit, } } @@ -263,7 +264,7 @@ func (c *cgroupHook) preExec() error { logger.Debugf("Creating v2 cgroup for task %s", c.provider.Name()) var resSet *cgv2.Resources if c.memLimit != 0 { - resSet = &cgv2.Resources { + resSet = &cgv2.Resources{ Memory: &cgv2.Memory{ Max: func(i int64) *int64 { return &i }(c.memLimit.Value()), }, @@ -279,7 +280,7 @@ func (c *cgroupHook) preExec() error { logger.Debugf("Creating v1 cgroup for task %s", c.provider.Name()) var resSet contspecs.LinuxResources if c.memLimit != 0 { - resSet = contspecs.LinuxResources { + resSet = contspecs.LinuxResources{ Memory: &contspecs.LinuxMemory{ Limit: func(i int64) *int64 { return &i }(c.memLimit.Value()), }, @@ -334,7 +335,7 @@ func (c *cgroupHook) killAll() error { taskList := []int{} if c.cgCfg.isUnified { procs, err := c.cgMgrV2.Procs(false) - if (err != nil) { + if err != nil { return []int{}, err } for _, proc := range procs { @@ -342,16 +343,16 @@ func (c *cgroupHook) killAll() error { } } else { taskSet := make(map[int]struct{}) - for _, subsys := range(c.cgMgrV1.Subsystems()) { + for _, subsys := range c.cgMgrV1.Subsystems() { procs, err := c.cgMgrV1.Processes(subsys.Name(), false) if err != nil { return []int{}, err } - for _, proc := range(procs) { + for _, proc := range procs { taskSet[proc.Pid] = struct{}{} } } - for proc := range(taskSet) { + for proc := range taskSet { taskList = append(taskList, proc) } } @@ -360,7 +361,7 @@ func (c *cgroupHook) killAll() error { for i := 0; i < 4; i++ { if i == 3 { - return errors.New("Unable to kill all child tasks") + return errors.New("unable to kill all child tasks") } taskList, err := readTaskList() if err != nil { diff --git a/worker/cgroup_test.go b/worker/cgroup_test.go index 8adca9a..a617cf9 100644 --- a/worker/cgroup_test.go +++ b/worker/cgroup_test.go @@ -1,16 +1,17 @@ package worker import ( + "errors" "io/ioutil" "os" "os/exec" "path/filepath" "strconv" "strings" + "syscall" "testing" "time" - "errors" - "syscall" + cgv1 "github.com/containerd/cgroups" cgv2 "github.com/containerd/cgroups/v2" units "github.com/docker/go-units" @@ -21,14 +22,14 @@ import ( func init() { _, testReexec := os.LookupEnv("TESTREEXEC") - if ! testReexec { + if !testReexec { reexec.Init() } } func TestReexec(t *testing.T) { testCase, testReexec := os.LookupEnv("TESTREEXEC") - if ! testReexec { + if !testReexec { return } for len(os.Args) > 1 { @@ -39,51 +40,51 @@ func TestReexec(t *testing.T) { } } switch testCase { - case "1": - Convey("Reexec should panic when command not found", t, func(ctx C){ - So(func(){ - reexec.Init() - }, ShouldPanicWith, exec.ErrNotFound) - }) - case "2": - Convey("Reexec should run when fd 3 is not open", t, func(ctx C){ - So((func() error{ - pipe := os.NewFile(3, "pipe") - if pipe == nil { - return errors.New("pipe is nil") - } else { - _, err := pipe.Stat() - return err - } - })(), ShouldNotBeNil) - So(func(){ - reexec.Init() - }, ShouldPanicWith, syscall.ENOEXEC) - }) - case "3": - Convey("Reexec should fail when fd 3 is sent with abrt cmd", t, func(ctx C){ - So(func(){ - reexec.Init() - }, ShouldPanicWith, "Exited on request") - }) - case "4": - Convey("Reexec should run when fd 3 is sent with cont cmd", t, func(ctx C){ - So(func(){ - reexec.Init() - }, ShouldPanicWith, syscall.ENOEXEC) - }) - case "5": - Convey("Reexec should not be triggered when argv[0] is not reexec", t, func(ctx C){ - So(func(){ - reexec.Init() - }, ShouldNotPanic) - }) + case "1": + Convey("Reexec should panic when command not found", t, func(ctx C) { + So(func() { + reexec.Init() + }, ShouldPanicWith, exec.ErrNotFound) + }) + case "2": + Convey("Reexec should run when fd 3 is not open", t, func(ctx C) { + So((func() error { + pipe := os.NewFile(3, "pipe") + if pipe == nil { + return errors.New("pipe is nil") + } else { + _, err := pipe.Stat() + return err + } + })(), ShouldNotBeNil) + So(func() { + reexec.Init() + }, ShouldPanicWith, syscall.ENOEXEC) + }) + case "3": + Convey("Reexec should fail when fd 3 is sent with abrt cmd", t, func(ctx C) { + So(func() { + reexec.Init() + }, ShouldPanicWith, "Exited on request") + }) + case "4": + Convey("Reexec should run when fd 3 is sent with cont cmd", t, func(ctx C) { + So(func() { + reexec.Init() + }, ShouldPanicWith, syscall.ENOEXEC) + }) + case "5": + Convey("Reexec should not be triggered when argv[0] is not reexec", t, func(ctx C) { + So(func() { + reexec.Init() + }, ShouldNotPanic) + }) } } func TestCgroup(t *testing.T) { var cgcf *cgroupConfig - Convey("init cgroup", t, func(ctx C){ + Convey("init cgroup", t, func(ctx C) { _, useCurrentCgroup := os.LookupEnv("USECURCGROUP") cgcf = &cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"} if useCurrentCgroup { @@ -97,28 +98,28 @@ func TestCgroup(t *testing.T) { So(cgcf.cgMgrV1, ShouldNotBeNil) } - Convey("Cgroup Should Work", func(ctx C) { - tmpDir, err := ioutil.TempDir("", "tunasync") - defer os.RemoveAll(tmpDir) - So(err, ShouldBeNil) - cmdScript := filepath.Join(tmpDir, "cmd.sh") - daemonScript := filepath.Join(tmpDir, "daemon.sh") - tmpFile := filepath.Join(tmpDir, "log_file") - bgPidfile := filepath.Join(tmpDir, "bg.pid") + Convey("Cgroup Should Work", func(ctx C) { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + cmdScript := filepath.Join(tmpDir, "cmd.sh") + daemonScript := filepath.Join(tmpDir, "daemon.sh") + tmpFile := filepath.Join(tmpDir, "log_file") + bgPidfile := filepath.Join(tmpDir, "bg.pid") - c := cmdConfig{ - name: "tuna-cgroup", - upstreamURL: "http://mirrors.tuna.moe/", - command: cmdScript + " " + daemonScript, - workingDir: tmpDir, - logDir: tmpDir, - logFile: tmpFile, - interval: 600 * time.Second, - env: map[string]string{ - "BG_PIDFILE": bgPidfile, - }, - } - cmdScriptContent := `#!/bin/bash + c := cmdConfig{ + name: "tuna-cgroup", + upstreamURL: "http://mirrors.tuna.moe/", + command: cmdScript + " " + daemonScript, + workingDir: tmpDir, + logDir: tmpDir, + logFile: tmpFile, + interval: 600 * time.Second, + env: map[string]string{ + "BG_PIDFILE": bgPidfile, + }, + } + cmdScriptContent := `#!/bin/bash redirect-std() { [[ -t 0 ]] && exec /dev/null @@ -144,167 +145,127 @@ echo $$ daemonize $@ sleep 5 ` - daemonScriptContent := `#!/bin/bash + daemonScriptContent := `#!/bin/bash echo $$ > $BG_PIDFILE sleep 30 ` - err = ioutil.WriteFile(cmdScript, []byte(cmdScriptContent), 0755) - So(err, ShouldBeNil) - err = ioutil.WriteFile(daemonScript, []byte(daemonScriptContent), 0755) - So(err, ShouldBeNil) - - provider, err := newCmdProvider(c) - So(err, ShouldBeNil) - - cg := newCgroupHook(provider, *cgcf, 0) - provider.AddHook(cg) - - err = cg.preExec() - So(err, ShouldBeNil) - - go func() { - err := provider.Run(make(chan empty, 1)) - ctx.So(err, ShouldNotBeNil) - }() - - time.Sleep(1 * time.Second) - // Deamon should be started - daemonPidBytes, err := ioutil.ReadFile(bgPidfile) - So(err, ShouldBeNil) - daemonPid := strings.Trim(string(daemonPidBytes), " \n") - logger.Debug("daemon pid: %s", daemonPid) - procDir := filepath.Join("/proc", daemonPid) - _, err = os.Stat(procDir) - So(err, ShouldBeNil) - - err = provider.Terminate() - So(err, ShouldBeNil) - - // Deamon won't be killed - _, err = os.Stat(procDir) - So(err, ShouldBeNil) - - // Deamon can be killed by cgroup killer - cg.postExec() - _, err = os.Stat(procDir) - So(os.IsNotExist(err), ShouldBeTrue) - - }) - - Convey("Rsync Memory Should Be Limited", func() { - tmpDir, err := ioutil.TempDir("", "tunasync") - defer os.RemoveAll(tmpDir) - So(err, ShouldBeNil) - scriptFile := filepath.Join(tmpDir, "myrsync") - tmpFile := filepath.Join(tmpDir, "log_file") - - c := rsyncConfig{ - name: "tuna-cgroup", - upstreamURL: "rsync://rsync.tuna.moe/tuna/", - rsyncCmd: scriptFile, - workingDir: tmpDir, - logDir: tmpDir, - logFile: tmpFile, - useIPv6: true, - interval: 600 * time.Second, - } - - provider, err := newRsyncProvider(c) - So(err, ShouldBeNil) - - cg := newCgroupHook(provider, *cgcf, 512 * units.MiB) - provider.AddHook(cg) - - err = cg.preExec() - So(err, ShouldBeNil) - if cgcf.isUnified { - cgpath := filepath.Join(cgcf.BasePath, cgcf.Group, provider.Name()) - if useCurrentCgroup { - group, err := cgv2.NestedGroupPath(filepath.Join("..", provider.Name())) - So(err, ShouldBeNil) - cgpath = filepath.Join(cgcf.BasePath, group) - } - memoLimit, err := ioutil.ReadFile(filepath.Join(cgpath, "memory.max")) + err = ioutil.WriteFile(cmdScript, []byte(cmdScriptContent), 0755) So(err, ShouldBeNil) - So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) - } else { - for _, subsys := range(cg.cgMgrV1.Subsystems()) { - if subsys.Name() == cgv1.Memory { - cgpath := filepath.Join(cgcf.Group, provider.Name()) - if useCurrentCgroup { - p, err := cgv1.NestedPath(filepath.Join("..", provider.Name()))(cgv1.Memory) + err = ioutil.WriteFile(daemonScript, []byte(daemonScriptContent), 0755) + So(err, ShouldBeNil) + + provider, err := newCmdProvider(c) + So(err, ShouldBeNil) + + cg := newCgroupHook(provider, *cgcf, 0) + provider.AddHook(cg) + + err = cg.preExec() + So(err, ShouldBeNil) + + go func() { + err := provider.Run(make(chan empty, 1)) + ctx.So(err, ShouldNotBeNil) + }() + + time.Sleep(1 * time.Second) + // Deamon should be started + daemonPidBytes, err := ioutil.ReadFile(bgPidfile) + So(err, ShouldBeNil) + daemonPid := strings.Trim(string(daemonPidBytes), " \n") + logger.Debug("daemon pid: %s", daemonPid) + procDir := filepath.Join("/proc", daemonPid) + _, err = os.Stat(procDir) + So(err, ShouldBeNil) + + err = provider.Terminate() + So(err, ShouldBeNil) + + // Deamon won't be killed + _, err = os.Stat(procDir) + So(err, ShouldBeNil) + + // Deamon can be killed by cgroup killer + cg.postExec() + _, err = os.Stat(procDir) + So(os.IsNotExist(err), ShouldBeTrue) + + }) + + Convey("Rsync Memory Should Be Limited", func() { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + scriptFile := filepath.Join(tmpDir, "myrsync") + tmpFile := filepath.Join(tmpDir, "log_file") + + c := rsyncConfig{ + name: "tuna-cgroup", + upstreamURL: "rsync://rsync.tuna.moe/tuna/", + rsyncCmd: scriptFile, + workingDir: tmpDir, + logDir: tmpDir, + logFile: tmpFile, + useIPv6: true, + interval: 600 * time.Second, + } + + provider, err := newRsyncProvider(c) + So(err, ShouldBeNil) + + cg := newCgroupHook(provider, *cgcf, 512*units.MiB) + provider.AddHook(cg) + + err = cg.preExec() + So(err, ShouldBeNil) + if cgcf.isUnified { + cgpath := filepath.Join(cgcf.BasePath, cgcf.Group, provider.Name()) + if useCurrentCgroup { + group, err := cgv2.NestedGroupPath(filepath.Join("..", provider.Name())) + So(err, ShouldBeNil) + cgpath = filepath.Join(cgcf.BasePath, group) + } + memoLimit, err := ioutil.ReadFile(filepath.Join(cgpath, "memory.max")) + So(err, ShouldBeNil) + So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) + } else { + for _, subsys := range cg.cgMgrV1.Subsystems() { + if subsys.Name() == cgv1.Memory { + cgpath := filepath.Join(cgcf.Group, provider.Name()) + if useCurrentCgroup { + p, err := cgv1.NestedPath(filepath.Join("..", provider.Name()))(cgv1.Memory) + So(err, ShouldBeNil) + cgpath = p + } + memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, "memory", cgpath, "memory.limit_in_bytes")) So(err, ShouldBeNil) - cgpath = p - } - memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, "memory", cgpath, "memory.limit_in_bytes")) - So(err, ShouldBeNil) - So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) - } - } - } - cg.postExec() - So(cg.cgMgrV1, ShouldBeNil) - }) - Reset(func() { - if cgcf.isUnified { - if cgcf.Group == "" { - wkrg, err := cgv2.NestedGroupPath(""); - So(err, ShouldBeNil) - wkrMgr, err := cgv2.LoadManager("/sys/fs/cgroup", wkrg); - allCtrls, err := wkrMgr.Controllers() - So(err, ShouldBeNil) - err = wkrMgr.ToggleControllers(allCtrls, cgv2.Disable) - So(err, ShouldBeNil) - origMgr := cgcf.cgMgrV2 - for { - logger.Debugf("Restoring pids") - procs, err := wkrMgr.Procs(false) - So(err, ShouldBeNil) - if len(procs) == 0 { - break - } - for _, p := range(procs) { - if err := origMgr.AddProc(p); err != nil{ - if errors.Is(err, syscall.ESRCH) { - logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring") - } else { - So(err, ShouldBeNil) - } - } + So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) } } - err = wkrMgr.Delete() - So(err, ShouldBeNil) } - } else { - if cgcf.Group == "" { - pather := (func(p cgv1.Path) (cgv1.Path){ - return func(subsys cgv1.Name) (string, error){ - path, err := p(subsys); - if err != nil { - return "", err - } - if path == "/" { - return "", cgv1.ErrControllerNotActive - } - return path, err - } - })(cgv1.NestedPath("")) - wkrMgr, err := cgv1.Load(cgv1.V1, pather, func(cfg *cgv1.InitConfig) error{ - cfg.InitCheck = cgv1.AllowAny - return nil - }) - So(err, ShouldBeNil) - origMgr := cgcf.cgMgrV1 - for _, subsys := range(wkrMgr.Subsystems()){ + cg.postExec() + So(cg.cgMgrV1, ShouldBeNil) + }) + Reset(func() { + if cgcf.isUnified { + if cgcf.Group == "" { + wkrg, err := cgv2.NestedGroupPath("") + So(err, ShouldBeNil) + wkrMgr, _ := cgv2.LoadManager("/sys/fs/cgroup", wkrg) + allCtrls, err := wkrMgr.Controllers() + So(err, ShouldBeNil) + err = wkrMgr.ToggleControllers(allCtrls, cgv2.Disable) + So(err, ShouldBeNil) + origMgr := cgcf.cgMgrV2 for { - procs, err := wkrMgr.Processes(subsys.Name(), false) + logger.Debugf("Restoring pids") + procs, err := wkrMgr.Procs(false) So(err, ShouldBeNil) if len(procs) == 0 { break } - for _, proc := range(procs) { - if err := origMgr.Add(proc); err != nil { + for _, p := range procs { + if err := origMgr.AddProc(p); err != nil { if errors.Is(err, syscall.ESRCH) { logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring") } else { @@ -313,11 +274,51 @@ sleep 30 } } } + err = wkrMgr.Delete() + So(err, ShouldBeNil) + } + } else { + if cgcf.Group == "" { + pather := (func(p cgv1.Path) cgv1.Path { + return func(subsys cgv1.Name) (string, error) { + path, err := p(subsys) + if err != nil { + return "", err + } + if path == "/" { + return "", cgv1.ErrControllerNotActive + } + return path, err + } + })(cgv1.NestedPath("")) + wkrMgr, err := cgv1.Load(cgv1.V1, pather, func(cfg *cgv1.InitConfig) error { + cfg.InitCheck = cgv1.AllowAny + return nil + }) + So(err, ShouldBeNil) + origMgr := cgcf.cgMgrV1 + for _, subsys := range wkrMgr.Subsystems() { + for { + procs, err := wkrMgr.Processes(subsys.Name(), false) + So(err, ShouldBeNil) + if len(procs) == 0 { + break + } + for _, proc := range procs { + if err := origMgr.Add(proc); err != nil { + if errors.Is(err, syscall.ESRCH) { + logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring") + } else { + So(err, ShouldBeNil) + } + } + } + } + } + err = wkrMgr.Delete() + So(err, ShouldBeNil) } - err = wkrMgr.Delete() - So(err, ShouldBeNil) } - } - }) + }) }) } diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index b877853..2e7ffbe 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -106,7 +106,7 @@ func (p *cmdProvider) Run(started chan empty) error { } if len(matches) != 0 { logger.Debug("Fail-on-match: %r", matches) - return fmt.Errorf("Fail-on-match regexp found %d matches", len(matches)) + return fmt.Errorf("fail-on-match regexp found %d matches", len(matches)) } } if p.sizePattern != nil { diff --git a/worker/config.go b/worker/config.go index c91cbc2..740743b 100644 --- a/worker/config.go +++ b/worker/config.go @@ -6,10 +6,10 @@ import ( "path/filepath" "github.com/BurntSushi/toml" - "github.com/imdario/mergo" - units "github.com/docker/go-units" cgv1 "github.com/containerd/cgroups" cgv2 "github.com/containerd/cgroups/v2" + units "github.com/docker/go-units" + "github.com/imdario/mergo" ) type providerEnum uint8 @@ -30,7 +30,7 @@ func (p *providerEnum) UnmarshalText(text []byte) error { case `two-stage-rsync`: *p = provTwoStageRsync default: - return errors.New("Invalid value to provierEnum") + return errors.New("invalid value to provierEnum") } return nil } diff --git a/worker/config_diff_test.go b/worker/config_diff_test.go index 55c80cc..9f165fb 100644 --- a/worker/config_diff_test.go +++ b/worker/config_diff_test.go @@ -10,12 +10,12 @@ import ( func TestConfigDiff(t *testing.T) { Convey("When old and new configs are equal", t, func() { oldList := []mirrorConfig{ - mirrorConfig{Name: "debian"}, - mirrorConfig{Name: "debian-security"}, - mirrorConfig{Name: "fedora"}, - mirrorConfig{Name: "archlinux"}, - mirrorConfig{Name: "AOSP"}, - mirrorConfig{Name: "ubuntu"}, + {Name: "debian"}, + {Name: "debian-security"}, + {Name: "fedora"}, + {Name: "archlinux"}, + {Name: "AOSP"}, + {Name: "ubuntu"}, } newList := make([]mirrorConfig, len(oldList)) copy(newList, oldList) @@ -25,19 +25,19 @@ func TestConfigDiff(t *testing.T) { }) Convey("When giving two config lists with different names", t, func() { oldList := []mirrorConfig{ - mirrorConfig{Name: "debian"}, - mirrorConfig{Name: "debian-security"}, - mirrorConfig{Name: "fedora"}, - mirrorConfig{Name: "archlinux"}, - mirrorConfig{Name: "AOSP", Env: map[string]string{"REPO": "/usr/bin/repo"}}, - mirrorConfig{Name: "ubuntu"}, + {Name: "debian"}, + {Name: "debian-security"}, + {Name: "fedora"}, + {Name: "archlinux"}, + {Name: "AOSP", Env: map[string]string{"REPO": "/usr/bin/repo"}}, + {Name: "ubuntu"}, } newList := []mirrorConfig{ - mirrorConfig{Name: "debian"}, - mirrorConfig{Name: "debian-cd"}, - mirrorConfig{Name: "archlinuxcn"}, - mirrorConfig{Name: "AOSP", Env: map[string]string{"REPO": "/usr/local/bin/aosp-repo"}}, - mirrorConfig{Name: "ubuntu-ports"}, + {Name: "debian"}, + {Name: "debian-cd"}, + {Name: "archlinuxcn"}, + {Name: "AOSP", Env: map[string]string{"REPO": "/usr/local/bin/aosp-repo"}}, + {Name: "ubuntu-ports"}, } difference := diffMirrorConfig(oldList, newList) diff --git a/worker/context.go b/worker/context.go index 7a240a7..a4828cd 100644 --- a/worker/context.go +++ b/worker/context.go @@ -34,7 +34,7 @@ func (ctx *Context) Enter() *Context { // Exit return the upper layer of context func (ctx *Context) Exit() (*Context, error) { if ctx.parent == nil { - return nil, errors.New("Cannot exit the bottom layer context") + return nil, errors.New("cannot exit the bottom layer context") } return ctx.parent, nil } diff --git a/worker/docker.go b/worker/docker.go index ae8cf77..f6b1cde 100644 --- a/worker/docker.go +++ b/worker/docker.go @@ -10,9 +10,9 @@ import ( type dockerHook struct { emptyHook - image string - volumes []string - options []string + image string + volumes []string + options []string memoryLimit MemBytes } @@ -33,9 +33,9 @@ func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dock emptyHook: emptyHook{ provider: p, }, - image: mCfg.DockerImage, - volumes: volumes, - options: options, + image: mCfg.DockerImage, + volumes: volumes, + options: options, memoryLimit: mCfg.MemoryLimit, } } @@ -49,7 +49,7 @@ func (d *dockerHook) preExec() error { if _, err := os.Stat(workingDir); os.IsNotExist(err) { logger.Debugf("Making dir %s", workingDir) if err = os.MkdirAll(workingDir, 0755); err != nil { - return fmt.Errorf("Error making dir %s: %s", workingDir, err.Error()) + return fmt.Errorf("error making dir %s: %s", workingDir, err.Error()) } } diff --git a/worker/exec_post_hook.go b/worker/exec_post_hook.go index a2c564c..db5e423 100644 --- a/worker/exec_post_hook.go +++ b/worker/exec_post_hook.go @@ -32,7 +32,7 @@ func newExecPostHook(provider mirrorProvider, execOn uint8, command string) (*ex return nil, err } if execOn != execOnSuccess && execOn != execOnFailure { - return nil, fmt.Errorf("Invalid option for exec-on: %d", execOn) + return nil, fmt.Errorf("invalid option for exec-on: %d", execOn) } return &execPostHook{ @@ -92,7 +92,7 @@ func (h *execPostHook) Do() error { args = append(args, arg) } } else { - return errors.New("Invalid Command") + return errors.New("invalid command") } return session.Command(cmd, args...).Run() } diff --git a/worker/job.go b/worker/job.go index 39af616..0193afc 100644 --- a/worker/job.go +++ b/worker/job.go @@ -79,7 +79,7 @@ func (m *mirrorJob) SetState(state uint32) { func (m *mirrorJob) SetProvider(provider mirrorProvider) error { s := m.State() if (s != stateNone) && (s != stateDisabled) { - return fmt.Errorf("Provider cannot be switched when job state is %d", s) + return fmt.Errorf("provider cannot be switched when job state is %d", s) } m.provider = provider return nil diff --git a/worker/provider.go b/worker/provider.go index 443c8f5..8a19554 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -188,7 +188,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { p.isMaster = isMaster provider = p default: - panic(errors.New("Invalid mirror provider")) + panic(errors.New("invalid mirror provider")) } // Add Logging Hook diff --git a/worker/runner.go b/worker/runner.go index 5f25a84..4ef6183 100644 --- a/worker/runner.go +++ b/worker/runner.go @@ -11,22 +11,21 @@ import ( "time" "github.com/codeskyblue/go-sh" - "golang.org/x/sys/unix" - "github.com/moby/moby/pkg/reexec" cgv1 "github.com/containerd/cgroups" + "github.com/moby/moby/pkg/reexec" + "golang.org/x/sys/unix" ) // runner is to run os commands giving command line, env and log file // it's an alternative to python-sh or go-sh -var errProcessNotStarted = errors.New("Process Not Started") +var errProcessNotStarted = errors.New("process not started") type cmdJob struct { sync.Mutex cmd *exec.Cmd workingDir string env map[string]string - logFile *os.File finished chan empty provider mirrorProvider retErr error @@ -60,7 +59,7 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, } // set memlimit if d.memoryLimit != 0 { - args = append(args, "-m", fmt.Sprint(d.memoryLimit.Value())) + args = append(args, "-m", fmt.Sprint(d.memoryLimit.Value())) } // apply options args = append(args, d.options...) @@ -115,7 +114,7 @@ func (c *cmdJob) Start() error { if cg != nil { logger.Debugf("Preparing cgroup sync pipes for job %s", c.provider.Name()) var err error - pipeR, pipeW, err = os.Pipe(); + pipeR, pipeW, err = os.Pipe() if err != nil { return err } @@ -139,7 +138,7 @@ func (c *cmdJob) Start() error { } pid := c.cmd.Process.Pid if cg.cgCfg.isUnified { - if err := cg.cgMgrV2.AddProc(uint64(pid)); err != nil{ + if err := cg.cgMgrV2.AddProc(uint64(pid)); err != nil { if errors.Is(err, syscall.ESRCH) { logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring") } else { @@ -147,7 +146,7 @@ func (c *cmdJob) Start() error { } } } else { - if err := cg.cgMgrV1.Add(cgv1.Process{Pid: pid}); err != nil{ + if err := cg.cgMgrV1.Add(cgv1.Process{Pid: pid}); err != nil { if errors.Is(err, syscall.ESRCH) { logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring") } else { diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go index 7063991..18965c9 100644 --- a/worker/two_stage_rsync_provider.go +++ b/worker/two_stage_rsync_provider.go @@ -112,7 +112,7 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) { options = append(options, p.stage1Options...) stage1Profile, ok := rsyncStage1Profiles[p.stage1Profile] if !ok { - return nil, errors.New("Invalid Stage 1 Profile") + return nil, errors.New("invalid stage 1 profile") } for _, exc := range stage1Profile { options = append(options, exc) @@ -124,7 +124,7 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) { options = append(options, p.extraOptions...) } } else { - return []string{}, fmt.Errorf("Invalid stage: %d", stage) + return []string{}, fmt.Errorf("invalid stage: %d", stage) } if !p.rsyncNeverTimeout { diff --git a/worker/worker_test.go b/worker/worker_test.go index e00f59a..9550181 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -147,7 +147,7 @@ func TestWorker(t *testing.T) { }) Convey("with one job", func(ctx C) { workerCfg.Mirrors = []mirrorConfig{ - mirrorConfig{ + { Name: "job-ls", Provider: provCommand, Command: "ls", @@ -194,17 +194,17 @@ func TestWorker(t *testing.T) { }) Convey("with several jobs", func(ctx C) { workerCfg.Mirrors = []mirrorConfig{ - mirrorConfig{ + { Name: "job-ls-1", Provider: provCommand, Command: "ls", }, - mirrorConfig{ + { Name: "job-fail", Provider: provCommand, Command: "non-existent-command-xxxx", }, - mirrorConfig{ + { Name: "job-ls-2", Provider: provCommand, Command: "ls", diff --git a/worker/zfs_hook_test.go b/worker/zfs_hook_test.go index cf24a59..e5b8a7a 100644 --- a/worker/zfs_hook_test.go +++ b/worker/zfs_hook_test.go @@ -13,7 +13,7 @@ import ( func TestZFSHook(t *testing.T) { Convey("ZFS Hook should work", t, func(ctx C) { - tmpDir, err := ioutil.TempDir("", "tunasync") + tmpDir, _ := ioutil.TempDir("", "tunasync") tmpFile := filepath.Join(tmpDir, "log_file") c := cmdConfig{ @@ -45,4 +45,4 @@ func TestZFSHook(t *testing.T) { So(err, ShouldNotBeNil) }) }) -} \ No newline at end of file +}