diff --git a/internal/msg.go b/internal/msg.go index a433949..85337d6 100644 --- a/internal/msg.go +++ b/internal/msg.go @@ -1,6 +1,9 @@ package internal -import "time" +import ( + "fmt" + "time" +) // A StatusUpdateMsg represents a msg when // a worker has done syncing @@ -34,6 +37,22 @@ const ( CmdPing // ensure the goroutine is alive ) +func (c CmdVerb) String() string { + switch c { + case CmdStart: + return "start" + case CmdStop: + return "stop" + case CmdDisable: + return "disable" + case CmdRestart: + return "restart" + case CmdPing: + return "ping" + } + return "unknown" +} + // A WorkerCmd is the command message send from the // manager to a worker type WorkerCmd struct { @@ -42,6 +61,13 @@ type WorkerCmd struct { Args []string `json:"args"` } +func (c WorkerCmd) String() string { + if len(c.Args) > 0 { + return fmt.Sprintf("%v (%s, %v)", c.Cmd, c.MirrorID, c.Args) + } + return fmt.Sprintf("%v (%s)", c.Cmd, c.MirrorID) +} + // A ClientCmd is the command message send from client // to the manager type ClientCmd struct { diff --git a/manager/server.go b/manager/server.go index 94f40ad..147cecb 100644 --- a/manager/server.go +++ b/manager/server.go @@ -194,6 +194,24 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) { var status MirrorStatus c.BindJSON(&status) mirrorName := status.Name + + curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName) + if err != nil { + err := fmt.Errorf("failed to get job %s of worker %s: %s", + mirrorName, workerID, err.Error(), + ) + c.Error(err) + s.returnErrJSON(c, http.StatusInternalServerError, err) + return + } + + // Only successful syncing needs last_update + if status.Status == Success { + status.LastUpdate = time.Now() + } else { + status.LastUpdate = curStatus.LastUpdate + } + newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status) if err != nil { err := fmt.Errorf("failed to update job %s of worker %s: %s", @@ -231,6 +249,22 @@ func (s *Manager) handleClientCmd(c *gin.Context) { Args: clientCmd.Args, } + // update job status, even if the job did not disable successfully, + // this status should be set as disabled + curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID) + changed := false + switch clientCmd.Cmd { + case CmdDisable: + curStat.Status = Disabled + changed = true + case CmdStop: + curStat.Status = Paused + changed = true + } + if changed { + s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat) + } + // post command to worker _, err = PostJSON(workerURL, workerCmd, s.tlsConfig) if err != nil { diff --git a/worker/job.go b/worker/job.go index f71136a..de5f79a 100644 --- a/worker/job.go +++ b/worker/job.go @@ -20,23 +20,28 @@ const ( ) type jobMessage struct { - status tunasync.SyncStatus - name string - msg string + status tunasync.SyncStatus + name string + msg string + schedule bool } type mirrorJob struct { - provider mirrorProvider - ctrlChan chan ctrlAction - disabled chan empty - enabled bool + provider mirrorProvider + ctrlChan chan ctrlAction + disabled chan empty + started bool + schedule bool + isDisabled bool } func newMirrorJob(provider mirrorProvider) *mirrorJob { return &mirrorJob{ - provider: provider, - ctrlChan: make(chan ctrlAction, 1), - enabled: false, + provider: provider, + ctrlChan: make(chan ctrlAction, 1), + started: false, + schedule: false, + isDisabled: false, } } @@ -44,18 +49,6 @@ func (m *mirrorJob) Name() string { return m.provider.Name() } -func (m *mirrorJob) Disabled() bool { - if !m.enabled { - return true - } - select { - case <-m.disabled: - return true - default: - return false - } -} - // runMirrorJob is the goroutine where syncing job runs in // arguments: // provider: mirror provider object @@ -66,7 +59,11 @@ func (m *mirrorJob) Disabled() bool { func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error { m.disabled = make(chan empty) - defer close(m.disabled) + defer func() { + close(m.disabled) + m.schedule = false + m.isDisabled = true + }() provider := m.provider @@ -81,6 +78,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err managerChan <- jobMessage{ tunasync.Failed, m.Name(), fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()), + false, } return err } @@ -91,7 +89,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error { defer close(jobDone) - managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), ""} + managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), "", false} logger.Info("start syncing: %s", m.Name()) Hooks := provider.Hooks() @@ -118,7 +116,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err } // start syncing - managerChan <- jobMessage{tunasync.Syncing, m.Name(), ""} + managerChan <- jobMessage{tunasync.Syncing, m.Name(), "", false} var syncErr error syncDone := make(chan error, 1) @@ -152,7 +150,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err if syncErr == nil { // syncing success logger.Info("succeeded syncing %s", m.Name()) - managerChan <- jobMessage{tunasync.Success, m.Name(), ""} + managerChan <- jobMessage{tunasync.Success, m.Name(), "", true} // post-success hooks err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success") if err != nil { @@ -164,7 +162,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err // syncing failed logger.Warning("failed syncing %s: %s", m.Name(), syncErr.Error()) - managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error()} + managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), retry == maxRetry-1} // post-fail hooks logger.Debug("post-fail hooks") @@ -194,7 +192,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err } for { - if m.enabled { + if m.started { kill := make(chan empty) jobDone := make(chan empty) go runJob(kill, jobDone) @@ -206,21 +204,24 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err case ctrl := <-m.ctrlChan: switch ctrl { case jobStop: - m.enabled = false + m.schedule = false + m.started = false close(kill) <-jobDone case jobDisable: - m.enabled = false + m.schedule = false + m.isDisabled = true + m.started = false close(kill) <-jobDone return nil case jobRestart: - m.enabled = true + m.started = true close(kill) <-jobDone continue case jobStart: - m.enabled = true + m.started = true goto _wait_for_job default: // TODO: implement this @@ -233,14 +234,21 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err ctrl := <-m.ctrlChan switch ctrl { case jobStop: - m.enabled = false + m.schedule = false + m.started = false case jobDisable: - m.enabled = false + m.schedule = false + m.isDisabled = true + m.started = false return nil case jobRestart: - m.enabled = true + m.schedule = true + m.isDisabled = false + m.started = true case jobStart: - m.enabled = true + m.schedule = true + m.isDisabled = false + m.started = true default: // TODO return nil diff --git a/worker/provider.go b/worker/provider.go index 498a3e7..cf757f0 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -85,6 +85,7 @@ func (p *baseProvider) Context() *Context { } func (p *baseProvider) Interval() time.Duration { + // logger.Debug("interval for %s: %v", p.Name(), p.interval) return p.interval } diff --git a/worker/schedule.go b/worker/schedule.go index cb95a5d..0d3f8f0 100644 --- a/worker/schedule.go +++ b/worker/schedule.go @@ -44,6 +44,7 @@ func (q *scheduleQueue) Pop() *mirrorJob { defer first.Close() t := first.Key().(time.Time) + // logger.Debug("First job should run @%v", t) if t.Before(time.Now()) { job := first.Value().(*mirrorJob) q.list.Delete(first.Key()) diff --git a/worker/worker.go b/worker/worker.go index 56bffa6..40eeaa6 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -28,8 +28,6 @@ type Worker struct { schedule *scheduleQueue httpServer *gin.Engine tlsConfig *tls.Config - - mirrorStatus map[string]SyncStatus } // GetTUNASyncWorker returns a singalton worker @@ -46,8 +44,7 @@ func GetTUNASyncWorker(cfg *Config) *Worker { managerChan: make(chan jobMessage, 32), semaphore: make(chan empty, cfg.Global.Concurrent), - schedule: newScheduleQueue(), - mirrorStatus: make(map[string]SyncStatus), + schedule: newScheduleQueue(), } if cfg.Manager.CACert != "" { @@ -89,6 +86,9 @@ func (w *Worker) initProviders() { c.Global.MirrorDir, mirror.Name, ) } + if mirror.Interval == 0 { + mirror.Interval = c.Global.Interval + } logDir = formatLogDir(logDir, mirror) var provider mirrorProvider @@ -163,8 +163,6 @@ func (w *Worker) initJobs() { for name, provider := range w.providers { w.jobs[name] = newMirrorJob(provider) - go w.jobs[name].Run(w.managerChan, w.semaphore) - w.mirrorStatus[name] = Paused } } @@ -185,24 +183,40 @@ func (w *Worker) makeHTTPServer() { c.JSON(http.StatusNotFound, gin.H{"msg": fmt.Sprintf("Mirror ``%s'' not found", cmd.MirrorID)}) return } + logger.Info("Received command: %v", cmd) // if job disabled, start them first switch cmd.Cmd { case CmdStart, CmdRestart: - if job.Disabled() { + if job.isDisabled { go job.Run(w.managerChan, w.semaphore) } } switch cmd.Cmd { case CmdStart: + job.schedule = true + job.isDisabled = false job.ctrlChan <- jobStart - case CmdStop: - job.ctrlChan <- jobStop case CmdRestart: + job.schedule = true + job.isDisabled = false job.ctrlChan <- jobRestart + case CmdStop: + // if job is disabled, no goroutine would be there + // receiving this signal + if !job.isDisabled { + job.schedule = false + job.isDisabled = false + w.schedule.Remove(job.Name()) + job.ctrlChan <- jobStop + } case CmdDisable: - w.schedule.Remove(job.Name()) - job.ctrlChan <- jobDisable - <-job.disabled + if !job.isDisabled { + job.schedule = false + job.isDisabled = true + w.schedule.Remove(job.Name()) + job.ctrlChan <- jobDisable + <-job.disabled + } case CmdPing: job.ctrlChan <- jobStart default: @@ -243,15 +257,32 @@ func (w *Worker) runSchedule() { for name := range w.jobs { unset[name] = true } + // Fetch mirror list stored in the manager + // put it on the scheduled time + // if it's disabled, ignore it for _, m := range mirrorList { if job, ok := w.jobs[m.Name]; ok { - stime := m.LastUpdate.Add(job.provider.Interval()) - w.schedule.AddJob(stime, job) delete(unset, m.Name) + switch m.Status { + case Paused: + go job.Run(w.managerChan, w.semaphore) + job.schedule = false + continue + case Disabled: + job.schedule = false + job.isDisabled = true + continue + default: + go job.Run(w.managerChan, w.semaphore) + stime := m.LastUpdate.Add(job.provider.Interval()) + logger.Debug("Scheduling job %s @%s", job.Name(), stime.Format("2006-01-02 15:04:05")) + w.schedule.AddJob(stime, job) + } } } for name := range unset { job := w.jobs[name] + go job.Run(w.managerChan, w.semaphore) w.schedule.AddJob(time.Now(), job) } @@ -259,22 +290,26 @@ func (w *Worker) runSchedule() { select { case jobMsg := <-w.managerChan: // got status update from job - w.updateStatus(jobMsg) - status := w.mirrorStatus[jobMsg.name] - if status == Disabled || status == Paused { + job := w.jobs[jobMsg.name] + if !job.schedule { + logger.Info("Job %s disabled/paused, skip adding new schedule", jobMsg.name) continue } - w.mirrorStatus[jobMsg.name] = jobMsg.status - switch jobMsg.status { - case Success, Failed: - job := w.jobs[jobMsg.name] - w.schedule.AddJob( - time.Now().Add(job.provider.Interval()), - job, + + w.updateStatus(jobMsg) + + if jobMsg.schedule { + schedTime := time.Now().Add(job.provider.Interval()) + logger.Info( + "Next scheduled time for %s: %s", + job.Name(), + schedTime.Format("2006-01-02 15:04:05"), ) + w.schedule.AddJob(schedTime, job) } - case <-time.Tick(10 * time.Second): + case <-time.Tick(5 * time.Second): + // check schedule every 5 seconds if job := w.schedule.Pop(); job != nil { job.ctrlChan <- jobStart } @@ -324,14 +359,13 @@ func (w *Worker) updateStatus(jobMsg jobMessage) { ) p := w.providers[jobMsg.name] smsg := MirrorStatus{ - Name: jobMsg.name, - Worker: w.cfg.Global.Name, - IsMaster: true, - Status: jobMsg.status, - LastUpdate: time.Now(), - Upstream: p.Upstream(), - Size: "unknown", - ErrorMsg: jobMsg.msg, + Name: jobMsg.name, + Worker: w.cfg.Global.Name, + IsMaster: true, + Status: jobMsg.status, + Upstream: p.Upstream(), + Size: "unknown", + ErrorMsg: jobMsg.msg, } if _, err := PostJSON(url, smsg, w.tlsConfig); err != nil {