tunasync/worker/job.go

263 lines
5.8 KiB
Go

package worker
import (
"errors"
"fmt"
"sync/atomic"
tunasync "github.com/tuna/tunasync/internal"
)
// this file contains the workflow of a mirror jb
type ctrlAction uint8
const (
jobStart ctrlAction = iota
jobStop // stop syncing keep the job
jobDisable // disable the job (stops goroutine)
jobRestart // restart syncing
jobPing // ensure the goroutine is alive
)
type jobMessage struct {
status tunasync.SyncStatus
name string
msg string
schedule bool
}
const (
// empty state
stateNone uint32 = iota
// ready to run, able to schedule
stateReady
// paused by jobStop
statePaused
// disabled by jobDisable
stateDisabled
)
type mirrorJob struct {
provider mirrorProvider
ctrlChan chan ctrlAction
disabled chan empty
state uint32
}
func newMirrorJob(provider mirrorProvider) *mirrorJob {
return &mirrorJob{
provider: provider,
ctrlChan: make(chan ctrlAction, 1),
state: stateNone,
}
}
func (m *mirrorJob) Name() string {
return m.provider.Name()
}
func (m *mirrorJob) State() uint32 {
return atomic.LoadUint32(&(m.state))
}
func (m *mirrorJob) SetState(state uint32) {
atomic.StoreUint32(&(m.state), state)
}
// runMirrorJob is the goroutine where syncing job runs in
// arguments:
// provider: mirror provider object
// ctrlChan: receives messages from the manager
// managerChan: push messages to the manager, this channel should have a larger buffer
// sempaphore: make sure the concurrent running syncing job won't explode
// TODO: message struct for managerChan
func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
m.disabled = make(chan empty)
defer func() {
close(m.disabled)
m.SetState(stateDisabled)
}()
provider := m.provider
// to make code shorter
runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
for _, hook := range Hooks {
if err := action(hook); err != nil {
logger.Error(
"failed at %s hooks for %s: %s",
hookname, m.Name(), err.Error(),
)
managerChan <- jobMessage{
tunasync.Failed, m.Name(),
fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
false,
}
return err
}
}
return nil
}
runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
defer close(jobDone)
managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), "", false}
logger.Info("start syncing: %s", m.Name())
Hooks := provider.Hooks()
rHooks := []jobHook{}
for i := len(Hooks); i > 0; i-- {
rHooks = append(rHooks, Hooks[i-1])
}
logger.Debug("hooks: pre-job")
err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
if err != nil {
return err
}
for retry := 0; retry < maxRetry; retry++ {
stopASAP := false // stop job as soon as possible
if retry > 0 {
logger.Info("retry syncing: %s, retry: %d", m.Name(), retry)
}
err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
if err != nil {
return err
}
// start syncing
managerChan <- jobMessage{tunasync.Syncing, m.Name(), "", false}
var syncErr error
syncDone := make(chan error, 1)
go func() {
err := provider.Run()
if !stopASAP {
syncDone <- err
}
}()
select {
case syncErr = <-syncDone:
logger.Debug("syncing done")
case <-kill:
logger.Debug("received kill")
stopASAP = true
err := provider.Terminate()
if err != nil {
logger.Error("failed to terminate provider %s: %s", m.Name(), err.Error())
return err
}
syncErr = errors.New("killed by manager")
}
// post-exec hooks
herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
if herr != nil {
return herr
}
if syncErr == nil {
// syncing success
logger.Info("succeeded syncing %s", m.Name())
managerChan <- jobMessage{tunasync.Success, m.Name(), "", true}
// post-success hooks
err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
if err != nil {
return err
}
return nil
}
// syncing failed
logger.Warning("failed syncing %s: %s", m.Name(), syncErr.Error())
managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), retry == maxRetry-1}
// post-fail hooks
logger.Debug("post-fail hooks")
err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
if err != nil {
return err
}
// gracefully exit
if stopASAP {
logger.Debug("No retry, exit directly")
return nil
}
// continue to next retry
} // for retry
return nil
}
runJob := func(kill <-chan empty, jobDone chan<- empty) {
select {
case semaphore <- empty{}:
defer func() { <-semaphore }()
runJobWrapper(kill, jobDone)
case <-kill:
jobDone <- empty{}
return
}
}
for {
if m.State() == stateReady {
kill := make(chan empty)
jobDone := make(chan empty)
go runJob(kill, jobDone)
_wait_for_job:
select {
case <-jobDone:
logger.Debug("job done")
case ctrl := <-m.ctrlChan:
switch ctrl {
case jobStop:
m.SetState(statePaused)
close(kill)
<-jobDone
case jobDisable:
m.SetState(stateDisabled)
close(kill)
<-jobDone
return nil
case jobRestart:
m.SetState(stateReady)
close(kill)
<-jobDone
continue
case jobStart:
m.SetState(stateReady)
goto _wait_for_job
default:
// TODO: implement this
close(kill)
return nil
}
}
}
ctrl := <-m.ctrlChan
switch ctrl {
case jobStop:
m.SetState(statePaused)
case jobDisable:
m.SetState(stateDisabled)
return nil
case jobRestart:
m.SetState(stateReady)
case jobStart:
m.SetState(stateReady)
default:
// TODO
return nil
}
}
}