From 02a144744f6d5ae386b4ab49cdf9d4dd9c199563 Mon Sep 17 00:00:00 2001 From: Miao Wang Date: Sun, 25 Jul 2021 01:01:53 +0800 Subject: [PATCH] [WIP] cgroupv2: add init cgroup --- worker/cgroup.go | 143 ++++++++++++++++++++++++++++++++++++++++-- worker/cgroup_test.go | 4 ++ worker/worker.go | 6 ++ 3 files changed, 149 insertions(+), 4 deletions(-) diff --git a/worker/cgroup.go b/worker/cgroup.go index ebe36b5..2d95a56 100644 --- a/worker/cgroup.go +++ b/worker/cgroup.go @@ -17,6 +17,7 @@ import ( "github.com/moby/moby/pkg/reexec" cgv1 "github.com/containerd/cgroups" cgv2 "github.com/containerd/cgroups/v2" + contspecs "github.com/opencontainers/runtime-spec/specs-go" ) type cgroupHook struct { @@ -66,6 +67,7 @@ func waitExec () { func initCgroup(cfg *cgroupConfig) (error) { + logger.Debugf("Initializing cgroup") baseGroup := cfg.Group //subsystem := cfg.Subsystem @@ -78,27 +80,160 @@ func initCgroup(cfg *cgroupConfig) (error) { cfg.isUnified = cgv1.Mode() == cgv1.Unified if cfg.isUnified { - var err error + logger.Debugf("Cgroup V2 detected") g := baseGroup if g == "" { + logger.Debugf("Detecting my cgroup path") + var err error if g, err = cgv2.NestedGroupPath(""); err != nil { return err } } + logger.Infof("Using cgroup path: %s", g) + + var err error if cfg.cgMgrV2, err = cgv2.LoadManager("/sys/fs/cgroup", g); err != nil { return err } + if baseGroup == "" { + logger.Debugf("Creating a sub group and move all processes into it") + wkrMgr, err := cfg.cgMgrV2.NewChild("__worker", nil); + if err != nil { + return err + } + for { + logger.Debugf("Reading pids") + procs, err := cfg.cgMgrV2.Procs(false) + if err != nil { + logger.Errorf("Cannot read pids in that group") + return err + } + if len(procs) == 0 { + break + } + for _, p := range(procs) { + if err := wkrMgr.AddProc(p); err != nil{ + if errors.Is(err, syscall.ESRCH) { + logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring") + } else { + return err + } + } + } + } + } else { + logger.Debugf("Trying to create a sub group in that group") + testMgr, err := cfg.cgMgrV2.NewChild("__test", nil); + if err != nil { + logger.Errorf("Cannot create a sub group in the cgroup") + return err + } + if err := testMgr.Delete(); err != nil { + return err + } + procs, err := cfg.cgMgrV2.Procs(false) + if err != nil { + logger.Errorf("Cannot read pids in that group") + return err + } + if len(procs) != 0 { + return fmt.Errorf("There are remaining processes in cgroup %s", baseGroup) + } + } } else { - var err error + logger.Debugf("Cgroup V1 detected") var pather cgv1.Path if baseGroup != "" { pather = cgv1.StaticPath(baseGroup) } else { - pather = cgv1.NestedPath("") + pather = (func(p cgv1.Path) (cgv1.Path){ + return func(subsys cgv1.Name) (string, error){ + path, err := p(subsys); + if err != nil { + return "", err + } + if path == "/" { + return "", cgv1.ErrControllerNotActive + } + return path, err + } + })(cgv1.NestedPath("")) } + logger.Infof("Loading cgroup") + var err error if cfg.cgMgrV1, err = cgv1.Load(cgv1.V1, pather); err != nil { return err } + logger.Debugf("Available subsystems:") + for _, subsys := range(cfg.cgMgrV1.Subsystems()) { + p, err := pather(subsys.Name()) + if err != nil { + return err + } + logger.Debugf("%s: %s", subsys.Name(), p) + } + if baseGroup == "" { + logger.Debugf("Creating a sub group and move all processes into it") + wkrMgr, err := cfg.cgMgrV1.New("__worker", &contspecs.LinuxResources{}); + if err != nil { + return err + } + for _, subsys := range(cfg.cgMgrV1.Subsystems()) { + logger.Debugf("Reading pids for subsystem %s", subsys.Name()) + for { + procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false) + if err != nil { + p, err := pather(subsys.Name()) + if err != nil { + return err + } + logger.Errorf("Cannot read pids in group %s of subsystem %s", p, subsys.Name()) + return err + } + if len(procs) == 0 { + break + } + for _, proc := range(procs) { + if err := wkrMgr.Add(proc); err != nil { + if errors.Is(err, syscall.ESRCH) { + logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring") + } else { + return err + } + } + } + } + } + } else { + logger.Debugf("Trying to create a sub group in that group") + testMgr, err := cfg.cgMgrV1.New("__test", &contspecs.LinuxResources{}); + if err != nil { + logger.Errorf("Cannot create a sub group in the cgroup") + return err + } + if err := testMgr.Delete(); err != nil { + return err + } + for _, subsys := range(cfg.cgMgrV1.Subsystems()) { + logger.Debugf("Reading pids for subsystem %s", subsys.Name()) + procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false) + if err != nil { + p, err := pather(subsys.Name()) + if err != nil { + return err + } + logger.Errorf("Cannot read pids in group %s of subsystem %s", p, subsys.Name()) + return err + } + if len(procs) != 0 { + p, err := pather(subsys.Name()) + if err != nil { + return err + } + return fmt.Errorf("There are remaining processes in cgroup %s of subsystem %s", p, subsys.Name()) + } + } + } } return nil @@ -123,7 +258,7 @@ func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgrou emptyHook: emptyHook{ provider: p, }, - basePath: basePath, + basePath: basePath, baseGroup: baseGroup, subsystem: subsystem, } diff --git a/worker/cgroup_test.go b/worker/cgroup_test.go index 5ab91fe..afd4d8f 100644 --- a/worker/cgroup_test.go +++ b/worker/cgroup_test.go @@ -74,6 +74,8 @@ sleep 30 So(err, ShouldBeNil) cgcf := cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"} + err = initCgroup(&cgcf) + So(err, ShouldBeNil) cg := newCgroupHook(provider, cgcf, 0) provider.AddHook(cg) @@ -135,6 +137,8 @@ sleep 30 So(err, ShouldBeNil) cgcf := cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"} + err = initCgroup(&cgcf) + So(err, ShouldBeNil) cg := newCgroupHook(provider, cgcf, 512 * units.MiB) provider.AddHook(cg) diff --git a/worker/worker.go b/worker/worker.go index 56e5f68..c288b23 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -54,6 +54,12 @@ func NewTUNASyncWorker(cfg *Config) *Worker { w.httpClient = httpClient } + if cfg.Cgroup.Enable { + if err := initCgroup(&cfg.Cgroup); err != nil { + logger.Errorf("Error initializing Cgroup: %s", err.Error()) + return nil + } + } w.initJobs() w.makeHTTPServer() return w