From 28c8145137cb1ef5f3d559c85feb21962497bb89 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Sun, 8 May 2016 17:24:41 +0800 Subject: [PATCH] feature(worker): limit rsync memory using cgroup --- .travis.yml | 2 +- systemd/tunasync-worker.service | 4 ++-- worker/cgroup.go | 28 ++++++++++++++++++++++--- worker/cgroup_test.go | 36 +++++++++++++++++++++++++++++++++ worker/worker.go | 1 + 5 files changed, 65 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index bcba452..c943565 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,7 @@ os: - linux before_script: - - sudo cgcreate -t travis -a travis -g cpu:tunasync + - sudo cgcreate -t travis -a travis -g memory:tunasync script: - ./.testandcover.bash diff --git a/systemd/tunasync-worker.service b/systemd/tunasync-worker.service index 1ee11dc..51c2de4 100644 --- a/systemd/tunasync-worker.service +++ b/systemd/tunasync-worker.service @@ -6,10 +6,10 @@ After=network.target Type=simple User=tunasync PermissionsStartOnly=true -ExecStartPre=/usr/bin/cgcreate -t tunasync -a tunasync -g cpu:tunasync +ExecStartPre=/usr/bin/cgcreate -t tunasync -a tunasync -g memory:tunasync ExecStart=/home/bin/tunasync worker -c /etc/tunasync/worker.conf --with-systemd ExecReload=/bin/kill -SIGHUP $MAINPID -ExecStopPost=/usr/bin/cgdelete cpu:tunasync +ExecStopPost=/usr/bin/cgdelete memory:tunasync [Install] WantedBy=multi-user.target diff --git a/worker/cgroup.go b/worker/cgroup.go index f38fc4a..097aaa7 100644 --- a/worker/cgroup.go +++ b/worker/cgroup.go @@ -13,6 +13,8 @@ import ( "github.com/codeskyblue/go-sh" ) +var cgSubsystem string = "cpu" + type cgroupHook struct { emptyHook provider mirrorProvider @@ -21,6 +23,14 @@ type cgroupHook struct { created bool } +func initCgroup(basePath string) { + if _, err := os.Stat(filepath.Join(basePath, "memory")); err == nil { + cgSubsystem = "memory" + return + } + logger.Warning("Memory subsystem of cgroup not enabled, fallback to cpu") +} + func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook { if basePath == "" { basePath = "/sys/fs/cgroup" @@ -37,7 +47,19 @@ func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook { func (c *cgroupHook) preExec() error { c.created = true - return sh.Command("cgcreate", "-g", c.Cgroup()).Run() + if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil { + return err + } + if cgSubsystem != "memory" { + return nil + } + if c.provider.Type() == provRsync || c.provider.Type() == provTwoStageRsync { + gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name()) + return sh.Command( + "cgset", "-r", "memory.limit_in_bytes=128M", gname, + ).Run() + } + return nil } func (c *cgroupHook) postExec() error { @@ -52,7 +74,7 @@ func (c *cgroupHook) postExec() error { func (c *cgroupHook) Cgroup() string { name := c.provider.Name() - return fmt.Sprintf("cpu:%s/%s", c.baseGroup, name) + return fmt.Sprintf("%s:%s/%s", cgSubsystem, c.baseGroup, name) } func (c *cgroupHook) killAll() error { @@ -60,7 +82,7 @@ func (c *cgroupHook) killAll() error { return nil } name := c.provider.Name() - taskFile, err := os.Open(filepath.Join(c.basePath, "cpu", c.baseGroup, name, "tasks")) + taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks")) if err != nil { return err } diff --git a/worker/cgroup_test.go b/worker/cgroup_test.go index ba46db1..8706fac 100644 --- a/worker/cgroup_test.go +++ b/worker/cgroup_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strconv" "strings" "testing" "time" @@ -71,6 +72,7 @@ sleep 30 provider, err := newCmdProvider(c) So(err, ShouldBeNil) + initCgroup("/sys/fs/cgroup") cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync") provider.AddHook(cg) @@ -105,4 +107,38 @@ sleep 30 So(os.IsNotExist(err), ShouldBeTrue) }) + + Convey("Rsync Memory Should Be Limited", t, func() { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + scriptFile := filepath.Join(tmpDir, "myrsync") + tmpFile := filepath.Join(tmpDir, "log_file") + + c := rsyncConfig{ + name: "tuna-cgroup", + upstreamURL: "rsync://rsync.tuna.moe/tuna/", + rsyncCmd: scriptFile, + workingDir: tmpDir, + logDir: tmpDir, + logFile: tmpFile, + useIPv6: true, + interval: 600 * time.Second, + } + + provider, err := newRsyncProvider(c) + So(err, ShouldBeNil) + + initCgroup("/sys/fs/cgroup") + cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync") + provider.AddHook(cg) + + cg.preExec() + if cgSubsystem == "memory" { + memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes")) + So(err, ShouldBeNil) + So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(128*1024*1024)) + } + cg.postExec() + }) } diff --git a/worker/worker.go b/worker/worker.go index 61daf21..cca1069 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -53,6 +53,7 @@ func GetTUNASyncWorker(cfg *Config) *Worker { w.httpClient = httpClient } + initCgroup(cfg.Cgroup.BasePath) w.initJobs() w.makeHTTPServer() tunasyncWorker = w