tunasync/worker/worker.go

469 lines
11 KiB
Go

package worker
import (
"fmt"
"net/http"
"os"
"sync"
"syscall"
"time"
"github.com/gin-gonic/gin"
. "github.com/tuna/tunasync/internal"
)
// A Worker is a instance of tunasync worker
type Worker struct {
L sync.Mutex
cfg *Config
jobs map[string]*mirrorJob
managerChan chan jobMessage
semaphore chan empty
exit chan empty
schedule *scheduleQueue
httpEngine *gin.Engine
httpClient *http.Client
}
// NewTUNASyncWorker creates a worker
func NewTUNASyncWorker(cfg *Config) *Worker {
w := &Worker{
cfg: cfg,
jobs: make(map[string]*mirrorJob),
managerChan: make(chan jobMessage, 32),
semaphore: make(chan empty, cfg.Global.Concurrent),
exit: make(chan empty),
schedule: newScheduleQueue(),
}
if cfg.Manager.CACert != "" {
httpClient, err := CreateHTTPClient(cfg.Manager.CACert)
if err != nil {
logger.Errorf("Error initializing HTTP client: %s", err.Error())
return nil
}
w.httpClient = httpClient
}
w.initJobs()
w.makeHTTPServer()
return w
}
// Run runs worker forever
func (w *Worker) Run() {
w.registorWorker()
go w.runHTTPServer()
w.runSchedule()
}
// Halt stops all jobs
func (w *Worker) Halt() {
w.L.Lock()
logger.Notice("Stopping all the jobs")
for _, job := range w.jobs {
if job.State() != stateDisabled {
job.ctrlChan <- jobHalt
}
}
jobsDone.Wait()
logger.Notice("All the jobs are stopped")
w.L.Unlock()
close(w.exit)
}
// ReloadMirrorConfig refresh the providers and jobs
// from new mirror configs
// TODO: deleted job should be removed from manager-side mirror list
func (w *Worker) ReloadMirrorConfig(newMirrors []mirrorConfig) {
w.L.Lock()
defer w.L.Unlock()
logger.Info("Reloading mirror configs")
oldMirrors := w.cfg.Mirrors
difference := diffMirrorConfig(oldMirrors, newMirrors)
// first deal with deletion and modifications
for _, op := range difference {
if op.diffOp == diffAdd {
continue
}
name := op.mirCfg.Name
job, ok := w.jobs[name]
if !ok {
logger.Warningf("Job %s not found", name)
continue
}
switch op.diffOp {
case diffDelete:
w.disableJob(job)
delete(w.jobs, name)
logger.Noticef("Deleted job %s", name)
case diffModify:
jobState := job.State()
w.disableJob(job)
// set new provider
provider := newMirrorProvider(op.mirCfg, w.cfg)
if err := job.SetProvider(provider); err != nil {
logger.Errorf("Error setting job provider of %s: %s", name, err.Error())
continue
}
// re-schedule job according to its previous state
if jobState == stateDisabled {
job.SetState(stateDisabled)
} else if jobState == statePaused {
job.SetState(statePaused)
go job.Run(w.managerChan, w.semaphore)
} else {
job.SetState(stateNone)
go job.Run(w.managerChan, w.semaphore)
w.schedule.AddJob(time.Now(), job)
}
logger.Noticef("Reloaded job %s", name)
}
}
// for added new jobs, just start new jobs
for _, op := range difference {
if op.diffOp != diffAdd {
continue
}
provider := newMirrorProvider(op.mirCfg, w.cfg)
job := newMirrorJob(provider)
w.jobs[provider.Name()] = job
job.SetState(stateNone)
go job.Run(w.managerChan, w.semaphore)
w.schedule.AddJob(time.Now(), job)
logger.Noticef("New job %s", job.Name())
}
w.cfg.Mirrors = newMirrors
}
func (w *Worker) initJobs() {
for _, mirror := range w.cfg.Mirrors {
// Create Provider
provider := newMirrorProvider(mirror, w.cfg)
w.jobs[provider.Name()] = newMirrorJob(provider)
}
}
func (w *Worker) disableJob(job *mirrorJob) {
w.schedule.Remove(job.Name())
if job.State() != stateDisabled {
job.ctrlChan <- jobDisable
<-job.disabled
}
}
// Ctrl server receives commands from the manager
func (w *Worker) makeHTTPServer() {
s := gin.New()
s.Use(gin.Recovery())
s.POST("/", func(c *gin.Context) {
w.L.Lock()
defer w.L.Unlock()
var cmd WorkerCmd
if err := c.BindJSON(&cmd); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"msg": "Invalid request"})
return
}
logger.Noticef("Received command: %v", cmd)
if cmd.MirrorID == "" {
// worker-level commands
switch cmd.Cmd {
case CmdReload:
// send myself a SIGHUP
pid := os.Getpid()
syscall.Kill(pid, syscall.SIGHUP)
default:
c.JSON(http.StatusNotAcceptable, gin.H{"msg": "Invalid Command"})
return
}
}
// job level comands
job, ok := w.jobs[cmd.MirrorID]
if !ok {
c.JSON(http.StatusNotFound, gin.H{"msg": fmt.Sprintf("Mirror ``%s'' not found", cmd.MirrorID)})
return
}
// No matter what command, the existing job
// schedule should be flushed
w.schedule.Remove(job.Name())
// if job disabled, start them first
switch cmd.Cmd {
case CmdStart, CmdRestart:
if job.State() == stateDisabled {
go job.Run(w.managerChan, w.semaphore)
}
}
switch cmd.Cmd {
case CmdStart:
if cmd.Options["force"] {
job.ctrlChan <- jobForceStart
} else {
job.ctrlChan <- jobStart
}
case CmdRestart:
job.ctrlChan <- jobRestart
case CmdStop:
// if job is disabled, no goroutine would be there
// receiving this signal
if job.State() != stateDisabled {
job.ctrlChan <- jobStop
}
case CmdDisable:
w.disableJob(job)
case CmdPing:
// empty
default:
c.JSON(http.StatusNotAcceptable, gin.H{"msg": "Invalid Command"})
return
}
c.JSON(http.StatusOK, gin.H{"msg": "OK"})
})
w.httpEngine = s
}
func (w *Worker) runHTTPServer() {
addr := fmt.Sprintf("%s:%d", w.cfg.Server.Addr, w.cfg.Server.Port)
httpServer := &http.Server{
Addr: addr,
Handler: w.httpEngine,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
if w.cfg.Server.SSLCert == "" && w.cfg.Server.SSLKey == "" {
if err := httpServer.ListenAndServe(); err != nil {
panic(err)
}
} else {
if err := httpServer.ListenAndServeTLS(w.cfg.Server.SSLCert, w.cfg.Server.SSLKey); err != nil {
panic(err)
}
}
}
func (w *Worker) runSchedule() {
w.L.Lock()
mirrorList := w.fetchJobStatus()
unset := make(map[string]bool)
for name := range w.jobs {
unset[name] = true
}
// Fetch mirror list stored in the manager
// put it on the scheduled time
// if it's disabled, ignore it
for _, m := range mirrorList {
if job, ok := w.jobs[m.Name]; ok {
delete(unset, m.Name)
switch m.Status {
case Disabled:
job.SetState(stateDisabled)
continue
case Paused:
job.SetState(statePaused)
go job.Run(w.managerChan, w.semaphore)
continue
default:
job.SetState(stateNone)
go job.Run(w.managerChan, w.semaphore)
stime := m.LastUpdate.Add(job.provider.Interval())
logger.Debugf("Scheduling job %s @%s", job.Name(), stime.Format("2006-01-02 15:04:05"))
w.schedule.AddJob(stime, job)
}
}
}
// some new jobs may be added
// which does not exist in the
// manager's mirror list
for name := range unset {
job := w.jobs[name]
job.SetState(stateNone)
go job.Run(w.managerChan, w.semaphore)
w.schedule.AddJob(time.Now(), job)
}
w.L.Unlock()
schedInfo := w.schedule.GetJobs()
w.updateSchedInfo(schedInfo)
tick := time.Tick(5 * time.Second)
for {
select {
case jobMsg := <-w.managerChan:
// got status update from job
w.L.Lock()
job, ok := w.jobs[jobMsg.name]
w.L.Unlock()
if !ok {
logger.Warningf("Job %s not found", jobMsg.name)
continue
}
if (job.State() != stateReady) && (job.State() != stateHalting) {
logger.Infof("Job %s state is not ready, skip adding new schedule", jobMsg.name)
continue
}
// syncing status is only meaningful when job
// is running. If it's paused or disabled
// a sync failure signal would be emitted
// which needs to be ignored
w.updateStatus(job, jobMsg)
// only successful or the final failure msg
// can trigger scheduling
if jobMsg.schedule {
schedTime := time.Now().Add(job.provider.Interval())
logger.Noticef(
"Next scheduled time for %s: %s",
job.Name(),
schedTime.Format("2006-01-02 15:04:05"),
)
w.schedule.AddJob(schedTime, job)
}
schedInfo = w.schedule.GetJobs()
w.updateSchedInfo(schedInfo)
case <-tick:
// check schedule every 5 seconds
if job := w.schedule.Pop(); job != nil {
job.ctrlChan <- jobStart
}
case <-w.exit:
// flush status update messages
w.L.Lock()
defer w.L.Unlock()
for {
select {
case jobMsg := <-w.managerChan:
logger.Debugf("status update from %s", jobMsg.name)
job, ok := w.jobs[jobMsg.name]
if !ok {
continue
}
if jobMsg.status == Failed || jobMsg.status == Success {
w.updateStatus(job, jobMsg)
}
default:
return
}
}
}
}
}
// Name returns worker name
func (w *Worker) Name() string {
return w.cfg.Global.Name
}
// URL returns the url to http server of the worker
func (w *Worker) URL() string {
proto := "https"
if w.cfg.Server.SSLCert == "" && w.cfg.Server.SSLKey == "" {
proto = "http"
}
return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port)
}
func (w *Worker) registorWorker() {
msg := WorkerStatus{
ID: w.Name(),
URL: w.URL(),
}
for _, root := range w.cfg.Manager.APIBaseList() {
url := fmt.Sprintf("%s/workers", root)
logger.Debugf("register on manager url: %s", url)
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
logger.Errorf("Failed to register worker")
}
}
}
func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
p := job.provider
smsg := MirrorStatus{
Name: jobMsg.name,
Worker: w.cfg.Global.Name,
IsMaster: p.IsMaster(),
Status: jobMsg.status,
Upstream: p.Upstream(),
Size: "unknown",
ErrorMsg: jobMsg.msg,
}
// Certain Providers (rsync for example) may know the size of mirror,
// so we report it to Manager here
if len(job.size) != 0 {
smsg.Size = job.size
}
for _, root := range w.cfg.Manager.APIBaseList() {
url := fmt.Sprintf(
"%s/workers/%s/jobs/%s", root, w.Name(), jobMsg.name,
)
logger.Debugf("reporting on manager url: %s", url)
if _, err := PostJSON(url, smsg, w.httpClient); err != nil {
logger.Errorf("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error())
}
}
}
func (w *Worker) updateSchedInfo(schedInfo []jobScheduleInfo) {
var s []MirrorSchedule
for _, sched := range schedInfo {
s = append(s, MirrorSchedule{
MirrorName: sched.jobName,
NextSchedule: sched.nextScheduled,
})
}
msg := MirrorSchedules{Schedules: s}
for _, root := range w.cfg.Manager.APIBaseList() {
url := fmt.Sprintf(
"%s/workers/%s/schedules", root, w.Name(),
)
logger.Debugf("reporting on manager url: %s", url)
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
logger.Errorf("Failed to upload schedules: %s", err.Error())
}
}
}
func (w *Worker) fetchJobStatus() []MirrorStatus {
var mirrorList []MirrorStatus
apiBase := w.cfg.Manager.APIBaseList()[0]
url := fmt.Sprintf("%s/workers/%s/jobs", apiBase, w.Name())
if _, err := GetJSON(url, &mirrorList, w.httpClient); err != nil {
logger.Errorf("Failed to fetch job status: %s", err.Error())
}
return mirrorList
}