From ddc9efd1556b7d3a96a371317d1521eb25a9f718 Mon Sep 17 00:00:00 2001 From: zyx Date: Thu, 11 Apr 2019 12:36:18 +0800 Subject: [PATCH] report next scheduled sync time --- internal/msg.go | 10 ++++++++++ internal/status_web.go | 4 ++++ manager/server.go | 43 ++++++++++++++++++++++++++++++++++++++++++ worker/schedule.go | 19 +++++++++++++++++++ worker/worker.go | 27 ++++++++++++++++++++++++++ 5 files changed, 103 insertions(+) diff --git a/internal/msg.go b/internal/msg.go index 14a9102..496907b 100644 --- a/internal/msg.go +++ b/internal/msg.go @@ -14,6 +14,7 @@ type MirrorStatus struct { Status SyncStatus `json:"status"` LastUpdate time.Time `json:"last_update"` LastEnded time.Time `json:"last_ended"` + Scheduled time.Time `json:"next_schedule"` Upstream string `json:"upstream"` Size string `json:"size"` ErrorMsg string `json:"error_msg"` @@ -28,6 +29,15 @@ type WorkerStatus struct { LastOnline time.Time `json:"last_online"` // last seen } +type MirrorSchedules struct { + Schedules []MirrorSchedule `json:"schedules"` +} + +type MirrorSchedule struct { + MirrorName string `json:"name"` + NextSchedule time.Time `json:"next_schedule"` +} + // A CmdVerb is an action to a job or worker type CmdVerb uint8 diff --git a/internal/status_web.go b/internal/status_web.go index 9329c96..5fa6114 100644 --- a/internal/status_web.go +++ b/internal/status_web.go @@ -45,6 +45,8 @@ type WebMirrorStatus struct { LastUpdateTs stampTime `json:"last_update_ts"` LastEnded textTime `json:"last_ended"` LastEndedTs stampTime `json:"last_ended_ts"` + Scheduled textTime `json:"next_schedule"` + ScheduledTs stampTime `json:"next_schedule_ts"` Upstream string `json:"upstream"` Size string `json:"size"` // approximate size } @@ -58,6 +60,8 @@ func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus { LastUpdateTs: stampTime{m.LastUpdate}, LastEnded: textTime{m.LastEnded}, LastEndedTs: stampTime{m.LastEnded}, + Scheduled: textTime{m.Scheduled}, + ScheduledTs: stampTime{m.Scheduled}, Upstream: m.Upstream, Size: m.Size, } diff --git a/manager/server.go b/manager/server.go index 56ac986..5e6b185 100644 --- a/manager/server.go +++ b/manager/server.go @@ -91,6 +91,7 @@ func GetTUNASyncManager(cfg *Config) *Manager { // post job status workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker) workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize) + workerValidateGroup.POST(":id/schedules", s.updateSchedulesOfWorker) } // for tunasynctl to post commands @@ -240,6 +241,48 @@ func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) { }) } +func (s *Manager) updateSchedulesOfWorker(c *gin.Context) { + workerID := c.Param("id") + var schedules MirrorSchedules + c.BindJSON(&schedules) + + for _, schedule := range schedules.Schedules { + mirrorName := schedule.MirrorName + if len(mirrorName) == 0 { + s.returnErrJSON( + c, http.StatusBadRequest, + errors.New("Mirror Name should not be empty"), + ) + } + + curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName) + if err != nil { + fmt.Errorf("failed to get job %s of worker %s: %s", + mirrorName, workerID, err.Error(), + ) + continue + } + + if curStatus.Scheduled == schedule.NextSchedule { + // no changes, skip update + continue + } + + curStatus.Scheduled = schedule.NextSchedule + _, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus) + if err != nil { + err := fmt.Errorf("failed to update job %s of worker %s: %s", + mirrorName, workerID, err.Error(), + ) + c.Error(err) + s.returnErrJSON(c, http.StatusInternalServerError, err) + return + } + } + type empty struct{} + c.JSON(http.StatusOK, empty{}) +} + func (s *Manager) updateJobOfWorker(c *gin.Context) { workerID := c.Param("id") var status MirrorStatus diff --git a/worker/schedule.go b/worker/schedule.go index c16ab9d..2a94654 100644 --- a/worker/schedule.go +++ b/worker/schedule.go @@ -15,6 +15,11 @@ type scheduleQueue struct { jobs map[string]bool } +type jobScheduleInfo struct { + jobName string + nextScheduled time.Time +} + func timeLessThan(l, r interface{}) bool { tl := l.(time.Time) tr := r.(time.Time) @@ -28,6 +33,20 @@ func newScheduleQueue() *scheduleQueue { return queue } +func (q *scheduleQueue) GetJobs() (jobs []jobScheduleInfo) { + cur := q.list.Iterator() + defer cur.Close() + + for cur.Next() { + cj := cur.Value().(*mirrorJob) + jobs = append(jobs, jobScheduleInfo{ + cj.Name(), + cur.Key().(time.Time), + }) + } + return +} + func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) { q.Lock() defer q.Unlock() diff --git a/worker/worker.go b/worker/worker.go index 321bbc5..0209962 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -304,6 +304,9 @@ func (w *Worker) runSchedule() { w.L.Unlock() + schedInfo := w.schedule.GetJobs() + w.updateSchedInfo(schedInfo) + tick := time.Tick(5 * time.Second) for { select { @@ -340,6 +343,9 @@ func (w *Worker) runSchedule() { w.schedule.AddJob(schedTime, job) } + schedInfo = w.schedule.GetJobs() + w.updateSchedInfo(schedInfo) + case <-tick: // check schedule every 5 seconds if job := w.schedule.Pop(); job != nil { @@ -421,6 +427,27 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) { } } +func (w *Worker) updateSchedInfo(schedInfo []jobScheduleInfo) { + var s []MirrorSchedule + for _, sched := range schedInfo { + s = append(s, MirrorSchedule{ + MirrorName: sched.jobName, + NextSchedule: sched.nextScheduled, + }) + } + msg := MirrorSchedules{Schedules: s} + + for _, root := range w.cfg.Manager.APIBaseList() { + url := fmt.Sprintf( + "%s/workers/%s/schedules", root, w.Name(), + ) + logger.Debugf("reporting on manager url: %s", url) + if _, err := PostJSON(url, msg, w.httpClient); err != nil { + logger.Errorf("Failed to upload schedules: %s", err.Error()) + } + } +} + func (w *Worker) fetchJobStatus() []MirrorStatus { var mirrorList []MirrorStatus apiBase := w.cfg.Manager.APIBaseList()[0]