From 76ad3d40d6ef37491472e373b88c8ed71a969074 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Mon, 2 May 2016 22:16:49 +0800 Subject: [PATCH] feature(worker): when SIGINT/SIGTERM received, stop all the jobs and update their status before quit. Close #19. --- cmd/tunasync/tunasync.go | 7 +++++-- worker/job.go | 17 +++++++++++++++-- worker/worker.go | 38 +++++++++++++++++++++++++++++++++++++- 3 files changed, 57 insertions(+), 5 deletions(-) diff --git a/cmd/tunasync/tunasync.go b/cmd/tunasync/tunasync.go index aadf868..afcc5c3 100644 --- a/cmd/tunasync/tunasync.go +++ b/cmd/tunasync/tunasync.go @@ -61,8 +61,9 @@ func startWorker(c *cli.Context) { time.Sleep(1 * time.Second) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGHUP) - for { - s := <-sigChan + signal.Notify(sigChan, syscall.SIGINT) + signal.Notify(sigChan, syscall.SIGTERM) + for s := range sigChan { switch s { case syscall.SIGHUP: logger.Info("Received reload signal") @@ -71,6 +72,8 @@ func startWorker(c *cli.Context) { logger.Errorf("Error loading config: %s", err.Error()) } w.ReloadMirrorConfig(newCfg.Mirrors) + case syscall.SIGINT, syscall.SIGTERM: + w.Halt() } } }() diff --git a/worker/job.go b/worker/job.go index d8462fd..a5feca1 100644 --- a/worker/job.go +++ b/worker/job.go @@ -3,6 +3,7 @@ package worker import ( "errors" "fmt" + "sync" "sync/atomic" tunasync "github.com/tuna/tunasync/internal" @@ -18,6 +19,7 @@ const ( jobDisable // disable the job (stops goroutine) jobRestart // restart syncing jobPing // ensure the goroutine is alive + jobHalt // worker halts ) type jobMessage struct { @@ -36,8 +38,14 @@ const ( statePaused // disabled by jobDisable stateDisabled + // worker is halting + stateHalting ) +// use to ensure all jobs are finished before +// worker exit +var jobsDone sync.WaitGroup + type mirrorJob struct { provider mirrorProvider ctrlChan chan ctrlAction @@ -82,11 +90,11 @@ func (m *mirrorJob) SetProvider(provider mirrorProvider) error { // sempaphore: make sure the concurrent running syncing job won't explode // TODO: message struct for managerChan func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error { - + jobsDone.Add(1) m.disabled = make(chan empty) defer func() { close(m.disabled) - m.SetState(stateDisabled) + jobsDone.Done() }() provider := m.provider @@ -244,6 +252,11 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err case jobStart: m.SetState(stateReady) goto _wait_for_job + case jobHalt: + m.SetState(stateHalting) + close(kill) + <-jobDone + return nil default: // TODO: implement this close(kill) diff --git a/worker/worker.go b/worker/worker.go index c68c6d1..4857a86 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -20,6 +20,7 @@ type Worker struct { managerChan chan jobMessage semaphore chan empty + exit chan empty schedule *scheduleQueue httpEngine *gin.Engine @@ -38,6 +39,7 @@ func GetTUNASyncWorker(cfg *Config) *Worker { managerChan: make(chan jobMessage, 32), semaphore: make(chan empty, cfg.Global.Concurrent), + exit: make(chan empty), schedule: newScheduleQueue(), } @@ -222,6 +224,21 @@ func (w *Worker) runHTTPServer() { } } +// Halt stops all jobs +func (w *Worker) Halt() { + w.L.Lock() + logger.Notice("Stopping all the jobs") + for _, job := range w.jobs { + if job.State() != stateDisabled { + job.ctrlChan <- jobHalt + } + } + jobsDone.Wait() + logger.Notice("All the jobs are stopped") + w.L.Unlock() + close(w.exit) +} + // Run runs worker forever func (w *Worker) Run() { w.registorWorker() @@ -284,7 +301,7 @@ func (w *Worker) runSchedule() { continue } - if job.State() != stateReady { + if (job.State() != stateReady) && (job.State() != stateHalting) { logger.Infof("Job %s state is not ready, skip adding new schedule", jobMsg.name) continue } @@ -312,6 +329,25 @@ func (w *Worker) runSchedule() { if job := w.schedule.Pop(); job != nil { job.ctrlChan <- jobStart } + case <-w.exit: + // flush status update messages + w.L.Lock() + defer w.L.Unlock() + for { + select { + case jobMsg := <-w.managerChan: + logger.Debugf("status update from %s", jobMsg.name) + job, ok := w.jobs[jobMsg.name] + if !ok { + continue + } + if jobMsg.status == Failed || jobMsg.status == Success { + w.updateStatus(job, jobMsg) + } + default: + return + } + } } } }