From 7dd61ae8ca06d3e4b33d82446df65216ee03edd9 Mon Sep 17 00:00:00 2001 From: jiegec Date: Tue, 13 Oct 2020 22:41:58 +0800 Subject: [PATCH] Add kv abstration layer for bolt and redis --- manager/db.go | 181 +++++++++++++++++++++++++++++++++++++++++++- manager/db_bolt.go | 171 ++++++----------------------------------- manager/db_redis.go | 142 +++++----------------------------- 3 files changed, 219 insertions(+), 275 deletions(-) diff --git a/manager/db.go b/manager/db.go index c2811be..d254513 100644 --- a/manager/db.go +++ b/manager/db.go @@ -1,7 +1,9 @@ package manager import ( + "encoding/json" "fmt" + "strings" "time" "github.com/boltdb/bolt" @@ -25,6 +27,15 @@ type dbAdapter interface { Close() error } +type kvAdapter interface { + InitBucket(bucket string) error + Get(bucket string, key string) ([]byte, error) + GetAll(bucket string) (map[string][]byte, error) + Put(bucket string, key string, value []byte) error + Delete(bucket string, key string) error + Close() error +} + const ( _workerBucketKey = "workers" _statusBucketKey = "mirror_status" @@ -42,8 +53,11 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { db: innerDB, dbFile: dbFile, } - err = db.Init() - return &db, err + kv := kvDBAdapter{ + db: &db, + } + err = kv.Init() + return &kv, err } else if dbType == "redis" { opt, err := redis.ParseURL(dbFile) if err != nil { @@ -53,9 +67,168 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { db := redisAdapter{ db: innerDB, } - err = db.Init() - return &db, err + kv := kvDBAdapter{ + db: &db, + } + err = kv.Init() + return &kv, err } // unsupported db-type return nil, fmt.Errorf("unsupported db-type: %s", dbType) } + +type kvDBAdapter struct { + db kvAdapter +} + +func (b *kvDBAdapter) Init() error { + err := b.db.InitBucket(_workerBucketKey) + if err != nil { + return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error()) + } + err = b.db.InitBucket(_statusBucketKey) + if err != nil { + return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error()) + } + return err +} + +func (b *kvDBAdapter) ListWorkers() (ws []WorkerStatus, err error) { + var workers map[string][]byte + workers, err = b.db.GetAll(_workerBucketKey) + + var w WorkerStatus + for _, v := range workers { + jsonErr := json.Unmarshal(v, &w) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ws = append(ws, w) + } + return +} + +func (b *kvDBAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { + var v []byte + v, err = b.db.Get(_workerBucketKey, workerID) + if v == nil { + err = fmt.Errorf("invalid workerID %s", workerID) + } else { + err = json.Unmarshal(v, &w) + } + return +} + +func (b *kvDBAdapter) DeleteWorker(workerID string) error { + return b.db.Delete(_workerBucketKey, workerID) +} + +func (b *kvDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { + v, err := json.Marshal(w) + if err == nil { + err = b.db.Put(_workerBucketKey, w.ID, v) + } + return w, err +} + +func (b *kvDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { + w, err = b.GetWorker(workerID) + if err == nil { + w.LastOnline = time.Now() + w, err = b.CreateWorker(w) + } + return w, err +} + +func (b *kvDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { + id := mirrorID + "/" + workerID + v, err := json.Marshal(status) + if err == nil { + err = b.db.Put(_statusBucketKey, id, v) + } + return status, err +} + +func (b *kvDBAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) { + id := mirrorID + "/" + workerID + var v []byte + v, err = b.db.Get(_statusBucketKey, id) + if v == nil { + err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID) + } else if err == nil { + err = json.Unmarshal(v, &m) + } + return +} + +func (b *kvDBAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) { + var vals map[string][]byte + vals, err = b.db.GetAll(_statusBucketKey) + if err != nil { + return + } + + for k, v := range vals { + if wID := strings.Split(k, "/")[1]; wID == workerID { + var m MirrorStatus + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ms = append(ms, m) + } + } + return +} + +func (b *kvDBAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { + var vals map[string][]byte + vals, err = b.db.GetAll(_statusBucketKey) + if err != nil { + return + } + + for _, v := range vals { + var m MirrorStatus + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ms = append(ms, m) + } + return +} + +func (b *kvDBAdapter) FlushDisabledJobs() (err error) { + var vals map[string][]byte + vals, err = b.db.GetAll(_statusBucketKey) + if err != nil { + return + } + + for k, v := range vals { + var m MirrorStatus + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + if m.Status == Disabled || len(m.Name) == 0 { + deleteErr := b.db.Delete(_statusBucketKey, k) + if deleteErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), deleteErr) + } + } + } + return +} + +func (b *kvDBAdapter) Close() error { + if b.db != nil { + return b.db.Close() + } + return nil +} diff --git a/manager/db_bolt.go b/manager/db_bolt.go index 0473b0f..b80372e 100644 --- a/manager/db_bolt.go +++ b/manager/db_bolt.go @@ -1,14 +1,9 @@ package manager import ( - "encoding/json" "fmt" - "strings" - "time" "github.com/boltdb/bolt" - - . "github.com/tuna/tunasync/internal" ) type boltAdapter struct { @@ -16,172 +11,56 @@ type boltAdapter struct { dbFile string } -func (b *boltAdapter) Init() (err error) { +func (b *boltAdapter) InitBucket(bucket string) (err error) { return b.db.Update(func(tx *bolt.Tx) error { - _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey)) + _, err = tx.CreateBucketIfNotExists([]byte(bucket)) if err != nil { return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error()) } - _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey)) - if err != nil { - return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error()) - } return nil }) } -func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) { +func (b *boltAdapter) Get(bucket string, key string) (v []byte, err error) { err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) + bucket := tx.Bucket([]byte(bucket)) + v = bucket.Get([]byte(key)) + return nil + }) + return +} + +func (b *boltAdapter) GetAll(bucket string) (m map[string][]byte, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(bucket)) c := bucket.Cursor() - var w WorkerStatus + m = make(map[string][]byte) for k, v := c.First(); k != nil; k, v = c.Next() { - jsonErr := json.Unmarshal(v, &w) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ws = append(ws, w) + m[string(k)] = v } - return err + return nil }) return } -func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - v := bucket.Get([]byte(workerID)) - if v == nil { - return fmt.Errorf("invalid workerID %s", workerID) - } - err := json.Unmarshal(v, &w) - return err - }) - return -} - -func (b *boltAdapter) DeleteWorker(workerID string) (err error) { - err = b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - v := bucket.Get([]byte(workerID)) - if v == nil { - return fmt.Errorf("invalid workerID %s", workerID) - } - err := bucket.Delete([]byte(workerID)) - return err - }) - return -} - -func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { +func (b *boltAdapter) Put(bucket string, key string, value []byte) error { err := b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - v, err := json.Marshal(w) - if err != nil { - return err - } - err = bucket.Put([]byte(w.ID), v) + bucket := tx.Bucket([]byte(bucket)) + err := bucket.Put([]byte(key), value) return err }) - return w, err + return err } -func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { - w, err = b.GetWorker(workerID) - if err == nil { - w.LastOnline = time.Now() - w, err = b.CreateWorker(w) - } - return w, err -} - -func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { - id := mirrorID + "/" + workerID +func (b *boltAdapter) Delete(bucket string, key string) error { err := b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - v, err := json.Marshal(status) - err = bucket.Put([]byte(id), v) + bucket := tx.Bucket([]byte(bucket)) + err := bucket.Delete([]byte(key)) return err }) - return status, err -} - -func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) { - id := mirrorID + "/" + workerID - err = b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - v := bucket.Get([]byte(id)) - if v == nil { - return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID) - } - err := json.Unmarshal(v, &m) - return err - }) - return -} - -func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - c := bucket.Cursor() - var m MirrorStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - if wID := strings.Split(string(k), "/")[1]; wID == workerID { - jsonErr := json.Unmarshal(v, &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ms = append(ms, m) - } - } - return err - }) - return -} - -func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - c := bucket.Cursor() - var m MirrorStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - jsonErr := json.Unmarshal(v, &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ms = append(ms, m) - } - return err - }) - return -} - -func (b *boltAdapter) FlushDisabledJobs() (err error) { - err = b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - c := bucket.Cursor() - var m MirrorStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - jsonErr := json.Unmarshal(v, &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - if m.Status == Disabled || len(m.Name) == 0 { - err = c.Delete() - } - } - return err - }) - return + return err } func (b *boltAdapter) Close() error { - if b.db != nil { - return b.db.Close() - } - return nil + return b.db.Close() } diff --git a/manager/db_redis.go b/manager/db_redis.go index 324cf15..17005a0 100644 --- a/manager/db_redis.go +++ b/manager/db_redis.go @@ -2,13 +2,8 @@ package manager import ( "context" - "encoding/json" - "fmt" - "strings" - "time" "github.com/go-redis/redis/v8" - . "github.com/tuna/tunasync/internal" ) type redisAdapter struct { @@ -17,143 +12,40 @@ type redisAdapter struct { var ctx = context.Background() -func (b *redisAdapter) Init() (err error) { - return nil -} - -func (b *redisAdapter) ListWorkers() (ws []WorkerStatus, err error) { - var val map[string]string - val, err = b.db.HGetAll(ctx, _workerBucketKey).Result() - if err == nil { - var w WorkerStatus - for _, v := range val { - jsonErr := json.Unmarshal([]byte(v), &w) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ws = append(ws, w) - } - } +func (b *redisAdapter) InitBucket(bucket string) (err error) { + // no-op return } -func (b *redisAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { +func (b *redisAdapter) Get(bucket string, key string) (v []byte, err error) { var val string - val, err = b.db.HGet(ctx, _workerBucketKey, workerID).Result() - if err == nil { - err = json.Unmarshal([]byte(val), &w) - } else { - err = fmt.Errorf("invalid workerID %s", workerID) - } + val, err = b.db.HGet(ctx, bucket, key).Result() + v = []byte(val) return } -func (b *redisAdapter) DeleteWorker(workerID string) (err error) { - _, err = b.db.HDel(ctx, _workerBucketKey, workerID).Result() - if err != nil { - err = fmt.Errorf("invalid workerID %s", workerID) - } - return -} - -func (b *redisAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { - var v []byte - v, err := json.Marshal(w) - if err == nil { - _, err = b.db.HSet(ctx, _workerBucketKey, w.ID, string(v)).Result() - } - return w, err -} - -func (b *redisAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { - w, err = b.GetWorker(workerID) - if err == nil { - w.LastOnline = time.Now() - w, err = b.CreateWorker(w) - } - return w, err -} - -func (b *redisAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { - id := mirrorID + "/" + workerID - v, err := json.Marshal(status) - if err == nil { - _, err = b.db.HSet(ctx, _statusBucketKey, id, string(v)).Result() - } - return status, err -} - -func (b *redisAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) { - id := mirrorID + "/" + workerID - var val string - val, err = b.db.HGet(ctx, _statusBucketKey, id).Result() - if err == nil { - err = json.Unmarshal([]byte(val), &m) - } else { - err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID) - } - return -} - -func (b *redisAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) { +func (b *redisAdapter) GetAll(bucket string) (m map[string][]byte, err error) { var val map[string]string - val, err = b.db.HGetAll(ctx, _statusBucketKey).Result() - if err == nil { - var m MirrorStatus + val, err = b.db.HGetAll(ctx, bucket).Result() + if err == nil && val != nil { + m = make(map[string][]byte) for k, v := range val { - if wID := strings.Split(string(k), "/")[1]; wID == workerID { - jsonErr := json.Unmarshal([]byte(v), &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ms = append(ms, m) - } + m[k] = []byte(v) } } return } -func (b *redisAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { - var val map[string]string - val, err = b.db.HGetAll(ctx, _statusBucketKey).Result() - if err == nil { - var m MirrorStatus - for _, v := range val { - jsonErr := json.Unmarshal([]byte(v), &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ms = append(ms, m) - } - } - return +func (b *redisAdapter) Put(bucket string, key string, value []byte) error { + _, err := b.db.HSet(ctx, bucket, key, string(value)).Result() + return err } -func (b *redisAdapter) FlushDisabledJobs() (err error) { - var val map[string]string - val, err = b.db.HGetAll(ctx, _statusBucketKey).Result() - if err == nil { - var m MirrorStatus - for k, v := range val { - jsonErr := json.Unmarshal([]byte(v), &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - if m.Status == Disabled || len(m.Name) == 0 { - _, err = b.db.HDel(ctx, _statusBucketKey, k).Result() - } - } - } - return +func (b *redisAdapter) Delete(bucket string, key string) error { + _, err := b.db.HDel(ctx, bucket, key).Result() + return err } func (b *redisAdapter) Close() error { - if b.db != nil { - return b.db.Close() - } - return nil + return b.db.Close() }