From 924fda6dd89fd0b1b9e723e50ed962f603d4b930 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Fri, 29 Apr 2016 16:05:15 +0800 Subject: [PATCH] feature(worker): use cgroup track job process, so that they can be all-killed --- .travis.yml | 8 ++- tests/worker.conf | 5 ++ worker/cgroup.go | 83 ++++++++++++++++++++++ worker/cgroup_test.go | 108 +++++++++++++++++++++++++++++ worker/cmd_provider.go | 2 +- worker/config.go | 7 ++ worker/provider.go | 12 +++- worker/rsync_provider.go | 2 +- worker/runner.go | 26 ++++--- worker/two_stage_rsync_provider.go | 2 +- worker/worker.go | 12 +++- 11 files changed, 250 insertions(+), 17 deletions(-) create mode 100644 worker/cgroup.go create mode 100644 worker/cgroup_test.go diff --git a/.travis.yml b/.travis.yml index d6a4831..bcba452 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,12 +3,16 @@ go: - 1.6 before_install: - - go get golang.org/x/tools/cmd/cover - - go get -v github.com/mattn/goveralls + - sudo apt-get install cgroup-bin + - go get golang.org/x/tools/cmd/cover + - go get -v github.com/mattn/goveralls os: - linux +before_script: + - sudo cgcreate -t travis -a travis -g cpu:tunasync + script: - ./.testandcover.bash diff --git a/tests/worker.conf b/tests/worker.conf index 0f8f42a..2a27567 100644 --- a/tests/worker.conf +++ b/tests/worker.conf @@ -10,6 +10,11 @@ api_base = "https://localhost:12345" token = "some_token" ca_cert = "rootCA.crt" +[cgroup] +enable = true +base_path = "/sys/fs/cgroup" +group = "tunasync" + [server] hostname = "localhost" listen_addr = "127.0.0.1" diff --git a/worker/cgroup.go b/worker/cgroup.go new file mode 100644 index 0000000..b31eb3f --- /dev/null +++ b/worker/cgroup.go @@ -0,0 +1,83 @@ +package worker + +import ( + "bufio" + "fmt" + "os" + "path/filepath" + "strconv" + "syscall" + + "golang.org/x/sys/unix" + + "github.com/codeskyblue/go-sh" +) + +type cgroupHook struct { + emptyHook + provider mirrorProvider + basePath string + baseGroup string + created bool +} + +func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook { + if basePath == "" { + basePath = "/sys/fs/cgroup" + } + if baseGroup == "" { + baseGroup = "tunasync" + } + return &cgroupHook{ + provider: p, + basePath: basePath, + baseGroup: baseGroup, + } +} + +func (c *cgroupHook) preExec() error { + c.created = true + return sh.Command("cgcreate", "-g", c.Cgroup()).Run() +} + +func (c *cgroupHook) postExec() error { + err := c.killAll() + if err != nil { + logger.Error("Error killing tasks: %s", err.Error()) + } + + c.created = false + return sh.Command("cgdelete", c.Cgroup()).Run() +} + +func (c *cgroupHook) Cgroup() string { + name := c.provider.Name() + return fmt.Sprintf("cpu:%s/%s", c.baseGroup, name) +} + +func (c *cgroupHook) killAll() error { + if !c.created { + return nil + } + name := c.provider.Name() + taskFile, err := os.Open(filepath.Join(c.basePath, "cpu", c.baseGroup, name, "tasks")) + if err != nil { + return err + } + defer taskFile.Close() + taskList := []int{} + scanner := bufio.NewScanner(taskFile) + for scanner.Scan() { + pid, err := strconv.Atoi(scanner.Text()) + if err != nil { + return err + } + taskList = append(taskList, pid) + } + for _, pid := range taskList { + logger.Debug("Killing process: %d", pid) + unix.Kill(pid, syscall.SIGKILL) + } + + return nil +} diff --git a/worker/cgroup_test.go b/worker/cgroup_test.go new file mode 100644 index 0000000..ba46db1 --- /dev/null +++ b/worker/cgroup_test.go @@ -0,0 +1,108 @@ +package worker + +import ( + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestCgroup(t *testing.T) { + Convey("Cgroup Should Work", t, func(ctx C) { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + cmdScript := filepath.Join(tmpDir, "cmd.sh") + daemonScript := filepath.Join(tmpDir, "daemon.sh") + tmpFile := filepath.Join(tmpDir, "log_file") + bgPidfile := filepath.Join(tmpDir, "bg.pid") + + c := cmdConfig{ + name: "tuna-cgroup", + upstreamURL: "http://mirrors.tuna.moe/", + command: cmdScript + " " + daemonScript, + workingDir: tmpDir, + logDir: tmpDir, + logFile: tmpFile, + interval: 600 * time.Second, + env: map[string]string{ + "BG_PIDFILE": bgPidfile, + }, + } + cmdScriptContent := `#!/bin/bash +redirect-std() { + [[ -t 0 ]] && exec /dev/null + [[ -t 2 ]] && exec 2>/dev/null +} + +# close all non-std* fds +close-fds() { + eval exec {3..255}\>\&- +} + +# full daemonization of external command with setsid +daemonize() { + ( + redirect-std + cd / + close-fds + exec setsid "$@" + ) & +} + +echo $$ +daemonize $@ +sleep 5 +` + daemonScriptContent := `#!/bin/bash +echo $$ > $BG_PIDFILE +sleep 30 +` + err = ioutil.WriteFile(cmdScript, []byte(cmdScriptContent), 0755) + So(err, ShouldBeNil) + err = ioutil.WriteFile(daemonScript, []byte(daemonScriptContent), 0755) + So(err, ShouldBeNil) + + provider, err := newCmdProvider(c) + So(err, ShouldBeNil) + + cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync") + provider.AddHook(cg) + + err = cg.preExec() + So(err, ShouldBeNil) + + go func() { + err = provider.Run() + ctx.So(err, ShouldNotBeNil) + }() + + time.Sleep(1 * time.Second) + // Deamon should be started + daemonPidBytes, err := ioutil.ReadFile(bgPidfile) + So(err, ShouldBeNil) + daemonPid := strings.Trim(string(daemonPidBytes), " \n") + logger.Debug("daemon pid: %s", daemonPid) + procDir := filepath.Join("/proc", daemonPid) + _, err = os.Stat(procDir) + So(err, ShouldBeNil) + + err = provider.Terminate() + So(err, ShouldBeNil) + + // Deamon won't be killed + _, err = os.Stat(procDir) + So(err, ShouldBeNil) + + // Deamon can be killed by cgroup killer + cg.postExec() + _, err = os.Stat(procDir) + So(os.IsNotExist(err), ShouldBeTrue) + + }) +} diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index 8ed8f9e..7a1d413 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -65,7 +65,7 @@ func (p *cmdProvider) Start() error { for k, v := range p.env { env[k] = v } - p.cmd = newCmdJob(p.command, p.WorkingDir(), env) + p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env) if err := p.prepareLogFile(); err != nil { return err } diff --git a/worker/config.go b/worker/config.go index bb3bd12..9b6c493 100644 --- a/worker/config.go +++ b/worker/config.go @@ -35,6 +35,7 @@ type Config struct { Global globalConfig `toml:"global"` Manager managerConfig `toml:"manager"` Server serverConfig `toml:"server"` + Cgroup cgroupConfig `toml:"cgroup"` Mirrors []mirrorConfig `toml:"mirrors"` } @@ -60,6 +61,12 @@ type serverConfig struct { SSLKey string `toml:"ssl_key"` } +type cgroupConfig struct { + Enable bool `toml:"enable"` + BasePath string `toml:"base_path"` + Group string `toml:"group"` +} + type mirrorConfig struct { Name string `toml:"name"` Provider ProviderEnum `toml:"provider"` diff --git a/worker/provider.go b/worker/provider.go index cf757f0..d83038e 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -33,6 +33,8 @@ type mirrorProvider interface { Terminate() error // job hooks IsRunning() bool + // Cgroup + Cgroup() *cgroupHook AddHook(hook jobHook) Hooks() []jobHook @@ -63,7 +65,8 @@ type baseProvider struct { logFile *os.File - hooks []jobHook + cgroup *cgroupHook + hooks []jobHook } func (p *baseProvider) Name() string { @@ -117,6 +120,9 @@ func (p *baseProvider) LogFile() string { } func (p *baseProvider) AddHook(hook jobHook) { + if cg, ok := hook.(*cgroupHook); ok { + p.cgroup = cg + } p.hooks = append(p.hooks, hook) } @@ -124,6 +130,10 @@ func (p *baseProvider) Hooks() []jobHook { return p.hooks } +func (p *baseProvider) Cgroup() *cgroupHook { + return p.cgroup +} + func (p *baseProvider) prepareLogFile() error { if p.LogFile() == "/dev/null" { p.cmd.SetLogFile(nil) diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index 49153c9..c3cdefc 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -84,7 +84,7 @@ func (p *rsyncProvider) Start() error { command = append(command, p.options...) command = append(command, p.upstreamURL, p.WorkingDir()) - p.cmd = newCmdJob(command, p.WorkingDir(), env) + p.cmd = newCmdJob(p, command, p.WorkingDir(), env) if err := p.prepareLogFile(); err != nil { return err } diff --git a/worker/runner.go b/worker/runner.go index 49e27bd..8410354 100644 --- a/worker/runner.go +++ b/worker/runner.go @@ -13,7 +13,6 @@ import ( // runner is to run os commands giving command line, env and log file // it's an alternative to python-sh or go-sh -// TODO: cgroup excution var errProcessNotStarted = errors.New("Process Not Started") @@ -23,18 +22,27 @@ type cmdJob struct { env map[string]string logFile *os.File finished chan empty + provider mirrorProvider } -func newCmdJob(cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob { +func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob { var cmd *exec.Cmd - if len(cmdAndArgs) == 1 { - cmd = exec.Command(cmdAndArgs[0]) - } else if len(cmdAndArgs) > 1 { - c := cmdAndArgs[0] - args := cmdAndArgs[1:] + + if provider.Cgroup() != nil { + c := "cgexec" + args := []string{"-g", provider.Cgroup().Cgroup()} + args = append(args, cmdAndArgs...) cmd = exec.Command(c, args...) - } else if len(cmdAndArgs) == 0 { - panic("Command length should be at least 1!") + } else { + if len(cmdAndArgs) == 1 { + cmd = exec.Command(cmdAndArgs[0]) + } else if len(cmdAndArgs) > 1 { + c := cmdAndArgs[0] + args := cmdAndArgs[1:] + cmd = exec.Command(c, args...) + } else if len(cmdAndArgs) == 0 { + panic("Command length should be at least 1!") + } } logger.Debug("Executing command %s at %s", cmdAndArgs[0], workingDir) diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go index 5a53716..b27cea5 100644 --- a/worker/two_stage_rsync_provider.go +++ b/worker/two_stage_rsync_provider.go @@ -120,7 +120,7 @@ func (p *twoStageRsyncProvider) Run() error { command = append(command, options...) command = append(command, p.upstreamURL, p.WorkingDir()) - p.cmd = newCmdJob(command, p.WorkingDir(), env) + p.cmd = newCmdJob(p, command, p.WorkingDir(), env) if err := p.prepareLogFile(); err != nil { return err } diff --git a/worker/worker.go b/worker/worker.go index 7f31bf8..c780b94 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -152,6 +152,14 @@ func (w *Worker) initProviders() { } provider.AddHook(newLogLimiter(provider)) + + // Add Cgroup Hook + if w.cfg.Cgroup.Enable { + provider.AddHook( + newCgroupHook(provider, w.cfg.Cgroup.BasePath, w.cfg.Cgroup.Group), + ) + } + w.providers[provider.Name()] = provider } @@ -198,13 +206,13 @@ func (w *Worker) makeHTTPServer() { case CmdStop: // if job is disabled, no goroutine would be there // receiving this signal + w.schedule.Remove(job.Name()) if job.State() != stateDisabled { - w.schedule.Remove(job.Name()) job.ctrlChan <- jobStop } case CmdDisable: + w.schedule.Remove(job.Name()) if job.State() != stateDisabled { - w.schedule.Remove(job.Name()) job.ctrlChan <- jobDisable <-job.disabled }