mirror of
https://github.com/tuna/tunasync.git
synced 2025-04-21 04:42:46 +00:00
commit
ba46e3a63a
@ -61,8 +61,9 @@ func startWorker(c *cli.Context) {
|
||||
time.Sleep(1 * time.Second)
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGHUP)
|
||||
for {
|
||||
s := <-sigChan
|
||||
signal.Notify(sigChan, syscall.SIGINT)
|
||||
signal.Notify(sigChan, syscall.SIGTERM)
|
||||
for s := range sigChan {
|
||||
switch s {
|
||||
case syscall.SIGHUP:
|
||||
logger.Info("Received reload signal")
|
||||
@ -71,6 +72,8 @@ func startWorker(c *cli.Context) {
|
||||
logger.Errorf("Error loading config: %s", err.Error())
|
||||
}
|
||||
w.ReloadMirrorConfig(newCfg.Mirrors)
|
||||
case syscall.SIGINT, syscall.SIGTERM:
|
||||
w.Halt()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -3,6 +3,7 @@ package worker
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
tunasync "github.com/tuna/tunasync/internal"
|
||||
@ -18,6 +19,7 @@ const (
|
||||
jobDisable // disable the job (stops goroutine)
|
||||
jobRestart // restart syncing
|
||||
jobPing // ensure the goroutine is alive
|
||||
jobHalt // worker halts
|
||||
)
|
||||
|
||||
type jobMessage struct {
|
||||
@ -36,8 +38,14 @@ const (
|
||||
statePaused
|
||||
// disabled by jobDisable
|
||||
stateDisabled
|
||||
// worker is halting
|
||||
stateHalting
|
||||
)
|
||||
|
||||
// use to ensure all jobs are finished before
|
||||
// worker exit
|
||||
var jobsDone sync.WaitGroup
|
||||
|
||||
type mirrorJob struct {
|
||||
provider mirrorProvider
|
||||
ctrlChan chan ctrlAction
|
||||
@ -82,11 +90,11 @@ func (m *mirrorJob) SetProvider(provider mirrorProvider) error {
|
||||
// sempaphore: make sure the concurrent running syncing job won't explode
|
||||
// TODO: message struct for managerChan
|
||||
func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
|
||||
|
||||
jobsDone.Add(1)
|
||||
m.disabled = make(chan empty)
|
||||
defer func() {
|
||||
close(m.disabled)
|
||||
m.SetState(stateDisabled)
|
||||
jobsDone.Done()
|
||||
}()
|
||||
|
||||
provider := m.provider
|
||||
@ -244,6 +252,11 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
case jobStart:
|
||||
m.SetState(stateReady)
|
||||
goto _wait_for_job
|
||||
case jobHalt:
|
||||
m.SetState(stateHalting)
|
||||
close(kill)
|
||||
<-jobDone
|
||||
return nil
|
||||
default:
|
||||
// TODO: implement this
|
||||
close(kill)
|
||||
|
@ -20,6 +20,7 @@ type Worker struct {
|
||||
|
||||
managerChan chan jobMessage
|
||||
semaphore chan empty
|
||||
exit chan empty
|
||||
|
||||
schedule *scheduleQueue
|
||||
httpEngine *gin.Engine
|
||||
@ -38,6 +39,7 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
|
||||
|
||||
managerChan: make(chan jobMessage, 32),
|
||||
semaphore: make(chan empty, cfg.Global.Concurrent),
|
||||
exit: make(chan empty),
|
||||
|
||||
schedule: newScheduleQueue(),
|
||||
}
|
||||
@ -57,12 +59,26 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *Worker) initJobs() {
|
||||
for _, mirror := range w.cfg.Mirrors {
|
||||
// Create Provider
|
||||
provider := newMirrorProvider(mirror, w.cfg)
|
||||
w.jobs[provider.Name()] = newMirrorJob(provider)
|
||||
// Run runs worker forever
|
||||
func (w *Worker) Run() {
|
||||
w.registorWorker()
|
||||
go w.runHTTPServer()
|
||||
w.runSchedule()
|
||||
}
|
||||
|
||||
// Halt stops all jobs
|
||||
func (w *Worker) Halt() {
|
||||
w.L.Lock()
|
||||
logger.Notice("Stopping all the jobs")
|
||||
for _, job := range w.jobs {
|
||||
if job.State() != stateDisabled {
|
||||
job.ctrlChan <- jobHalt
|
||||
}
|
||||
}
|
||||
jobsDone.Wait()
|
||||
logger.Notice("All the jobs are stopped")
|
||||
w.L.Unlock()
|
||||
close(w.exit)
|
||||
}
|
||||
|
||||
// ReloadMirrorConfig refresh the providers and jobs
|
||||
@ -132,7 +148,14 @@ func (w *Worker) ReloadMirrorConfig(newMirrors []mirrorConfig) {
|
||||
}
|
||||
|
||||
w.cfg.Mirrors = newMirrors
|
||||
}
|
||||
|
||||
func (w *Worker) initJobs() {
|
||||
for _, mirror := range w.cfg.Mirrors {
|
||||
// Create Provider
|
||||
provider := newMirrorProvider(mirror, w.cfg)
|
||||
w.jobs[provider.Name()] = newMirrorJob(provider)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) disableJob(job *mirrorJob) {
|
||||
@ -222,13 +245,6 @@ func (w *Worker) runHTTPServer() {
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs worker forever
|
||||
func (w *Worker) Run() {
|
||||
w.registorWorker()
|
||||
go w.runHTTPServer()
|
||||
w.runSchedule()
|
||||
}
|
||||
|
||||
func (w *Worker) runSchedule() {
|
||||
w.L.Lock()
|
||||
|
||||
@ -284,7 +300,7 @@ func (w *Worker) runSchedule() {
|
||||
continue
|
||||
}
|
||||
|
||||
if job.State() != stateReady {
|
||||
if (job.State() != stateReady) && (job.State() != stateHalting) {
|
||||
logger.Infof("Job %s state is not ready, skip adding new schedule", jobMsg.name)
|
||||
continue
|
||||
}
|
||||
@ -312,6 +328,25 @@ func (w *Worker) runSchedule() {
|
||||
if job := w.schedule.Pop(); job != nil {
|
||||
job.ctrlChan <- jobStart
|
||||
}
|
||||
case <-w.exit:
|
||||
// flush status update messages
|
||||
w.L.Lock()
|
||||
defer w.L.Unlock()
|
||||
for {
|
||||
select {
|
||||
case jobMsg := <-w.managerChan:
|
||||
logger.Debugf("status update from %s", jobMsg.name)
|
||||
job, ok := w.jobs[jobMsg.name]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if jobMsg.status == Failed || jobMsg.status == Success {
|
||||
w.updateStatus(job, jobMsg)
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user