From c8600d094e71332f88f7427c7c9ac61e2ed0587f Mon Sep 17 00:00:00 2001 From: Jiajie Chen Date: Thu, 10 Sep 2020 21:31:31 +0800 Subject: [PATCH 1/4] Add LastRegister to WorkerStatus --- internal/msg.go | 9 +++++---- manager/db_test.go | 1 + manager/server.go | 6 ++++-- worker/worker_test.go | 1 + 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/internal/msg.go b/internal/msg.go index ac992a5..2840261 100644 --- a/internal/msg.go +++ b/internal/msg.go @@ -24,10 +24,11 @@ type MirrorStatus struct { // A WorkerStatus is the information struct that describe // a worker, and sent from the manager to clients. type WorkerStatus struct { - ID string `json:"id"` - URL string `json:"url"` // worker url - Token string `json:"token"` // session token - LastOnline time.Time `json:"last_online"` // last seen + ID string `json:"id"` + URL string `json:"url"` // worker url + Token string `json:"token"` // session token + LastOnline time.Time `json:"last_online"` // last seen + LastRegister time.Time `json:"last_register"` // last register time } type MirrorSchedules struct { diff --git a/manager/db_test.go b/manager/db_test.go index d4b16ed..f65a356 100644 --- a/manager/db_test.go +++ b/manager/db_test.go @@ -35,6 +35,7 @@ func TestBoltAdapter(t *testing.T) { ID: id, Token: "token_" + id, LastOnline: time.Now(), + LastRegister: time.Now(), } w, err = boltDB.CreateWorker(w) So(err, ShouldBeNil) diff --git a/manager/server.go b/manager/server.go index 77ed92e..04ce207 100644 --- a/manager/server.go +++ b/manager/server.go @@ -203,8 +203,9 @@ func (s *Manager) listWorkers(c *gin.Context) { for _, w := range workers { workerInfos = append(workerInfos, WorkerStatus{ - ID: w.ID, - LastOnline: w.LastOnline, + ID: w.ID, + LastOnline: w.LastOnline, + LastRegister: w.LastRegister, }) } c.JSON(http.StatusOK, workerInfos) @@ -215,6 +216,7 @@ func (s *Manager) registerWorker(c *gin.Context) { var _worker WorkerStatus c.BindJSON(&_worker) _worker.LastOnline = time.Now() + _worker.LastRegister = time.Now() newWorker, err := s.adapter.CreateWorker(_worker) if err != nil { err := fmt.Errorf("failed to register worker: %s", diff --git a/worker/worker_test.go b/worker/worker_test.go index 719b651..e00f59a 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -25,6 +25,7 @@ func makeMockManagerServer(recvData chan interface{}) *gin.Engine { var _worker WorkerStatus c.BindJSON(&_worker) _worker.LastOnline = time.Now() + _worker.LastRegister = time.Now() recvData <- _worker c.JSON(http.StatusOK, _worker) }) From b4b81ef7e9b372a36960af8ee4a981a71e19f994 Mon Sep 17 00:00:00 2001 From: Jiajie Chen Date: Thu, 10 Sep 2020 21:32:22 +0800 Subject: [PATCH 2/4] Fix typo: registor -> register --- worker/worker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index fac4baa..56e5f68 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -61,7 +61,7 @@ func NewTUNASyncWorker(cfg *Config) *Worker { // Run runs worker forever func (w *Worker) Run() { - w.registorWorker() + w.registerWorker() go w.runHTTPServer() w.runSchedule() } @@ -393,7 +393,7 @@ func (w *Worker) URL() string { return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port) } -func (w *Worker) registorWorker() { +func (w *Worker) registerWorker() { msg := WorkerStatus{ ID: w.Name(), URL: w.URL(), From fd274cc976ca23827c473ce578c1b3afb4b43f84 Mon Sep 17 00:00:00 2001 From: Jiajie Chen Date: Thu, 10 Sep 2020 21:51:33 +0800 Subject: [PATCH 3/4] Refresh worker LastOnline when worker updates --- manager/db.go | 10 ++++++++++ manager/server.go | 3 +++ 2 files changed, 13 insertions(+) diff --git a/manager/db.go b/manager/db.go index 68caae7..7c02180 100644 --- a/manager/db.go +++ b/manager/db.go @@ -17,6 +17,7 @@ type dbAdapter interface { GetWorker(workerID string) (WorkerStatus, error) DeleteWorker(workerID string) error CreateWorker(w WorkerStatus) (WorkerStatus, error) + RefreshWorker(workerID string) (WorkerStatus, error) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) ListMirrorStatus(workerID string) ([]MirrorStatus, error) @@ -125,6 +126,15 @@ func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { return w, err } +func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { + w, err = b.GetWorker(workerID) + if err == nil { + w.LastOnline = time.Now() + w, err = b.CreateWorker(w) + } + return w, err +} + func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { id := mirrorID + "/" + workerID err := b.db.Update(func(tx *bolt.Tx) error { diff --git a/manager/server.go b/manager/server.go index 04ce207..18a94da 100644 --- a/manager/server.go +++ b/manager/server.go @@ -270,6 +270,7 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) { } s.rwmu.RLock() + s.adapter.RefreshWorker(workerID) curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName) s.rwmu.RUnlock() if err != nil { @@ -314,6 +315,7 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) { } s.rwmu.RLock() + s.adapter.RefreshWorker(workerID) curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName) s.rwmu.RUnlock() @@ -376,6 +378,7 @@ func (s *Manager) updateMirrorSize(c *gin.Context) { mirrorName := msg.Name s.rwmu.RLock() + s.adapter.RefreshWorker(workerID) status, err := s.adapter.GetMirrorStatus(workerID, mirrorName) s.rwmu.RUnlock() if err != nil { From 9f7f18c2c471ff00ad14205ea42d1cca9b1dffb3 Mon Sep 17 00:00:00 2001 From: Jiajie Chen Date: Thu, 10 Sep 2020 21:58:31 +0800 Subject: [PATCH 4/4] Fix missing method in mock test --- manager/server_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/manager/server_test.go b/manager/server_test.go index 1db70de..7fc1432 100644 --- a/manager/server_test.go +++ b/manager/server_test.go @@ -462,6 +462,15 @@ func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { return w, nil } +func (b *mockDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { + w, err = b.GetWorker(workerID) + if err == nil { + w.LastOnline = time.Now() + w, err = b.CreateWorker(w) + } + return w, err +} + func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) { id := mirrorID + "/" + workerID status, ok := b.statusStore[id]