Add kv abstration layer for bolt and redis

This commit is contained in:
jiegec 2020-10-13 22:41:58 +08:00
parent 5880ed92dc
commit 7dd61ae8ca
3 changed files with 219 additions and 275 deletions

View File

@ -1,7 +1,9 @@
package manager package manager
import ( import (
"encoding/json"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
@ -25,6 +27,15 @@ type dbAdapter interface {
Close() error Close() error
} }
type kvAdapter interface {
InitBucket(bucket string) error
Get(bucket string, key string) ([]byte, error)
GetAll(bucket string) (map[string][]byte, error)
Put(bucket string, key string, value []byte) error
Delete(bucket string, key string) error
Close() error
}
const ( const (
_workerBucketKey = "workers" _workerBucketKey = "workers"
_statusBucketKey = "mirror_status" _statusBucketKey = "mirror_status"
@ -42,8 +53,11 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
db: innerDB, db: innerDB,
dbFile: dbFile, dbFile: dbFile,
} }
err = db.Init() kv := kvDBAdapter{
return &db, err db: &db,
}
err = kv.Init()
return &kv, err
} else if dbType == "redis" { } else if dbType == "redis" {
opt, err := redis.ParseURL(dbFile) opt, err := redis.ParseURL(dbFile)
if err != nil { if err != nil {
@ -53,9 +67,168 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
db := redisAdapter{ db := redisAdapter{
db: innerDB, db: innerDB,
} }
err = db.Init() kv := kvDBAdapter{
return &db, err db: &db,
}
err = kv.Init()
return &kv, err
} }
// unsupported db-type // unsupported db-type
return nil, fmt.Errorf("unsupported db-type: %s", dbType) return nil, fmt.Errorf("unsupported db-type: %s", dbType)
} }
type kvDBAdapter struct {
db kvAdapter
}
func (b *kvDBAdapter) Init() error {
err := b.db.InitBucket(_workerBucketKey)
if err != nil {
return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
}
err = b.db.InitBucket(_statusBucketKey)
if err != nil {
return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
}
return err
}
func (b *kvDBAdapter) ListWorkers() (ws []WorkerStatus, err error) {
var workers map[string][]byte
workers, err = b.db.GetAll(_workerBucketKey)
var w WorkerStatus
for _, v := range workers {
jsonErr := json.Unmarshal(v, &w)
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
ws = append(ws, w)
}
return
}
func (b *kvDBAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
var v []byte
v, err = b.db.Get(_workerBucketKey, workerID)
if v == nil {
err = fmt.Errorf("invalid workerID %s", workerID)
} else {
err = json.Unmarshal(v, &w)
}
return
}
func (b *kvDBAdapter) DeleteWorker(workerID string) error {
return b.db.Delete(_workerBucketKey, workerID)
}
func (b *kvDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
v, err := json.Marshal(w)
if err == nil {
err = b.db.Put(_workerBucketKey, w.ID, v)
}
return w, err
}
func (b *kvDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
w, err = b.GetWorker(workerID)
if err == nil {
w.LastOnline = time.Now()
w, err = b.CreateWorker(w)
}
return w, err
}
func (b *kvDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
id := mirrorID + "/" + workerID
v, err := json.Marshal(status)
if err == nil {
err = b.db.Put(_statusBucketKey, id, v)
}
return status, err
}
func (b *kvDBAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
id := mirrorID + "/" + workerID
var v []byte
v, err = b.db.Get(_statusBucketKey, id)
if v == nil {
err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
} else if err == nil {
err = json.Unmarshal(v, &m)
}
return
}
func (b *kvDBAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
var vals map[string][]byte
vals, err = b.db.GetAll(_statusBucketKey)
if err != nil {
return
}
for k, v := range vals {
if wID := strings.Split(k, "/")[1]; wID == workerID {
var m MirrorStatus
jsonErr := json.Unmarshal(v, &m)
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
ms = append(ms, m)
}
}
return
}
func (b *kvDBAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
var vals map[string][]byte
vals, err = b.db.GetAll(_statusBucketKey)
if err != nil {
return
}
for _, v := range vals {
var m MirrorStatus
jsonErr := json.Unmarshal(v, &m)
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
ms = append(ms, m)
}
return
}
func (b *kvDBAdapter) FlushDisabledJobs() (err error) {
var vals map[string][]byte
vals, err = b.db.GetAll(_statusBucketKey)
if err != nil {
return
}
for k, v := range vals {
var m MirrorStatus
jsonErr := json.Unmarshal(v, &m)
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
if m.Status == Disabled || len(m.Name) == 0 {
deleteErr := b.db.Delete(_statusBucketKey, k)
if deleteErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), deleteErr)
}
}
}
return
}
func (b *kvDBAdapter) Close() error {
if b.db != nil {
return b.db.Close()
}
return nil
}

View File

@ -1,14 +1,9 @@
package manager package manager
import ( import (
"encoding/json"
"fmt" "fmt"
"strings"
"time"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
. "github.com/tuna/tunasync/internal"
) )
type boltAdapter struct { type boltAdapter struct {
@ -16,172 +11,56 @@ type boltAdapter struct {
dbFile string dbFile string
} }
func (b *boltAdapter) Init() (err error) { func (b *boltAdapter) InitBucket(bucket string) (err error) {
return b.db.Update(func(tx *bolt.Tx) error { return b.db.Update(func(tx *bolt.Tx) error {
_, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey)) _, err = tx.CreateBucketIfNotExists([]byte(bucket))
if err != nil { if err != nil {
return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error()) return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
} }
_, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey))
if err != nil {
return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error())
}
return nil return nil
}) })
} }
func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) { func (b *boltAdapter) Get(bucket string, key string) (v []byte, err error) {
err = b.db.View(func(tx *bolt.Tx) error { err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey)) bucket := tx.Bucket([]byte(bucket))
v = bucket.Get([]byte(key))
return nil
})
return
}
func (b *boltAdapter) GetAll(bucket string) (m map[string][]byte, err error) {
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(bucket))
c := bucket.Cursor() c := bucket.Cursor()
var w WorkerStatus m = make(map[string][]byte)
for k, v := c.First(); k != nil; k, v = c.Next() { for k, v := c.First(); k != nil; k, v = c.Next() {
jsonErr := json.Unmarshal(v, &w) m[string(k)] = v
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
ws = append(ws, w)
} }
return err return nil
}) })
return return
} }
func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { func (b *boltAdapter) Put(bucket string, key string, value []byte) error {
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey))
v := bucket.Get([]byte(workerID))
if v == nil {
return fmt.Errorf("invalid workerID %s", workerID)
}
err := json.Unmarshal(v, &w)
return err
})
return
}
func (b *boltAdapter) DeleteWorker(workerID string) (err error) {
err = b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey))
v := bucket.Get([]byte(workerID))
if v == nil {
return fmt.Errorf("invalid workerID %s", workerID)
}
err := bucket.Delete([]byte(workerID))
return err
})
return
}
func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
err := b.db.Update(func(tx *bolt.Tx) error { err := b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey)) bucket := tx.Bucket([]byte(bucket))
v, err := json.Marshal(w) err := bucket.Put([]byte(key), value)
if err != nil {
return err
}
err = bucket.Put([]byte(w.ID), v)
return err return err
}) })
return w, err return err
} }
func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { func (b *boltAdapter) Delete(bucket string, key string) error {
w, err = b.GetWorker(workerID)
if err == nil {
w.LastOnline = time.Now()
w, err = b.CreateWorker(w)
}
return w, err
}
func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
id := mirrorID + "/" + workerID
err := b.db.Update(func(tx *bolt.Tx) error { err := b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_statusBucketKey)) bucket := tx.Bucket([]byte(bucket))
v, err := json.Marshal(status) err := bucket.Delete([]byte(key))
err = bucket.Put([]byte(id), v)
return err return err
}) })
return status, err return err
}
func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
id := mirrorID + "/" + workerID
err = b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_statusBucketKey))
v := bucket.Get([]byte(id))
if v == nil {
return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
}
err := json.Unmarshal(v, &m)
return err
})
return
}
func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_statusBucketKey))
c := bucket.Cursor()
var m MirrorStatus
for k, v := c.First(); k != nil; k, v = c.Next() {
if wID := strings.Split(string(k), "/")[1]; wID == workerID {
jsonErr := json.Unmarshal(v, &m)
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
ms = append(ms, m)
}
}
return err
})
return
}
func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_statusBucketKey))
c := bucket.Cursor()
var m MirrorStatus
for k, v := c.First(); k != nil; k, v = c.Next() {
jsonErr := json.Unmarshal(v, &m)
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
ms = append(ms, m)
}
return err
})
return
}
func (b *boltAdapter) FlushDisabledJobs() (err error) {
err = b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_statusBucketKey))
c := bucket.Cursor()
var m MirrorStatus
for k, v := c.First(); k != nil; k, v = c.Next() {
jsonErr := json.Unmarshal(v, &m)
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
if m.Status == Disabled || len(m.Name) == 0 {
err = c.Delete()
}
}
return err
})
return
} }
func (b *boltAdapter) Close() error { func (b *boltAdapter) Close() error {
if b.db != nil { return b.db.Close()
return b.db.Close()
}
return nil
} }

View File

@ -2,13 +2,8 @@ package manager
import ( import (
"context" "context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
. "github.com/tuna/tunasync/internal"
) )
type redisAdapter struct { type redisAdapter struct {
@ -17,143 +12,40 @@ type redisAdapter struct {
var ctx = context.Background() var ctx = context.Background()
func (b *redisAdapter) Init() (err error) { func (b *redisAdapter) InitBucket(bucket string) (err error) {
return nil // no-op
}
func (b *redisAdapter) ListWorkers() (ws []WorkerStatus, err error) {
var val map[string]string
val, err = b.db.HGetAll(ctx, _workerBucketKey).Result()
if err == nil {
var w WorkerStatus
for _, v := range val {
jsonErr := json.Unmarshal([]byte(v), &w)
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
ws = append(ws, w)
}
}
return return
} }
func (b *redisAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { func (b *redisAdapter) Get(bucket string, key string) (v []byte, err error) {
var val string var val string
val, err = b.db.HGet(ctx, _workerBucketKey, workerID).Result() val, err = b.db.HGet(ctx, bucket, key).Result()
if err == nil { v = []byte(val)
err = json.Unmarshal([]byte(val), &w)
} else {
err = fmt.Errorf("invalid workerID %s", workerID)
}
return return
} }
func (b *redisAdapter) DeleteWorker(workerID string) (err error) { func (b *redisAdapter) GetAll(bucket string) (m map[string][]byte, err error) {
_, err = b.db.HDel(ctx, _workerBucketKey, workerID).Result()
if err != nil {
err = fmt.Errorf("invalid workerID %s", workerID)
}
return
}
func (b *redisAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
var v []byte
v, err := json.Marshal(w)
if err == nil {
_, err = b.db.HSet(ctx, _workerBucketKey, w.ID, string(v)).Result()
}
return w, err
}
func (b *redisAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
w, err = b.GetWorker(workerID)
if err == nil {
w.LastOnline = time.Now()
w, err = b.CreateWorker(w)
}
return w, err
}
func (b *redisAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
id := mirrorID + "/" + workerID
v, err := json.Marshal(status)
if err == nil {
_, err = b.db.HSet(ctx, _statusBucketKey, id, string(v)).Result()
}
return status, err
}
func (b *redisAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
id := mirrorID + "/" + workerID
var val string
val, err = b.db.HGet(ctx, _statusBucketKey, id).Result()
if err == nil {
err = json.Unmarshal([]byte(val), &m)
} else {
err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
}
return
}
func (b *redisAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
var val map[string]string var val map[string]string
val, err = b.db.HGetAll(ctx, _statusBucketKey).Result() val, err = b.db.HGetAll(ctx, bucket).Result()
if err == nil { if err == nil && val != nil {
var m MirrorStatus m = make(map[string][]byte)
for k, v := range val { for k, v := range val {
if wID := strings.Split(string(k), "/")[1]; wID == workerID { m[k] = []byte(v)
jsonErr := json.Unmarshal([]byte(v), &m)
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
ms = append(ms, m)
}
} }
} }
return return
} }
func (b *redisAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { func (b *redisAdapter) Put(bucket string, key string, value []byte) error {
var val map[string]string _, err := b.db.HSet(ctx, bucket, key, string(value)).Result()
val, err = b.db.HGetAll(ctx, _statusBucketKey).Result() return err
if err == nil {
var m MirrorStatus
for _, v := range val {
jsonErr := json.Unmarshal([]byte(v), &m)
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
ms = append(ms, m)
}
}
return
} }
func (b *redisAdapter) FlushDisabledJobs() (err error) { func (b *redisAdapter) Delete(bucket string, key string) error {
var val map[string]string _, err := b.db.HDel(ctx, bucket, key).Result()
val, err = b.db.HGetAll(ctx, _statusBucketKey).Result() return err
if err == nil {
var m MirrorStatus
for k, v := range val {
jsonErr := json.Unmarshal([]byte(v), &m)
if jsonErr != nil {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
if m.Status == Disabled || len(m.Name) == 0 {
_, err = b.db.HDel(ctx, _statusBucketKey, k).Result()
}
}
}
return
} }
func (b *redisAdapter) Close() error { func (b *redisAdapter) Close() error {
if b.db != nil { return b.db.Close()
return b.db.Close()
}
return nil
} }