Fix code formatting according to go-staticcheck

This commit is contained in:
kebyn 2022-03-12 13:16:45 +00:00
parent c07aaffe65
commit d8b2675fda
15 changed files with 309 additions and 308 deletions

View File

@ -9,10 +9,10 @@ import (
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/pkg/profile"
"gopkg.in/op/go-logging.v1"
"github.com/urfave/cli"
"github.com/moby/moby/pkg/reexec" "github.com/moby/moby/pkg/reexec"
"github.com/pkg/profile"
"github.com/urfave/cli"
"gopkg.in/op/go-logging.v1"
tunasync "github.com/tuna/tunasync/internal" tunasync "github.com/tuna/tunasync/internal"
"github.com/tuna/tunasync/manager" "github.com/tuna/tunasync/manager"

View File

@ -12,32 +12,32 @@ import (
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"github.com/moby/moby/pkg/reexec"
cgv1 "github.com/containerd/cgroups" cgv1 "github.com/containerd/cgroups"
cgv2 "github.com/containerd/cgroups/v2" cgv2 "github.com/containerd/cgroups/v2"
"github.com/moby/moby/pkg/reexec"
contspecs "github.com/opencontainers/runtime-spec/specs-go" contspecs "github.com/opencontainers/runtime-spec/specs-go"
) )
type cgroupHook struct { type cgroupHook struct {
emptyHook emptyHook
cgCfg cgroupConfig cgCfg cgroupConfig
memLimit MemBytes memLimit MemBytes
cgMgrV1 cgv1.Cgroup cgMgrV1 cgv1.Cgroup
cgMgrV2 *cgv2.Manager cgMgrV2 *cgv2.Manager
} }
type execCmd string type execCmd string
const ( const (
cmdCont execCmd = "cont" cmdCont execCmd = "cont"
cmdAbrt execCmd = "abrt" cmdAbrt execCmd = "abrt"
) )
func init () { func init() {
reexec.Register("tunasync-exec", waitExec) reexec.Register("tunasync-exec", waitExec)
} }
func waitExec () { func waitExec() {
binary, err := exec.LookPath(os.Args[1]) binary, err := exec.LookPath(os.Args[1])
if err != nil { if err != nil {
panic(err) panic(err)
@ -51,14 +51,15 @@ func waitExec () {
panic(err) panic(err)
} }
if err := pipe.Close(); err != nil { if err := pipe.Close(); err != nil {
panic(err)
} }
cmd := execCmd(string(cmdBytes)) cmd := execCmd(string(cmdBytes))
switch cmd { switch cmd {
case cmdAbrt: case cmdAbrt:
fallthrough fallthrough
default: case cmdCont:
panic("Exited on request") default:
case cmdCont: panic("Exited on request")
} }
} }
} }
@ -71,7 +72,7 @@ func waitExec () {
panic("Exec failed.") panic("Exec failed.")
} }
func initCgroup(cfg *cgroupConfig) (error) { func initCgroup(cfg *cgroupConfig) error {
logger.Debugf("Initializing cgroup") logger.Debugf("Initializing cgroup")
baseGroup := cfg.Group baseGroup := cfg.Group
@ -103,7 +104,7 @@ func initCgroup(cfg *cgroupConfig) (error) {
} }
if baseGroup == "" { if baseGroup == "" {
logger.Debugf("Creating a sub group and move all processes into it") logger.Debugf("Creating a sub group and move all processes into it")
wkrMgr, err := cfg.cgMgrV2.NewChild("__worker", nil); wkrMgr, err := cfg.cgMgrV2.NewChild("__worker", nil)
if err != nil { if err != nil {
return err return err
} }
@ -117,8 +118,8 @@ func initCgroup(cfg *cgroupConfig) (error) {
if len(procs) == 0 { if len(procs) == 0 {
break break
} }
for _, p := range(procs) { for _, p := range procs {
if err := wkrMgr.AddProc(p); err != nil{ if err := wkrMgr.AddProc(p); err != nil {
if errors.Is(err, syscall.ESRCH) { if errors.Is(err, syscall.ESRCH) {
logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring") logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
} else { } else {
@ -129,7 +130,7 @@ func initCgroup(cfg *cgroupConfig) (error) {
} }
} else { } else {
logger.Debugf("Trying to create a sub group in that group") logger.Debugf("Trying to create a sub group in that group")
testMgr, err := cfg.cgMgrV2.NewChild("__test", nil); testMgr, err := cfg.cgMgrV2.NewChild("__test", nil)
if err != nil { if err != nil {
logger.Errorf("Cannot create a sub group in the cgroup") logger.Errorf("Cannot create a sub group in the cgroup")
return err return err
@ -143,7 +144,7 @@ func initCgroup(cfg *cgroupConfig) (error) {
return err return err
} }
if len(procs) != 0 { if len(procs) != 0 {
return fmt.Errorf("There are remaining processes in cgroup %s", baseGroup) return fmt.Errorf("there are remaining processes in cgroup %s", baseGroup)
} }
} }
} else { } else {
@ -152,9 +153,9 @@ func initCgroup(cfg *cgroupConfig) (error) {
if baseGroup != "" { if baseGroup != "" {
pather = cgv1.StaticPath(baseGroup) pather = cgv1.StaticPath(baseGroup)
} else { } else {
pather = (func(p cgv1.Path) (cgv1.Path){ pather = (func(p cgv1.Path) cgv1.Path {
return func(subsys cgv1.Name) (string, error){ return func(subsys cgv1.Name) (string, error) {
path, err := p(subsys); path, err := p(subsys)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -167,14 +168,14 @@ func initCgroup(cfg *cgroupConfig) (error) {
} }
logger.Infof("Loading cgroup") logger.Infof("Loading cgroup")
var err error var err error
if cfg.cgMgrV1, err = cgv1.Load(cgv1.V1, pather, func(cfg *cgv1.InitConfig) error{ if cfg.cgMgrV1, err = cgv1.Load(cgv1.V1, pather, func(cfg *cgv1.InitConfig) error {
cfg.InitCheck = cgv1.AllowAny cfg.InitCheck = cgv1.AllowAny
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
logger.Debugf("Available subsystems:") logger.Debugf("Available subsystems:")
for _, subsys := range(cfg.cgMgrV1.Subsystems()) { for _, subsys := range cfg.cgMgrV1.Subsystems() {
p, err := pather(subsys.Name()) p, err := pather(subsys.Name())
if err != nil { if err != nil {
return err return err
@ -183,11 +184,11 @@ func initCgroup(cfg *cgroupConfig) (error) {
} }
if baseGroup == "" { if baseGroup == "" {
logger.Debugf("Creating a sub group and move all processes into it") logger.Debugf("Creating a sub group and move all processes into it")
wkrMgr, err := cfg.cgMgrV1.New("__worker", &contspecs.LinuxResources{}); wkrMgr, err := cfg.cgMgrV1.New("__worker", &contspecs.LinuxResources{})
if err != nil { if err != nil {
return err return err
} }
for _, subsys := range(cfg.cgMgrV1.Subsystems()) { for _, subsys := range cfg.cgMgrV1.Subsystems() {
logger.Debugf("Reading pids for subsystem %s", subsys.Name()) logger.Debugf("Reading pids for subsystem %s", subsys.Name())
for { for {
procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false) procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false)
@ -202,7 +203,7 @@ func initCgroup(cfg *cgroupConfig) (error) {
if len(procs) == 0 { if len(procs) == 0 {
break break
} }
for _, proc := range(procs) { for _, proc := range procs {
if err := wkrMgr.Add(proc); err != nil { if err := wkrMgr.Add(proc); err != nil {
if errors.Is(err, syscall.ESRCH) { if errors.Is(err, syscall.ESRCH) {
logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring") logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
@ -215,7 +216,7 @@ func initCgroup(cfg *cgroupConfig) (error) {
} }
} else { } else {
logger.Debugf("Trying to create a sub group in that group") logger.Debugf("Trying to create a sub group in that group")
testMgr, err := cfg.cgMgrV1.New("__test", &contspecs.LinuxResources{}); testMgr, err := cfg.cgMgrV1.New("__test", &contspecs.LinuxResources{})
if err != nil { if err != nil {
logger.Errorf("Cannot create a sub group in the cgroup") logger.Errorf("Cannot create a sub group in the cgroup")
return err return err
@ -223,7 +224,7 @@ func initCgroup(cfg *cgroupConfig) (error) {
if err := testMgr.Delete(); err != nil { if err := testMgr.Delete(); err != nil {
return err return err
} }
for _, subsys := range(cfg.cgMgrV1.Subsystems()) { for _, subsys := range cfg.cgMgrV1.Subsystems() {
logger.Debugf("Reading pids for subsystem %s", subsys.Name()) logger.Debugf("Reading pids for subsystem %s", subsys.Name())
procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false) procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false)
if err != nil { if err != nil {
@ -239,7 +240,7 @@ func initCgroup(cfg *cgroupConfig) (error) {
if err != nil { if err != nil {
return err return err
} }
return fmt.Errorf("There are remaining processes in cgroup %s of subsystem %s", p, subsys.Name()) return fmt.Errorf("there are remaining processes in cgroup %s of subsystem %s", p, subsys.Name())
} }
} }
} }
@ -253,7 +254,7 @@ func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgrou
emptyHook: emptyHook{ emptyHook: emptyHook{
provider: p, provider: p,
}, },
cgCfg: cfg, cgCfg: cfg,
memLimit: memLimit, memLimit: memLimit,
} }
} }
@ -263,7 +264,7 @@ func (c *cgroupHook) preExec() error {
logger.Debugf("Creating v2 cgroup for task %s", c.provider.Name()) logger.Debugf("Creating v2 cgroup for task %s", c.provider.Name())
var resSet *cgv2.Resources var resSet *cgv2.Resources
if c.memLimit != 0 { if c.memLimit != 0 {
resSet = &cgv2.Resources { resSet = &cgv2.Resources{
Memory: &cgv2.Memory{ Memory: &cgv2.Memory{
Max: func(i int64) *int64 { return &i }(c.memLimit.Value()), Max: func(i int64) *int64 { return &i }(c.memLimit.Value()),
}, },
@ -279,7 +280,7 @@ func (c *cgroupHook) preExec() error {
logger.Debugf("Creating v1 cgroup for task %s", c.provider.Name()) logger.Debugf("Creating v1 cgroup for task %s", c.provider.Name())
var resSet contspecs.LinuxResources var resSet contspecs.LinuxResources
if c.memLimit != 0 { if c.memLimit != 0 {
resSet = contspecs.LinuxResources { resSet = contspecs.LinuxResources{
Memory: &contspecs.LinuxMemory{ Memory: &contspecs.LinuxMemory{
Limit: func(i int64) *int64 { return &i }(c.memLimit.Value()), Limit: func(i int64) *int64 { return &i }(c.memLimit.Value()),
}, },
@ -334,7 +335,7 @@ func (c *cgroupHook) killAll() error {
taskList := []int{} taskList := []int{}
if c.cgCfg.isUnified { if c.cgCfg.isUnified {
procs, err := c.cgMgrV2.Procs(false) procs, err := c.cgMgrV2.Procs(false)
if (err != nil) { if err != nil {
return []int{}, err return []int{}, err
} }
for _, proc := range procs { for _, proc := range procs {
@ -342,16 +343,16 @@ func (c *cgroupHook) killAll() error {
} }
} else { } else {
taskSet := make(map[int]struct{}) taskSet := make(map[int]struct{})
for _, subsys := range(c.cgMgrV1.Subsystems()) { for _, subsys := range c.cgMgrV1.Subsystems() {
procs, err := c.cgMgrV1.Processes(subsys.Name(), false) procs, err := c.cgMgrV1.Processes(subsys.Name(), false)
if err != nil { if err != nil {
return []int{}, err return []int{}, err
} }
for _, proc := range(procs) { for _, proc := range procs {
taskSet[proc.Pid] = struct{}{} taskSet[proc.Pid] = struct{}{}
} }
} }
for proc := range(taskSet) { for proc := range taskSet {
taskList = append(taskList, proc) taskList = append(taskList, proc)
} }
} }
@ -360,7 +361,7 @@ func (c *cgroupHook) killAll() error {
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
if i == 3 { if i == 3 {
return errors.New("Unable to kill all child tasks") return errors.New("unable to kill all child tasks")
} }
taskList, err := readTaskList() taskList, err := readTaskList()
if err != nil { if err != nil {

View File

@ -1,16 +1,17 @@
package worker package worker
import ( import (
"errors"
"io/ioutil" "io/ioutil"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"syscall"
"testing" "testing"
"time" "time"
"errors"
"syscall"
cgv1 "github.com/containerd/cgroups" cgv1 "github.com/containerd/cgroups"
cgv2 "github.com/containerd/cgroups/v2" cgv2 "github.com/containerd/cgroups/v2"
units "github.com/docker/go-units" units "github.com/docker/go-units"
@ -21,14 +22,14 @@ import (
func init() { func init() {
_, testReexec := os.LookupEnv("TESTREEXEC") _, testReexec := os.LookupEnv("TESTREEXEC")
if ! testReexec { if !testReexec {
reexec.Init() reexec.Init()
} }
} }
func TestReexec(t *testing.T) { func TestReexec(t *testing.T) {
testCase, testReexec := os.LookupEnv("TESTREEXEC") testCase, testReexec := os.LookupEnv("TESTREEXEC")
if ! testReexec { if !testReexec {
return return
} }
for len(os.Args) > 1 { for len(os.Args) > 1 {
@ -39,51 +40,51 @@ func TestReexec(t *testing.T) {
} }
} }
switch testCase { switch testCase {
case "1": case "1":
Convey("Reexec should panic when command not found", t, func(ctx C){ Convey("Reexec should panic when command not found", t, func(ctx C) {
So(func(){ So(func() {
reexec.Init() reexec.Init()
}, ShouldPanicWith, exec.ErrNotFound) }, ShouldPanicWith, exec.ErrNotFound)
}) })
case "2": case "2":
Convey("Reexec should run when fd 3 is not open", t, func(ctx C){ Convey("Reexec should run when fd 3 is not open", t, func(ctx C) {
So((func() error{ So((func() error {
pipe := os.NewFile(3, "pipe") pipe := os.NewFile(3, "pipe")
if pipe == nil { if pipe == nil {
return errors.New("pipe is nil") return errors.New("pipe is nil")
} else { } else {
_, err := pipe.Stat() _, err := pipe.Stat()
return err return err
} }
})(), ShouldNotBeNil) })(), ShouldNotBeNil)
So(func(){ So(func() {
reexec.Init() reexec.Init()
}, ShouldPanicWith, syscall.ENOEXEC) }, ShouldPanicWith, syscall.ENOEXEC)
}) })
case "3": case "3":
Convey("Reexec should fail when fd 3 is sent with abrt cmd", t, func(ctx C){ Convey("Reexec should fail when fd 3 is sent with abrt cmd", t, func(ctx C) {
So(func(){ So(func() {
reexec.Init() reexec.Init()
}, ShouldPanicWith, "Exited on request") }, ShouldPanicWith, "Exited on request")
}) })
case "4": case "4":
Convey("Reexec should run when fd 3 is sent with cont cmd", t, func(ctx C){ Convey("Reexec should run when fd 3 is sent with cont cmd", t, func(ctx C) {
So(func(){ So(func() {
reexec.Init() reexec.Init()
}, ShouldPanicWith, syscall.ENOEXEC) }, ShouldPanicWith, syscall.ENOEXEC)
}) })
case "5": case "5":
Convey("Reexec should not be triggered when argv[0] is not reexec", t, func(ctx C){ Convey("Reexec should not be triggered when argv[0] is not reexec", t, func(ctx C) {
So(func(){ So(func() {
reexec.Init() reexec.Init()
}, ShouldNotPanic) }, ShouldNotPanic)
}) })
} }
} }
func TestCgroup(t *testing.T) { func TestCgroup(t *testing.T) {
var cgcf *cgroupConfig var cgcf *cgroupConfig
Convey("init cgroup", t, func(ctx C){ Convey("init cgroup", t, func(ctx C) {
_, useCurrentCgroup := os.LookupEnv("USECURCGROUP") _, useCurrentCgroup := os.LookupEnv("USECURCGROUP")
cgcf = &cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"} cgcf = &cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"}
if useCurrentCgroup { if useCurrentCgroup {
@ -97,28 +98,28 @@ func TestCgroup(t *testing.T) {
So(cgcf.cgMgrV1, ShouldNotBeNil) So(cgcf.cgMgrV1, ShouldNotBeNil)
} }
Convey("Cgroup Should Work", func(ctx C) { Convey("Cgroup Should Work", func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync") tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil) So(err, ShouldBeNil)
cmdScript := filepath.Join(tmpDir, "cmd.sh") cmdScript := filepath.Join(tmpDir, "cmd.sh")
daemonScript := filepath.Join(tmpDir, "daemon.sh") daemonScript := filepath.Join(tmpDir, "daemon.sh")
tmpFile := filepath.Join(tmpDir, "log_file") tmpFile := filepath.Join(tmpDir, "log_file")
bgPidfile := filepath.Join(tmpDir, "bg.pid") bgPidfile := filepath.Join(tmpDir, "bg.pid")
c := cmdConfig{ c := cmdConfig{
name: "tuna-cgroup", name: "tuna-cgroup",
upstreamURL: "http://mirrors.tuna.moe/", upstreamURL: "http://mirrors.tuna.moe/",
command: cmdScript + " " + daemonScript, command: cmdScript + " " + daemonScript,
workingDir: tmpDir, workingDir: tmpDir,
logDir: tmpDir, logDir: tmpDir,
logFile: tmpFile, logFile: tmpFile,
interval: 600 * time.Second, interval: 600 * time.Second,
env: map[string]string{ env: map[string]string{
"BG_PIDFILE": bgPidfile, "BG_PIDFILE": bgPidfile,
}, },
} }
cmdScriptContent := `#!/bin/bash cmdScriptContent := `#!/bin/bash
redirect-std() { redirect-std() {
[[ -t 0 ]] && exec </dev/null [[ -t 0 ]] && exec </dev/null
[[ -t 1 ]] && exec >/dev/null [[ -t 1 ]] && exec >/dev/null
@ -144,167 +145,127 @@ echo $$
daemonize $@ daemonize $@
sleep 5 sleep 5
` `
daemonScriptContent := `#!/bin/bash daemonScriptContent := `#!/bin/bash
echo $$ > $BG_PIDFILE echo $$ > $BG_PIDFILE
sleep 30 sleep 30
` `
err = ioutil.WriteFile(cmdScript, []byte(cmdScriptContent), 0755) err = ioutil.WriteFile(cmdScript, []byte(cmdScriptContent), 0755)
So(err, ShouldBeNil)
err = ioutil.WriteFile(daemonScript, []byte(daemonScriptContent), 0755)
So(err, ShouldBeNil)
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
cg := newCgroupHook(provider, *cgcf, 0)
provider.AddHook(cg)
err = cg.preExec()
So(err, ShouldBeNil)
go func() {
err := provider.Run(make(chan empty, 1))
ctx.So(err, ShouldNotBeNil)
}()
time.Sleep(1 * time.Second)
// Deamon should be started
daemonPidBytes, err := ioutil.ReadFile(bgPidfile)
So(err, ShouldBeNil)
daemonPid := strings.Trim(string(daemonPidBytes), " \n")
logger.Debug("daemon pid: %s", daemonPid)
procDir := filepath.Join("/proc", daemonPid)
_, err = os.Stat(procDir)
So(err, ShouldBeNil)
err = provider.Terminate()
So(err, ShouldBeNil)
// Deamon won't be killed
_, err = os.Stat(procDir)
So(err, ShouldBeNil)
// Deamon can be killed by cgroup killer
cg.postExec()
_, err = os.Stat(procDir)
So(os.IsNotExist(err), ShouldBeTrue)
})
Convey("Rsync Memory Should Be Limited", func() {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
scriptFile := filepath.Join(tmpDir, "myrsync")
tmpFile := filepath.Join(tmpDir, "log_file")
c := rsyncConfig{
name: "tuna-cgroup",
upstreamURL: "rsync://rsync.tuna.moe/tuna/",
rsyncCmd: scriptFile,
workingDir: tmpDir,
logDir: tmpDir,
logFile: tmpFile,
useIPv6: true,
interval: 600 * time.Second,
}
provider, err := newRsyncProvider(c)
So(err, ShouldBeNil)
cg := newCgroupHook(provider, *cgcf, 512 * units.MiB)
provider.AddHook(cg)
err = cg.preExec()
So(err, ShouldBeNil)
if cgcf.isUnified {
cgpath := filepath.Join(cgcf.BasePath, cgcf.Group, provider.Name())
if useCurrentCgroup {
group, err := cgv2.NestedGroupPath(filepath.Join("..", provider.Name()))
So(err, ShouldBeNil)
cgpath = filepath.Join(cgcf.BasePath, group)
}
memoLimit, err := ioutil.ReadFile(filepath.Join(cgpath, "memory.max"))
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) err = ioutil.WriteFile(daemonScript, []byte(daemonScriptContent), 0755)
} else { So(err, ShouldBeNil)
for _, subsys := range(cg.cgMgrV1.Subsystems()) {
if subsys.Name() == cgv1.Memory { provider, err := newCmdProvider(c)
cgpath := filepath.Join(cgcf.Group, provider.Name()) So(err, ShouldBeNil)
if useCurrentCgroup {
p, err := cgv1.NestedPath(filepath.Join("..", provider.Name()))(cgv1.Memory) cg := newCgroupHook(provider, *cgcf, 0)
provider.AddHook(cg)
err = cg.preExec()
So(err, ShouldBeNil)
go func() {
err := provider.Run(make(chan empty, 1))
ctx.So(err, ShouldNotBeNil)
}()
time.Sleep(1 * time.Second)
// Deamon should be started
daemonPidBytes, err := ioutil.ReadFile(bgPidfile)
So(err, ShouldBeNil)
daemonPid := strings.Trim(string(daemonPidBytes), " \n")
logger.Debug("daemon pid: %s", daemonPid)
procDir := filepath.Join("/proc", daemonPid)
_, err = os.Stat(procDir)
So(err, ShouldBeNil)
err = provider.Terminate()
So(err, ShouldBeNil)
// Deamon won't be killed
_, err = os.Stat(procDir)
So(err, ShouldBeNil)
// Deamon can be killed by cgroup killer
cg.postExec()
_, err = os.Stat(procDir)
So(os.IsNotExist(err), ShouldBeTrue)
})
Convey("Rsync Memory Should Be Limited", func() {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
scriptFile := filepath.Join(tmpDir, "myrsync")
tmpFile := filepath.Join(tmpDir, "log_file")
c := rsyncConfig{
name: "tuna-cgroup",
upstreamURL: "rsync://rsync.tuna.moe/tuna/",
rsyncCmd: scriptFile,
workingDir: tmpDir,
logDir: tmpDir,
logFile: tmpFile,
useIPv6: true,
interval: 600 * time.Second,
}
provider, err := newRsyncProvider(c)
So(err, ShouldBeNil)
cg := newCgroupHook(provider, *cgcf, 512*units.MiB)
provider.AddHook(cg)
err = cg.preExec()
So(err, ShouldBeNil)
if cgcf.isUnified {
cgpath := filepath.Join(cgcf.BasePath, cgcf.Group, provider.Name())
if useCurrentCgroup {
group, err := cgv2.NestedGroupPath(filepath.Join("..", provider.Name()))
So(err, ShouldBeNil)
cgpath = filepath.Join(cgcf.BasePath, group)
}
memoLimit, err := ioutil.ReadFile(filepath.Join(cgpath, "memory.max"))
So(err, ShouldBeNil)
So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))
} else {
for _, subsys := range cg.cgMgrV1.Subsystems() {
if subsys.Name() == cgv1.Memory {
cgpath := filepath.Join(cgcf.Group, provider.Name())
if useCurrentCgroup {
p, err := cgv1.NestedPath(filepath.Join("..", provider.Name()))(cgv1.Memory)
So(err, ShouldBeNil)
cgpath = p
}
memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, "memory", cgpath, "memory.limit_in_bytes"))
So(err, ShouldBeNil) So(err, ShouldBeNil)
cgpath = p So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))
}
memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, "memory", cgpath, "memory.limit_in_bytes"))
So(err, ShouldBeNil)
So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))
}
}
}
cg.postExec()
So(cg.cgMgrV1, ShouldBeNil)
})
Reset(func() {
if cgcf.isUnified {
if cgcf.Group == "" {
wkrg, err := cgv2.NestedGroupPath("");
So(err, ShouldBeNil)
wkrMgr, err := cgv2.LoadManager("/sys/fs/cgroup", wkrg);
allCtrls, err := wkrMgr.Controllers()
So(err, ShouldBeNil)
err = wkrMgr.ToggleControllers(allCtrls, cgv2.Disable)
So(err, ShouldBeNil)
origMgr := cgcf.cgMgrV2
for {
logger.Debugf("Restoring pids")
procs, err := wkrMgr.Procs(false)
So(err, ShouldBeNil)
if len(procs) == 0 {
break
}
for _, p := range(procs) {
if err := origMgr.AddProc(p); err != nil{
if errors.Is(err, syscall.ESRCH) {
logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
} else {
So(err, ShouldBeNil)
}
}
} }
} }
err = wkrMgr.Delete()
So(err, ShouldBeNil)
} }
} else { cg.postExec()
if cgcf.Group == "" { So(cg.cgMgrV1, ShouldBeNil)
pather := (func(p cgv1.Path) (cgv1.Path){ })
return func(subsys cgv1.Name) (string, error){ Reset(func() {
path, err := p(subsys); if cgcf.isUnified {
if err != nil { if cgcf.Group == "" {
return "", err wkrg, err := cgv2.NestedGroupPath("")
} So(err, ShouldBeNil)
if path == "/" { wkrMgr, _ := cgv2.LoadManager("/sys/fs/cgroup", wkrg)
return "", cgv1.ErrControllerNotActive allCtrls, err := wkrMgr.Controllers()
} So(err, ShouldBeNil)
return path, err err = wkrMgr.ToggleControllers(allCtrls, cgv2.Disable)
} So(err, ShouldBeNil)
})(cgv1.NestedPath("")) origMgr := cgcf.cgMgrV2
wkrMgr, err := cgv1.Load(cgv1.V1, pather, func(cfg *cgv1.InitConfig) error{
cfg.InitCheck = cgv1.AllowAny
return nil
})
So(err, ShouldBeNil)
origMgr := cgcf.cgMgrV1
for _, subsys := range(wkrMgr.Subsystems()){
for { for {
procs, err := wkrMgr.Processes(subsys.Name(), false) logger.Debugf("Restoring pids")
procs, err := wkrMgr.Procs(false)
So(err, ShouldBeNil) So(err, ShouldBeNil)
if len(procs) == 0 { if len(procs) == 0 {
break break
} }
for _, proc := range(procs) { for _, p := range procs {
if err := origMgr.Add(proc); err != nil { if err := origMgr.AddProc(p); err != nil {
if errors.Is(err, syscall.ESRCH) { if errors.Is(err, syscall.ESRCH) {
logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring") logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
} else { } else {
@ -313,11 +274,51 @@ sleep 30
} }
} }
} }
err = wkrMgr.Delete()
So(err, ShouldBeNil)
}
} else {
if cgcf.Group == "" {
pather := (func(p cgv1.Path) cgv1.Path {
return func(subsys cgv1.Name) (string, error) {
path, err := p(subsys)
if err != nil {
return "", err
}
if path == "/" {
return "", cgv1.ErrControllerNotActive
}
return path, err
}
})(cgv1.NestedPath(""))
wkrMgr, err := cgv1.Load(cgv1.V1, pather, func(cfg *cgv1.InitConfig) error {
cfg.InitCheck = cgv1.AllowAny
return nil
})
So(err, ShouldBeNil)
origMgr := cgcf.cgMgrV1
for _, subsys := range wkrMgr.Subsystems() {
for {
procs, err := wkrMgr.Processes(subsys.Name(), false)
So(err, ShouldBeNil)
if len(procs) == 0 {
break
}
for _, proc := range procs {
if err := origMgr.Add(proc); err != nil {
if errors.Is(err, syscall.ESRCH) {
logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
} else {
So(err, ShouldBeNil)
}
}
}
}
}
err = wkrMgr.Delete()
So(err, ShouldBeNil)
} }
err = wkrMgr.Delete()
So(err, ShouldBeNil)
} }
} })
})
}) })
} }

View File

@ -106,7 +106,7 @@ func (p *cmdProvider) Run(started chan empty) error {
} }
if len(matches) != 0 { if len(matches) != 0 {
logger.Debug("Fail-on-match: %r", matches) logger.Debug("Fail-on-match: %r", matches)
return fmt.Errorf("Fail-on-match regexp found %d matches", len(matches)) return fmt.Errorf("fail-on-match regexp found %d matches", len(matches))
} }
} }
if p.sizePattern != nil { if p.sizePattern != nil {

View File

@ -6,10 +6,10 @@ import (
"path/filepath" "path/filepath"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"github.com/imdario/mergo"
units "github.com/docker/go-units"
cgv1 "github.com/containerd/cgroups" cgv1 "github.com/containerd/cgroups"
cgv2 "github.com/containerd/cgroups/v2" cgv2 "github.com/containerd/cgroups/v2"
units "github.com/docker/go-units"
"github.com/imdario/mergo"
) )
type providerEnum uint8 type providerEnum uint8
@ -30,7 +30,7 @@ func (p *providerEnum) UnmarshalText(text []byte) error {
case `two-stage-rsync`: case `two-stage-rsync`:
*p = provTwoStageRsync *p = provTwoStageRsync
default: default:
return errors.New("Invalid value to provierEnum") return errors.New("invalid value to provierEnum")
} }
return nil return nil
} }

View File

@ -10,12 +10,12 @@ import (
func TestConfigDiff(t *testing.T) { func TestConfigDiff(t *testing.T) {
Convey("When old and new configs are equal", t, func() { Convey("When old and new configs are equal", t, func() {
oldList := []mirrorConfig{ oldList := []mirrorConfig{
mirrorConfig{Name: "debian"}, {Name: "debian"},
mirrorConfig{Name: "debian-security"}, {Name: "debian-security"},
mirrorConfig{Name: "fedora"}, {Name: "fedora"},
mirrorConfig{Name: "archlinux"}, {Name: "archlinux"},
mirrorConfig{Name: "AOSP"}, {Name: "AOSP"},
mirrorConfig{Name: "ubuntu"}, {Name: "ubuntu"},
} }
newList := make([]mirrorConfig, len(oldList)) newList := make([]mirrorConfig, len(oldList))
copy(newList, oldList) copy(newList, oldList)
@ -25,19 +25,19 @@ func TestConfigDiff(t *testing.T) {
}) })
Convey("When giving two config lists with different names", t, func() { Convey("When giving two config lists with different names", t, func() {
oldList := []mirrorConfig{ oldList := []mirrorConfig{
mirrorConfig{Name: "debian"}, {Name: "debian"},
mirrorConfig{Name: "debian-security"}, {Name: "debian-security"},
mirrorConfig{Name: "fedora"}, {Name: "fedora"},
mirrorConfig{Name: "archlinux"}, {Name: "archlinux"},
mirrorConfig{Name: "AOSP", Env: map[string]string{"REPO": "/usr/bin/repo"}}, {Name: "AOSP", Env: map[string]string{"REPO": "/usr/bin/repo"}},
mirrorConfig{Name: "ubuntu"}, {Name: "ubuntu"},
} }
newList := []mirrorConfig{ newList := []mirrorConfig{
mirrorConfig{Name: "debian"}, {Name: "debian"},
mirrorConfig{Name: "debian-cd"}, {Name: "debian-cd"},
mirrorConfig{Name: "archlinuxcn"}, {Name: "archlinuxcn"},
mirrorConfig{Name: "AOSP", Env: map[string]string{"REPO": "/usr/local/bin/aosp-repo"}}, {Name: "AOSP", Env: map[string]string{"REPO": "/usr/local/bin/aosp-repo"}},
mirrorConfig{Name: "ubuntu-ports"}, {Name: "ubuntu-ports"},
} }
difference := diffMirrorConfig(oldList, newList) difference := diffMirrorConfig(oldList, newList)

View File

@ -34,7 +34,7 @@ func (ctx *Context) Enter() *Context {
// Exit return the upper layer of context // Exit return the upper layer of context
func (ctx *Context) Exit() (*Context, error) { func (ctx *Context) Exit() (*Context, error) {
if ctx.parent == nil { if ctx.parent == nil {
return nil, errors.New("Cannot exit the bottom layer context") return nil, errors.New("cannot exit the bottom layer context")
} }
return ctx.parent, nil return ctx.parent, nil
} }

View File

@ -10,9 +10,9 @@ import (
type dockerHook struct { type dockerHook struct {
emptyHook emptyHook
image string image string
volumes []string volumes []string
options []string options []string
memoryLimit MemBytes memoryLimit MemBytes
} }
@ -33,9 +33,9 @@ func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dock
emptyHook: emptyHook{ emptyHook: emptyHook{
provider: p, provider: p,
}, },
image: mCfg.DockerImage, image: mCfg.DockerImage,
volumes: volumes, volumes: volumes,
options: options, options: options,
memoryLimit: mCfg.MemoryLimit, memoryLimit: mCfg.MemoryLimit,
} }
} }
@ -49,7 +49,7 @@ func (d *dockerHook) preExec() error {
if _, err := os.Stat(workingDir); os.IsNotExist(err) { if _, err := os.Stat(workingDir); os.IsNotExist(err) {
logger.Debugf("Making dir %s", workingDir) logger.Debugf("Making dir %s", workingDir)
if err = os.MkdirAll(workingDir, 0755); err != nil { if err = os.MkdirAll(workingDir, 0755); err != nil {
return fmt.Errorf("Error making dir %s: %s", workingDir, err.Error()) return fmt.Errorf("error making dir %s: %s", workingDir, err.Error())
} }
} }

View File

@ -32,7 +32,7 @@ func newExecPostHook(provider mirrorProvider, execOn uint8, command string) (*ex
return nil, err return nil, err
} }
if execOn != execOnSuccess && execOn != execOnFailure { if execOn != execOnSuccess && execOn != execOnFailure {
return nil, fmt.Errorf("Invalid option for exec-on: %d", execOn) return nil, fmt.Errorf("invalid option for exec-on: %d", execOn)
} }
return &execPostHook{ return &execPostHook{
@ -92,7 +92,7 @@ func (h *execPostHook) Do() error {
args = append(args, arg) args = append(args, arg)
} }
} else { } else {
return errors.New("Invalid Command") return errors.New("invalid command")
} }
return session.Command(cmd, args...).Run() return session.Command(cmd, args...).Run()
} }

View File

@ -79,7 +79,7 @@ func (m *mirrorJob) SetState(state uint32) {
func (m *mirrorJob) SetProvider(provider mirrorProvider) error { func (m *mirrorJob) SetProvider(provider mirrorProvider) error {
s := m.State() s := m.State()
if (s != stateNone) && (s != stateDisabled) { if (s != stateNone) && (s != stateDisabled) {
return fmt.Errorf("Provider cannot be switched when job state is %d", s) return fmt.Errorf("provider cannot be switched when job state is %d", s)
} }
m.provider = provider m.provider = provider
return nil return nil

View File

@ -188,7 +188,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
p.isMaster = isMaster p.isMaster = isMaster
provider = p provider = p
default: default:
panic(errors.New("Invalid mirror provider")) panic(errors.New("invalid mirror provider"))
} }
// Add Logging Hook // Add Logging Hook

View File

@ -11,22 +11,21 @@ import (
"time" "time"
"github.com/codeskyblue/go-sh" "github.com/codeskyblue/go-sh"
"golang.org/x/sys/unix"
"github.com/moby/moby/pkg/reexec"
cgv1 "github.com/containerd/cgroups" cgv1 "github.com/containerd/cgroups"
"github.com/moby/moby/pkg/reexec"
"golang.org/x/sys/unix"
) )
// runner is to run os commands giving command line, env and log file // runner is to run os commands giving command line, env and log file
// it's an alternative to python-sh or go-sh // it's an alternative to python-sh or go-sh
var errProcessNotStarted = errors.New("Process Not Started") var errProcessNotStarted = errors.New("process not started")
type cmdJob struct { type cmdJob struct {
sync.Mutex sync.Mutex
cmd *exec.Cmd cmd *exec.Cmd
workingDir string workingDir string
env map[string]string env map[string]string
logFile *os.File
finished chan empty finished chan empty
provider mirrorProvider provider mirrorProvider
retErr error retErr error
@ -60,7 +59,7 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
} }
// set memlimit // set memlimit
if d.memoryLimit != 0 { if d.memoryLimit != 0 {
args = append(args, "-m", fmt.Sprint(d.memoryLimit.Value())) args = append(args, "-m", fmt.Sprint(d.memoryLimit.Value()))
} }
// apply options // apply options
args = append(args, d.options...) args = append(args, d.options...)
@ -115,7 +114,7 @@ func (c *cmdJob) Start() error {
if cg != nil { if cg != nil {
logger.Debugf("Preparing cgroup sync pipes for job %s", c.provider.Name()) logger.Debugf("Preparing cgroup sync pipes for job %s", c.provider.Name())
var err error var err error
pipeR, pipeW, err = os.Pipe(); pipeR, pipeW, err = os.Pipe()
if err != nil { if err != nil {
return err return err
} }
@ -139,7 +138,7 @@ func (c *cmdJob) Start() error {
} }
pid := c.cmd.Process.Pid pid := c.cmd.Process.Pid
if cg.cgCfg.isUnified { if cg.cgCfg.isUnified {
if err := cg.cgMgrV2.AddProc(uint64(pid)); err != nil{ if err := cg.cgMgrV2.AddProc(uint64(pid)); err != nil {
if errors.Is(err, syscall.ESRCH) { if errors.Is(err, syscall.ESRCH) {
logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring") logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring")
} else { } else {
@ -147,7 +146,7 @@ func (c *cmdJob) Start() error {
} }
} }
} else { } else {
if err := cg.cgMgrV1.Add(cgv1.Process{Pid: pid}); err != nil{ if err := cg.cgMgrV1.Add(cgv1.Process{Pid: pid}); err != nil {
if errors.Is(err, syscall.ESRCH) { if errors.Is(err, syscall.ESRCH) {
logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring") logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring")
} else { } else {

View File

@ -112,7 +112,7 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
options = append(options, p.stage1Options...) options = append(options, p.stage1Options...)
stage1Profile, ok := rsyncStage1Profiles[p.stage1Profile] stage1Profile, ok := rsyncStage1Profiles[p.stage1Profile]
if !ok { if !ok {
return nil, errors.New("Invalid Stage 1 Profile") return nil, errors.New("invalid stage 1 profile")
} }
for _, exc := range stage1Profile { for _, exc := range stage1Profile {
options = append(options, exc) options = append(options, exc)
@ -124,7 +124,7 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
options = append(options, p.extraOptions...) options = append(options, p.extraOptions...)
} }
} else { } else {
return []string{}, fmt.Errorf("Invalid stage: %d", stage) return []string{}, fmt.Errorf("invalid stage: %d", stage)
} }
if !p.rsyncNeverTimeout { if !p.rsyncNeverTimeout {

View File

@ -147,7 +147,7 @@ func TestWorker(t *testing.T) {
}) })
Convey("with one job", func(ctx C) { Convey("with one job", func(ctx C) {
workerCfg.Mirrors = []mirrorConfig{ workerCfg.Mirrors = []mirrorConfig{
mirrorConfig{ {
Name: "job-ls", Name: "job-ls",
Provider: provCommand, Provider: provCommand,
Command: "ls", Command: "ls",
@ -194,17 +194,17 @@ func TestWorker(t *testing.T) {
}) })
Convey("with several jobs", func(ctx C) { Convey("with several jobs", func(ctx C) {
workerCfg.Mirrors = []mirrorConfig{ workerCfg.Mirrors = []mirrorConfig{
mirrorConfig{ {
Name: "job-ls-1", Name: "job-ls-1",
Provider: provCommand, Provider: provCommand,
Command: "ls", Command: "ls",
}, },
mirrorConfig{ {
Name: "job-fail", Name: "job-fail",
Provider: provCommand, Provider: provCommand,
Command: "non-existent-command-xxxx", Command: "non-existent-command-xxxx",
}, },
mirrorConfig{ {
Name: "job-ls-2", Name: "job-ls-2",
Provider: provCommand, Provider: provCommand,
Command: "ls", Command: "ls",

View File

@ -13,7 +13,7 @@ import (
func TestZFSHook(t *testing.T) { func TestZFSHook(t *testing.T) {
Convey("ZFS Hook should work", t, func(ctx C) { Convey("ZFS Hook should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync") tmpDir, _ := ioutil.TempDir("", "tunasync")
tmpFile := filepath.Join(tmpDir, "log_file") tmpFile := filepath.Join(tmpDir, "log_file")
c := cmdConfig{ c := cmdConfig{
@ -45,4 +45,4 @@ func TestZFSHook(t *testing.T) {
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
}) })
}) })
} }