增加tstorage 时序数据库支持。

This commit is contained in:
zhuhoujiu 2025-03-07 11:02:43 +08:00
parent 7c6709c363
commit 1996542446
10 changed files with 586 additions and 50 deletions

View File

@ -36,10 +36,12 @@ Dsn = 'root:123456@tcp(127.0.0.1:3306)/open-hummingbird?charset=utf8mb4&parseTim
[Databases.Data]
[Databases.Data.Primary]
Type = 'tstorage'
DataSource = 'manifest/docker/db-data/tstorage-data/'
#Type = 'tdengine'
#Dsn = 'root:taosdata@ws(127.0.0.1:6041)/hummingbird'
#Type = 'leveldb'
#DataSource = 'manifest/docker/db-data/leveldb-core-data/'
Type = 'tdengine'
Dsn = 'root:taosdata@ws(127.0.0.1:6041)/hummingbird'
[MessageQueue]
Protocol = 'tcp'

1
go.mod
View File

@ -21,6 +21,7 @@ require (
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c
github.com/kirinlabs/HttpRequest v1.1.1
github.com/mitchellh/mapstructure v1.4.3
github.com/nakabonne/tstorage v0.3.6
github.com/nicksnyder/go-i18n/v2 v2.2.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pelletier/go-toml v1.9.5

View File

@ -556,7 +556,7 @@ func (p *deviceApp) DevicesReportMsgGather(ctx context.Context) error {
persistApp := resourceContainer.PersistItfFrom(p.dic.Get)
count, err = persistApp.SearchDeviceMsgCount(startTime, endTime)
if err != nil {
return err
}
var msgGather models.MsgGather
msgGather.Count = count

View File

@ -58,6 +58,8 @@ func (pst *persistApp) SaveDeviceThingModelData(req dtos.ThingModelMessage) erro
return pst.saveDeviceThingModelToLevelDB(req)
case constants.TDengine:
return pst.saveDeviceThingModelToTdengine(req)
case constants.Tstorage:
return pst.saveDeviceThingModelToTstorage(req)
default:
return nil
}
@ -406,6 +408,174 @@ func (pst *persistApp) saveDeviceThingModelToTdengine(req dtos.ThingModelMessage
return nil
}
func (pst *persistApp) saveDeviceThingModelToTstorage(req dtos.ThingModelMessage) error {
switch req.GetOpType() {
case thingmodel.OperationType_PROPERTY_REPORT:
propertyMsg, err := req.TransformMessageDataByProperty()
if err != nil {
return err
}
data := make(map[string]interface{})
for s, reportData := range propertyMsg.Data {
data[s] = reportData.Value
}
err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data)
if err != nil {
return err
}
case thingmodel.OperationType_EVENT_REPORT:
eventMsg, err := req.TransformMessageDataByEvent()
if err != nil {
return err
}
data := make(map[string]interface{})
data[eventMsg.Data.EventCode] = eventMsg.Data
err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data)
if err != nil {
return err
}
case thingmodel.OperationType_SERVICE_EXECUTE:
serviceMsg, err := req.TransformMessageDataByService()
if err != nil {
return err
}
v, _ := serviceMsg.Marshal()
data := make(map[string]interface{})
data[serviceMsg.Code] = string(v)
err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data)
if err != nil {
return err
}
case thingmodel.OperationType_PROPERTY_GET_RESPONSE:
msg, err := req.TransformMessageDataByGetProperty()
if err != nil {
return err
}
messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
ack, ok := messageStore.LoadMsgChan(msg.MsgId)
if !ok {
//超时了。
return nil
}
if v, ok := ack.(*messagestore.MsgAckChan); ok {
v.TrySendDataAndCloseChan(msg.Data)
messageStore.DeleteMsgId(msg.MsgId)
}
case thingmodel.OperationType_PROPERTY_SET_RESPONSE:
msg, err := req.TransformMessageDataBySetProperty()
if err != nil {
return err
}
messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
ack, ok := messageStore.LoadMsgChan(msg.MsgId)
if !ok {
//超时了。
return nil
}
if v, ok := ack.(*messagestore.MsgAckChan); ok {
v.TrySendDataAndCloseChan(msg.Data)
messageStore.DeleteMsgId(msg.MsgId)
}
case thingmodel.OperationType_DATA_BATCH_REPORT:
msg, err := req.TransformMessageDataByBatchReport()
if err != nil {
return err
}
data := make(map[string]interface{})
for code, property := range msg.Data.Properties {
data[code] = property.Value
}
for code, event := range msg.Data.Events {
var eventData dtos.EventData
eventData.OutputParams = event.OutputParams
eventData.EventCode = code
eventData.EventTime = msg.Time
data[code] = eventData
}
//批量写。
err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data)
if err != nil {
return err
}
return nil
case thingmodel.OperationType_SERVICE_EXECUTE_RESPONSE:
serviceMsg, err := req.TransformMessageDataByServiceExec()
if err != nil {
return err
}
device, err := pst.dbClient.DeviceById(req.Cid)
if err != nil {
return err
}
_, err = pst.dbClient.ProductById(device.ProductId)
if err != nil {
return err
}
//var find bool
//var callType constants.CallType
//
//for _, action := range product.Actions {
// if action.Code == serviceMsg.Code {
// find = true
// callType = action.CallType
// break
// }
//}
//
//if !find {
// return errors.New("")
//}
messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId)
if !ok {
//可能是超时了。
return nil
}
if v, ok := ack.(*messagestore.MsgAckChan); ok {
v.TrySendDataAndCloseChan(serviceMsg.OutputParams)
messageStore.DeleteMsgId(serviceMsg.MsgId)
}
//if callType == constants.CallTypeSync {
// messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
// ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId)
// if !ok {
// //可能是超时了。
// return nil
// }
//
// if v, ok := ack.(*messagestore.MsgAckChan); ok {
// v.TrySendDataAndCloseChan(serviceMsg.OutputParams)
// messageStore.DeleteMsgId(serviceMsg.MsgId)
// }
//
//} else if callType == constants.CallTypeAsync {
// v, _ := serviceMsg.Marshal()
// data := make(map[string]interface{})
// data[serviceMsg.Code] = string(v)
// err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data)
// if err != nil {
// return err
// }
//}
}
return nil
}
func generatePropertyLeveldbKey(cid, code string, reportTime int64) string {
return cid + "-" + constants.Property + "-" + code + "-" + strconv.Itoa(int(reportTime))
}
@ -517,6 +687,96 @@ func (pst *persistApp) searchDeviceThingModelHistoryPropertyDataFromTDengine(req
return response, count, nil
}
func (pst *persistApp) searchDeviceThingModelHistoryPropertyDataFromTstorage(req dtos.ThingModelPropertyDataRequest) (interface{}, int, error) {
var count int
deviceInfo, err := pst.dbClient.DeviceById(req.DeviceId)
if err != nil {
return nil, count, err
}
var productInfo models.Product
productInfo, err = pst.dbClient.ProductById(deviceInfo.ProductId)
if err != nil {
return nil, count, err
}
var response []dtos.ReportData
for _, property := range productInfo.Properties {
if property.Code == req.Code {
req.Code = property.Code
response, count, err = pst.dataDbClient.GetDeviceProperty(req, deviceInfo)
if err != nil {
pst.lc.Errorf("GetDeviceProperty error %+v", err)
}
var typeSpecIntOrFloat models.TypeSpecIntOrFloat
if property.TypeSpec.Type == constants.SpecsTypeInt || property.TypeSpec.Type == constants.SpecsTypeFloat {
_ = json.Unmarshal([]byte(property.TypeSpec.Specs), &typeSpecIntOrFloat)
}
if typeSpecIntOrFloat.Unit == "" {
typeSpecIntOrFloat.Unit = "-"
}
break
}
}
return response, count, nil
}
func (pst *persistApp) searchDeviceThingModelPropertyDataFromTstorage(req dtos.ThingModelPropertyDataRequest) (interface{}, error) {
deviceInfo, err := pst.dbClient.DeviceById(req.DeviceId)
if err != nil {
return nil, err
}
var productInfo models.Product
response := make([]dtos.ThingModelDataResponse, 0)
productInfo, err = pst.dbClient.ProductById(deviceInfo.ProductId)
if err != nil {
return nil, err
}
if req.Code == "" {
for _, property := range productInfo.Properties {
req.Code = property.Code
ksv, _, err := pst.dataDbClient.GetDeviceProperty(req, deviceInfo)
if err != nil {
pst.lc.Errorf("GetDeviceProperty error %+v", err)
continue
}
var reportData dtos.ReportData
if len(ksv) > 0 {
reportData = ksv[0]
}
var unit string
if property.TypeSpec.Type == constants.SpecsTypeInt || property.TypeSpec.Type == constants.SpecsTypeFloat {
var typeSpecIntOrFloat models.TypeSpecIntOrFloat
_ = json.Unmarshal([]byte(property.TypeSpec.Specs), &typeSpecIntOrFloat)
unit = typeSpecIntOrFloat.Unit
} else if property.TypeSpec.Type == constants.SpecsTypeEnum {
//enum 的单位需要特殊处理一下
enumTypeSpec := make(map[string]string)
_ = json.Unmarshal([]byte(property.TypeSpec.Specs), &enumTypeSpec)
for key, value := range enumTypeSpec {
s := utils.InterfaceToString(reportData.Value)
if key == s {
unit = value
}
}
}
if unit == "" {
unit = "-"
}
response = append(response, dtos.ThingModelDataResponse{
ReportData: reportData,
Code: property.Code,
DataType: string(property.TypeSpec.Type),
Name: property.Name,
Unit: unit,
AccessMode: property.AccessMode,
})
}
}
return response, nil
}
func (pst *persistApp) searchDeviceThingModelPropertyDataFromTDengine(req dtos.ThingModelPropertyDataRequest) (interface{}, error) {
deviceInfo, err := pst.dbClient.DeviceById(req.DeviceId)
if err != nil {
@ -549,8 +809,6 @@ func (pst *persistApp) searchDeviceThingModelPropertyDataFromTDengine(req dtos.T
//enum 的单位需要特殊处理一下
enumTypeSpec := make(map[string]string)
_ = json.Unmarshal([]byte(property.TypeSpec.Specs), &enumTypeSpec)
//pst.lc.Info("reportDataType enumTypeSpec", enumTypeSpec)
for key, value := range enumTypeSpec {
s := utils.InterfaceToString(reportData.Value)
if key == s {
@ -580,9 +838,10 @@ func (pst *persistApp) SearchDeviceThingModelPropertyData(req dtos.ThingModelPro
switch pst.dataDbClient.GetDataDBType() {
case constants.LevelDB:
return pst.searchDeviceThingModelPropertyDataFromLevelDB(req)
case constants.TDengine:
return pst.searchDeviceThingModelPropertyDataFromTDengine(req)
case constants.Tstorage:
return pst.searchDeviceThingModelPropertyDataFromTstorage(req)
default:
return make([]interface{}, 0), nil
@ -619,6 +878,8 @@ func (pst *persistApp) SearchDeviceThingModelHistoryPropertyData(req dtos.ThingM
return pst.searchDeviceThingModelHistoryPropertyDataFromLevelDB(req)
case constants.TDengine:
return pst.searchDeviceThingModelHistoryPropertyDataFromTDengine(req)
case constants.Tstorage:
return pst.searchDeviceThingModelHistoryPropertyDataFromTstorage(req)
}
response := make([]interface{}, 0)
return response, 0, nil

View File

@ -6,20 +6,10 @@ package application
import (
"context"
resourceContainer "github.com/winc-link/hummingbird/internal/hummingbird/core/container"
"github.com/winc-link/hummingbird/internal/pkg/crontab"
"github.com/winc-link/hummingbird/internal/pkg/di"
"github.com/winc-link/hummingbird/internal/pkg/logger"
"time"
//"gitlab.com/tedge/edgex/internal/dtos"
//"gitlab.com/tedge/edgex/internal/pkg/constants"
//"gitlab.com/tedge/edgex/internal/pkg/container"
//pkgContainer "github.com/winc-link/hummingbird/internal/pkg/container"
"github.com/winc-link/hummingbird/internal/pkg/crontab"
//"gitlab.com/tedge/edgex/internal/pkg/di"
//"gitlab.com/tedge/edgex/internal/pkg/errort"
//"gitlab.com/tedge/edgex/internal/pkg/logger"
//resourceContainer "gitlab.com/tedge/edgex/internal/tedge/resource/container"
//"gitlab.com/tedge/edgex/internal/tools/atopclient"
)
func InitSchedule(dic *di.Container, lc logger.LoggingClient) {

View File

@ -15,6 +15,7 @@ import (
"github.com/winc-link/hummingbird/internal/hummingbird/core/infrastructure/sqlite"
"github.com/winc-link/hummingbird/internal/pkg/constants"
"github.com/winc-link/hummingbird/internal/tools/datadb/tdengine"
"github.com/winc-link/hummingbird/internal/tools/datadb/tstorage"
"github.com/winc-link/hummingbird/internal/pkg/di"
"github.com/winc-link/hummingbird/internal/pkg/logger"
@ -68,6 +69,10 @@ func (d Database) newDataDBClient(
return leveldb.NewClient(dtos.Configuration{
DataSource: dataDbInfo.DataSource,
}, lc)
case string(constants.Tstorage):
return tstorage.NewClient(dtos.Configuration{
DataSource: dataDbInfo.DataSource,
}, lc)
case string(constants.TDengine):
return tdengine.NewClient(dtos.Configuration{
Dsn: dataDbInfo.Dsn,

View File

@ -30,4 +30,5 @@ type DataType string
const (
LevelDB DataType = "leveldb"
TDengine DataType = "tdengine"
Tstorage DataType = "tstorage"
)

View File

@ -17,25 +17,46 @@ package utils
import "fmt"
func InStringSlice(key string, keys []string) bool {
ok := false
for _, item := range keys {
if key == item {
return true
}
}
return ok
ok := false
for _, item := range keys {
if key == item {
return true
}
}
return ok
}
func SliceStringUnique(strings []string) []string {
temp := map[string]struct{}{}
result := make([]string, 0, len(strings))
for _, item := range strings {
key := fmt.Sprint(item)
if _, ok := temp[key]; !ok {
temp[key] = struct{}{}
result = append(result, item)
}
}
return result
temp := map[string]struct{}{}
result := make([]string, 0, len(strings))
for _, item := range strings {
key := fmt.Sprint(item)
if _, ok := temp[key]; !ok {
temp[key] = struct{}{}
result = append(result, item)
}
}
return result
}
func ConvertToFloat64(i interface{}) float64 {
switch v := i.(type) {
case float64:
return v
case float32:
return float64(v)
case int:
return float64(v)
case int64:
return float64(v)
case int32:
return float64(v)
case string:
var f float64
_, err := fmt.Sscanf(v, "%f", &f)
if err == nil {
return f
}
}
return 0
}

View File

@ -318,32 +318,45 @@ func (c *Client) GetDeviceProperty(req dtos.ThingModelPropertyDataRequest, devic
firstTime = strconv.Itoa(int(req.Range[1]))
lastTime = strconv.Itoa(int(req.Range[0]))
}
iter := c.client.NewIterator(&util.Range{Start: []byte(baseKey + firstTime), Limit: []byte(baseKey + lastTime)}, &opt.ReadOptions{
DontFillCache: true,
})
var start int
var end int
if req.IsAll {
if iter.Last() {
for iter.Prev() {
var dbvalue dtos.ReportData
_ = json.Unmarshal(iter.Value(), &dbvalue)
response = append(response, dbvalue)
start = (req.Page-1)*req.PageSize + 1
end = (req.Page-1)*req.PageSize + req.PageSize
}
}
} else {
if iter.Last() {
count++
var dbvalue dtos.ReportData
_ = json.Unmarshal(iter.Value(), &dbvalue)
response = append(response, dbvalue)
for iter.Prev() {
var start int
var end int
start = (req.Page-1)*req.PageSize + 1
end = (req.Page-1)*req.PageSize + req.PageSize
if iter.Last() {
count++
if count >= start && count <= end {
if req.Page == 1 {
var dbvalue dtos.ReportData
_ = json.Unmarshal(iter.Value(), &dbvalue)
response = append(response, dbvalue)
}
for iter.Prev() {
count++
if count >= start && count <= end {
var dbvalue dtos.ReportData
_ = json.Unmarshal(iter.Value(), &dbvalue)
response = append(response, dbvalue)
}
}
}
}
iter.Release()
} else if req.First {
iter := c.client.NewIterator(util.BytesPrefix([]byte(baseKey)), &opt.ReadOptions{
DontFillCache: true,
@ -363,13 +376,27 @@ func (c *Client) GetDeviceProperty(req dtos.ThingModelPropertyDataRequest, devic
//break
}
iter.Release()
} else {
}
return response, count, nil
}
func (c *Client) GetDeviceMsgCountByGiveTime(deviceId string, startTime, endTime int64) (int, error) {
//TODO implement me
panic("implement me")
baseKey := deviceId + "-" + constants.Property + "-"
firstTime := strconv.Itoa(int(startTime))
lastTime := strconv.Itoa(int(endTime))
var count int
iter := c.client.NewIterator(&util.Range{Start: []byte(baseKey + firstTime), Limit: []byte(baseKey + lastTime)}, &opt.ReadOptions{
DontFillCache: true,
})
count++
if iter.Prev() {
count++
}
iter.Release()
return count, nil
}

View File

@ -0,0 +1,228 @@
package tstorage
import (
"context"
tstorage "github.com/nakabonne/tstorage"
"github.com/winc-link/hummingbird/internal/dtos"
interfaces "github.com/winc-link/hummingbird/internal/hummingbird/core/interface"
"github.com/winc-link/hummingbird/internal/models"
"github.com/winc-link/hummingbird/internal/pkg/constants"
"github.com/winc-link/hummingbird/internal/pkg/logger"
"github.com/winc-link/hummingbird/internal/pkg/utils"
"os"
"path/filepath"
"time"
)
type Client struct {
client tstorage.Storage
loggingClient logger.LoggingClient
}
func (c *Client) GetDataDBType() constants.DataType {
return constants.Tstorage
}
func (c *Client) CloseSession() {
c.client.Close()
}
func (c *Client) Insert(ctx context.Context, table string, data map[string]interface{}) (err error) {
metric := table
var rows []tstorage.Row
timestamp := time.Now().UnixMilli()
for code, value := range data {
var labels []tstorage.Label
labels = append(labels, tstorage.Label{
Name: "code", Value: code,
})
rows = append(rows, tstorage.Row{
Metric: metric,
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: timestamp,
Value: utils.ConvertToFloat64(value),
},
})
}
return c.client.InsertRows(rows)
}
func paginate(arr []*tstorage.DataPoint, page, pageSize int) []*tstorage.DataPoint {
arr = reverseArray(arr)
// 计算起始索引
start := (page - 1) * pageSize
end := start + pageSize
// 边界检查
if start >= len(arr) {
return []*tstorage.DataPoint{}
}
if end > len(arr) {
end = len(arr) // 不能超出数组范围
}
return arr[start:end]
}
// reverseArray 翻转数组
func reverseArray(arr []*tstorage.DataPoint) []*tstorage.DataPoint {
n := len(arr)
reversed := make([]*tstorage.DataPoint, n)
for i, v := range arr {
reversed[n-1-i] = v
}
return reversed
}
func (c *Client) GetDeviceProperty(req dtos.ThingModelPropertyDataRequest, device models.Device) ([]dtos.ReportData, int, error) {
var response []dtos.ReportData
var count int
if len(req.Range) == 2 {
var startTime, endTime int64
if req.Range[0] < req.Range[1] {
startTime = req.Range[0]
endTime = req.Range[1]
} else {
startTime = req.Range[1]
endTime = req.Range[0]
}
var labels []tstorage.Label
labels = append(labels, tstorage.Label{
Name: "code", Value: req.Code,
})
points, err := c.client.Select(constants.DB_PREFIX+device.Id, labels, startTime, endTime)
if err != nil {
c.loggingClient.Error("tstorage query data:", err)
return []dtos.ReportData{}, count, err
}
count = len(points)
paginateRes := paginate(points, req.Page, req.PageSize)
for _, re := range paginateRes {
response = append(response, dtos.ReportData{
Value: re.Value,
Time: re.Timestamp,
})
}
} else if req.Last {
var labels []tstorage.Label
labels = append(labels, tstorage.Label{
Name: "code", Value: req.Code,
})
var startTime, endTime int64
// 获取当前时间
now := time.Now()
// 计算半小时前的时间
past := now.Add(-30 * time.Minute)
// 转换为毫秒时间戳
endTime = now.UnixMilli()
startTime = past.UnixMilli()
points, err := c.client.Select(constants.DB_PREFIX+device.Id, labels, startTime, endTime)
if err != nil {
c.loggingClient.Error("tstorage query data:", err)
return []dtos.ReportData{}, count, nil
}
dataPoint := points[len(points)-1]
var reportData dtos.ReportData
reportData.Time = dataPoint.Timestamp
reportData.Value = dataPoint.Value
response = append(response, reportData)
}
return response, count, nil
}
func (c *Client) GetDeviceService(req dtos.ThingModelServiceDataRequest, device models.Device, product models.Product) ([]dtos.SaveServiceIssueData, int, error) {
// tstorage 不支持服务查询
var response []dtos.SaveServiceIssueData
var count int
return response, count, nil
}
func (c *Client) GetDeviceEvent(req dtos.ThingModelEventDataRequest, device models.Device, product models.Product) ([]dtos.EventData, int, error) {
// tstorage 不支持事件查询
var response []dtos.EventData
var count int
return response, count, nil
}
func (c *Client) CreateTable(ctx context.Context, stable, table string) (err error) {
return nil
}
func (c *Client) DropTable(ctx context.Context, table string) (err error) {
return nil
}
func (c *Client) CreateStable(ctx context.Context, product models.Product) (err error) {
return nil
}
func (c *Client) DropStable(ctx context.Context, table string) (err error) {
return nil
}
func (c *Client) AddDatabaseField(ctx context.Context, tableName string, specsType constants.SpecsType, code string, name string) (err error) {
return nil
}
func (c *Client) DelDatabaseField(ctx context.Context, tableName, code string) (err error) {
return nil
}
func (c *Client) ModifyDatabaseField(ctx context.Context, tableName string, specsType constants.SpecsType, code string, name string) (err error) {
return nil
}
func (c *Client) GetDevicePropertyCount(request dtos.ThingModelPropertyDataRequest) (int, error) {
//TODO implement me
panic("implement me")
}
func (c *Client) GetDeviceEventCount(req dtos.ThingModelEventDataRequest) (int, error) {
//TODO implement me
panic("implement me")
}
func (c *Client) GetDeviceMsgCountByGiveTime(deviceId string, startTime, endTime int64) (int, error) {
var labels []tstorage.Label
var count int
labels = append(labels, tstorage.Label{})
points, err := c.client.Select(constants.DB_PREFIX+deviceId, labels, startTime, endTime)
if err != nil {
c.loggingClient.Error("tstorage query data:", err)
return count, err
}
count = len(points)
return count, nil
}
func NewClient(config dtos.Configuration, lc logger.LoggingClient) (c interfaces.DataDBClient, errEdgeX error) {
dataSourceDir := filepath.Dir(config.DataSource)
_, fileErr := os.Stat(dataSourceDir)
if fileErr != nil || !os.IsExist(fileErr) {
_ = os.MkdirAll(dataSourceDir, os.ModePerm)
}
storage, err := tstorage.NewStorage(
tstorage.WithTimestampPrecision(tstorage.Milliseconds),
tstorage.WithDataPath(dataSourceDir),
//tstorage.WithRetention(365*24*time.Hour),
)
if err != nil {
return nil, err
}
return &Client{
client: storage,
loggingClient: lc,
}, nil
}