diff --git a/manager/db.go b/manager/db.go index 0520570..afbb894 100644 --- a/manager/db.go +++ b/manager/db.go @@ -182,7 +182,7 @@ func (b *boltAdapter) FlushDisabledJobs() (err error) { err = fmt.Errorf("%s; %s", err.Error(), jsonErr) continue } - if m.Status == Disabled { + if m.Status == Disabled || len(m.Name) == 0 { err = c.Delete() } } diff --git a/worker/base_provider.go b/worker/base_provider.go index ace76d3..82ef419 100644 --- a/worker/base_provider.go +++ b/worker/base_provider.go @@ -23,6 +23,7 @@ type baseProvider struct { logFile *os.File cgroup *cgroupHook + zfs *zfsHook hooks []jobHook } @@ -77,12 +78,15 @@ func (p *baseProvider) LogFile() string { return s } } - panic("log dir is impossible to be unavailable") + panic("log file is impossible to be unavailable") } func (p *baseProvider) AddHook(hook jobHook) { - if cg, ok := hook.(*cgroupHook); ok { - p.cgroup = cg + switch v := hook.(type) { + case *cgroupHook: + p.cgroup = v + case *zfsHook: + p.zfs = v } p.hooks = append(p.hooks, hook) } @@ -95,6 +99,10 @@ func (p *baseProvider) Cgroup() *cgroupHook { return p.cgroup } +func (p *baseProvider) ZFS() *zfsHook { + return p.zfs +} + func (p *baseProvider) prepareLogFile() error { if p.LogFile() == "/dev/null" { p.cmd.SetLogFile(nil) diff --git a/worker/config.go b/worker/config.go index 6f3a636..51a69c1 100644 --- a/worker/config.go +++ b/worker/config.go @@ -37,6 +37,7 @@ type Config struct { Manager managerConfig `toml:"manager"` Server serverConfig `toml:"server"` Cgroup cgroupConfig `toml:"cgroup"` + ZFS zfsConfig `toml:"zfs"` Include includeConfig `toml:"include"` Mirrors []mirrorConfig `toml:"mirrors"` } @@ -53,9 +54,10 @@ type globalConfig struct { } type managerConfig struct { - APIBase string `toml:"api_base"` - CACert string `toml:"ca_cert"` - Token string `toml:"token"` + APIBase string `toml:"api_base"` + CACert string `toml:"ca_cert"` + ExtraStatusAPIs []string `toml:"extra_status_managers"` + // Token string `toml:"token"` } type serverConfig struct { @@ -72,6 +74,11 @@ type cgroupConfig struct { Group string `toml:"group"` } +type zfsConfig struct { + Enable bool `toml:"enable"` + Zpool string `toml:"zpool"` +} + type includeConfig struct { IncludeMirrors string `toml:"include_mirrors"` } diff --git a/worker/provider.go b/worker/provider.go index 8358eeb..cdc9a18 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -36,6 +36,8 @@ type mirrorProvider interface { IsRunning() bool // Cgroup Cgroup() *cgroupHook + // ZFS + ZFS() *zfsHook AddHook(hook jobHook) Hooks() []jobHook @@ -162,6 +164,11 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { // Add Logging Hook provider.AddHook(newLogLimiter(provider)) + // Add ZFS Hook + if cfg.ZFS.Enable { + provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool)) + } + // Add Cgroup Hook if cfg.Cgroup.Enable { provider.AddHook( diff --git a/worker/worker.go b/worker/worker.go index 4b6741b..7c3db9f 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -405,12 +405,6 @@ func (w *Worker) registorWorker() { } func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) { - url := fmt.Sprintf( - "%s/workers/%s/jobs/%s", - w.cfg.Manager.APIBase, - w.Name(), - jobMsg.name, - ) p := job.provider smsg := MirrorStatus{ Name: jobMsg.name, @@ -422,8 +416,15 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) { ErrorMsg: jobMsg.msg, } - if _, err := PostJSON(url, smsg, w.httpClient); err != nil { - logger.Errorf("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error()) + apiBases := []string{w.cfg.Manager.APIBase} + apiBases = append(apiBases, w.cfg.Manager.ExtraStatusAPIs...) + for _, root := range apiBases { + url := fmt.Sprintf( + "%s/workers/%s/jobs/%s", root, w.Name(), jobMsg.name, + ) + if _, err := PostJSON(url, smsg, w.httpClient); err != nil { + logger.Errorf("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error()) + } } } diff --git a/worker/zfs_hook.go b/worker/zfs_hook.go new file mode 100644 index 0000000..7e8f25e --- /dev/null +++ b/worker/zfs_hook.go @@ -0,0 +1,45 @@ +package worker + +import ( + "fmt" + "os" + "strings" + + "github.com/codeskyblue/go-sh" +) + +type zfsHook struct { + emptyHook + provider mirrorProvider + zpool string +} + +func newZfsHook(provider mirrorProvider, zpool string) *zfsHook { + return &zfsHook{ + provider: provider, + zpool: zpool, + } +} + +// create zfs dataset for a new mirror +func (z *zfsHook) preJob() error { + workingDir := z.provider.WorkingDir() + if _, err := os.Stat(workingDir); os.IsNotExist(err) { + // sudo zfs create $zfsDataset + // sudo zfs set mountpoint=${absPath} ${zfsDataset} + + zfsDataset := fmt.Sprintf("%s/%s", z.zpool, z.provider.Name()) + // Unknown issue of ZFS: + // dataset name should not contain upper case letters + zfsDataset = strings.ToLower(zfsDataset) + logger.Infof("Creating ZFS dataset %s", zfsDataset) + if err := sh.Command("sudo", "zfs", "create", zfsDataset).Run(); err != nil { + return err + } + logger.Infof("Mount ZFS dataset %s to %s", zfsDataset, workingDir) + if err := sh.Command("sudo", "zfs", "set", "mountpoint="+workingDir, zfsDataset).Run(); err != nil { + return err + } + } + return nil +}