diff --git a/go.mod b/go.mod index 4a84d0c..51e9458 100644 --- a/go.mod +++ b/go.mod @@ -36,3 +36,5 @@ require ( gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 gotest.tools/v3 v3.0.3 // indirect ) + +replace github.com/containerd/cgroups v1.0.1 => github.com/shankerwangmiao/cgroups v1.0.1-p6 diff --git a/go.sum b/go.sum index e02ed51..4309ce3 100644 --- a/go.sum +++ b/go.sum @@ -27,8 +27,6 @@ github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0 h1:sDMmm+q/3+Bu github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM= github.com/codeskyblue/go-sh v0.0.0-20190412065543-76bd3d59ff27 h1:HHUr4P/aKh4quafGxDT9LDasjGdlGkzLbfmmrlng3kA= github.com/codeskyblue/go-sh v0.0.0-20190412065543-76bd3d59ff27/go.mod h1:VQx0hjo2oUeQkQUET7wRwradO6f+fN5jzXgB/zROxxE= -github.com/containerd/cgroups v1.0.1 h1:iJnMvco9XGvKUvNQkv88bE4uJXxRQH18efbKo9w5vHQ= -github.com/containerd/cgroups v1.0.1/go.mod h1:0SJrPIenamHDcZhEcJMNBB85rHcUsw4f25ZfBiPYRkU= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= @@ -166,6 +164,8 @@ github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a8tTFrMLUcfWwyC0pnifVo2ClaLq+hP8= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8= +github.com/shankerwangmiao/cgroups v1.0.1-p6 h1:rXRdXSexliIldzv4M4pEGhG73IYUv1pUlrOZqUA5IUM= +github.com/shankerwangmiao/cgroups v1.0.1-p6/go.mod h1:0SJrPIenamHDcZhEcJMNBB85rHcUsw4f25ZfBiPYRkU= github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= diff --git a/worker/cgroup_test.go b/worker/cgroup_test.go index ab352f5..b3db44b 100644 --- a/worker/cgroup_test.go +++ b/worker/cgroup_test.go @@ -8,7 +8,10 @@ import ( "strings" "testing" "time" + "errors" + "syscall" cgv1 "github.com/containerd/cgroups" + cgv2 "github.com/containerd/cgroups/v2" units "github.com/docker/go-units" "github.com/moby/moby/pkg/reexec" @@ -20,7 +23,22 @@ func init() { } func TestCgroup(t *testing.T) { - Convey("Cgroup Should Work", t, func(ctx C) { + var cgcf *cgroupConfig + Convey("init cgroup", t, func(ctx C){ + _, useCurrentCgroup := os.LookupEnv("USECURCGROUP") + cgcf = &cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"} + if useCurrentCgroup { + cgcf.Group = "" + } + err := initCgroup(cgcf) + So(err, ShouldBeNil) + if cgcf.isUnified { + So(cgcf.cgMgrV2, ShouldNotBeNil) + } else { + So(cgcf.cgMgrV1, ShouldNotBeNil) + } + + Convey("Cgroup Should Work", func(ctx C) { tmpDir, err := ioutil.TempDir("", "tunasync") defer os.RemoveAll(tmpDir) So(err, ShouldBeNil) @@ -79,15 +97,7 @@ sleep 30 provider, err := newCmdProvider(c) So(err, ShouldBeNil) - cgcf := cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"} - err = initCgroup(&cgcf) - So(err, ShouldBeNil) - if cgcf.isUnified { - So(cgcf.cgMgrV2, ShouldNotBeNil) - } else { - So(cgcf.cgMgrV1, ShouldNotBeNil) - } - cg := newCgroupHook(provider, cgcf, 0) + cg := newCgroupHook(provider, *cgcf, 0) provider.AddHook(cg) err = cg.preExec() @@ -122,7 +132,7 @@ sleep 30 }) - Convey("Rsync Memory Should Be Limited", t, func() { + Convey("Rsync Memory Should Be Limited", func() { tmpDir, err := ioutil.TempDir("", "tunasync") defer os.RemoveAll(tmpDir) So(err, ShouldBeNil) @@ -143,27 +153,31 @@ sleep 30 provider, err := newRsyncProvider(c) So(err, ShouldBeNil) - cgcf := cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"} - err = initCgroup(&cgcf) - So(err, ShouldBeNil) - if cgcf.isUnified { - So(cgcf.cgMgrV2, ShouldNotBeNil) - } else { - So(cgcf.cgMgrV1, ShouldNotBeNil) - } - cg := newCgroupHook(provider, cgcf, 512 * units.MiB) + cg := newCgroupHook(provider, *cgcf, 512 * units.MiB) provider.AddHook(cg) err = cg.preExec() So(err, ShouldBeNil) if cgcf.isUnified { - memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, cgcf.Group, provider.Name(), "memory.max")) + cgpath := filepath.Join(cgcf.BasePath, cgcf.Group, provider.Name()) + if useCurrentCgroup { + group, err := cgv2.NestedGroupPath(filepath.Join("..", provider.Name())) + So(err, ShouldBeNil) + cgpath = filepath.Join(cgcf.BasePath, group) + } + memoLimit, err := ioutil.ReadFile(filepath.Join(cgpath, "memory.max")) So(err, ShouldBeNil) So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) } else { for _, subsys := range(cg.cgMgrV1.Subsystems()) { if subsys.Name() == cgv1.Memory { - memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, "memory", cgcf.Group, provider.Name(), "memory.limit_in_bytes")) + cgpath := filepath.Join(cgcf.Group, provider.Name()) + if useCurrentCgroup { + p, err := cgv1.NestedPath(filepath.Join("..", provider.Name()))(cgv1.Memory) + So(err, ShouldBeNil) + cgpath = p + } + memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, "memory", cgpath, "memory.limit_in_bytes")) So(err, ShouldBeNil) So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) } @@ -172,4 +186,76 @@ sleep 30 cg.postExec() So(cg.cgMgrV1, ShouldBeNil) }) + Reset(func() { + if cgcf.isUnified { + if cgcf.Group == "" { + wkrg, err := cgv2.NestedGroupPath(""); + So(err, ShouldBeNil) + wkrMgr, err := cgv2.LoadManager("/sys/fs/cgroup", wkrg); + allCtrls, err := wkrMgr.Controllers() + So(err, ShouldBeNil) + err = wkrMgr.ToggleControllers(allCtrls, cgv2.Disable) + So(err, ShouldBeNil) + origMgr := cgcf.cgMgrV2 + for { + logger.Debugf("Restoring pids") + procs, err := wkrMgr.Procs(false) + So(err, ShouldBeNil) + if len(procs) == 0 { + break + } + for _, p := range(procs) { + if err := origMgr.AddProc(p); err != nil{ + if errors.Is(err, syscall.ESRCH) { + logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring") + } else { + So(err, ShouldBeNil) + } + } + } + } + err = wkrMgr.Delete() + So(err, ShouldBeNil) + } + } else { + if cgcf.Group == "" { + pather := (func(p cgv1.Path) (cgv1.Path){ + return func(subsys cgv1.Name) (string, error){ + path, err := p(subsys); + if err != nil { + return "", err + } + if path == "/" { + return "", cgv1.ErrControllerNotActive + } + return path, err + } + })(cgv1.NestedPath("")) + wkrMgr, err := cgv1.Load(cgv1.V1, pather) + So(err, ShouldBeNil) + origMgr := cgcf.cgMgrV1 + for _, subsys := range(wkrMgr.Subsystems()){ + for { + procs, err := wkrMgr.Processes(subsys.Name(), false) + So(err, ShouldBeNil) + if len(procs) == 0 { + break + } + for _, proc := range(procs) { + if err := origMgr.Add(proc); err != nil { + if errors.Is(err, syscall.ESRCH) { + logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring") + } else { + So(err, ShouldBeNil) + } + } + } + } + } + err = wkrMgr.Delete() + So(err, ShouldBeNil) + } + } + }) + }) }