From 1996542446dbacf2f469d2e3bc33993aec71a5a1 Mon Sep 17 00:00:00 2001 From: zhuhoujiu Date: Fri, 7 Mar 2025 11:02:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0tstorage=20=E6=97=B6=E5=BA=8F?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E6=94=AF=E6=8C=81=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/hummingbird-core/res/configuration.toml | 6 +- go.mod | 1 + .../core/application/deviceapp/deviceapp.go | 2 +- .../application/persistence/thinkmodel.go | 267 +++++++++++++++++- .../hummingbird/core/application/schedule.go | 12 +- .../core/bootstrap/database/database.go | 5 + internal/pkg/constants/db.go | 1 + internal/pkg/utils/function.go | 55 ++-- internal/tools/datadb/leveldb/client.go | 59 ++-- internal/tools/datadb/tstorage/client.go | 228 +++++++++++++++ 10 files changed, 586 insertions(+), 50 deletions(-) create mode 100644 internal/tools/datadb/tstorage/client.go diff --git a/cmd/hummingbird-core/res/configuration.toml b/cmd/hummingbird-core/res/configuration.toml index 13a54a5..ec770db 100644 --- a/cmd/hummingbird-core/res/configuration.toml +++ b/cmd/hummingbird-core/res/configuration.toml @@ -36,10 +36,12 @@ Dsn = 'root:123456@tcp(127.0.0.1:3306)/open-hummingbird?charset=utf8mb4&parseTim [Databases.Data] [Databases.Data.Primary] +Type = 'tstorage' +DataSource = 'manifest/docker/db-data/tstorage-data/' +#Type = 'tdengine' +#Dsn = 'root:taosdata@ws(127.0.0.1:6041)/hummingbird' #Type = 'leveldb' #DataSource = 'manifest/docker/db-data/leveldb-core-data/' -Type = 'tdengine' -Dsn = 'root:taosdata@ws(127.0.0.1:6041)/hummingbird' [MessageQueue] Protocol = 'tcp' diff --git a/go.mod b/go.mod index fbd7eea..10cc6a1 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c github.com/kirinlabs/HttpRequest v1.1.1 github.com/mitchellh/mapstructure v1.4.3 + github.com/nakabonne/tstorage v0.3.6 github.com/nicksnyder/go-i18n/v2 v2.2.0 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pelletier/go-toml v1.9.5 diff --git a/internal/hummingbird/core/application/deviceapp/deviceapp.go b/internal/hummingbird/core/application/deviceapp/deviceapp.go index 27d5099..2a9493a 100644 --- a/internal/hummingbird/core/application/deviceapp/deviceapp.go +++ b/internal/hummingbird/core/application/deviceapp/deviceapp.go @@ -556,7 +556,7 @@ func (p *deviceApp) DevicesReportMsgGather(ctx context.Context) error { persistApp := resourceContainer.PersistItfFrom(p.dic.Get) count, err = persistApp.SearchDeviceMsgCount(startTime, endTime) if err != nil { - + return err } var msgGather models.MsgGather msgGather.Count = count diff --git a/internal/hummingbird/core/application/persistence/thinkmodel.go b/internal/hummingbird/core/application/persistence/thinkmodel.go index 0cd1e70..599c349 100644 --- a/internal/hummingbird/core/application/persistence/thinkmodel.go +++ b/internal/hummingbird/core/application/persistence/thinkmodel.go @@ -58,6 +58,8 @@ func (pst *persistApp) SaveDeviceThingModelData(req dtos.ThingModelMessage) erro return pst.saveDeviceThingModelToLevelDB(req) case constants.TDengine: return pst.saveDeviceThingModelToTdengine(req) + case constants.Tstorage: + return pst.saveDeviceThingModelToTstorage(req) default: return nil } @@ -406,6 +408,174 @@ func (pst *persistApp) saveDeviceThingModelToTdengine(req dtos.ThingModelMessage return nil } +func (pst *persistApp) saveDeviceThingModelToTstorage(req dtos.ThingModelMessage) error { + switch req.GetOpType() { + case thingmodel.OperationType_PROPERTY_REPORT: + propertyMsg, err := req.TransformMessageDataByProperty() + if err != nil { + return err + } + data := make(map[string]interface{}) + for s, reportData := range propertyMsg.Data { + data[s] = reportData.Value + } + + err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data) + if err != nil { + return err + } + + case thingmodel.OperationType_EVENT_REPORT: + eventMsg, err := req.TransformMessageDataByEvent() + if err != nil { + return err + } + data := make(map[string]interface{}) + data[eventMsg.Data.EventCode] = eventMsg.Data + err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data) + if err != nil { + return err + } + + case thingmodel.OperationType_SERVICE_EXECUTE: + serviceMsg, err := req.TransformMessageDataByService() + if err != nil { + return err + } + v, _ := serviceMsg.Marshal() + data := make(map[string]interface{}) + data[serviceMsg.Code] = string(v) + err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data) + if err != nil { + return err + } + + case thingmodel.OperationType_PROPERTY_GET_RESPONSE: + msg, err := req.TransformMessageDataByGetProperty() + if err != nil { + return err + } + messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) + ack, ok := messageStore.LoadMsgChan(msg.MsgId) + if !ok { + //超时了。 + return nil + } + if v, ok := ack.(*messagestore.MsgAckChan); ok { + v.TrySendDataAndCloseChan(msg.Data) + messageStore.DeleteMsgId(msg.MsgId) + } + case thingmodel.OperationType_PROPERTY_SET_RESPONSE: + msg, err := req.TransformMessageDataBySetProperty() + if err != nil { + return err + } + messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) + ack, ok := messageStore.LoadMsgChan(msg.MsgId) + if !ok { + //超时了。 + return nil + } + if v, ok := ack.(*messagestore.MsgAckChan); ok { + v.TrySendDataAndCloseChan(msg.Data) + messageStore.DeleteMsgId(msg.MsgId) + } + case thingmodel.OperationType_DATA_BATCH_REPORT: + + msg, err := req.TransformMessageDataByBatchReport() + if err != nil { + return err + } + data := make(map[string]interface{}) + + for code, property := range msg.Data.Properties { + data[code] = property.Value + } + for code, event := range msg.Data.Events { + var eventData dtos.EventData + eventData.OutputParams = event.OutputParams + eventData.EventCode = code + eventData.EventTime = msg.Time + data[code] = eventData + + } + //批量写。 + err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data) + + if err != nil { + return err + } + return nil + case thingmodel.OperationType_SERVICE_EXECUTE_RESPONSE: + serviceMsg, err := req.TransformMessageDataByServiceExec() + if err != nil { + return err + } + + device, err := pst.dbClient.DeviceById(req.Cid) + if err != nil { + return err + } + + _, err = pst.dbClient.ProductById(device.ProductId) + if err != nil { + return err + } + + //var find bool + //var callType constants.CallType + // + //for _, action := range product.Actions { + // if action.Code == serviceMsg.Code { + // find = true + // callType = action.CallType + // break + // } + //} + // + //if !find { + // return errors.New("") + //} + + messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) + ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId) + if !ok { + //可能是超时了。 + return nil + } + + if v, ok := ack.(*messagestore.MsgAckChan); ok { + v.TrySendDataAndCloseChan(serviceMsg.OutputParams) + messageStore.DeleteMsgId(serviceMsg.MsgId) + } + + //if callType == constants.CallTypeSync { + // messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) + // ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId) + // if !ok { + // //可能是超时了。 + // return nil + // } + // + // if v, ok := ack.(*messagestore.MsgAckChan); ok { + // v.TrySendDataAndCloseChan(serviceMsg.OutputParams) + // messageStore.DeleteMsgId(serviceMsg.MsgId) + // } + // + //} else if callType == constants.CallTypeAsync { + // v, _ := serviceMsg.Marshal() + // data := make(map[string]interface{}) + // data[serviceMsg.Code] = string(v) + // err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data) + // if err != nil { + // return err + // } + //} + + } + return nil +} + func generatePropertyLeveldbKey(cid, code string, reportTime int64) string { return cid + "-" + constants.Property + "-" + code + "-" + strconv.Itoa(int(reportTime)) } @@ -517,6 +687,96 @@ func (pst *persistApp) searchDeviceThingModelHistoryPropertyDataFromTDengine(req return response, count, nil } +func (pst *persistApp) searchDeviceThingModelHistoryPropertyDataFromTstorage(req dtos.ThingModelPropertyDataRequest) (interface{}, int, error) { + var count int + deviceInfo, err := pst.dbClient.DeviceById(req.DeviceId) + if err != nil { + return nil, count, err + } + var productInfo models.Product + productInfo, err = pst.dbClient.ProductById(deviceInfo.ProductId) + if err != nil { + return nil, count, err + } + var response []dtos.ReportData + + for _, property := range productInfo.Properties { + if property.Code == req.Code { + req.Code = property.Code + response, count, err = pst.dataDbClient.GetDeviceProperty(req, deviceInfo) + if err != nil { + pst.lc.Errorf("GetDeviceProperty error %+v", err) + } + var typeSpecIntOrFloat models.TypeSpecIntOrFloat + if property.TypeSpec.Type == constants.SpecsTypeInt || property.TypeSpec.Type == constants.SpecsTypeFloat { + _ = json.Unmarshal([]byte(property.TypeSpec.Specs), &typeSpecIntOrFloat) + } + + if typeSpecIntOrFloat.Unit == "" { + typeSpecIntOrFloat.Unit = "-" + } + break + } + } + return response, count, nil +} + +func (pst *persistApp) searchDeviceThingModelPropertyDataFromTstorage(req dtos.ThingModelPropertyDataRequest) (interface{}, error) { + deviceInfo, err := pst.dbClient.DeviceById(req.DeviceId) + if err != nil { + return nil, err + } + var productInfo models.Product + response := make([]dtos.ThingModelDataResponse, 0) + productInfo, err = pst.dbClient.ProductById(deviceInfo.ProductId) + if err != nil { + return nil, err + } + if req.Code == "" { + for _, property := range productInfo.Properties { + req.Code = property.Code + ksv, _, err := pst.dataDbClient.GetDeviceProperty(req, deviceInfo) + if err != nil { + pst.lc.Errorf("GetDeviceProperty error %+v", err) + continue + } + var reportData dtos.ReportData + if len(ksv) > 0 { + reportData = ksv[0] + } + var unit string + if property.TypeSpec.Type == constants.SpecsTypeInt || property.TypeSpec.Type == constants.SpecsTypeFloat { + var typeSpecIntOrFloat models.TypeSpecIntOrFloat + _ = json.Unmarshal([]byte(property.TypeSpec.Specs), &typeSpecIntOrFloat) + unit = typeSpecIntOrFloat.Unit + } else if property.TypeSpec.Type == constants.SpecsTypeEnum { + //enum 的单位需要特殊处理一下 + enumTypeSpec := make(map[string]string) + _ = json.Unmarshal([]byte(property.TypeSpec.Specs), &enumTypeSpec) + for key, value := range enumTypeSpec { + s := utils.InterfaceToString(reportData.Value) + if key == s { + unit = value + } + } + } + + if unit == "" { + unit = "-" + } + response = append(response, dtos.ThingModelDataResponse{ + ReportData: reportData, + Code: property.Code, + DataType: string(property.TypeSpec.Type), + Name: property.Name, + Unit: unit, + AccessMode: property.AccessMode, + }) + } + } + return response, nil +} + func (pst *persistApp) searchDeviceThingModelPropertyDataFromTDengine(req dtos.ThingModelPropertyDataRequest) (interface{}, error) { deviceInfo, err := pst.dbClient.DeviceById(req.DeviceId) if err != nil { @@ -549,8 +809,6 @@ func (pst *persistApp) searchDeviceThingModelPropertyDataFromTDengine(req dtos.T //enum 的单位需要特殊处理一下 enumTypeSpec := make(map[string]string) _ = json.Unmarshal([]byte(property.TypeSpec.Specs), &enumTypeSpec) - //pst.lc.Info("reportDataType enumTypeSpec", enumTypeSpec) - for key, value := range enumTypeSpec { s := utils.InterfaceToString(reportData.Value) if key == s { @@ -580,9 +838,10 @@ func (pst *persistApp) SearchDeviceThingModelPropertyData(req dtos.ThingModelPro switch pst.dataDbClient.GetDataDBType() { case constants.LevelDB: return pst.searchDeviceThingModelPropertyDataFromLevelDB(req) - case constants.TDengine: return pst.searchDeviceThingModelPropertyDataFromTDengine(req) + case constants.Tstorage: + return pst.searchDeviceThingModelPropertyDataFromTstorage(req) default: return make([]interface{}, 0), nil @@ -619,6 +878,8 @@ func (pst *persistApp) SearchDeviceThingModelHistoryPropertyData(req dtos.ThingM return pst.searchDeviceThingModelHistoryPropertyDataFromLevelDB(req) case constants.TDengine: return pst.searchDeviceThingModelHistoryPropertyDataFromTDengine(req) + case constants.Tstorage: + return pst.searchDeviceThingModelHistoryPropertyDataFromTstorage(req) } response := make([]interface{}, 0) return response, 0, nil diff --git a/internal/hummingbird/core/application/schedule.go b/internal/hummingbird/core/application/schedule.go index ae0af66..1757e2b 100644 --- a/internal/hummingbird/core/application/schedule.go +++ b/internal/hummingbird/core/application/schedule.go @@ -6,20 +6,10 @@ package application import ( "context" resourceContainer "github.com/winc-link/hummingbird/internal/hummingbird/core/container" + "github.com/winc-link/hummingbird/internal/pkg/crontab" "github.com/winc-link/hummingbird/internal/pkg/di" "github.com/winc-link/hummingbird/internal/pkg/logger" "time" - - //"gitlab.com/tedge/edgex/internal/dtos" - //"gitlab.com/tedge/edgex/internal/pkg/constants" - //"gitlab.com/tedge/edgex/internal/pkg/container" - //pkgContainer "github.com/winc-link/hummingbird/internal/pkg/container" - "github.com/winc-link/hummingbird/internal/pkg/crontab" - //"gitlab.com/tedge/edgex/internal/pkg/di" - //"gitlab.com/tedge/edgex/internal/pkg/errort" - //"gitlab.com/tedge/edgex/internal/pkg/logger" - //resourceContainer "gitlab.com/tedge/edgex/internal/tedge/resource/container" - //"gitlab.com/tedge/edgex/internal/tools/atopclient" ) func InitSchedule(dic *di.Container, lc logger.LoggingClient) { diff --git a/internal/hummingbird/core/bootstrap/database/database.go b/internal/hummingbird/core/bootstrap/database/database.go index c72bd92..ffec49b 100644 --- a/internal/hummingbird/core/bootstrap/database/database.go +++ b/internal/hummingbird/core/bootstrap/database/database.go @@ -15,6 +15,7 @@ import ( "github.com/winc-link/hummingbird/internal/hummingbird/core/infrastructure/sqlite" "github.com/winc-link/hummingbird/internal/pkg/constants" "github.com/winc-link/hummingbird/internal/tools/datadb/tdengine" + "github.com/winc-link/hummingbird/internal/tools/datadb/tstorage" "github.com/winc-link/hummingbird/internal/pkg/di" "github.com/winc-link/hummingbird/internal/pkg/logger" @@ -68,6 +69,10 @@ func (d Database) newDataDBClient( return leveldb.NewClient(dtos.Configuration{ DataSource: dataDbInfo.DataSource, }, lc) + case string(constants.Tstorage): + return tstorage.NewClient(dtos.Configuration{ + DataSource: dataDbInfo.DataSource, + }, lc) case string(constants.TDengine): return tdengine.NewClient(dtos.Configuration{ Dsn: dataDbInfo.Dsn, diff --git a/internal/pkg/constants/db.go b/internal/pkg/constants/db.go index 927614d..a6140b7 100644 --- a/internal/pkg/constants/db.go +++ b/internal/pkg/constants/db.go @@ -30,4 +30,5 @@ type DataType string const ( LevelDB DataType = "leveldb" TDengine DataType = "tdengine" + Tstorage DataType = "tstorage" ) diff --git a/internal/pkg/utils/function.go b/internal/pkg/utils/function.go index 95a1535..d212b6a 100644 --- a/internal/pkg/utils/function.go +++ b/internal/pkg/utils/function.go @@ -17,25 +17,46 @@ package utils import "fmt" func InStringSlice(key string, keys []string) bool { - ok := false - for _, item := range keys { - if key == item { - return true - } - } - return ok + ok := false + for _, item := range keys { + if key == item { + return true + } + } + return ok } func SliceStringUnique(strings []string) []string { - temp := map[string]struct{}{} - result := make([]string, 0, len(strings)) - for _, item := range strings { - key := fmt.Sprint(item) - if _, ok := temp[key]; !ok { - temp[key] = struct{}{} - result = append(result, item) - } - } - return result + temp := map[string]struct{}{} + result := make([]string, 0, len(strings)) + for _, item := range strings { + key := fmt.Sprint(item) + if _, ok := temp[key]; !ok { + temp[key] = struct{}{} + result = append(result, item) + } + } + return result } +func ConvertToFloat64(i interface{}) float64 { + switch v := i.(type) { + case float64: + return v + case float32: + return float64(v) + case int: + return float64(v) + case int64: + return float64(v) + case int32: + return float64(v) + case string: + var f float64 + _, err := fmt.Sscanf(v, "%f", &f) + if err == nil { + return f + } + } + return 0 +} diff --git a/internal/tools/datadb/leveldb/client.go b/internal/tools/datadb/leveldb/client.go index d41b774..d5ae47e 100644 --- a/internal/tools/datadb/leveldb/client.go +++ b/internal/tools/datadb/leveldb/client.go @@ -318,32 +318,45 @@ func (c *Client) GetDeviceProperty(req dtos.ThingModelPropertyDataRequest, devic firstTime = strconv.Itoa(int(req.Range[1])) lastTime = strconv.Itoa(int(req.Range[0])) } - iter := c.client.NewIterator(&util.Range{Start: []byte(baseKey + firstTime), Limit: []byte(baseKey + lastTime)}, &opt.ReadOptions{ DontFillCache: true, }) - var start int - var end int + if req.IsAll { + if iter.Last() { + for iter.Prev() { + var dbvalue dtos.ReportData + _ = json.Unmarshal(iter.Value(), &dbvalue) + response = append(response, dbvalue) - start = (req.Page-1)*req.PageSize + 1 - end = (req.Page-1)*req.PageSize + req.PageSize + } + } + } else { - if iter.Last() { - count++ - var dbvalue dtos.ReportData - _ = json.Unmarshal(iter.Value(), &dbvalue) - response = append(response, dbvalue) - for iter.Prev() { + var start int + var end int + + start = (req.Page-1)*req.PageSize + 1 + end = (req.Page-1)*req.PageSize + req.PageSize + if iter.Last() { count++ - if count >= start && count <= end { + if req.Page == 1 { var dbvalue dtos.ReportData _ = json.Unmarshal(iter.Value(), &dbvalue) response = append(response, dbvalue) } + for iter.Prev() { + count++ + if count >= start && count <= end { + var dbvalue dtos.ReportData + _ = json.Unmarshal(iter.Value(), &dbvalue) + response = append(response, dbvalue) + } + } } } iter.Release() + } else if req.First { iter := c.client.NewIterator(util.BytesPrefix([]byte(baseKey)), &opt.ReadOptions{ DontFillCache: true, @@ -363,13 +376,27 @@ func (c *Client) GetDeviceProperty(req dtos.ThingModelPropertyDataRequest, devic //break } iter.Release() - } else { - } return response, count, nil } func (c *Client) GetDeviceMsgCountByGiveTime(deviceId string, startTime, endTime int64) (int, error) { - //TODO implement me - panic("implement me") + baseKey := deviceId + "-" + constants.Property + "-" + firstTime := strconv.Itoa(int(startTime)) + lastTime := strconv.Itoa(int(endTime)) + + var count int + + iter := c.client.NewIterator(&util.Range{Start: []byte(baseKey + firstTime), Limit: []byte(baseKey + lastTime)}, &opt.ReadOptions{ + DontFillCache: true, + }) + + count++ + if iter.Prev() { + count++ + } + + iter.Release() + + return count, nil } diff --git a/internal/tools/datadb/tstorage/client.go b/internal/tools/datadb/tstorage/client.go new file mode 100644 index 0000000..1f1f13a --- /dev/null +++ b/internal/tools/datadb/tstorage/client.go @@ -0,0 +1,228 @@ +package tstorage + +import ( + "context" + tstorage "github.com/nakabonne/tstorage" + "github.com/winc-link/hummingbird/internal/dtos" + interfaces "github.com/winc-link/hummingbird/internal/hummingbird/core/interface" + "github.com/winc-link/hummingbird/internal/models" + "github.com/winc-link/hummingbird/internal/pkg/constants" + "github.com/winc-link/hummingbird/internal/pkg/logger" + "github.com/winc-link/hummingbird/internal/pkg/utils" + "os" + "path/filepath" + "time" +) + +type Client struct { + client tstorage.Storage + loggingClient logger.LoggingClient +} + +func (c *Client) GetDataDBType() constants.DataType { + return constants.Tstorage +} + +func (c *Client) CloseSession() { + c.client.Close() +} + +func (c *Client) Insert(ctx context.Context, table string, data map[string]interface{}) (err error) { + metric := table + var rows []tstorage.Row + + timestamp := time.Now().UnixMilli() + + for code, value := range data { + var labels []tstorage.Label + labels = append(labels, tstorage.Label{ + Name: "code", Value: code, + }) + + rows = append(rows, tstorage.Row{ + Metric: metric, + Labels: labels, + DataPoint: tstorage.DataPoint{ + Timestamp: timestamp, + Value: utils.ConvertToFloat64(value), + }, + }) + } + return c.client.InsertRows(rows) +} + +func paginate(arr []*tstorage.DataPoint, page, pageSize int) []*tstorage.DataPoint { + arr = reverseArray(arr) + // 计算起始索引 + start := (page - 1) * pageSize + end := start + pageSize + + // 边界检查 + if start >= len(arr) { + return []*tstorage.DataPoint{} + } + if end > len(arr) { + end = len(arr) // 不能超出数组范围 + } + + return arr[start:end] +} + +// reverseArray 翻转数组 +func reverseArray(arr []*tstorage.DataPoint) []*tstorage.DataPoint { + n := len(arr) + reversed := make([]*tstorage.DataPoint, n) + for i, v := range arr { + reversed[n-1-i] = v + } + return reversed +} + +func (c *Client) GetDeviceProperty(req dtos.ThingModelPropertyDataRequest, device models.Device) ([]dtos.ReportData, int, error) { + var response []dtos.ReportData + var count int + if len(req.Range) == 2 { + var startTime, endTime int64 + if req.Range[0] < req.Range[1] { + startTime = req.Range[0] + endTime = req.Range[1] + } else { + startTime = req.Range[1] + endTime = req.Range[0] + } + var labels []tstorage.Label + + labels = append(labels, tstorage.Label{ + Name: "code", Value: req.Code, + }) + points, err := c.client.Select(constants.DB_PREFIX+device.Id, labels, startTime, endTime) + if err != nil { + c.loggingClient.Error("tstorage query data:", err) + return []dtos.ReportData{}, count, err + } + count = len(points) + + paginateRes := paginate(points, req.Page, req.PageSize) + for _, re := range paginateRes { + response = append(response, dtos.ReportData{ + Value: re.Value, + Time: re.Timestamp, + }) + } + + } else if req.Last { + var labels []tstorage.Label + labels = append(labels, tstorage.Label{ + Name: "code", Value: req.Code, + }) + var startTime, endTime int64 + // 获取当前时间 + now := time.Now() + // 计算半小时前的时间 + past := now.Add(-30 * time.Minute) + // 转换为毫秒时间戳 + endTime = now.UnixMilli() + startTime = past.UnixMilli() + + points, err := c.client.Select(constants.DB_PREFIX+device.Id, labels, startTime, endTime) + if err != nil { + c.loggingClient.Error("tstorage query data:", err) + return []dtos.ReportData{}, count, nil + } + + dataPoint := points[len(points)-1] + var reportData dtos.ReportData + reportData.Time = dataPoint.Timestamp + reportData.Value = dataPoint.Value + response = append(response, reportData) + + } + return response, count, nil +} + +func (c *Client) GetDeviceService(req dtos.ThingModelServiceDataRequest, device models.Device, product models.Product) ([]dtos.SaveServiceIssueData, int, error) { + // tstorage 不支持服务查询 + var response []dtos.SaveServiceIssueData + var count int + return response, count, nil +} + +func (c *Client) GetDeviceEvent(req dtos.ThingModelEventDataRequest, device models.Device, product models.Product) ([]dtos.EventData, int, error) { + // tstorage 不支持事件查询 + var response []dtos.EventData + var count int + return response, count, nil +} + +func (c *Client) CreateTable(ctx context.Context, stable, table string) (err error) { + return nil +} + +func (c *Client) DropTable(ctx context.Context, table string) (err error) { + return nil +} + +func (c *Client) CreateStable(ctx context.Context, product models.Product) (err error) { + return nil +} + +func (c *Client) DropStable(ctx context.Context, table string) (err error) { + return nil +} + +func (c *Client) AddDatabaseField(ctx context.Context, tableName string, specsType constants.SpecsType, code string, name string) (err error) { + return nil +} + +func (c *Client) DelDatabaseField(ctx context.Context, tableName, code string) (err error) { + return nil +} + +func (c *Client) ModifyDatabaseField(ctx context.Context, tableName string, specsType constants.SpecsType, code string, name string) (err error) { + return nil +} + +func (c *Client) GetDevicePropertyCount(request dtos.ThingModelPropertyDataRequest) (int, error) { + //TODO implement me + panic("implement me") +} + +func (c *Client) GetDeviceEventCount(req dtos.ThingModelEventDataRequest) (int, error) { + //TODO implement me + panic("implement me") +} + +func (c *Client) GetDeviceMsgCountByGiveTime(deviceId string, startTime, endTime int64) (int, error) { + var labels []tstorage.Label + var count int + labels = append(labels, tstorage.Label{}) + points, err := c.client.Select(constants.DB_PREFIX+deviceId, labels, startTime, endTime) + if err != nil { + c.loggingClient.Error("tstorage query data:", err) + return count, err + } + count = len(points) + return count, nil +} + +func NewClient(config dtos.Configuration, lc logger.LoggingClient) (c interfaces.DataDBClient, errEdgeX error) { + + dataSourceDir := filepath.Dir(config.DataSource) + _, fileErr := os.Stat(dataSourceDir) + if fileErr != nil || !os.IsExist(fileErr) { + _ = os.MkdirAll(dataSourceDir, os.ModePerm) + } + storage, err := tstorage.NewStorage( + tstorage.WithTimestampPrecision(tstorage.Milliseconds), + tstorage.WithDataPath(dataSourceDir), + //tstorage.WithRetention(365*24*time.Hour), + ) + if err != nil { + return nil, err + } + + return &Client{ + client: storage, + loggingClient: lc, + }, nil +}