diff --git a/manager/db.go b/manager/db.go index 1a1d3ed..36d9ff6 100644 --- a/manager/db.go +++ b/manager/db.go @@ -1,11 +1,15 @@ package manager import ( + "encoding/json" "fmt" + "strings" + "github.com/boltdb/bolt" ) type dbAdapter interface { + Init() error ListWorkers() ([]worker, error) GetWorker(workerID string) (worker, error) CreateWorker(w worker) (worker, error) @@ -26,43 +30,142 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { db: innerDB, dbFile: dbFile, } - return &db, nil + err = db.Init() + return &db, err } // unsupported db-type return nil, fmt.Errorf("unsupported db-type: %s", dbType) } +const ( + _workerBucketKey = "workers" + _statusBucketKey = "mirror_status" +) + type boltAdapter struct { db *bolt.DB dbFile string } -func (b *boltAdapter) ListWorkers() ([]worker, error) { - return []worker{}, nil +func (b *boltAdapter) Init() (err error) { + return b.db.Update(func(tx *bolt.Tx) error { + _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey)) + if err != nil { + return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error()) + } + _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey)) + if err != nil { + return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error()) + } + return nil + }) } -func (b *boltAdapter) GetWorker(workerID string) (worker, error) { - return worker{}, nil +func (b *boltAdapter) ListWorkers() (ws []worker, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_workerBucketKey)) + c := bucket.Cursor() + var w worker + for k, v := c.First(); k != nil; k, v = c.Next() { + jsonErr := json.Unmarshal(v, &w) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ws = append(ws, w) + } + return err + }) + return +} + +func (b *boltAdapter) GetWorker(workerID string) (w worker, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_workerBucketKey)) + v := bucket.Get([]byte(workerID)) + if v == nil { + return fmt.Errorf("invalid workerID %s", workerID) + } + err := json.Unmarshal(v, &w) + return err + }) + return } func (b *boltAdapter) CreateWorker(w worker) (worker, error) { - return worker{}, nil + err := b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_workerBucketKey)) + v, err := json.Marshal(w) + if err != nil { + return err + } + err = bucket.Put([]byte(w.ID), v) + return err + }) + return w, err } func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) { - return mirrorStatus{}, nil + id := mirrorID + "/" + workerID + err := b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + v, err := json.Marshal(status) + err = bucket.Put([]byte(id), v) + return err + }) + return status, err } -func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error) { - return mirrorStatus{}, nil +func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m mirrorStatus, err error) { + id := mirrorID + "/" + workerID + err = b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + v := bucket.Get([]byte(id)) + if v == nil { + return fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID) + } + err := json.Unmarshal(v, &m) + return err + }) + return } -func (b *boltAdapter) ListMirrorStatus(workerID string) ([]mirrorStatus, error) { - return []mirrorStatus{}, nil +func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []mirrorStatus, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + c := bucket.Cursor() + var m mirrorStatus + for k, v := c.First(); k != nil; k, v = c.Next() { + if wID := strings.Split(string(k), "/")[1]; wID == workerID { + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ms = append(ms, m) + } + } + return err + }) + return } -func (b *boltAdapter) ListAllMirrorStatus() ([]mirrorStatus, error) { - return []mirrorStatus{}, nil +func (b *boltAdapter) ListAllMirrorStatus() (ms []mirrorStatus, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + c := bucket.Cursor() + var m mirrorStatus + for k, v := c.First(); k != nil; k, v = c.Next() { + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ms = append(ms, m) + } + return err + }) + return } func (b *boltAdapter) Close() error { diff --git a/manager/db_test.go b/manager/db_test.go new file mode 100644 index 0000000..e95dfe8 --- /dev/null +++ b/manager/db_test.go @@ -0,0 +1,117 @@ +package manager + +import ( + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" + . "github.com/tuna/tunasync/internal" +) + +func TestBoltAdapter(t *testing.T) { + Convey("boltAdapter should work", t, func() { + tmpDir, err := ioutil.TempDir("", "tunasync") + defer os.RemoveAll(tmpDir) + So(err, ShouldBeNil) + + dbType, dbFile := "bolt", filepath.Join(tmpDir, "bolt.db") + boltDB, err := makeDBAdapter(dbType, dbFile) + So(err, ShouldBeNil) + + defer func() { + // close boltDB + err := boltDB.Close() + So(err, ShouldBeNil) + }() + + testWorkerIDs := []string{"test_worker1", "test_worker2"} + Convey("create worker", func() { + for _, id := range testWorkerIDs { + w := worker{ + ID: id, + Token: "token_" + id, + LastOnline: time.Now(), + } + w, err = boltDB.CreateWorker(w) + So(err, ShouldBeNil) + } + + Convey("get exists worker", func() { + _, err := boltDB.GetWorker(testWorkerIDs[0]) + So(err, ShouldBeNil) + }) + + Convey("list exist worker", func() { + ws, err := boltDB.ListWorkers() + So(err, ShouldBeNil) + So(len(ws), ShouldEqual, 2) + }) + + Convey("get inexist worker", func() { + _, err := boltDB.GetWorker("invalid workerID") + So(err, ShouldNotBeNil) + }) + }) + + Convey("update mirror status", func() { + status1 := mirrorStatus{ + Name: "arch-sync1", + Worker: testWorkerIDs[0], + IsMaster: true, + Status: Success, + LastUpdate: time.Now(), + Upstream: "mirrors.tuna.tsinghua.edu.cn", + Size: "3GB", + } + status2 := mirrorStatus{ + Name: "arch-sync2", + Worker: testWorkerIDs[1], + IsMaster: true, + Status: Success, + LastUpdate: time.Now(), + Upstream: "mirrors.tuna.tsinghua.edu.cn", + Size: "4GB", + } + + _, err := boltDB.UpdateMirrorStatus(status1.Worker, status1.Name, status1) + _, err = boltDB.UpdateMirrorStatus(status2.Worker, status2.Name, status2) + So(err, ShouldBeNil) + + Convey("get mirror status", func() { + m, err := boltDB.GetMirrorStatus(testWorkerIDs[0], status1.Name) + So(err, ShouldBeNil) + expectedJSON, err := json.Marshal(status1) + So(err, ShouldBeNil) + actualJSON, err := json.Marshal(m) + So(err, ShouldBeNil) + So(string(actualJSON), ShouldEqual, string(expectedJSON)) + }) + + Convey("list mirror status", func() { + ms, err := boltDB.ListMirrorStatus(testWorkerIDs[0]) + So(err, ShouldBeNil) + expectedJSON, err := json.Marshal([]mirrorStatus{status1}) + So(err, ShouldBeNil) + actualJSON, err := json.Marshal(ms) + So(err, ShouldBeNil) + So(string(actualJSON), ShouldEqual, string(expectedJSON)) + }) + + Convey("list all mirror status", func() { + ms, err := boltDB.ListAllMirrorStatus() + So(err, ShouldBeNil) + expectedJSON, err := json.Marshal([]mirrorStatus{status1, status2}) + So(err, ShouldBeNil) + actualJSON, err := json.Marshal(ms) + So(err, ShouldBeNil) + So(string(actualJSON), ShouldEqual, string(expectedJSON)) + }) + + }) + + }) +} diff --git a/manager/server_test.go b/manager/server_test.go index cfc229b..8470b24 100644 --- a/manager/server_test.go +++ b/manager/server_test.go @@ -157,6 +157,10 @@ type mockDBAdapter struct { statusStore map[string]mirrorStatus } +func (b *mockDBAdapter) Init() error { + return nil +} + func (b *mockDBAdapter) ListWorkers() ([]worker, error) { workers := make([]worker, len(b.workerStore)) idx := 0