refactor(manager): refactored structure names in manager

This commit is contained in:
bigeagle 2016-04-28 19:21:41 +08:00
parent ce3471e30d
commit 0dcd89da31
No known key found for this signature in database
GPG Key ID: 9171A4571C27920A
8 changed files with 119 additions and 158 deletions

View File

@ -15,9 +15,9 @@ type MirrorStatus struct {
ErrorMsg string `json:"error_msg"`
}
// A WorkerInfoMsg is the information struct that describe
// A WorkerStatus is the information struct that describe
// a worker, and sent from the manager to clients.
type WorkerInfoMsg struct {
type WorkerStatus struct {
ID string `json:"id"`
URL string `json:"url"` // worker url
Token string `json:"token"` // session token

View File

@ -6,17 +6,19 @@ import (
"strings"
"github.com/boltdb/bolt"
. "github.com/tuna/tunasync/internal"
)
type dbAdapter interface {
Init() error
ListWorkers() ([]workerStatus, error)
GetWorker(workerID string) (workerStatus, error)
CreateWorker(w workerStatus) (workerStatus, error)
UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error)
GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error)
ListMirrorStatus(workerID string) ([]mirrorStatus, error)
ListAllMirrorStatus() ([]mirrorStatus, error)
ListWorkers() ([]WorkerStatus, error)
GetWorker(workerID string) (WorkerStatus, error)
CreateWorker(w WorkerStatus) (WorkerStatus, error)
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
ListMirrorStatus(workerID string) ([]MirrorStatus, error)
ListAllMirrorStatus() ([]MirrorStatus, error)
Close() error
}
@ -61,11 +63,11 @@ func (b *boltAdapter) Init() (err error) {
})
}
func (b *boltAdapter) ListWorkers() (ws []workerStatus, err error) {
func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) {
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey))
c := bucket.Cursor()
var w workerStatus
var w WorkerStatus
for k, v := c.First(); k != nil; k, v = c.Next() {
jsonErr := json.Unmarshal(v, &w)
if jsonErr != nil {
@ -79,7 +81,7 @@ func (b *boltAdapter) ListWorkers() (ws []workerStatus, err error) {
return
}
func (b *boltAdapter) GetWorker(workerID string) (w workerStatus, err error) {
func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey))
v := bucket.Get([]byte(workerID))
@ -92,7 +94,7 @@ func (b *boltAdapter) GetWorker(workerID string) (w workerStatus, err error) {
return
}
func (b *boltAdapter) CreateWorker(w workerStatus) (workerStatus, error) {
func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
err := b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey))
v, err := json.Marshal(w)
@ -105,7 +107,7 @@ func (b *boltAdapter) CreateWorker(w workerStatus) (workerStatus, error) {
return w, err
}
func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) {
func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
id := mirrorID + "/" + workerID
err := b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_statusBucketKey))
@ -116,7 +118,7 @@ func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirro
return status, err
}
func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m mirrorStatus, err error) {
func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
id := mirrorID + "/" + workerID
err = b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_statusBucketKey))
@ -130,11 +132,11 @@ func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m mirrorStatus
return
}
func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []mirrorStatus, err error) {
func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_statusBucketKey))
c := bucket.Cursor()
var m mirrorStatus
var m MirrorStatus
for k, v := c.First(); k != nil; k, v = c.Next() {
if wID := strings.Split(string(k), "/")[1]; wID == workerID {
jsonErr := json.Unmarshal(v, &m)
@ -150,11 +152,11 @@ func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []mirrorStatus, err
return
}
func (b *boltAdapter) ListAllMirrorStatus() (ms []mirrorStatus, err error) {
func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_statusBucketKey))
c := bucket.Cursor()
var m mirrorStatus
var m MirrorStatus
for k, v := c.First(); k != nil; k, v = c.Next() {
jsonErr := json.Unmarshal(v, &m)
if jsonErr != nil {

View File

@ -31,7 +31,7 @@ func TestBoltAdapter(t *testing.T) {
testWorkerIDs := []string{"test_worker1", "test_worker2"}
Convey("create worker", func() {
for _, id := range testWorkerIDs {
w := workerStatus{
w := WorkerStatus{
ID: id,
Token: "token_" + id,
LastOnline: time.Now(),
@ -58,7 +58,7 @@ func TestBoltAdapter(t *testing.T) {
})
Convey("update mirror status", func() {
status1 := mirrorStatus{
status1 := MirrorStatus{
Name: "arch-sync1",
Worker: testWorkerIDs[0],
IsMaster: true,
@ -67,7 +67,7 @@ func TestBoltAdapter(t *testing.T) {
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB",
}
status2 := mirrorStatus{
status2 := MirrorStatus{
Name: "arch-sync2",
Worker: testWorkerIDs[1],
IsMaster: true,
@ -94,7 +94,7 @@ func TestBoltAdapter(t *testing.T) {
Convey("list mirror status", func() {
ms, err := boltDB.ListMirrorStatus(testWorkerIDs[0])
So(err, ShouldBeNil)
expectedJSON, err := json.Marshal([]mirrorStatus{status1})
expectedJSON, err := json.Marshal([]MirrorStatus{status1})
So(err, ShouldBeNil)
actualJSON, err := json.Marshal(ms)
So(err, ShouldBeNil)
@ -104,7 +104,7 @@ func TestBoltAdapter(t *testing.T) {
Convey("list all mirror status", func() {
ms, err := boltDB.ListAllMirrorStatus()
So(err, ShouldBeNil)
expectedJSON, err := json.Marshal([]mirrorStatus{status1, status2})
expectedJSON, err := json.Marshal([]MirrorStatus{status1, status2})
So(err, ShouldBeNil)
actualJSON, err := json.Marshal(ms)
So(err, ShouldBeNil)

View File

@ -30,12 +30,19 @@ func (s *managerServer) listAllJobs(c *gin.Context) {
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, mirrorStatusList)
webMirStatusList := []webMirrorStatus{}
for _, m := range mirrorStatusList {
webMirStatusList = append(
webMirStatusList,
convertMirrorStatus(m),
)
}
c.JSON(http.StatusOK, webMirStatusList)
}
// listWrokers respond with informations of all the workers
func (s *managerServer) listWorkers(c *gin.Context) {
var workerInfos []WorkerInfoMsg
var workerInfos []WorkerStatus
workers, err := s.adapter.ListWorkers()
if err != nil {
err := fmt.Errorf("failed to list workers: %s",
@ -47,7 +54,7 @@ func (s *managerServer) listWorkers(c *gin.Context) {
}
for _, w := range workers {
workerInfos = append(workerInfos,
WorkerInfoMsg{
WorkerStatus{
ID: w.ID,
LastOnline: w.LastOnline,
})
@ -57,7 +64,7 @@ func (s *managerServer) listWorkers(c *gin.Context) {
// registerWorker register an newly-online worker
func (s *managerServer) registerWorker(c *gin.Context) {
var _worker workerStatus
var _worker WorkerStatus
c.BindJSON(&_worker)
newWorker, err := s.adapter.CreateWorker(_worker)
if err != nil {
@ -95,7 +102,7 @@ func (s *managerServer) returnErrJSON(c *gin.Context, code int, err error) {
func (s *managerServer) updateJobOfWorker(c *gin.Context) {
workerID := c.Param("id")
var status mirrorStatus
var status MirrorStatus
c.BindJSON(&status)
mirrorName := status.Name
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)

View File

@ -26,11 +26,11 @@ func TestHTTPServer(t *testing.T) {
s := makeHTTPServer(false)
So(s, ShouldNotBeNil)
s.setDBAdapter(&mockDBAdapter{
workerStore: map[string]workerStatus{
_magicBadWorkerID: workerStatus{
workerStore: map[string]WorkerStatus{
_magicBadWorkerID: WorkerStatus{
ID: _magicBadWorkerID,
}},
statusStore: make(map[string]mirrorStatus),
statusStore: make(map[string]MirrorStatus),
})
port := rand.Intn(10000) + 20000
baseURL := fmt.Sprintf("http://127.0.0.1:%d", port)
@ -62,7 +62,7 @@ func TestHTTPServer(t *testing.T) {
})
Convey("when register a worker", func(ctx C) {
w := workerStatus{
w := WorkerStatus{
ID: "test_worker1",
}
resp, err := postJSON(baseURL+"/workers", w)
@ -74,14 +74,14 @@ func TestHTTPServer(t *testing.T) {
resp, err := http.Get(baseURL + "/workers")
So(err, ShouldBeNil)
defer resp.Body.Close()
var actualResponseObj []WorkerInfoMsg
var actualResponseObj []WorkerStatus
err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
So(err, ShouldBeNil)
So(len(actualResponseObj), ShouldEqual, 2)
})
Convey("update mirror status of a existed worker", func(ctx C) {
status := mirrorStatus{
status := MirrorStatus{
Name: "arch-sync1",
Worker: "test_worker1",
IsMaster: true,
@ -97,7 +97,7 @@ func TestHTTPServer(t *testing.T) {
Convey("list mirror status of an existed worker", func(ctx C) {
expectedResponse, err := json.Marshal([]mirrorStatus{status})
expectedResponse, err := json.Marshal([]MirrorStatus{status})
So(err, ShouldBeNil)
resp, err := http.Get(baseURL + "/workers/test_worker1/jobs")
So(err, ShouldBeNil)
@ -110,7 +110,9 @@ func TestHTTPServer(t *testing.T) {
})
Convey("list all job status of all workers", func(ctx C) {
expectedResponse, err := json.Marshal([]mirrorStatus{status})
expectedResponse, err := json.Marshal(
[]webMirrorStatus{convertMirrorStatus(status)},
)
So(err, ShouldBeNil)
resp, err := http.Get(baseURL + "/jobs")
So(err, ShouldBeNil)
@ -125,7 +127,7 @@ func TestHTTPServer(t *testing.T) {
Convey("update mirror status of an inexisted worker", func(ctx C) {
invalidWorker := "test_worker2"
status := mirrorStatus{
status := MirrorStatus{
Name: "arch-sync2",
Worker: invalidWorker,
IsMaster: true,
@ -150,7 +152,7 @@ func TestHTTPServer(t *testing.T) {
workerPort := rand.Intn(10000) + 30000
bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort)
workerBaseURL := fmt.Sprintf("http://%s", bindAddress)
w := workerStatus{
w := WorkerStatus{
ID: "test_worker_cmd",
URL: workerBaseURL + "/cmd",
}
@ -208,16 +210,16 @@ func TestHTTPServer(t *testing.T) {
}
type mockDBAdapter struct {
workerStore map[string]workerStatus
statusStore map[string]mirrorStatus
workerStore map[string]WorkerStatus
statusStore map[string]MirrorStatus
}
func (b *mockDBAdapter) Init() error {
return nil
}
func (b *mockDBAdapter) ListWorkers() ([]workerStatus, error) {
workers := make([]workerStatus, len(b.workerStore))
func (b *mockDBAdapter) ListWorkers() ([]WorkerStatus, error) {
workers := make([]WorkerStatus, len(b.workerStore))
idx := 0
for _, w := range b.workerStore {
workers[idx] = w
@ -226,15 +228,15 @@ func (b *mockDBAdapter) ListWorkers() ([]workerStatus, error) {
return workers, nil
}
func (b *mockDBAdapter) GetWorker(workerID string) (workerStatus, error) {
func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
w, ok := b.workerStore[workerID]
if !ok {
return workerStatus{}, fmt.Errorf("invalid workerId")
return WorkerStatus{}, fmt.Errorf("invalid workerId")
}
return w, nil
}
func (b *mockDBAdapter) CreateWorker(w workerStatus) (workerStatus, error) {
func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
// _, ok := b.workerStore[w.ID]
// if ok {
// return workerStatus{}, fmt.Errorf("duplicate worker name")
@ -243,19 +245,19 @@ func (b *mockDBAdapter) CreateWorker(w workerStatus) (workerStatus, error) {
return w, nil
}
func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error) {
func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
id := mirrorID + "/" + workerID
status, ok := b.statusStore[id]
if !ok {
return mirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
return MirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
}
return status, nil
}
func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) {
func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
// if _, ok := b.workerStore[workerID]; !ok {
// // unregistered worker
// return mirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID)
// return MirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID)
// }
id := mirrorID + "/" + workerID
@ -263,11 +265,11 @@ func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mir
return status, nil
}
func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]mirrorStatus, error) {
var mirrorStatusList []mirrorStatus
func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]MirrorStatus, error) {
var mirrorStatusList []MirrorStatus
// simulating a database fail
if workerID == _magicBadWorkerID {
return []mirrorStatus{}, fmt.Errorf("database fail")
return []MirrorStatus{}, fmt.Errorf("database fail")
}
for k, v := range b.statusStore {
if wID := strings.Split(k, "/")[1]; wID == workerID {
@ -277,8 +279,8 @@ func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]mirrorStatus, error
return mirrorStatusList, nil
}
func (b *mockDBAdapter) ListAllMirrorStatus() ([]mirrorStatus, error) {
var mirrorStatusList []mirrorStatus
func (b *mockDBAdapter) ListAllMirrorStatus() ([]MirrorStatus, error) {
var mirrorStatusList []MirrorStatus
for _, v := range b.statusStore {
mirrorStatusList = append(mirrorStatusList, v)
}

View File

@ -2,114 +2,61 @@ package manager
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
. "github.com/tuna/tunasync/internal"
)
type mirrorStatus struct {
Name string
Worker string
IsMaster bool
Status SyncStatus
LastUpdate time.Time
Upstream string
Size string // approximate size
type textTime struct {
time.Time
}
func (s mirrorStatus) MarshalJSON() ([]byte, error) {
m := map[string]interface{}{
"name": s.Name,
"worker": s.Worker,
"is_master": s.IsMaster,
"status": s.Status,
"last_update": s.LastUpdate.Format("2006-01-02 15:04:05"),
"last_update_ts": fmt.Sprintf("%d", s.LastUpdate.Unix()),
"size": s.Size,
"upstream": s.Upstream,
}
return json.Marshal(m)
func (t textTime) MarshalJSON() ([]byte, error) {
return json.Marshal(t.Format("2006-01-02 15:04:05"))
}
func (t *textTime) UnmarshalJSON(b []byte) error {
s := string(b)
t2, err := time.ParseInLocation(`"2006-01-02 15:04:05"`, s, time.Local)
*t = textTime{t2}
return err
}
func (s *mirrorStatus) UnmarshalJSON(v []byte) error {
var m map[string]interface{}
type stampTime struct {
time.Time
}
err := json.Unmarshal(v, &m)
func (t stampTime) MarshalJSON() ([]byte, error) {
return json.Marshal(t.Unix())
}
func (t *stampTime) UnmarshalJSON(b []byte) error {
ts, err := strconv.Atoi(string(b))
if err != nil {
return err
}
if name, ok := m["name"]; ok {
if s.Name, ok = name.(string); !ok {
return errors.New("name should be a string")
}
} else {
return errors.New("key `name` does not exist in the json")
}
if isMaster, ok := m["is_master"]; ok {
if s.IsMaster, ok = isMaster.(bool); !ok {
return errors.New("is_master should be a string")
}
} else {
return errors.New("key `is_master` does not exist in the json")
}
if _worker, ok := m["worker"]; ok {
if s.Worker, ok = _worker.(string); !ok {
return errors.New("worker should be a string")
}
} else {
return errors.New("key `worker` does not exist in the json")
}
if upstream, ok := m["upstream"]; ok {
if s.Upstream, ok = upstream.(string); !ok {
return errors.New("upstream should be a string")
}
} else {
return errors.New("key `upstream` does not exist in the json")
}
if size, ok := m["size"]; ok {
if s.Size, ok = size.(string); !ok {
return errors.New("size should be a string")
}
} else {
return errors.New("key `size` does not exist in the json")
}
// tricky: status
if status, ok := m["status"]; ok {
if ss, ok := status.(string); ok {
err := json.Unmarshal([]byte(`"`+ss+`"`), &(s.Status))
if err != nil {
return err
}
} else {
return errors.New("status should be a string")
}
} else {
return errors.New("key `status` does not exist in the json")
}
// tricky: last update
if lastUpdate, ok := m["last_update_ts"]; ok {
if sts, ok := lastUpdate.(string); ok {
ts, err := strconv.Atoi(sts)
if err != nil {
return fmt.Errorf("last_update_ts should be a interger, got: %s", sts)
}
s.LastUpdate = time.Unix(int64(ts), 0)
} else {
return fmt.Errorf("last_update_ts should be a string of integer, got: %s", lastUpdate)
}
} else {
return errors.New("key `last_update_ts` does not exist in the json")
}
return nil
*t = stampTime{time.Unix(int64(ts), 0)}
return err
}
type workerStatus struct {
ID string `json:"id"` // worker name
Token string `json:"token"` // session token
URL string `json:"url"` // worker url
LastOnline time.Time `json:"last_online"` // last seen
// webMirrorStatus is the mirror status to be shown in the web page
type webMirrorStatus struct {
Name string `json:"name"`
IsMaster bool `json:"is_master"`
Status SyncStatus `json:"status"`
LastUpdate textTime `json:"last_update"`
LastUpdateTs stampTime `json:"last_update_ts"`
Upstream string `json:"upstream"`
Size string `json:"size"` // approximate size
}
func convertMirrorStatus(m MirrorStatus) webMirrorStatus {
return webMirrorStatus{
Name: m.Name,
IsMaster: m.IsMaster,
Status: m.Status,
LastUpdate: textTime{m.LastUpdate},
LastUpdateTs: stampTime{m.LastUpdate},
Upstream: m.Upstream,
Size: m.Size,
}
}

View File

@ -15,26 +15,29 @@ func TestStatus(t *testing.T) {
tz := "Asia/Shanghai"
loc, err := time.LoadLocation(tz)
So(err, ShouldBeNil)
m := mirrorStatus{
Name: "tunalinux",
Status: tunasync.Success,
LastUpdate: time.Date(2016, time.April, 16, 23, 8, 10, 0, loc),
Size: "5GB",
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
m := webMirrorStatus{
Name: "tunalinux",
Status: tunasync.Success,
LastUpdate: textTime{t},
LastUpdateTs: stampTime{t},
Size: "5GB",
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
}
b, err := json.Marshal(m)
So(err, ShouldBeNil)
// fmt.Println(string(b))
var m2 mirrorStatus
var m2 webMirrorStatus
err = json.Unmarshal(b, &m2)
So(err, ShouldBeNil)
// fmt.Printf("%#v", m2)
So(m2.Name, ShouldEqual, m.Name)
So(m2.Status, ShouldEqual, m.Status)
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.Size, ShouldEqual, m.Size)
So(m2.Upstream, ShouldEqual, m.Upstream)
})

View File

@ -291,7 +291,7 @@ func (w *Worker) registorWorker() {
w.cfg.Manager.APIBase,
)
msg := WorkerInfoMsg{
msg := WorkerStatus{
ID: w.Name(),
URL: w.URL(),
}