feature(worker): implemented rsync provider

This commit is contained in:
bigeagle 2016-04-24 22:40:44 +08:00
parent f336fda736
commit a6e8e9e2d9
No known key found for this signature in database
GPG Key ID: 9171A4571C27920A
3 changed files with 79 additions and 40 deletions

View File

@ -1,8 +1,6 @@
package worker
import (
"errors"
"os"
"time"
"github.com/anmitsu/go-shlex"
@ -20,8 +18,6 @@ type cmdProvider struct {
baseProvider
cmdConfig
command []string
cmd *cmdJob
logFile *os.File
}
func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
@ -59,32 +55,9 @@ func (p *cmdProvider) Start() error {
env[k] = v
}
p.cmd = newCmdJob(p.command, p.WorkingDir(), env)
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
if err := p.setLogFile(); err != nil {
return err
}
p.logFile = logFile
p.cmd.SetLogFile(logFile)
return p.cmd.Start()
}
func (p *cmdProvider) Wait() error {
if p.logFile != nil {
defer p.logFile.Close()
}
return p.cmd.Wait()
}
func (p *cmdProvider) Terminate() error {
logger.Debug("terminating provider: %s", p.Name())
if p.cmd == nil {
return errors.New("provider command job not initialized")
}
if p.logFile != nil {
p.logFile.Close()
}
err := p.cmd.Terminate()
return err
}

View File

@ -1,6 +1,10 @@
package worker
import "time"
import (
"errors"
"os"
"time"
)
// mirror provider is the wrapper of mirror jobs
@ -45,6 +49,10 @@ type baseProvider struct {
ctx *Context
name string
interval time.Duration
cmd *cmdJob
logFile *os.File
hooks []jobHook
}
@ -104,3 +112,38 @@ func (p *baseProvider) AddHook(hook jobHook) {
func (p *baseProvider) Hooks() []jobHook {
return p.hooks
}
func (p *baseProvider) setLogFile() error {
if p.LogFile() == "/dev/null" {
p.cmd.SetLogFile(nil)
return nil
}
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
logger.Error("Error opening logfile %s: %s", p.LogFile(), err.Error())
return err
}
p.logFile = logFile
p.cmd.SetLogFile(logFile)
return nil
}
func (p *baseProvider) Wait() error {
if p.logFile != nil {
defer p.logFile.Close()
}
return p.cmd.Wait()
}
func (p *baseProvider) Terminate() error {
logger.Debug("terminating provider: %s", p.Name())
if p.cmd == nil {
return errors.New("provider command job not initialized")
}
if p.logFile != nil {
p.logFile.Close()
}
err := p.cmd.Terminate()
return err
}

View File

@ -4,6 +4,7 @@ import "time"
type rsyncConfig struct {
name string
rsyncCmd string
upstreamURL, password, excludeFile string
workingDir, logDir, logFile string
useIPv6 bool
@ -14,6 +15,7 @@ type rsyncConfig struct {
type rsyncProvider struct {
baseProvider
rsyncConfig
options []string
}
func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
@ -27,6 +29,25 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
rsyncConfig: c,
}
if c.rsyncCmd == "" {
provider.rsyncCmd = "rsync"
}
options := []string{
"-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/",
"--delete", "--delete-after", "--delay-updates",
"--safe-links", "--timeout=120", "--contimeout=120",
}
if c.useIPv6 {
options = append(options, "-6")
}
if c.excludeFile != "" {
options = append(options, "--exclude-from", c.excludeFile)
}
provider.ctx.Set(_WorkingDirKey, c.workingDir)
provider.ctx.Set(_LogDirKey, c.logDir)
provider.ctx.Set(_LogFileKey, c.logFile)
@ -34,17 +55,19 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
return provider, nil
}
// TODO: implement this
func (p *rsyncProvider) Start() error {
return nil
env := map[string]string{}
if p.password != "" {
env["RSYNC_PASSWORD"] = p.password
}
command := []string{p.rsyncCmd}
command = append(command, p.options...)
command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(command, p.WorkingDir(), env)
if err := p.setLogFile(); err != nil {
return err
}
// TODO: implement this
func (p *rsyncProvider) Terminate() error {
return nil
}
// TODO: implement this
func (p *rsyncProvider) Wait() error {
return nil
return p.cmd.Start()
}