diff --git a/cmd/tunasynctl/tunasynctl.go b/cmd/tunasynctl/tunasynctl.go index 71f4751..78789f8 100644 --- a/cmd/tunasynctl/tunasynctl.go +++ b/cmd/tunasynctl/tunasynctl.go @@ -23,9 +23,10 @@ var ( ) const ( - listJobsPath = "/jobs" - listWorkersPath = "/workers" - cmdPath = "/cmd" + listJobsPath = "/jobs" + listWorkersPath = "/workers" + flushDisabledPath = "/jobs/disabled" + cmdPath = "/cmd" systemCfgFile = "/etc/tunasync/ctl.conf" // system-wide conf userCfgFile = "$HOME/.config/tunasync/ctl.conf" // user-specific conf @@ -182,6 +183,38 @@ func listJobs(c *cli.Context) error { return nil } +func flushDisabledJobs(c *cli.Context) error { + req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil) + if err != nil { + logger.Panicf("Invalid HTTP Request: %s", err.Error()) + } + resp, err := client.Do(req) + + if err != nil { + return cli.NewExitError( + fmt.Sprintf("Failed to send request to manager: %s", + err.Error()), + 1) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return cli.NewExitError( + fmt.Sprintf("Failed to parse response: %s", err.Error()), + 1) + } + + return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+ + " command: HTTP status code is not 200: %s", body), + 1) + } + + logger.Info("Successfully flushed disabled jobs") + return nil +} + func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc { return func(c *cli.Context) error { var mirrorID string @@ -335,6 +368,12 @@ func main() { }...), Action: initializeWrapper(listJobs), }, + { + Name: "flush", + Usage: "Flush disabled jobs", + Flags: commonFlags, + Action: initializeWrapper(flushDisabledJobs), + }, { Name: "workers", Usage: "List workers", diff --git a/internal/msg.go b/internal/msg.go index b1164ea..7f9db65 100644 --- a/internal/msg.go +++ b/internal/msg.go @@ -27,15 +27,23 @@ type WorkerStatus struct { LastOnline time.Time `json:"last_online"` // last seen } +// A CmdVerb is an action to a job or worker type CmdVerb uint8 const ( - CmdStart CmdVerb = iota - CmdStop // stop syncing keep the job - CmdDisable // disable the job (stops goroutine) - CmdRestart // restart syncing - CmdPing // ensure the goroutine is alive - CmdReload // reload mirror config + // CmdStart start a job + CmdStart CmdVerb = iota + // CmdStop stop syncing, but keep the job + CmdStop + // CmdDisable disable the job (stops goroutine) + CmdDisable + // CmdRestart restart a syncing job + CmdRestart + // CmdPing ensures the goroutine is alive + CmdPing + + // CmdReload tells a worker to reload mirror config + CmdReload ) func (c CmdVerb) String() string { diff --git a/manager/db.go b/manager/db.go index 42623a0..0520570 100644 --- a/manager/db.go +++ b/manager/db.go @@ -19,6 +19,7 @@ type dbAdapter interface { GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) ListMirrorStatus(workerID string) ([]MirrorStatus, error) ListAllMirrorStatus() ([]MirrorStatus, error) + FlushDisabledJobs() error Close() error } @@ -170,6 +171,26 @@ func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { return } +func (b *boltAdapter) FlushDisabledJobs() (err error) { + err = b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + c := bucket.Cursor() + var m MirrorStatus + for k, v := c.First(); k != nil; k, v = c.Next() { + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + if m.Status == Disabled { + err = c.Delete() + } + } + return err + }) + return +} + func (b *boltAdapter) Close() error { if b.db != nil { return b.db.Close() diff --git a/manager/db_test.go b/manager/db_test.go index 5cd7456..1d13eec 100644 --- a/manager/db_test.go +++ b/manager/db_test.go @@ -58,33 +58,46 @@ func TestBoltAdapter(t *testing.T) { }) Convey("update mirror status", func() { - status1 := MirrorStatus{ - Name: "arch-sync1", - Worker: testWorkerIDs[0], - IsMaster: true, - Status: Success, - LastUpdate: time.Now(), - Upstream: "mirrors.tuna.tsinghua.edu.cn", - Size: "3GB", - } - status2 := MirrorStatus{ - Name: "arch-sync2", - Worker: testWorkerIDs[1], - IsMaster: true, - Status: Success, - LastUpdate: time.Now(), - Upstream: "mirrors.tuna.tsinghua.edu.cn", - Size: "4GB", + status := []MirrorStatus{ + MirrorStatus{ + Name: "arch-sync1", + Worker: testWorkerIDs[0], + IsMaster: true, + Status: Success, + LastUpdate: time.Now(), + Upstream: "mirrors.tuna.tsinghua.edu.cn", + Size: "3GB", + }, + MirrorStatus{ + Name: "arch-sync2", + Worker: testWorkerIDs[1], + IsMaster: true, + Status: Disabled, + LastUpdate: time.Now(), + Upstream: "mirrors.tuna.tsinghua.edu.cn", + Size: "4GB", + }, + MirrorStatus{ + Name: "arch-sync3", + Worker: testWorkerIDs[1], + IsMaster: true, + Status: Success, + LastUpdate: time.Now(), + Upstream: "mirrors.tuna.tsinghua.edu.cn", + Size: "4GB", + }, } - _, err := boltDB.UpdateMirrorStatus(status1.Worker, status1.Name, status1) - _, err = boltDB.UpdateMirrorStatus(status2.Worker, status2.Name, status2) - So(err, ShouldBeNil) + for _, s := range status { + _, err := boltDB.UpdateMirrorStatus(s.Worker, s.Name, s) + So(err, ShouldBeNil) + + } Convey("get mirror status", func() { - m, err := boltDB.GetMirrorStatus(testWorkerIDs[0], status1.Name) + m, err := boltDB.GetMirrorStatus(testWorkerIDs[0], status[0].Name) So(err, ShouldBeNil) - expectedJSON, err := json.Marshal(status1) + expectedJSON, err := json.Marshal(status[0]) So(err, ShouldBeNil) actualJSON, err := json.Marshal(m) So(err, ShouldBeNil) @@ -94,7 +107,7 @@ func TestBoltAdapter(t *testing.T) { Convey("list mirror status", func() { ms, err := boltDB.ListMirrorStatus(testWorkerIDs[0]) So(err, ShouldBeNil) - expectedJSON, err := json.Marshal([]MirrorStatus{status1}) + expectedJSON, err := json.Marshal([]MirrorStatus{status[0]}) So(err, ShouldBeNil) actualJSON, err := json.Marshal(ms) So(err, ShouldBeNil) @@ -104,13 +117,24 @@ func TestBoltAdapter(t *testing.T) { Convey("list all mirror status", func() { ms, err := boltDB.ListAllMirrorStatus() So(err, ShouldBeNil) - expectedJSON, err := json.Marshal([]MirrorStatus{status1, status2}) + expectedJSON, err := json.Marshal(status) So(err, ShouldBeNil) actualJSON, err := json.Marshal(ms) So(err, ShouldBeNil) So(string(actualJSON), ShouldEqual, string(expectedJSON)) }) + Convey("flush disabled jobs", func() { + ms, err := boltDB.ListAllMirrorStatus() + So(err, ShouldBeNil) + So(len(ms), ShouldEqual, 3) + err = boltDB.FlushDisabledJobs() + So(err, ShouldBeNil) + ms, err = boltDB.ListAllMirrorStatus() + So(err, ShouldBeNil) + So(len(ms), ShouldEqual, 2) + }) + }) }) diff --git a/manager/server.go b/manager/server.go index 1787aee..917c58b 100644 --- a/manager/server.go +++ b/manager/server.go @@ -72,6 +72,8 @@ func GetTUNASyncManager(cfg *Config) *Manager { }) // list jobs, status page s.engine.GET("/jobs", s.listAllJobs) + // flush disabled jobs + s.engine.DELETE("/jobs/disabled", s.flushDisabledJobs) // list workers s.engine.GET("/workers", s.listWorkers) @@ -80,10 +82,12 @@ func GetTUNASyncManager(cfg *Config) *Manager { // workerID should be valid in this route group workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator) - // get job list - workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker) - // post job status - workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker) + { + // get job list + workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker) + // post job status + workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker) + } // for tunasynctl to post commands s.engine.POST("/cmd", s.handleClientCmd) @@ -139,6 +143,20 @@ func (s *Manager) listAllJobs(c *gin.Context) { c.JSON(http.StatusOK, webMirStatusList) } +// flushDisabledJobs deletes all jobs that marks as deleted +func (s *Manager) flushDisabledJobs(c *gin.Context) { + err := s.adapter.FlushDisabledJobs() + if err != nil { + err := fmt.Errorf("failed to flush disabled jobs: %s", + err.Error(), + ) + c.Error(err) + s.returnErrJSON(c, http.StatusInternalServerError, err) + return + } + c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"}) +} + // listWrokers respond with informations of all the workers func (s *Manager) listWorkers(c *gin.Context) { var workerInfos []WorkerStatus diff --git a/manager/server_test.go b/manager/server_test.go index 61e67a2..748a323 100644 --- a/manager/server_test.go +++ b/manager/server_test.go @@ -70,7 +70,6 @@ func TestHTTPServer(t *testing.T) { So(resp.StatusCode, ShouldEqual, http.StatusOK) Convey("list all workers", func(ctx C) { - So(err, ShouldBeNil) resp, err := http.Get(baseURL + "/workers") So(err, ShouldBeNil) defer resp.Body.Close() @@ -80,6 +79,19 @@ func TestHTTPServer(t *testing.T) { So(len(actualResponseObj), ShouldEqual, 2) }) + Convey("flush disabled jobs", func(ctx C) { + req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil) + So(err, ShouldBeNil) + clt := &http.Client{} + resp, err := clt.Do(req) + So(err, ShouldBeNil) + defer resp.Body.Close() + res := map[string]string{} + err = json.NewDecoder(resp.Body).Decode(&res) + So(err, ShouldBeNil) + So(res[_infoKey], ShouldEqual, "flushed") + }) + Convey("update mirror status of a existed worker", func(ctx C) { status := MirrorStatus{ Name: "arch-sync1", @@ -295,6 +307,10 @@ func (b *mockDBAdapter) Close() error { return nil } +func (b *mockDBAdapter) FlushDisabledJobs() error { + return nil +} + func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine { r := gin.Default() r.GET("/ping", func(c *gin.Context) {