diff --git a/worker/job.go b/worker/job.go index 8a75efc..d94cc3c 100644 --- a/worker/job.go +++ b/worker/job.go @@ -44,6 +44,18 @@ func (m *mirrorJob) Name() string { return m.provider.Name() } +func (m *mirrorJob) Stopped() bool { + if !m.enabled { + return true + } + select { + case <-m.stopped: + return true + default: + return false + } +} + // runMirrorJob is the goroutine where syncing job runs in // arguments: // provider: mirror provider object diff --git a/worker/loglimit_hook.go b/worker/loglimit_hook.go new file mode 100644 index 0000000..fdf55e3 --- /dev/null +++ b/worker/loglimit_hook.go @@ -0,0 +1,108 @@ +package worker + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "strings" + "time" +) + +// limit + +type logLimiter struct { + emptyHook + provider mirrorProvider +} + +func newLogLimiter(provider mirrorProvider) *logLimiter { + return &logLimiter{ + provider: provider, + } +} + +type fileSlice []os.FileInfo + +func (f fileSlice) Len() int { return len(f) } +func (f fileSlice) Swap(i, j int) { f[i], f[j] = f[j], f[i] } +func (f fileSlice) Less(i, j int) bool { return f[i].ModTime().Before(f[j].ModTime()) } + +func (l *logLimiter) preExec() error { + logger.Debug("executing log limitter for %s", l.provider.Name()) + + p := l.provider + if p.LogFile() == "/dev/null" { + return nil + } + + logDir := p.LogDir() + files, err := ioutil.ReadDir(logDir) + if err != nil { + if os.IsNotExist(err) { + os.MkdirAll(logDir, 0755) + } else { + return err + } + } + matchedFiles := []os.FileInfo{} + for _, f := range files { + if strings.HasPrefix(f.Name(), p.Name()) { + matchedFiles = append(matchedFiles, f) + } + } + + // sort the filelist in time order + // earlier modified files are sorted as larger + sort.Sort( + sort.Reverse( + fileSlice(matchedFiles), + ), + ) + // remove old files + if len(matchedFiles) > 9 { + for _, f := range matchedFiles[9:] { + // logger.Debug(f.Name()) + os.Remove(filepath.Join(logDir, f.Name())) + } + } + + logFile := filepath.Join( + logDir, + fmt.Sprintf( + "%s_%s.log", + p.Name(), + time.Now().Format("2006-01-02_15_04"), + ), + ) + + logLink := filepath.Join(logDir, "latest") + + if _, err = os.Stat(logLink); err == nil { + os.Remove(logLink) + } + os.Symlink(logFile, logLink) + + ctx := p.EnterContext() + ctx.Set(_LogFileKey, logFile) + return nil +} + +func (l *logLimiter) postSuccess() error { + l.provider.ExitContext() + return nil +} + +func (l *logLimiter) postFail() error { + logFile := l.provider.LogFile() + logFileFail := logFile + ".fail" + logDir := l.provider.LogDir() + logLink := filepath.Join(logDir, "latest") + os.Rename(logFile, logFileFail) + os.Remove(logLink) + os.Symlink(logFileFail, logLink) + + l.provider.ExitContext() + return nil +} diff --git a/worker/loglimit_test.go b/worker/loglimit_test.go new file mode 100644 index 0000000..74d87b6 --- /dev/null +++ b/worker/loglimit_test.go @@ -0,0 +1,146 @@ +package worker + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" + . "github.com/tuna/tunasync/internal" +) + +func TestLogLimiter(t *testing.T) { + Convey("LogLimiter should work", t, func(ctx C) { + tmpDir, err := ioutil.TempDir("", "tunasync") + tmpLogDir, err := ioutil.TempDir("", "tunasync-log") + defer os.RemoveAll(tmpDir) + defer os.RemoveAll(tmpLogDir) + So(err, ShouldBeNil) + scriptFile := filepath.Join(tmpDir, "cmd.sh") + + c := cmdConfig{ + name: "tuna-loglimit", + upstreamURL: "http://mirrors.tuna.moe/", + command: scriptFile, + workingDir: tmpDir, + logDir: tmpLogDir, + logFile: filepath.Join(tmpLogDir, "latest.log"), + interval: 600 * time.Second, + } + + provider, err := newCmdProvider(c) + So(err, ShouldBeNil) + limiter := newLogLimiter(provider) + provider.AddHook(limiter) + + Convey("If logs are created simply", func() { + for i := 0; i < 15; i++ { + fn := filepath.Join(tmpLogDir, fmt.Sprintf("%s-%d.log", provider.Name(), i)) + f, _ := os.Create(fn) + // time.Sleep(1 * time.Second) + f.Close() + } + + matches, _ := filepath.Glob(filepath.Join(tmpLogDir, "*.log")) + So(len(matches), ShouldEqual, 15) + + managerChan := make(chan jobMessage) + semaphore := make(chan empty, 1) + job := newMirrorJob(provider) + + scriptContent := `#!/bin/bash +echo $TUNASYNC_WORKING_DIR +echo $TUNASYNC_MIRROR_NAME +echo $TUNASYNC_UPSTREAM_URL +echo $TUNASYNC_LOG_FILE + ` + + err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) + So(err, ShouldBeNil) + + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + msg := <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + logFile := provider.LogFile() + msg = <-managerChan + So(msg.status, ShouldEqual, Success) + + job.ctrlChan <- jobDisable + + So(logFile, ShouldNotEqual, provider.LogFile()) + + matches, _ = filepath.Glob(filepath.Join(tmpLogDir, "*.log")) + So(len(matches), ShouldEqual, 10) + + expectedOutput := fmt.Sprintf( + "%s\n%s\n%s\n%s\n", + provider.WorkingDir(), + provider.Name(), + provider.upstreamURL, + logFile, + ) + + loggedContent, err := ioutil.ReadFile(filepath.Join(provider.LogDir(), "latest")) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, expectedOutput) + }) + + Convey("If job failed simply", func() { + managerChan := make(chan jobMessage) + semaphore := make(chan empty, 1) + job := newMirrorJob(provider) + + scriptContent := `#!/bin/bash +echo $TUNASYNC_WORKING_DIR +echo $TUNASYNC_MIRROR_NAME +echo $TUNASYNC_UPSTREAM_URL +echo $TUNASYNC_LOG_FILE +sleep 5 + ` + + err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) + So(err, ShouldBeNil) + + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + msg := <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + logFile := provider.LogFile() + + time.Sleep(1 * time.Second) + job.ctrlChan <- jobStop + + msg = <-managerChan + So(msg.status, ShouldEqual, Failed) + + job.ctrlChan <- jobDisable + <-job.stopped + + So(logFile, ShouldNotEqual, provider.LogFile()) + + expectedOutput := fmt.Sprintf( + "%s\n%s\n%s\n%s\n", + provider.WorkingDir(), + provider.Name(), + provider.upstreamURL, + logFile, + ) + + loggedContent, err := ioutil.ReadFile(filepath.Join(provider.LogDir(), "latest")) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, expectedOutput) + loggedContent, err = ioutil.ReadFile(logFile + ".fail") + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, expectedOutput) + }) + + }) +} diff --git a/worker/provider.go b/worker/provider.go index 143ccd8..7cd8b85 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -33,6 +33,7 @@ type mirrorProvider interface { // job hooks IsRunning() bool + AddHook(hook jobHook) Hooks() []jobHook Interval() time.Duration