diff --git a/.gitignore b/.gitignore index e96a241..528694a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,59 +1,4 @@ -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] - -# C extensions -*.so - -# Distribution / packaging -.Python -env/ -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -*.egg-info/ -.installed.cfg -*.egg - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.coverage -.cache -nosetests.xml -coverage.xml - -# Translations -*.mo -*.pot - -# Django stuff: -*.log - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ - *.swp *~ -/examples/tunasync.json /*.cov +node_modules diff --git a/.travis.yml b/.travis.yml index bcba452..c943565 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,7 @@ os: - linux before_script: - - sudo cgcreate -t travis -a travis -g cpu:tunasync + - sudo cgcreate -t travis -a travis -g memory:tunasync script: - ./.testandcover.bash diff --git a/README.md b/README.md index 2e42881..4baf573 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,7 @@ tunasync [![Build Status](https://travis-ci.org/tuna/tunasync.svg?branch=dev)](https://travis-ci.org/tuna/tunasync) [![Coverage Status](https://coveralls.io/repos/github/tuna/tunasync/badge.svg?branch=dev)](https://coveralls.io/github/tuna/tunasync?branch=dev) +[![Commitizen friendly](https://img.shields.io/badge/commitizen-friendly-brightgreen.svg)](http://commitizen.github.io/cz-cli/) ![GPLv3](https://img.shields.io/badge/license-GPLv3-blue.svg) ## Design @@ -41,20 +42,6 @@ PreSyncing Syncing Success +-----------------+ ``` -## TODO - -- [x] split to `tunasync-manager` and `tunasync-worker` instances - - [x] use HTTP as communication protocol - - [x] implement manager as status server first, and use python worker - - [x] implement go worker -- Web frontend for `tunasync-manager` - - [ ] start/stop/restart job - - [ ] enable/disable mirror - - [ ] view log -- [ ] config file structure - - [ ] support multi-file configuration (`/etc/tunasync.d/mirror-enabled/*.conf`) - - ## Generate Self-Signed Certificate Fisrt, create root CA diff --git a/cmd/tunasync/tunasync.go b/cmd/tunasync/tunasync.go index afcc5c3..3cd0c20 100644 --- a/cmd/tunasync/tunasync.go +++ b/cmd/tunasync/tunasync.go @@ -8,6 +8,7 @@ import ( "github.com/codegangsta/cli" "github.com/gin-gonic/gin" + "github.com/pkg/profile" "gopkg.in/op/go-logging.v1" tunasync "github.com/tuna/tunasync/internal" @@ -57,6 +58,20 @@ func startWorker(c *cli.Context) { os.Exit(1) } + if profPath := c.String("prof-path"); profPath != "" { + valid := false + if fi, err := os.Stat(profPath); err == nil { + if fi.IsDir() { + valid = true + defer profile.Start(profile.ProfilePath(profPath)).Stop() + } + } + if !valid { + logger.Errorf("Invalid profiling path: %s", profPath) + os.Exit(1) + } + } + go func() { time.Sleep(1 * time.Second) sigChan := make(chan os.Signal, 1) @@ -70,8 +85,9 @@ func startWorker(c *cli.Context) { newCfg, err := worker.LoadConfig(c.String("config")) if err != nil { logger.Errorf("Error loading config: %s", err.Error()) + } else { + w.ReloadMirrorConfig(newCfg.Mirrors) } - w.ReloadMirrorConfig(newCfg.Mirrors) case syscall.SIGINT, syscall.SIGTERM: w.Halt() } @@ -97,7 +113,6 @@ func main() { Name: "config, c", Usage: "Load manager configurations from `FILE`", }, - cli.StringFlag{ Name: "addr", Usage: "The manager will listen on `ADDR`", @@ -126,7 +141,6 @@ func main() { Name: "db-type", Usage: "Use database type `TYPE`", }, - cli.BoolFlag{ Name: "verbose, v", Usage: "Enable verbose logging", @@ -139,7 +153,6 @@ func main() { Name: "with-systemd", Usage: "Enable systemd-compatible logging", }, - cli.StringFlag{ Name: "pidfile", Value: "/run/tunasync/tunasync.manager.pid", @@ -157,7 +170,6 @@ func main() { Name: "config, c", Usage: "Load worker configurations from `FILE`", }, - cli.BoolFlag{ Name: "verbose, v", Usage: "Enable verbose logging", @@ -170,12 +182,16 @@ func main() { Name: "with-systemd", Usage: "Enable systemd-compatible logging", }, - cli.StringFlag{ Name: "pidfile", Value: "/run/tunasync/tunasync.worker.pid", Usage: "The pid file of the worker process", }, + cli.StringFlag{ + Name: "prof-path", + Value: "", + Usage: "Go profiling file path", + }, }, }, } diff --git a/cmd/tunasynctl/tunasynctl.go b/cmd/tunasynctl/tunasynctl.go index 863a30d..98e51e2 100644 --- a/cmd/tunasynctl/tunasynctl.go +++ b/cmd/tunasynctl/tunasynctl.go @@ -20,8 +20,8 @@ const ( listWorkersPath = "/workers" cmdPath = "/cmd" - systemCfgFile = "/etc/tunasync/ctl.conf" - userCfgFile = "$HOME/.config/tunasync/ctl.conf" + systemCfgFile = "/etc/tunasync/ctl.conf" // system-wide conf + userCfgFile = "$HOME/.config/tunasync/ctl.conf" // user-specific conf ) var logger = logging.MustGetLogger("tunasynctl-cmd") @@ -29,13 +29,13 @@ var logger = logging.MustGetLogger("tunasynctl-cmd") var baseURL string var client *http.Client -func initializeWrapper(handler func(*cli.Context)) func(*cli.Context) { - return func(c *cli.Context) { +func initializeWrapper(handler cli.ActionFunc) cli.ActionFunc { + return func(c *cli.Context) error { err := initialize(c) if err != nil { - os.Exit(1) + return cli.NewExitError("", 1) } - handler(c) + return handler(c) } } @@ -45,18 +45,40 @@ type config struct { CACert string `toml:"ca_cert"` } -func loadConfig(cfgFile string, c *cli.Context) (*config, error) { - cfg := new(config) - cfg.ManagerAddr = "localhost" - cfg.ManagerPort = 14242 - +func loadConfig(cfgFile string, cfg *config) error { if cfgFile != "" { if _, err := toml.DecodeFile(cfgFile, cfg); err != nil { logger.Errorf(err.Error()) - return nil, err + return err } } + return nil +} + +func initialize(c *cli.Context) error { + // init logger + tunasync.InitLogger(c.Bool("verbose"), c.Bool("verbose"), false) + + cfg := new(config) + + // default configs + cfg.ManagerAddr = "localhost" + cfg.ManagerPort = 14242 + + // find config file and load config + if _, err := os.Stat(systemCfgFile); err == nil { + loadConfig(systemCfgFile, cfg) + } + fmt.Println(os.ExpandEnv(userCfgFile)) + if _, err := os.Stat(os.ExpandEnv(userCfgFile)); err == nil { + loadConfig(os.ExpandEnv(userCfgFile), cfg) + } + if c.String("config") != "" { + loadConfig(c.String("config"), cfg) + } + + // override config using the command-line arguments if c.String("manager") != "" { cfg.ManagerAddr = c.String("manager") } @@ -67,28 +89,6 @@ func loadConfig(cfgFile string, c *cli.Context) (*config, error) { if c.String("ca-cert") != "" { cfg.CACert = c.String("ca-cert") } - return cfg, nil -} - -func initialize(c *cli.Context) error { - // init logger - tunasync.InitLogger(c.Bool("verbose"), c.Bool("verbose"), false) - var cfgFile string - - // choose config file and load config - if c.String("config") != "" { - cfgFile = c.String("config") - } else if _, err := os.Stat(os.ExpandEnv(userCfgFile)); err == nil { - cfgFile = os.ExpandEnv(userCfgFile) - } else if _, err := os.Stat(systemCfgFile); err == nil { - cfgFile = systemCfgFile - } - cfg, err := loadConfig(cfgFile, c) - - if err != nil { - logger.Errorf("Load configuration for tunasynctl error: %s", err.Error()) - return err - } // parse base url of the manager server baseURL = fmt.Sprintf("https://%s:%d", @@ -97,6 +97,7 @@ func initialize(c *cli.Context) error { logger.Infof("Use manager address: %s", baseURL) // create HTTP client + var err error client, err = tunasync.CreateHTTPClient(cfg.CACert) if err != nil { err = fmt.Errorf("Error initializing HTTP client: %s", err.Error()) @@ -107,44 +108,54 @@ func initialize(c *cli.Context) error { return nil } -func listWorkers(c *cli.Context) { +func listWorkers(c *cli.Context) error { var workers []tunasync.WorkerStatus _, err := tunasync.GetJSON(baseURL+listWorkersPath, &workers, client) if err != nil { - logger.Errorf("Filed to correctly get informations from manager server: %s", err.Error()) - os.Exit(1) + return cli.NewExitError( + fmt.Sprintf("Filed to correctly get informations from"+ + "manager server: %s", err.Error()), 1) } b, err := json.MarshalIndent(workers, "", " ") if err != nil { - logger.Errorf("Error printing out informations: %s", err.Error()) + return cli.NewExitError( + fmt.Sprintf("Error printing out informations: %s", + err.Error()), + 1) } fmt.Print(string(b)) + return nil } -func listJobs(c *cli.Context) { +func listJobs(c *cli.Context) error { // FIXME: there should be an API on manager server side that return MirrorStatus list to tunasynctl var jobs []tunasync.MirrorStatus if c.Bool("all") { _, err := tunasync.GetJSON(baseURL+listJobsPath, &jobs, client) if err != nil { - logger.Errorf("Filed to correctly get information of all jobs from manager server: %s", err.Error()) - os.Exit(1) + return cli.NewExitError( + fmt.Sprintf("Failed to correctly get information "+ + "of all jobs from manager server: %s", err.Error()), + 1) } } else { args := c.Args() if len(args) == 0 { - logger.Error("Usage Error: jobs command need at least one arguments or \"--all\" flag.") - os.Exit(1) + return cli.NewExitError( + fmt.Sprintf("Usage Error: jobs command need at"+ + " least one arguments or \"--all\" flag."), 1) } ans := make(chan []tunasync.MirrorStatus, len(args)) for _, workerID := range args { go func(workerID string) { var workerJobs []tunasync.MirrorStatus - _, err := tunasync.GetJSON(fmt.Sprintf("%s/workers/%s/jobs", baseURL, workerID), &workerJobs, client) + _, err := tunasync.GetJSON(fmt.Sprintf("%s/workers/%s/jobs", + baseURL, workerID), &workerJobs, client) if err != nil { - logger.Errorf("Filed to correctly get jobs for worker %s: %s", workerID, err.Error()) + logger.Errorf("Filed to correctly get jobs"+ + " for worker %s: %s", workerID, err.Error()) } ans <- workerJobs }(workerID) @@ -156,13 +167,16 @@ func listJobs(c *cli.Context) { b, err := json.MarshalIndent(jobs, "", " ") if err != nil { - logger.Errorf("Error printing out informations: %s", err.Error()) + return cli.NewExitError( + fmt.Sprintf("Error printing out informations: %s", err.Error()), + 1) } fmt.Printf(string(b)) + return nil } -func cmdJob(cmd tunasync.CmdVerb) func(*cli.Context) { - return func(c *cli.Context) { +func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc { + return func(c *cli.Context) error { var mirrorID string var argsList []string if len(c.Args()) == 1 { @@ -173,8 +187,9 @@ func cmdJob(cmd tunasync.CmdVerb) func(*cli.Context) { argsList = append(argsList, strings.TrimSpace(arg)) } } else { - logger.Error("Usage Error: cmd command receive just 1 required positional argument MIRROR and 1 optional ") - os.Exit(1) + return cli.NewExitError("Usage Error: cmd command receive just "+ + "1 required positional argument MIRROR and 1 optional "+ + "argument WORKER", 1) } cmd := tunasync.ClientCmd{ @@ -185,21 +200,28 @@ func cmdJob(cmd tunasync.CmdVerb) func(*cli.Context) { } resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client) if err != nil { - logger.Errorf("Failed to correctly send command: %s", err.Error()) - os.Exit(1) + return cli.NewExitError( + fmt.Sprintf("Failed to correctly send command: %s", + err.Error()), + 1) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, err := ioutil.ReadAll(resp.Body) if err != nil { - logger.Errorf("Failed to parse response: %s", err.Error()) + return cli.NewExitError( + fmt.Sprintf("Failed to parse response: %s", err.Error()), + 1) } - logger.Errorf("Failed to correctly send command: HTTP status code is not 200: %s", body) - } else { - logger.Info("Succesfully send command") + return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+ + " command: HTTP status code is not 200: %s", body), + 1) } + logger.Info("Succesfully send command") + + return nil } } diff --git a/package.json b/package.json new file mode 100644 index 0000000..68e7469 --- /dev/null +++ b/package.json @@ -0,0 +1,13 @@ +{ + "name": "tunasync", + "version": "1.0.0b1", + "description": "This is not a node project!", + "devDependencies": { + "cz-conventional-changelog": "^1.1.6" + }, + "config": { + "commitizen": { + "path": "cz-conventional-changelog" + } + } +} diff --git a/systemd/tunasync-worker.service b/systemd/tunasync-worker.service index 1ee11dc..51c2de4 100644 --- a/systemd/tunasync-worker.service +++ b/systemd/tunasync-worker.service @@ -6,10 +6,10 @@ After=network.target Type=simple User=tunasync PermissionsStartOnly=true -ExecStartPre=/usr/bin/cgcreate -t tunasync -a tunasync -g cpu:tunasync +ExecStartPre=/usr/bin/cgcreate -t tunasync -a tunasync -g memory:tunasync ExecStart=/home/bin/tunasync worker -c /etc/tunasync/worker.conf --with-systemd ExecReload=/bin/kill -SIGHUP $MAINPID -ExecStopPost=/usr/bin/cgdelete cpu:tunasync +ExecStopPost=/usr/bin/cgdelete memory:tunasync [Install] WantedBy=multi-user.target diff --git a/worker/cgroup.go b/worker/cgroup.go index f38fc4a..097aaa7 100644 --- a/worker/cgroup.go +++ b/worker/cgroup.go @@ -13,6 +13,8 @@ import ( "github.com/codeskyblue/go-sh" ) +var cgSubsystem string = "cpu" + type cgroupHook struct { emptyHook provider mirrorProvider @@ -21,6 +23,14 @@ type cgroupHook struct { created bool } +func initCgroup(basePath string) { + if _, err := os.Stat(filepath.Join(basePath, "memory")); err == nil { + cgSubsystem = "memory" + return + } + logger.Warning("Memory subsystem of cgroup not enabled, fallback to cpu") +} + func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook { if basePath == "" { basePath = "/sys/fs/cgroup" @@ -37,7 +47,19 @@ func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook { func (c *cgroupHook) preExec() error { c.created = true - return sh.Command("cgcreate", "-g", c.Cgroup()).Run() + if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil { + return err + } + if cgSubsystem != "memory" { + return nil + } + if c.provider.Type() == provRsync || c.provider.Type() == provTwoStageRsync { + gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name()) + return sh.Command( + "cgset", "-r", "memory.limit_in_bytes=128M", gname, + ).Run() + } + return nil } func (c *cgroupHook) postExec() error { @@ -52,7 +74,7 @@ func (c *cgroupHook) postExec() error { func (c *cgroupHook) Cgroup() string { name := c.provider.Name() - return fmt.Sprintf("cpu:%s/%s", c.baseGroup, name) + return fmt.Sprintf("%s:%s/%s", cgSubsystem, c.baseGroup, name) } func (c *cgroupHook) killAll() error { @@ -60,7 +82,7 @@ func (c *cgroupHook) killAll() error { return nil } name := c.provider.Name() - taskFile, err := os.Open(filepath.Join(c.basePath, "cpu", c.baseGroup, name, "tasks")) + taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks")) if err != nil { return err } diff --git a/worker/cgroup_test.go b/worker/cgroup_test.go index ba46db1..8706fac 100644 --- a/worker/cgroup_test.go +++ b/worker/cgroup_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strconv" "strings" "testing" "time" @@ -71,6 +72,7 @@ sleep 30 provider, err := newCmdProvider(c) So(err, ShouldBeNil) + initCgroup("/sys/fs/cgroup") cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync") provider.AddHook(cg) @@ -105,4 +107,38 @@ sleep 30 So(os.IsNotExist(err), ShouldBeTrue) }) + + Convey("Rsync Memory Should Be Limited", t, func() { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + scriptFile := filepath.Join(tmpDir, "myrsync") + tmpFile := filepath.Join(tmpDir, "log_file") + + c := rsyncConfig{ + name: "tuna-cgroup", + upstreamURL: "rsync://rsync.tuna.moe/tuna/", + rsyncCmd: scriptFile, + workingDir: tmpDir, + logDir: tmpDir, + logFile: tmpFile, + useIPv6: true, + interval: 600 * time.Second, + } + + provider, err := newRsyncProvider(c) + So(err, ShouldBeNil) + + initCgroup("/sys/fs/cgroup") + cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync") + provider.AddHook(cg) + + cg.preExec() + if cgSubsystem == "memory" { + memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes")) + So(err, ShouldBeNil) + So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(128*1024*1024)) + } + cg.postExec() + }) } diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index 7a1d413..8ce3d70 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -44,6 +44,10 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) { return provider, nil } +func (p *cmdProvider) Type() providerEnum { + return provCommand +} + func (p *cmdProvider) Upstream() string { return p.upstreamURL } diff --git a/worker/provider.go b/worker/provider.go index 2791b56..33100fb 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -10,8 +10,6 @@ import ( // mirror provider is the wrapper of mirror jobs -type providerType uint8 - const ( _WorkingDirKey = "working_dir" _LogDirKey = "log_dir" @@ -24,6 +22,8 @@ type mirrorProvider interface { Name() string Upstream() string + Type() providerEnum + // run mirror job in background Run() error // run mirror job in background diff --git a/worker/provider_test.go b/worker/provider_test.go index 10fbceb..c157de6 100644 --- a/worker/provider_test.go +++ b/worker/provider_test.go @@ -33,6 +33,7 @@ func TestRsyncProvider(t *testing.T) { provider, err := newRsyncProvider(c) So(err, ShouldBeNil) + So(provider.Type(), ShouldEqual, provRsync) So(provider.Name(), ShouldEqual, c.name) So(provider.WorkingDir(), ShouldEqual, c.workingDir) So(provider.LogDir(), ShouldEqual, c.logDir) @@ -126,6 +127,7 @@ func TestCmdProvider(t *testing.T) { provider, err := newCmdProvider(c) So(err, ShouldBeNil) + So(provider.Type(), ShouldEqual, provCommand) So(provider.Name(), ShouldEqual, c.name) So(provider.WorkingDir(), ShouldEqual, c.workingDir) So(provider.LogDir(), ShouldEqual, c.logDir) @@ -218,6 +220,7 @@ func TestTwoStageRsyncProvider(t *testing.T) { provider, err := newTwoStageRsyncProvider(c) So(err, ShouldBeNil) + So(provider.Type(), ShouldEqual, provTwoStageRsync) So(provider.Name(), ShouldEqual, c.name) So(provider.WorkingDir(), ShouldEqual, c.workingDir) So(provider.LogDir(), ShouldEqual, c.logDir) diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index c3cdefc..2d70174 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -63,6 +63,10 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) { return provider, nil } +func (p *rsyncProvider) Type() providerEnum { + return provRsync +} + func (p *rsyncProvider) Upstream() string { return p.upstreamURL } diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go index b27cea5..37799b3 100644 --- a/worker/two_stage_rsync_provider.go +++ b/worker/two_stage_rsync_provider.go @@ -70,6 +70,10 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er return provider, nil } +func (p *twoStageRsyncProvider) Type() providerEnum { + return provTwoStageRsync +} + func (p *twoStageRsyncProvider) Upstream() string { return p.upstreamURL } diff --git a/worker/worker.go b/worker/worker.go index 61daf21..bd0f1a9 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -53,6 +53,9 @@ func GetTUNASyncWorker(cfg *Config) *Worker { w.httpClient = httpClient } + if cfg.Cgroup.Enable { + initCgroup(cfg.Cgroup.BasePath) + } w.initJobs() w.makeHTTPServer() tunasyncWorker = w