Merge pull request #38 from tuna/dev

Dev
This commit is contained in:
bigeagle 2016-08-02 23:41:25 +08:00 committed by GitHub
commit 1bf0bcb12a
8 changed files with 193 additions and 38 deletions

View File

@ -0,0 +1,14 @@
#! /bin/bash
: ${PROG:=$(basename ${BASH_SOURCE})}
_cli_bash_autocomplete() {
local cur opts base
COMPREPLY=()
cur="${COMP_WORDS[COMP_CWORD]}"
opts=$( ${COMP_WORDS[@]:0:$COMP_CWORD} --generate-bash-completion )
COMPREPLY=( $(compgen -W "${opts}" -- ${cur}) )
return 0
}
complete -F _cli_bash_autocomplete $PROG

View File

@ -0,0 +1,15 @@
autoload -U compinit && compinit
autoload -U bashcompinit && bashcompinit
: ${PROG:=$(basename ${BASH_SOURCE})}
_cli_bash_autocomplete() {
local cur opts base
COMPREPLY=()
cur="${COMP_WORDS[COMP_CWORD]}"
opts=$( ${COMP_WORDS[@]:0:$COMP_CWORD} --generate-bash-completion )
COMPREPLY=( $(compgen -W "${opts}" -- ${cur}) )
return 0
}
complete -F _cli_bash_autocomplete $PROG

View File

@ -23,9 +23,10 @@ var (
)
const (
listJobsPath = "/jobs"
listWorkersPath = "/workers"
cmdPath = "/cmd"
listJobsPath = "/jobs"
listWorkersPath = "/workers"
flushDisabledPath = "/jobs/disabled"
cmdPath = "/cmd"
systemCfgFile = "/etc/tunasync/ctl.conf" // system-wide conf
userCfgFile = "$HOME/.config/tunasync/ctl.conf" // user-specific conf
@ -182,6 +183,38 @@ func listJobs(c *cli.Context) error {
return nil
}
func flushDisabledJobs(c *cli.Context) error {
req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
if err != nil {
logger.Panicf("Invalid HTTP Request: %s", err.Error())
}
resp, err := client.Do(req)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to send request to manager: %s",
err.Error()),
1)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to parse response: %s", err.Error()),
1)
}
return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
" command: HTTP status code is not 200: %s", body),
1)
}
logger.Info("Successfully flushed disabled jobs")
return nil
}
func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
return func(c *cli.Context) error {
var mirrorID string
@ -335,6 +368,12 @@ func main() {
}...),
Action: initializeWrapper(listJobs),
},
{
Name: "flush",
Usage: "Flush disabled jobs",
Flags: commonFlags,
Action: initializeWrapper(flushDisabledJobs),
},
{
Name: "workers",
Usage: "List workers",

View File

@ -27,15 +27,23 @@ type WorkerStatus struct {
LastOnline time.Time `json:"last_online"` // last seen
}
// A CmdVerb is an action to a job or worker
type CmdVerb uint8
const (
CmdStart CmdVerb = iota
CmdStop // stop syncing keep the job
CmdDisable // disable the job (stops goroutine)
CmdRestart // restart syncing
CmdPing // ensure the goroutine is alive
CmdReload // reload mirror config
// CmdStart start a job
CmdStart CmdVerb = iota
// CmdStop stop syncing, but keep the job
CmdStop
// CmdDisable disable the job (stops goroutine)
CmdDisable
// CmdRestart restart a syncing job
CmdRestart
// CmdPing ensures the goroutine is alive
CmdPing
// CmdReload tells a worker to reload mirror config
CmdReload
)
func (c CmdVerb) String() string {

View File

@ -19,6 +19,7 @@ type dbAdapter interface {
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
ListMirrorStatus(workerID string) ([]MirrorStatus, error)
ListAllMirrorStatus() ([]MirrorStatus, error)
FlushDisabledJobs() error
Close() error
}
@ -170,6 +171,26 @@ func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
return
}
func (b *boltAdapter) FlushDisabledJobs() (err error) {
err = b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_statusBucketKey))
c := bucket.Cursor()
var m MirrorStatus
for k, v := c.First(); k != nil; k, v = c.Next() {
jsonErr := json.Unmarshal(v, &m)
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
if m.Status == Disabled {
err = c.Delete()
}
}
return err
})
return
}
func (b *boltAdapter) Close() error {
if b.db != nil {
return b.db.Close()

View File

@ -58,33 +58,46 @@ func TestBoltAdapter(t *testing.T) {
})
Convey("update mirror status", func() {
status1 := MirrorStatus{
Name: "arch-sync1",
Worker: testWorkerIDs[0],
IsMaster: true,
Status: Success,
LastUpdate: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB",
}
status2 := MirrorStatus{
Name: "arch-sync2",
Worker: testWorkerIDs[1],
IsMaster: true,
Status: Success,
LastUpdate: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
status := []MirrorStatus{
MirrorStatus{
Name: "arch-sync1",
Worker: testWorkerIDs[0],
IsMaster: true,
Status: Success,
LastUpdate: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB",
},
MirrorStatus{
Name: "arch-sync2",
Worker: testWorkerIDs[1],
IsMaster: true,
Status: Disabled,
LastUpdate: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
},
MirrorStatus{
Name: "arch-sync3",
Worker: testWorkerIDs[1],
IsMaster: true,
Status: Success,
LastUpdate: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
},
}
_, err := boltDB.UpdateMirrorStatus(status1.Worker, status1.Name, status1)
_, err = boltDB.UpdateMirrorStatus(status2.Worker, status2.Name, status2)
So(err, ShouldBeNil)
for _, s := range status {
_, err := boltDB.UpdateMirrorStatus(s.Worker, s.Name, s)
So(err, ShouldBeNil)
}
Convey("get mirror status", func() {
m, err := boltDB.GetMirrorStatus(testWorkerIDs[0], status1.Name)
m, err := boltDB.GetMirrorStatus(testWorkerIDs[0], status[0].Name)
So(err, ShouldBeNil)
expectedJSON, err := json.Marshal(status1)
expectedJSON, err := json.Marshal(status[0])
So(err, ShouldBeNil)
actualJSON, err := json.Marshal(m)
So(err, ShouldBeNil)
@ -94,7 +107,7 @@ func TestBoltAdapter(t *testing.T) {
Convey("list mirror status", func() {
ms, err := boltDB.ListMirrorStatus(testWorkerIDs[0])
So(err, ShouldBeNil)
expectedJSON, err := json.Marshal([]MirrorStatus{status1})
expectedJSON, err := json.Marshal([]MirrorStatus{status[0]})
So(err, ShouldBeNil)
actualJSON, err := json.Marshal(ms)
So(err, ShouldBeNil)
@ -104,13 +117,24 @@ func TestBoltAdapter(t *testing.T) {
Convey("list all mirror status", func() {
ms, err := boltDB.ListAllMirrorStatus()
So(err, ShouldBeNil)
expectedJSON, err := json.Marshal([]MirrorStatus{status1, status2})
expectedJSON, err := json.Marshal(status)
So(err, ShouldBeNil)
actualJSON, err := json.Marshal(ms)
So(err, ShouldBeNil)
So(string(actualJSON), ShouldEqual, string(expectedJSON))
})
Convey("flush disabled jobs", func() {
ms, err := boltDB.ListAllMirrorStatus()
So(err, ShouldBeNil)
So(len(ms), ShouldEqual, 3)
err = boltDB.FlushDisabledJobs()
So(err, ShouldBeNil)
ms, err = boltDB.ListAllMirrorStatus()
So(err, ShouldBeNil)
So(len(ms), ShouldEqual, 2)
})
})
})

View File

@ -72,6 +72,8 @@ func GetTUNASyncManager(cfg *Config) *Manager {
})
// list jobs, status page
s.engine.GET("/jobs", s.listAllJobs)
// flush disabled jobs
s.engine.DELETE("/jobs/disabled", s.flushDisabledJobs)
// list workers
s.engine.GET("/workers", s.listWorkers)
@ -80,10 +82,12 @@ func GetTUNASyncManager(cfg *Config) *Manager {
// workerID should be valid in this route group
workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
// get job list
workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
// post job status
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
{
// get job list
workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
// post job status
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
}
// for tunasynctl to post commands
s.engine.POST("/cmd", s.handleClientCmd)
@ -139,6 +143,20 @@ func (s *Manager) listAllJobs(c *gin.Context) {
c.JSON(http.StatusOK, webMirStatusList)
}
// flushDisabledJobs deletes all jobs that marks as deleted
func (s *Manager) flushDisabledJobs(c *gin.Context) {
err := s.adapter.FlushDisabledJobs()
if err != nil {
err := fmt.Errorf("failed to flush disabled jobs: %s",
err.Error(),
)
c.Error(err)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
}
// listWrokers respond with informations of all the workers
func (s *Manager) listWorkers(c *gin.Context) {
var workerInfos []WorkerStatus

View File

@ -70,7 +70,6 @@ func TestHTTPServer(t *testing.T) {
So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("list all workers", func(ctx C) {
So(err, ShouldBeNil)
resp, err := http.Get(baseURL + "/workers")
So(err, ShouldBeNil)
defer resp.Body.Close()
@ -80,6 +79,19 @@ func TestHTTPServer(t *testing.T) {
So(len(actualResponseObj), ShouldEqual, 2)
})
Convey("flush disabled jobs", func(ctx C) {
req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
So(err, ShouldBeNil)
clt := &http.Client{}
resp, err := clt.Do(req)
So(err, ShouldBeNil)
defer resp.Body.Close()
res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
So(err, ShouldBeNil)
So(res[_infoKey], ShouldEqual, "flushed")
})
Convey("update mirror status of a existed worker", func(ctx C) {
status := MirrorStatus{
Name: "arch-sync1",
@ -295,6 +307,10 @@ func (b *mockDBAdapter) Close() error {
return nil
}
func (b *mockDBAdapter) FlushDisabledJobs() error {
return nil
}
func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
r := gin.Default()
r.GET("/ping", func(c *gin.Context) {