diff --git a/worker/base_provider.go b/worker/base_provider.go index 82ef419..0befa61 100644 --- a/worker/base_provider.go +++ b/worker/base_provider.go @@ -24,7 +24,9 @@ type baseProvider struct { cgroup *cgroupHook zfs *zfsHook - hooks []jobHook + docker *dockerHook + + hooks []jobHook } func (p *baseProvider) Name() string { @@ -87,6 +89,8 @@ func (p *baseProvider) AddHook(hook jobHook) { p.cgroup = v case *zfsHook: p.zfs = v + case *dockerHook: + p.docker = v } p.hooks = append(p.hooks, hook) } @@ -103,6 +107,10 @@ func (p *baseProvider) ZFS() *zfsHook { return p.zfs } +func (p *baseProvider) Docker() *dockerHook { + return p.docker +} + func (p *baseProvider) prepareLogFile() error { if p.LogFile() == "/dev/null" { p.cmd.SetLogFile(nil) diff --git a/worker/config.go b/worker/config.go index 51a69c1..93ee27c 100644 --- a/worker/config.go +++ b/worker/config.go @@ -38,6 +38,7 @@ type Config struct { Server serverConfig `toml:"server"` Cgroup cgroupConfig `toml:"cgroup"` ZFS zfsConfig `toml:"zfs"` + Docker dockerConfig `toml:"docker"` Include includeConfig `toml:"include"` Mirrors []mirrorConfig `toml:"mirrors"` } @@ -74,6 +75,12 @@ type cgroupConfig struct { Group string `toml:"group"` } +type dockerConfig struct { + Enable bool `toml:"enable"` + Volumes []string `toml:"volumes"` + Options []string `toml:"options"` +} + type zfsConfig struct { Enable bool `toml:"enable"` Zpool string `toml:"zpool"` @@ -111,6 +118,10 @@ type mirrorConfig struct { Username string `toml:"username"` Password string `toml:"password"` Stage1Profile string `toml:"stage1_profile"` + + DockerImage string `toml:"docker_image"` + DockerVolumes []string `toml:"docker_volumes"` + DockerOptions []string `toml:"docker_options"` } // LoadConfig loads configuration diff --git a/worker/docker.go b/worker/docker.go new file mode 100644 index 0000000..a94cbe3 --- /dev/null +++ b/worker/docker.go @@ -0,0 +1,98 @@ +package worker + +import ( + "fmt" + "os" +) + +type dockerHook struct { + emptyHook + provider mirrorProvider + image string + volumes []string + options []string +} + +func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dockerHook { + volumes := []string{} + volumes = append(volumes, gCfg.Volumes...) + volumes = append(volumes, mCfg.DockerVolumes...) + + options := []string{} + options = append(options, gCfg.Options...) + options = append(options, mCfg.DockerOptions...) + + return &dockerHook{ + provider: p, + image: mCfg.DockerImage, + volumes: volumes, + options: options, + } +} + +func (d *dockerHook) preExec() error { + p := d.provider + logFile := p.LogFile() + workingDir := p.WorkingDir() + + if _, err := os.Stat(workingDir); os.IsNotExist(err) { + logger.Debugf("Making dir %s", workingDir) + if err = os.MkdirAll(workingDir, 0755); err != nil { + return fmt.Errorf("Error making dir %s: %s", workingDir, err.Error()) + } + } + + logFileNew := "/log_latest" + workingDirNew := "/data" + + // Override workingDir + ctx := p.EnterContext() + ctx.Set(_WorkingDirKey, workingDirNew) + ctx.Set(_LogFileKey+":docker", logFileNew) + ctx.Set( + "volumes", []string{ + fmt.Sprintf("%s:%s", logFile, logFileNew), + fmt.Sprintf("%s:%s", workingDir, workingDirNew), + }, + ) + return nil +} + +func (d *dockerHook) postExec() error { + // sh.Command( + // "docker", "rm", "-f", d.Name(), + // ).Run() + d.provider.ExitContext() + return nil +} + +// Volumes returns the configured volumes and +// runtime-needed volumes, including mirror dirs +// and log files +func (d *dockerHook) Volumes() []string { + vols := make([]string, len(d.volumes)) + copy(vols, d.volumes) + + p := d.provider + ctx := p.Context() + if ivs, ok := ctx.Get("volumes"); ok { + vs := ivs.([]string) + vols = append(vols, vs...) + } + return vols +} + +func (d *dockerHook) LogFile() string { + p := d.provider + ctx := p.Context() + if iv, ok := ctx.Get(_LogFileKey + ":docker"); ok { + v := iv.(string) + return v + } + return p.LogFile() +} + +func (d *dockerHook) Name() string { + p := d.provider + return "tunasync-job-" + p.Name() +} diff --git a/worker/docker_test.go b/worker/docker_test.go new file mode 100644 index 0000000..29526c2 --- /dev/null +++ b/worker/docker_test.go @@ -0,0 +1,97 @@ +package worker + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/codeskyblue/go-sh" + . "github.com/smartystreets/goconvey/convey" +) + +func getDockerByName(name string) (string, error) { + // docker ps -f 'name=$name' --format '{{.Names}}' + out, err := sh.Command( + "docker", "ps", + "--filter", "name="+name, + "--format", "{{.Names}}", + ).Output() + return string(out), err +} + +func TestDocker(t *testing.T) { + Convey("Docker Should Work", t, func(ctx C) { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + cmdScript := filepath.Join(tmpDir, "cmd.sh") + tmpFile := filepath.Join(tmpDir, "log_file") + expectedOutput := "HELLO_WORLD" + + c := cmdConfig{ + name: "tuna-docker", + upstreamURL: "http://mirrors.tuna.moe/", + command: "/bin/cmd.sh", + workingDir: tmpDir, + logDir: tmpDir, + logFile: tmpFile, + interval: 600 * time.Second, + env: map[string]string{ + "TEST_CONTENT": expectedOutput, + }, + } + + cmdScriptContent := `#!/bin/sh +echo ${TEST_CONTENT} +sleep 10 +` + err = ioutil.WriteFile(cmdScript, []byte(cmdScriptContent), 0755) + So(err, ShouldBeNil) + + provider, err := newCmdProvider(c) + So(err, ShouldBeNil) + + d := &dockerHook{ + provider: provider, + image: "alpine", + volumes: []string{ + fmt.Sprintf("%s:%s", cmdScript, "/bin/cmd.sh"), + }, + } + provider.AddHook(d) + So(provider.Docker(), ShouldNotBeNil) + + err = d.preExec() + So(err, ShouldBeNil) + + go func() { + err = provider.Run() + ctx.So(err, ShouldNotBeNil) + }() + + time.Sleep(1 * time.Second) + + // assert container running + names, err := getDockerByName(d.Name()) + So(err, ShouldBeNil) + So(names, ShouldEqual, d.Name()+"\n") + + err = provider.Terminate() + So(err, ShouldBeNil) + + // container should be terminated and removed + names, err = getDockerByName(d.Name()) + So(err, ShouldBeNil) + So(names, ShouldEqual, "") + + // check log content + loggedContent, err := ioutil.ReadFile(provider.LogFile()) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, expectedOutput+"\n") + + d.postExec() + }) +} diff --git a/worker/provider.go b/worker/provider.go index cdc9a18..94d4e20 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -38,6 +38,8 @@ type mirrorProvider interface { Cgroup() *cgroupHook // ZFS ZFS() *zfsHook + // Docker + Docker() *dockerHook AddHook(hook jobHook) Hooks() []jobHook @@ -169,8 +171,12 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool)) } - // Add Cgroup Hook - if cfg.Cgroup.Enable { + // Add Docker Hook + if cfg.Docker.Enable && len(mirror.DockerImage) > 0 { + provider.AddHook(newDockerHook(provider, cfg.Docker, mirror)) + + } else if cfg.Cgroup.Enable { + // Add Cgroup Hook provider.AddHook( newCgroupHook(provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group), ) diff --git a/worker/runner.go b/worker/runner.go index ff0399a..8d45f99 100644 --- a/worker/runner.go +++ b/worker/runner.go @@ -2,6 +2,7 @@ package worker import ( "errors" + "fmt" "os" "os/exec" "strings" @@ -9,6 +10,7 @@ import ( "syscall" "time" + "github.com/codeskyblue/go-sh" "golang.org/x/sys/unix" ) @@ -31,11 +33,40 @@ type cmdJob struct { func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob { var cmd *exec.Cmd - if provider.Cgroup() != nil { + if d := provider.Docker(); d != nil { + c := "docker" + args := []string{ + "run", "--rm", + "-a", "STDOUT", "-a", "STDERR", + "--name", d.Name(), + "-w", workingDir, + } + // add volumes + for _, vol := range d.Volumes() { + logger.Debugf("volume: %s", vol) + args = append(args, "-v", vol) + } + // set env + env["TUNASYNC_LOG_FILE"] = d.LogFile() + for k, v := range env { + kv := fmt.Sprintf("%s=%s", k, v) + args = append(args, "-e", kv) + } + // apply options + args = append(args, d.options...) + // apply image and command + args = append(args, d.image) + // apply command + args = append(args, cmdAndArgs...) + + cmd = exec.Command(c, args...) + + } else if provider.Cgroup() != nil { c := "cgexec" args := []string{"-g", provider.Cgroup().Cgroup()} args = append(args, cmdAndArgs...) cmd = exec.Command(c, args...) + } else { if len(cmdAndArgs) == 1 { cmd = exec.Command(cmdAndArgs[0]) @@ -48,25 +79,28 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, } } - logger.Debugf("Executing command %s at %s", cmdAndArgs[0], workingDir) - if _, err := os.Stat(workingDir); os.IsNotExist(err) { - logger.Debugf("Making dir %s", workingDir) - if err = os.MkdirAll(workingDir, 0755); err != nil { - logger.Errorf("Error making dir %s", workingDir) + if provider.Docker() == nil { + logger.Debugf("Executing command %s at %s", cmdAndArgs[0], workingDir) + if _, err := os.Stat(workingDir); os.IsNotExist(err) { + logger.Debugf("Making dir %s", workingDir) + if err = os.MkdirAll(workingDir, 0755); err != nil { + logger.Errorf("Error making dir %s: %s", workingDir, err.Error()) + } } + cmd.Dir = workingDir + cmd.Env = newEnviron(env, true) } - cmd.Dir = workingDir - cmd.Env = newEnviron(env, true) - return &cmdJob{ cmd: cmd, workingDir: workingDir, env: env, + provider: provider, } } func (c *cmdJob) Start() error { + // logger.Debugf("Command start: %v", c.cmd.Args) c.finished = make(chan empty, 1) return c.cmd.Start() } @@ -95,6 +129,14 @@ func (c *cmdJob) Terminate() error { if c.cmd == nil || c.cmd.Process == nil { return errProcessNotStarted } + + if d := c.provider.Docker(); d != nil { + sh.Command( + "docker", "stop", "-t", "2", d.Name(), + ).Run() + return nil + } + err := unix.Kill(c.cmd.Process.Pid, syscall.SIGTERM) if err != nil { return err