From 41e1f263a5394a0fa7e9fdee3b2d41c50f23ddbf Mon Sep 17 00:00:00 2001 From: bigeagle Date: Fri, 29 Apr 2016 13:52:58 +0800 Subject: [PATCH] refactor(worker): use atomic state to simplify job control --- worker/job.go | 69 ++++++++++++++++++++++++++---------------------- worker/worker.go | 40 +++++++++++++++------------- 2 files changed, 58 insertions(+), 51 deletions(-) diff --git a/worker/job.go b/worker/job.go index de5f79a..924aaf0 100644 --- a/worker/job.go +++ b/worker/job.go @@ -3,6 +3,7 @@ package worker import ( "errors" "fmt" + "sync/atomic" tunasync "github.com/tuna/tunasync/internal" ) @@ -26,22 +27,29 @@ type jobMessage struct { schedule bool } +const ( + // empty state + stateNone uint32 = iota + // ready to run, able to schedule + stateReady + // paused by jobStop + statePaused + // disabled by jobDisable + stateDisabled +) + type mirrorJob struct { - provider mirrorProvider - ctrlChan chan ctrlAction - disabled chan empty - started bool - schedule bool - isDisabled bool + provider mirrorProvider + ctrlChan chan ctrlAction + disabled chan empty + state uint32 } func newMirrorJob(provider mirrorProvider) *mirrorJob { return &mirrorJob{ - provider: provider, - ctrlChan: make(chan ctrlAction, 1), - started: false, - schedule: false, - isDisabled: false, + provider: provider, + ctrlChan: make(chan ctrlAction, 1), + state: stateNone, } } @@ -49,6 +57,14 @@ func (m *mirrorJob) Name() string { return m.provider.Name() } +func (m *mirrorJob) State() uint32 { + return atomic.LoadUint32(&(m.state)) +} + +func (m *mirrorJob) SetState(state uint32) { + atomic.StoreUint32(&(m.state), state) +} + // runMirrorJob is the goroutine where syncing job runs in // arguments: // provider: mirror provider object @@ -61,8 +77,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err m.disabled = make(chan empty) defer func() { close(m.disabled) - m.schedule = false - m.isDisabled = true + m.SetState(stateDisabled) }() provider := m.provider @@ -192,7 +207,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err } for { - if m.started { + if m.State() == stateReady { kill := make(chan empty) jobDone := make(chan empty) go runJob(kill, jobDone) @@ -204,24 +219,21 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err case ctrl := <-m.ctrlChan: switch ctrl { case jobStop: - m.schedule = false - m.started = false + m.SetState(statePaused) close(kill) <-jobDone case jobDisable: - m.schedule = false - m.isDisabled = true - m.started = false + m.SetState(stateDisabled) close(kill) <-jobDone return nil case jobRestart: - m.started = true + m.SetState(stateReady) close(kill) <-jobDone continue case jobStart: - m.started = true + m.SetState(stateReady) goto _wait_for_job default: // TODO: implement this @@ -234,21 +246,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err ctrl := <-m.ctrlChan switch ctrl { case jobStop: - m.schedule = false - m.started = false + m.SetState(statePaused) case jobDisable: - m.schedule = false - m.isDisabled = true - m.started = false + m.SetState(stateDisabled) return nil case jobRestart: - m.schedule = true - m.isDisabled = false - m.started = true + m.SetState(stateReady) case jobStart: - m.schedule = true - m.isDisabled = false - m.started = true + m.SetState(stateReady) default: // TODO return nil diff --git a/worker/worker.go b/worker/worker.go index 9cd3f8a..7f31bf8 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -186,32 +186,24 @@ func (w *Worker) makeHTTPServer() { // if job disabled, start them first switch cmd.Cmd { case CmdStart, CmdRestart: - if job.isDisabled { + if job.State() == stateDisabled { go job.Run(w.managerChan, w.semaphore) } } switch cmd.Cmd { case CmdStart: - job.schedule = true - job.isDisabled = false job.ctrlChan <- jobStart case CmdRestart: - job.schedule = true - job.isDisabled = false job.ctrlChan <- jobRestart case CmdStop: // if job is disabled, no goroutine would be there // receiving this signal - if !job.isDisabled { - job.schedule = false - job.isDisabled = false + if job.State() != stateDisabled { w.schedule.Remove(job.Name()) job.ctrlChan <- jobStop } case CmdDisable: - if !job.isDisabled { - job.schedule = false - job.isDisabled = true + if job.State() != stateDisabled { w.schedule.Remove(job.Name()) job.ctrlChan <- jobDisable <-job.disabled @@ -270,15 +262,15 @@ func (w *Worker) runSchedule() { if job, ok := w.jobs[m.Name]; ok { delete(unset, m.Name) switch m.Status { - case Paused: - go job.Run(w.managerChan, w.semaphore) - job.schedule = false - continue case Disabled: - job.schedule = false - job.isDisabled = true + job.SetState(stateDisabled) + continue + case Paused: + job.SetState(statePaused) + go job.Run(w.managerChan, w.semaphore) continue default: + job.SetState(stateReady) go job.Run(w.managerChan, w.semaphore) stime := m.LastUpdate.Add(job.provider.Interval()) logger.Debug("Scheduling job %s @%s", job.Name(), stime.Format("2006-01-02 15:04:05")) @@ -286,8 +278,12 @@ func (w *Worker) runSchedule() { } } } + // some new jobs may be added + // which does not exist in the + // manager's mirror list for name := range unset { job := w.jobs[name] + job.SetState(stateReady) go job.Run(w.managerChan, w.semaphore) w.schedule.AddJob(time.Now(), job) } @@ -297,13 +293,19 @@ func (w *Worker) runSchedule() { case jobMsg := <-w.managerChan: // got status update from job job := w.jobs[jobMsg.name] - if !job.schedule { - logger.Info("Job %s disabled/paused, skip adding new schedule", jobMsg.name) + if job.State() != stateReady { + logger.Info("Job %s state is not ready, skip adding new schedule", jobMsg.name) continue } + // syncing status is only meaningful when job + // is running. If it's paused or disabled + // a sync failure signal would be emitted + // which needs to be ignored w.updateStatus(jobMsg) + // only successful or the final failure msg + // can trigger scheduling if jobMsg.schedule { schedTime := time.Now().Add(job.provider.Interval()) logger.Info(