diff --git a/worker/cgroup.go b/worker/cgroup.go index 77c5439..e5c2c7f 100644 --- a/worker/cgroup.go +++ b/worker/cgroup.go @@ -3,6 +3,7 @@ package worker import ( "errors" "fmt" + "io/ioutil" "os" "os/exec" "path/filepath" @@ -25,38 +26,47 @@ type cgroupHook struct { cgMgrV2 *cgv2.Manager } +type execCmd string + +const ( + cmdCont execCmd = "cont" + cmdAbrt execCmd = "abrt" +) + func init () { reexec.Register("tunasync-exec", waitExec) } func waitExec () { - binary, lookErr := exec.LookPath(os.Args[1]) - if lookErr != nil { - panic(lookErr) + binary, err := exec.LookPath(os.Args[1]) + if err != nil { + panic(err) } pipe := os.NewFile(3, "pipe") if pipe != nil { - for { - tmpBytes := make([]byte, 1) - nRead, err := pipe.Read(tmpBytes) + if _, err := pipe.Stat(); err == nil { + cmdBytes, err := ioutil.ReadAll(pipe) if err != nil { - break + panic(err) } - if nRead == 0 { - break + if err := pipe.Close(); err != nil { + } + cmd := execCmd(string(cmdBytes)) + switch cmd { + case cmdAbrt: + fallthrough + default: + panic("Exited on request") + case cmdCont: } - } - err := pipe.Close() - if err != nil { } } args := os.Args[1:] env := os.Environ() - execErr := syscall.Exec(binary, args, env) - if execErr != nil { - panic(execErr) + if err := syscall.Exec(binary, args, env); err != nil { + panic(err) } panic("Exec failed.") } @@ -241,6 +251,7 @@ func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgrou provider: p, }, cgCfg: cfg, + memLimit: memLimit, } } @@ -255,7 +266,7 @@ func (c *cgroupHook) preExec() error { }, } } - subMgr, err := c.cgMgrV2.NewChild(c.provider.Name(), resSet) + subMgr, err := c.cgCfg.cgMgrV2.NewChild(c.provider.Name(), resSet) if err != nil { logger.Errorf("Failed to create cgroup for task %s: %s", c.provider.Name(), err.Error()) return err @@ -263,15 +274,15 @@ func (c *cgroupHook) preExec() error { c.cgMgrV2 = subMgr } else { logger.Debugf("Creating v1 cgroup for task %s", c.provider.Name()) - var resSet *contspecs.LinuxResources + var resSet contspecs.LinuxResources if c.memLimit != 0 { - resSet = &contspecs.LinuxResources { + resSet = contspecs.LinuxResources { Memory: &contspecs.LinuxMemory{ Limit: func(i int64) *int64 { return &i }(c.memLimit.Value()), }, } } - subMgr, err := c.cgMgrV1.New(c.provider.Name(), resSet) + subMgr, err := c.cgCfg.cgMgrV1.New(c.provider.Name(), &resSet) if err != nil { logger.Errorf("Failed to create cgroup for task %s: %s", c.provider.Name(), err.Error()) return err diff --git a/worker/cgroup_test.go b/worker/cgroup_test.go index b372d86..ab352f5 100644 --- a/worker/cgroup_test.go +++ b/worker/cgroup_test.go @@ -10,10 +10,15 @@ import ( "time" cgv1 "github.com/containerd/cgroups" units "github.com/docker/go-units" + "github.com/moby/moby/pkg/reexec" . "github.com/smartystreets/goconvey/convey" ) +func init() { + reexec.Init() +} + func TestCgroup(t *testing.T) { Convey("Cgroup Should Work", t, func(ctx C) { tmpDir, err := ioutil.TempDir("", "tunasync") @@ -77,14 +82,15 @@ sleep 30 cgcf := cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"} err = initCgroup(&cgcf) So(err, ShouldBeNil) + if cgcf.isUnified { + So(cgcf.cgMgrV2, ShouldNotBeNil) + } else { + So(cgcf.cgMgrV1, ShouldNotBeNil) + } cg := newCgroupHook(provider, cgcf, 0) provider.AddHook(cg) err = cg.preExec() - if err != nil { - logger.Errorf("Failed to create cgroup") - return - } So(err, ShouldBeNil) go func() { @@ -140,20 +146,27 @@ sleep 30 cgcf := cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"} err = initCgroup(&cgcf) So(err, ShouldBeNil) + if cgcf.isUnified { + So(cgcf.cgMgrV2, ShouldNotBeNil) + } else { + So(cgcf.cgMgrV1, ShouldNotBeNil) + } cg := newCgroupHook(provider, cgcf, 512 * units.MiB) provider.AddHook(cg) err = cg.preExec() - if err != nil { - logger.Errorf("Failed to create cgroup") - return - } - So(cg.cgMgrV1, ShouldNotBeNil) - for _, subsys := range(cg.cgMgrV1.Subsystems()) { - if subsys.Name() == cgv1.Memory { - memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, "memory", cgcf.Group, provider.Name(), "memory.limit_in_bytes")) - So(err, ShouldBeNil) - So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) + So(err, ShouldBeNil) + if cgcf.isUnified { + memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, cgcf.Group, provider.Name(), "memory.max")) + So(err, ShouldBeNil) + So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) + } else { + for _, subsys := range(cg.cgMgrV1.Subsystems()) { + if subsys.Name() == cgv1.Memory { + memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, "memory", cgcf.Group, provider.Name(), "memory.limit_in_bytes")) + So(err, ShouldBeNil) + So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) + } } } cg.postExec() diff --git a/worker/runner.go b/worker/runner.go index a2d6dfd..5f25a84 100644 --- a/worker/runner.go +++ b/worker/runner.go @@ -12,6 +12,8 @@ import ( "github.com/codeskyblue/go-sh" "golang.org/x/sys/unix" + "github.com/moby/moby/pkg/reexec" + cgv1 "github.com/containerd/cgroups" ) // runner is to run os commands giving command line, env and log file @@ -70,11 +72,7 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, cmd = exec.Command(c, args...) } else if provider.Cgroup() != nil { - //c := "cgexec" - //args := []string{"-g", provider.Cgroup().Cgroup()} - //args = append(args, cmdAndArgs...) - //cmd = exec.Command(c, args...) - cmd = exec.Command(cmdAndArgs[0], cmdAndArgs[1:]...) + cmd = reexec.Command(append([]string{"tunasync-exec"}, cmdAndArgs...)...) } else { if len(cmdAndArgs) == 1 { @@ -109,9 +107,59 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, } func (c *cmdJob) Start() error { + cg := c.provider.Cgroup() + var ( + pipeR *os.File + pipeW *os.File + ) + if cg != nil { + logger.Debugf("Preparing cgroup sync pipes for job %s", c.provider.Name()) + var err error + pipeR, pipeW, err = os.Pipe(); + if err != nil { + return err + } + c.cmd.ExtraFiles = []*os.File{pipeR} + defer pipeR.Close() + defer pipeW.Close() + } + logger.Debugf("Command start: %v", c.cmd.Args) c.finished = make(chan empty, 1) - return c.cmd.Start() + + if err := c.cmd.Start(); err != nil { + return err + } + if cg != nil { + if err := pipeR.Close(); err != nil { + return err + } + if c.cmd == nil || c.cmd.Process == nil { + return errProcessNotStarted + } + pid := c.cmd.Process.Pid + if cg.cgCfg.isUnified { + if err := cg.cgMgrV2.AddProc(uint64(pid)); err != nil{ + if errors.Is(err, syscall.ESRCH) { + logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring") + } else { + return err + } + } + } else { + if err := cg.cgMgrV1.Add(cgv1.Process{Pid: pid}); err != nil{ + if errors.Is(err, syscall.ESRCH) { + logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring") + } else { + return err + } + } + } + if _, err := pipeW.WriteString(string(cmdCont)); err != nil { + return err + } + } + return nil } func (c *cmdJob) Wait() error {