From ce3471e30d1739dbb131ba58aed07d05091afd7b Mon Sep 17 00:00:00 2001 From: bigeagle Date: Thu, 28 Apr 2016 18:34:22 +0800 Subject: [PATCH] feature(worker): implemented Worker object, worker side code is almost done --- internal/msg.go | 6 +- internal/util.go | 79 +++++++ manager/server.go | 5 +- worker/cmd_provider.go | 4 + worker/config_test.go | 13 +- worker/job.go | 12 +- worker/job_test.go | 6 +- worker/loglimit_test.go | 2 +- worker/main.go | 108 --------- worker/provider.go | 1 + worker/rsync_provider.go | 4 + worker/two_stage_rsync_provider.go | 4 + worker/worker.go | 342 +++++++++++++++++++++++++++++ 13 files changed, 462 insertions(+), 124 deletions(-) create mode 100644 internal/util.go delete mode 100644 worker/main.go create mode 100644 worker/worker.go diff --git a/internal/msg.go b/internal/msg.go index b1478f2..a4e2838 100644 --- a/internal/msg.go +++ b/internal/msg.go @@ -4,7 +4,7 @@ import "time" // A StatusUpdateMsg represents a msg when // a worker has done syncing -type StatusUpdateMsg struct { +type MirrorStatus struct { Name string `json:"name"` Worker string `json:"worker"` IsMaster bool `json:"is_master"` @@ -19,7 +19,9 @@ type StatusUpdateMsg struct { // a worker, and sent from the manager to clients. type WorkerInfoMsg struct { ID string `json:"id"` - LastOnline time.Time `json:"last_online"` + URL string `json:"url"` // worker url + Token string `json:"token"` // session token + LastOnline time.Time `json:"last_online"` // last seen } type CmdVerb uint8 diff --git a/internal/util.go b/internal/util.go new file mode 100644 index 0000000..9773914 --- /dev/null +++ b/internal/util.go @@ -0,0 +1,79 @@ +package internal + +import ( + "bytes" + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "io/ioutil" + "net/http" +) + +// GetTLSConfig generate tls.Config from CAFile +func GetTLSConfig(CAFile string) (*tls.Config, error) { + caCert, err := ioutil.ReadFile(CAFile) + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, errors.New("Failed to add CA to pool") + } + + tlsConfig := &tls.Config{ + RootCAs: caCertPool, + } + tlsConfig.BuildNameToCertificate() + return tlsConfig, nil +} + +// PostJSON posts json object to url +func PostJSON(url string, obj interface{}, tlsConfig *tls.Config) (*http.Response, error) { + var client *http.Client + if tlsConfig == nil { + client = &http.Client{} + } else { + tr := &http.Transport{ + TLSClientConfig: tlsConfig, + } + client = &http.Client{ + Transport: tr, + } + } + + b := new(bytes.Buffer) + if err := json.NewEncoder(b).Encode(obj); err != nil { + return nil, err + } + return client.Post(url, "application/json; charset=utf-8", b) +} + +// GetJSON gets a json response from url +func GetJSON(url string, obj interface{}, tlsConfig *tls.Config) (*http.Response, error) { + var client *http.Client + if tlsConfig == nil { + client = &http.Client{} + } else { + tr := &http.Transport{ + TLSClientConfig: tlsConfig, + } + client = &http.Client{ + Transport: tr, + } + } + + resp, err := client.Get(url) + if err != nil { + return resp, err + } + if resp.StatusCode != http.StatusOK { + return resp, errors.New("HTTP status code is not 200") + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return resp, err + } + return resp, json.Unmarshal(body, obj) +} diff --git a/manager/server.go b/manager/server.go index b8fe5a8..5102aae 100644 --- a/manager/server.go +++ b/manager/server.go @@ -47,7 +47,10 @@ func (s *managerServer) listWorkers(c *gin.Context) { } for _, w := range workers { workerInfos = append(workerInfos, - WorkerInfoMsg{w.ID, w.LastOnline}) + WorkerInfoMsg{ + ID: w.ID, + LastOnline: w.LastOnline, + }) } c.JSON(http.StatusOK, workerInfos) } diff --git a/worker/cmd_provider.go b/worker/cmd_provider.go index 38cdcac..8ed8f9e 100644 --- a/worker/cmd_provider.go +++ b/worker/cmd_provider.go @@ -44,6 +44,10 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) { return provider, nil } +func (p *cmdProvider) Upstream() string { + return p.upstreamURL +} + func (p *cmdProvider) Run() error { if err := p.Start(); err != nil { return err diff --git a/worker/config_test.go b/worker/config_test.go index ea461a2..94d8bae 100644 --- a/worker/config_test.go +++ b/worker/config_test.go @@ -110,16 +110,21 @@ exclude_file = "/etc/tunasync.d/fedora-exclude.txt" cfg, err := loadConfig(tmpfile.Name()) So(err, ShouldBeNil) - providers := initProviders(cfg) + w := &Worker{ + cfg: cfg, + providers: make(map[string]mirrorProvider), + } - p := providers[0] + w.initProviders() + + p := w.providers["AOSP"] So(p.Name(), ShouldEqual, "AOSP") So(p.LogDir(), ShouldEqual, "/var/log/tunasync/AOSP") So(p.LogFile(), ShouldEqual, "/var/log/tunasync/AOSP/latest.log") _, ok := p.(*cmdProvider) So(ok, ShouldBeTrue) - p = providers[1] + p = w.providers["debian"] So(p.Name(), ShouldEqual, "debian") So(p.LogDir(), ShouldEqual, "/var/log/tunasync/debian") So(p.LogFile(), ShouldEqual, "/var/log/tunasync/debian/latest.log") @@ -128,7 +133,7 @@ exclude_file = "/etc/tunasync.d/fedora-exclude.txt" So(r2p.stage1Profile, ShouldEqual, "debian") So(r2p.WorkingDir(), ShouldEqual, "/data/mirrors/debian") - p = providers[2] + p = w.providers["fedora"] So(p.Name(), ShouldEqual, "fedora") So(p.LogDir(), ShouldEqual, "/var/log/tunasync/fedora") So(p.LogFile(), ShouldEqual, "/var/log/tunasync/fedora/latest.log") diff --git a/worker/job.go b/worker/job.go index d94cc3c..f71136a 100644 --- a/worker/job.go +++ b/worker/job.go @@ -28,7 +28,7 @@ type jobMessage struct { type mirrorJob struct { provider mirrorProvider ctrlChan chan ctrlAction - stopped chan empty + disabled chan empty enabled bool } @@ -44,12 +44,12 @@ func (m *mirrorJob) Name() string { return m.provider.Name() } -func (m *mirrorJob) Stopped() bool { +func (m *mirrorJob) Disabled() bool { if !m.enabled { return true } select { - case <-m.stopped: + case <-m.disabled: return true default: return false @@ -65,8 +65,8 @@ func (m *mirrorJob) Stopped() bool { // TODO: message struct for managerChan func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error { - m.stopped = make(chan empty) - defer close(m.stopped) + m.disabled = make(chan empty) + defer close(m.disabled) provider := m.provider @@ -210,6 +210,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err close(kill) <-jobDone case jobDisable: + m.enabled = false close(kill) <-jobDone return nil @@ -234,6 +235,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err case jobStop: m.enabled = false case jobDisable: + m.enabled = false return nil case jobRestart: m.enabled = true diff --git a/worker/job_test.go b/worker/job_test.go index 15bc716..1edc14b 100644 --- a/worker/job_test.go +++ b/worker/job_test.go @@ -105,7 +105,7 @@ func TestMirrorJob(t *testing.T) { select { case <-managerChan: So(0, ShouldEqual, 1) // made this fail - case <-job.stopped: + case <-job.disabled: So(0, ShouldEqual, 0) } }) @@ -145,7 +145,7 @@ echo $TUNASYNC_WORKING_DIR So(err, ShouldBeNil) So(string(loggedContent), ShouldEqual, exceptedOutput) job.ctrlChan <- jobDisable - <-job.stopped + <-job.disabled }) Convey("If we don't kill it", func(ctx C) { @@ -168,7 +168,7 @@ echo $TUNASYNC_WORKING_DIR So(err, ShouldBeNil) So(string(loggedContent), ShouldEqual, exceptedOutput) job.ctrlChan <- jobDisable - <-job.stopped + <-job.disabled }) }) diff --git a/worker/loglimit_test.go b/worker/loglimit_test.go index 74d87b6..e42a78a 100644 --- a/worker/loglimit_test.go +++ b/worker/loglimit_test.go @@ -122,7 +122,7 @@ sleep 5 So(msg.status, ShouldEqual, Failed) job.ctrlChan <- jobDisable - <-job.stopped + <-job.disabled So(logFile, ShouldNotEqual, provider.LogFile()) diff --git a/worker/main.go b/worker/main.go deleted file mode 100644 index a51f579..0000000 --- a/worker/main.go +++ /dev/null @@ -1,108 +0,0 @@ -package worker - -import ( - "bytes" - "errors" - "html/template" - "path/filepath" - "time" -) - -// toplevel module for workers - -func initProviders(c *Config) []mirrorProvider { - - formatLogDir := func(logDir string, m mirrorConfig) string { - tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir) - if err != nil { - panic(err) - } - var formatedLogDir bytes.Buffer - tmpl.Execute(&formatedLogDir, m) - return formatedLogDir.String() - } - - providers := []mirrorProvider{} - - for _, mirror := range c.Mirrors { - logDir := mirror.LogDir - mirrorDir := mirror.MirrorDir - if logDir == "" { - logDir = c.Global.LogDir - } - if mirrorDir == "" { - mirrorDir = c.Global.MirrorDir - } - logDir = formatLogDir(logDir, mirror) - switch mirror.Provider { - case ProvCommand: - pc := cmdConfig{ - name: mirror.Name, - upstreamURL: mirror.Upstream, - command: mirror.Command, - workingDir: filepath.Join(mirrorDir, mirror.Name), - logDir: logDir, - logFile: filepath.Join(logDir, "latest.log"), - interval: time.Duration(mirror.Interval) * time.Minute, - env: mirror.Env, - } - p, err := newCmdProvider(pc) - if err != nil { - panic(err) - } - providers = append(providers, p) - case ProvRsync: - rc := rsyncConfig{ - name: mirror.Name, - upstreamURL: mirror.Upstream, - password: mirror.Password, - excludeFile: mirror.ExcludeFile, - workingDir: filepath.Join(mirrorDir, mirror.Name), - logDir: logDir, - logFile: filepath.Join(logDir, "latest.log"), - useIPv6: mirror.UseIPv6, - interval: time.Duration(mirror.Interval) * time.Minute, - } - p, err := newRsyncProvider(rc) - if err != nil { - panic(err) - } - providers = append(providers, p) - case ProvTwoStageRsync: - rc := twoStageRsyncConfig{ - name: mirror.Name, - stage1Profile: mirror.Stage1Profile, - upstreamURL: mirror.Upstream, - password: mirror.Password, - excludeFile: mirror.ExcludeFile, - workingDir: filepath.Join(mirrorDir, mirror.Name), - logDir: logDir, - logFile: filepath.Join(logDir, "latest.log"), - useIPv6: mirror.UseIPv6, - interval: time.Duration(mirror.Interval) * time.Minute, - } - p, err := newTwoStageRsyncProvider(rc) - if err != nil { - panic(err) - } - providers = append(providers, p) - default: - panic(errors.New("Invalid mirror provider")) - - } - - } - return providers -} - -func main() { - - for { - // if time.Now().After() { - // - // } - - time.Sleep(1 * time.Second) - } - -} diff --git a/worker/provider.go b/worker/provider.go index 6aba4dc..498a3e7 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -21,6 +21,7 @@ const ( type mirrorProvider interface { // name Name() string + Upstream() string // run mirror job in background Run() error diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index 96e88c7..49153c9 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -63,6 +63,10 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) { return provider, nil } +func (p *rsyncProvider) Upstream() string { + return p.upstreamURL +} + func (p *rsyncProvider) Run() error { if err := p.Start(); err != nil { return err diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go index 247aa0e..5a53716 100644 --- a/worker/two_stage_rsync_provider.go +++ b/worker/two_stage_rsync_provider.go @@ -70,6 +70,10 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er return provider, nil } +func (p *twoStageRsyncProvider) Upstream() string { + return p.upstreamURL +} + func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) { var options []string if stage == 1 { diff --git a/worker/worker.go b/worker/worker.go new file mode 100644 index 0000000..063f932 --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,342 @@ +package worker + +import ( + "bytes" + "crypto/tls" + "errors" + "fmt" + "html/template" + "net/http" + "path/filepath" + "time" + + "github.com/gin-gonic/gin" + . "github.com/tuna/tunasync/internal" +) + +var tunasyncWorker *Worker + +// A Worker is a instance of tunasync worker +type Worker struct { + cfg *Config + providers map[string]mirrorProvider + jobs map[string]*mirrorJob + + managerChan chan jobMessage + semaphore chan empty + + schedule *scheduleQueue + httpServer *gin.Engine + tlsConfig *tls.Config + + mirrorStatus map[string]SyncStatus +} + +// GetTUNASyncWorker returns a singalton worker +func GetTUNASyncWorker(cfg *Config) *Worker { + if tunasyncWorker != nil { + return tunasyncWorker + } + + w := &Worker{ + cfg: cfg, + providers: make(map[string]mirrorProvider), + jobs: make(map[string]*mirrorJob), + + managerChan: make(chan jobMessage, 32), + semaphore: make(chan empty, cfg.Global.Concurrent), + + schedule: newScheduleQueue(), + mirrorStatus: make(map[string]SyncStatus), + } + w.initJobs() + w.makeHTTPServer() + tunasyncWorker = w + return w +} + +func (w *Worker) initProviders() { + c := w.cfg + + formatLogDir := func(logDir string, m mirrorConfig) string { + tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir) + if err != nil { + panic(err) + } + var formatedLogDir bytes.Buffer + tmpl.Execute(&formatedLogDir, m) + return formatedLogDir.String() + } + + for _, mirror := range c.Mirrors { + logDir := mirror.LogDir + mirrorDir := mirror.MirrorDir + if logDir == "" { + logDir = c.Global.LogDir + } + if mirrorDir == "" { + mirrorDir = c.Global.MirrorDir + } + logDir = formatLogDir(logDir, mirror) + + var provider mirrorProvider + + switch mirror.Provider { + case ProvCommand: + pc := cmdConfig{ + name: mirror.Name, + upstreamURL: mirror.Upstream, + command: mirror.Command, + workingDir: filepath.Join(mirrorDir, mirror.Name), + logDir: logDir, + logFile: filepath.Join(logDir, "latest.log"), + interval: time.Duration(mirror.Interval) * time.Minute, + env: mirror.Env, + } + p, err := newCmdProvider(pc) + if err != nil { + panic(err) + } + provider = p + case ProvRsync: + rc := rsyncConfig{ + name: mirror.Name, + upstreamURL: mirror.Upstream, + password: mirror.Password, + excludeFile: mirror.ExcludeFile, + workingDir: filepath.Join(mirrorDir, mirror.Name), + logDir: logDir, + logFile: filepath.Join(logDir, "latest.log"), + useIPv6: mirror.UseIPv6, + interval: time.Duration(mirror.Interval) * time.Minute, + } + p, err := newRsyncProvider(rc) + if err != nil { + panic(err) + } + provider = p + case ProvTwoStageRsync: + rc := twoStageRsyncConfig{ + name: mirror.Name, + stage1Profile: mirror.Stage1Profile, + upstreamURL: mirror.Upstream, + password: mirror.Password, + excludeFile: mirror.ExcludeFile, + workingDir: filepath.Join(mirrorDir, mirror.Name), + logDir: logDir, + logFile: filepath.Join(logDir, "latest.log"), + useIPv6: mirror.UseIPv6, + interval: time.Duration(mirror.Interval) * time.Minute, + } + p, err := newTwoStageRsyncProvider(rc) + if err != nil { + panic(err) + } + provider = p + default: + panic(errors.New("Invalid mirror provider")) + + } + + provider.AddHook(newLogLimiter(provider)) + w.providers[provider.Name()] = provider + + } +} + +func (w *Worker) initJobs() { + w.initProviders() + + for name, provider := range w.providers { + w.jobs[name] = newMirrorJob(provider) + go w.jobs[name].Run(w.managerChan, w.semaphore) + w.mirrorStatus[name] = Paused + } +} + +// Ctrl server receives commands from the manager +func (w *Worker) makeHTTPServer() { + s := gin.New() + s.Use(gin.Recovery()) + + s.POST("/", func(c *gin.Context) { + var cmd WorkerCmd + + if err := c.BindJSON(&cmd); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"msg": "Invalid request"}) + return + } + job, ok := w.jobs[cmd.MirrorID] + if !ok { + c.JSON(http.StatusNotFound, gin.H{"msg": fmt.Sprintf("Mirror ``%s'' not found", cmd.MirrorID)}) + return + } + // if job disabled, start them first + switch cmd.Cmd { + case CmdStart, CmdRestart: + if job.Disabled() { + go job.Run(w.managerChan, w.semaphore) + } + } + switch cmd.Cmd { + case CmdStart: + job.ctrlChan <- jobStart + case CmdStop: + job.ctrlChan <- jobStop + case CmdRestart: + job.ctrlChan <- jobRestart + case CmdDisable: + w.schedule.Remove(job.Name()) + job.ctrlChan <- jobDisable + <-job.disabled + case CmdPing: + job.ctrlChan <- jobStart + default: + c.JSON(http.StatusNotAcceptable, gin.H{"msg": "Invalid Command"}) + return + } + + c.JSON(http.StatusOK, gin.H{"msg": "OK"}) + }) + + w.httpServer = s +} + +func (w *Worker) runHTTPServer() { + addr := fmt.Sprintf("%s:%d", w.cfg.Server.Addr, w.cfg.Server.Port) + + if w.cfg.Server.SSLCert == "" && w.cfg.Server.SSLKey == "" { + if err := w.httpServer.Run(addr); err != nil { + panic(err) + } + } else { + if err := w.httpServer.RunTLS(addr, w.cfg.Server.SSLCert, w.cfg.Server.SSLKey); err != nil { + panic(err) + } + } +} + +// Run runs worker forever +func (w *Worker) Run() { + w.registorWorker() + go w.runHTTPServer() + w.runSchedule() +} + +func (w *Worker) runSchedule() { + mirrorList := w.fetchJobStatus() + unset := make(map[string]bool) + for name := range w.jobs { + unset[name] = true + } + for _, m := range mirrorList { + if job, ok := w.jobs[m.Name]; ok { + stime := m.LastUpdate.Add(job.provider.Interval()) + w.schedule.AddJob(stime, job) + delete(unset, m.Name) + } + } + for name := range unset { + job := w.jobs[name] + w.schedule.AddJob(time.Now(), job) + } + + for { + select { + case jobMsg := <-w.managerChan: + // got status update from job + w.updateStatus(jobMsg) + status := w.mirrorStatus[jobMsg.name] + if status == Disabled || status == Paused { + continue + } + w.mirrorStatus[jobMsg.name] = jobMsg.status + switch jobMsg.status { + case Success, Failed: + job := w.jobs[jobMsg.name] + w.schedule.AddJob( + time.Now().Add(job.provider.Interval()), + job, + ) + } + + case <-time.Tick(10 * time.Second): + if job := w.schedule.Pop(); job != nil { + job.ctrlChan <- jobStart + } + } + + } + +} + +// Name returns worker name +func (w *Worker) Name() string { + return w.cfg.Global.Name +} + +// URL returns the url to http server of the worker +func (w *Worker) URL() string { + proto := "https" + if w.cfg.Server.SSLCert == "" && w.cfg.Server.SSLKey == "" { + proto = "http" + } + + return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port) +} + +func (w *Worker) registorWorker() { + url := fmt.Sprintf( + "%s/workers", + w.cfg.Manager.APIBase, + ) + + msg := WorkerInfoMsg{ + ID: w.Name(), + URL: w.URL(), + } + + if _, err := PostJSON(url, msg, w.tlsConfig); err != nil { + logger.Error("Failed to register worker") + } +} + +func (w *Worker) updateStatus(jobMsg jobMessage) { + url := fmt.Sprintf( + "%s/%s/jobs/%s", + w.cfg.Manager.APIBase, + w.Name(), + jobMsg.name, + ) + p := w.providers[jobMsg.name] + smsg := MirrorStatus{ + Name: jobMsg.name, + Worker: w.cfg.Global.Name, + IsMaster: true, + Status: jobMsg.status, + LastUpdate: time.Now(), + Upstream: p.Upstream(), + Size: "unknown", + ErrorMsg: jobMsg.msg, + } + + if _, err := PostJSON(url, smsg, w.tlsConfig); err != nil { + logger.Error("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error()) + } +} + +func (w *Worker) fetchJobStatus() []MirrorStatus { + var mirrorList []MirrorStatus + + url := fmt.Sprintf( + "%s/%s/jobs", + w.cfg.Manager.APIBase, + w.Name(), + ) + + if _, err := GetJSON(url, &mirrorList, w.tlsConfig); err != nil { + logger.Error("Failed to fetch job status: %s", err.Error()) + } + + return mirrorList +}