diff --git a/worker/schedule.go b/worker/schedule.go index 33d0d4f..c16ab9d 100644 --- a/worker/schedule.go +++ b/worker/schedule.go @@ -12,6 +12,7 @@ import ( type scheduleQueue struct { sync.Mutex list *skiplist.SkipList + jobs map[string]bool } func timeLessThan(l, r interface{}) bool { @@ -23,12 +24,18 @@ func timeLessThan(l, r interface{}) bool { func newScheduleQueue() *scheduleQueue { queue := new(scheduleQueue) queue.list = skiplist.NewCustomMap(timeLessThan) + queue.jobs = make(map[string]bool) return queue } func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) { q.Lock() defer q.Unlock() + if _, ok := q.jobs[job.Name()]; ok { + logger.Warningf("Job %s already scheduled, removing the existing one", job.Name()) + q.unsafeRemove(job.Name()) + } + q.jobs[job.Name()] = true q.list.Set(schedTime, job) logger.Debugf("Added job %s @ %v", job.Name(), schedTime) } @@ -45,10 +52,11 @@ func (q *scheduleQueue) Pop() *mirrorJob { defer first.Close() t := first.Key().(time.Time) - // logger.Debug("First job should run @%v", t) if t.Before(time.Now()) { job := first.Value().(*mirrorJob) q.list.Delete(first.Key()) + delete(q.jobs, job.Name()) + logger.Debug("Popped out job %s @%v", job.Name(), t) return job } return nil @@ -58,7 +66,11 @@ func (q *scheduleQueue) Pop() *mirrorJob { func (q *scheduleQueue) Remove(name string) bool { q.Lock() defer q.Unlock() + return q.unsafeRemove(name) +} +// remove job +func (q *scheduleQueue) unsafeRemove(name string) bool { cur := q.list.Iterator() defer cur.Close() @@ -66,6 +78,7 @@ func (q *scheduleQueue) Remove(name string) bool { cj := cur.Value().(*mirrorJob) if cj.Name() == name { q.list.Delete(cur.Key()) + delete(q.jobs, name) return true } } diff --git a/worker/schedule_test.go b/worker/schedule_test.go index 8bf3bc5..80475fe 100644 --- a/worker/schedule_test.go +++ b/worker/schedule_test.go @@ -30,6 +30,24 @@ func TestSchedule(t *testing.T) { time.Sleep(1200 * time.Millisecond) So(schedule.Pop(), ShouldEqual, job) + }) + Convey("When adding one job twice", func() { + c := cmdConfig{ + name: "schedule_test", + } + provider, _ := newCmdProvider(c) + job := newMirrorJob(provider) + sched := time.Now().Add(1 * time.Second) + + schedule.AddJob(sched, job) + schedule.AddJob(sched.Add(1*time.Second), job) + + So(schedule.Pop(), ShouldBeNil) + time.Sleep(1200 * time.Millisecond) + So(schedule.Pop(), ShouldBeNil) + time.Sleep(1200 * time.Millisecond) + So(schedule.Pop(), ShouldEqual, job) + }) Convey("When removing jobs", func() { c := cmdConfig{ diff --git a/worker/worker.go b/worker/worker.go index c6a8d31..c68c6d1 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -109,7 +109,7 @@ func (w *Worker) ReloadMirrorConfig(newMirrors []mirrorConfig) { job.SetState(statePaused) go job.Run(w.managerChan, w.semaphore) } else { - job.SetState(stateReady) + job.SetState(stateNone) go job.Run(w.managerChan, w.semaphore) w.schedule.AddJob(time.Now(), job) } @@ -125,7 +125,7 @@ func (w *Worker) ReloadMirrorConfig(newMirrors []mirrorConfig) { job := newMirrorJob(provider) w.jobs[provider.Name()] = job - job.SetState(stateReady) + job.SetState(stateNone) go job.Run(w.managerChan, w.semaphore) w.schedule.AddJob(time.Now(), job) logger.Noticef("New job %s", job.Name()) @@ -166,6 +166,9 @@ func (w *Worker) makeHTTPServer() { } logger.Noticef("Received command: %v", cmd) + // No matter what command, the existing job + // schedule should be flushed + w.schedule.Remove(job.Name()) // if job disabled, start them first switch cmd.Cmd { case CmdStart, CmdRestart: @@ -181,14 +184,13 @@ func (w *Worker) makeHTTPServer() { case CmdStop: // if job is disabled, no goroutine would be there // receiving this signal - w.schedule.Remove(job.Name()) if job.State() != stateDisabled { job.ctrlChan <- jobStop } case CmdDisable: w.disableJob(job) case CmdPing: - job.ctrlChan <- jobStart + // empty default: c.JSON(http.StatusNotAcceptable, gin.H{"msg": "Invalid Command"}) return @@ -250,7 +252,7 @@ func (w *Worker) runSchedule() { go job.Run(w.managerChan, w.semaphore) continue default: - job.SetState(stateReady) + job.SetState(stateNone) go job.Run(w.managerChan, w.semaphore) stime := m.LastUpdate.Add(job.provider.Interval()) logger.Debugf("Scheduling job %s @%s", job.Name(), stime.Format("2006-01-02 15:04:05")) @@ -263,7 +265,7 @@ func (w *Worker) runSchedule() { // manager's mirror list for name := range unset { job := w.jobs[name] - job.SetState(stateReady) + job.SetState(stateNone) go job.Run(w.managerChan, w.semaphore) w.schedule.AddJob(time.Now(), job) }