From b5c327a0a517d14afed79fcb8dcf0d65ecaaa96f Mon Sep 17 00:00:00 2001 From: godLei6 <603785348@qq.com> Date: Sun, 24 Apr 2022 20:17:38 +0800 Subject: [PATCH 1/4] =?UTF-8?q?dev:=20=E5=B0=86nats=E7=9A=84=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E6=94=B9=E9=80=A0=E4=B8=BAjs=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E5=B9=B6=E6=95=B4=E7=90=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deploy/nats/nats-server.conf | 5 ++ .../docker-compose.yml => docker-compose.yml | 2 + go.mod | 2 +- deploy/init.sh => init.sh | 21 +++----- shared/devices/topic.go | 50 +++++++++++++++---- src/ddsvr/ddExport/dd.go | 2 +- src/ddsvr/ddExport/msg.go | 2 + src/ddsvr/internal/event/deviceSub.go | 15 ++++-- .../internal/repo/event/innerLink/nats.go | 31 ++++++++++-- src/dmsvr/dmDef/dm.go | 2 +- src/dmsvr/etc/dm.yaml | 3 +- .../service/deviceSend/tempParam_test.go | 1 + .../event/eventDevSub/devicePublish.go | 2 +- .../internal/repo/event/innerLink/nats.go | 39 ++++++++++++--- 14 files changed, 134 insertions(+), 43 deletions(-) create mode 100644 deploy/nats/nats-server.conf rename deploy/docker-compose.yml => docker-compose.yml (97%) rename deploy/init.sh => init.sh (83%) diff --git a/deploy/nats/nats-server.conf b/deploy/nats/nats-server.conf new file mode 100644 index 000000000..1a63a0484 --- /dev/null +++ b/deploy/nats/nats-server.conf @@ -0,0 +1,5 @@ +listen: 0.0.0.0:4222 +server_name: nats1 +jetstream { + store_dir: datastore +} diff --git a/deploy/docker-compose.yml b/docker-compose.yml similarity index 97% rename from deploy/docker-compose.yml rename to docker-compose.yml index 3480080ba..70eb3a3ab 100644 --- a/deploy/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,8 @@ services: - "4222:4222" - "6222:6222" - "8222:8222" + volumes: + - "/opt/things/conf/nats/nats-server.conf:/nats-server.conf" networks: ithings_net: aliases: diff --git a/go.mod b/go.mod index c7dd08489..626fd6520 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/jinzhu/copier v0.3.2 github.com/mojocn/base64Captcha v1.3.4 github.com/nats-io/nats-server/v2 v2.7.4 // indirect - github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d + github.com/nats-io/nats.go v1.14.0 github.com/silenceper/wechat/v2 v2.0.5 github.com/spf13/cast v1.3.1 github.com/zeromicro/go-zero v1.3.2 diff --git a/deploy/init.sh b/init.sh similarity index 83% rename from deploy/init.sh rename to init.sh index cd07ec156..bb8860c5d 100644 --- a/deploy/init.sh +++ b/init.sh @@ -5,7 +5,7 @@ echo "well come to go-things,we need init docker with docker-compose first" function init_docker(){ echo "init docker" - cp -rf docker/* /etc/docker/ + cp -rf $confPath/docker/* /etc/docker/ curl -sSL https://get.daocloud.io/docker | sh sudo systemctl start docker docker run hello-world @@ -22,18 +22,11 @@ function init_conf_path(){ #预创建配置所需文件夹 thingsPath="/opt/things" confPath="$thingsPath/conf" - emqxPath="$confPath/emqx" - mysqlPath="$confPath/mysql" - - if [ ! -d "$emqxPath" ]; then - mkdir -p "$emqxPath" - fi - if [ ! -d "$mysqlPath" ]; then - mkdir -p "$mysqlPath" + if [ ! -d "$confPath" ]; then + mkdir -p "$confPath" fi - #将emqx和mysql所在工程内的配置拷贝到物理机目标位置 - cp -rf emqx/* $emqxPath - cp -rf mysql/* $mysqlPath + #将docker映射的所在工程内的配置拷贝到物理机目标位置 + cp -rf ./deploy/* confPath } function init_mysql_db_table(){ @@ -53,14 +46,12 @@ function init_mysql_db_table(){ fi done } - +init_conf_path type docker >/dev/null 2>&1 || init_docker; type docker-compose >/dev/null 2>&1 || init_docker_compose; echo "docker with docker-compose init success" echo "now buid and start go-things needs mirror image" echo "docker-compose -f $CURDIR/docker-compose.yml up -d" >> /etc/rc.local - -init_conf_path sleep 1 echo "start docker compose " docker-compose up -d diff --git a/shared/devices/topic.go b/shared/devices/topic.go index 658dca98f..f17a9284d 100644 --- a/shared/devices/topic.go +++ b/shared/devices/topic.go @@ -31,15 +31,29 @@ ${productID}/${deviceName}/event 发布 ${productID}/${deviceName}/xxxxx 订阅和发布 //自定义 暂不做支持 */ -func GetDeviceInfo(topic string) (productId, deviceName string, err error) { +type DIRECTION int + +const ( + UNKNOW DIRECTION = iota //未知 + UP //上行 + DOWN //下行 +) + +type TopicInfo struct { + ProductID string + DeviceName string + Direction DIRECTION +} + +func GetTopicInfo(topic string) (topicInfo *TopicInfo, err error) { keys := strings.Split(topic, "/") return parseTopic(keys) } //通过topic的第一个字段来获取处理函数 -func parseTopic(topics []string) (productId, deviceName string, err error) { +func parseTopic(topics []string) (topicInfo *TopicInfo, err error) { if len(topics) < 2 { - return "", "", errors.Parameter.AddDetail("topic is err") + return nil, errors.Parameter.AddDetail("topic is err") } switch topics[0] { case "$thing", "$ota", "$shadow", "$broadcast": @@ -49,16 +63,34 @@ func parseTopic(topics []string) (productId, deviceName string, err error) { } } -func parsePose(productPos int, topics []string) (productId, deviceName string, err error) { +func parsePose(productPos int, topics []string) (topicInfo *TopicInfo, err error) { if len(topics) < (productPos + 2) { - return "", "", errors.Parameter.AddDetail("topic is err") + return nil, errors.Parameter.AddDetail("topic is err") } - return topics[productPos], topics[productPos+1], err + return &TopicInfo{ + ProductID: topics[productPos], + DeviceName: topics[productPos+1], + }, err } -func parseLast(topics []string) (productId, deviceName string, err error) { +func parseLast(topics []string) (topicInfo *TopicInfo, err error) { if len(topics) < 2 { - return "", "", errors.Parameter.AddDetail("topic is err") + return nil, errors.Parameter.AddDetail("topic is err") + } + return &TopicInfo{ + ProductID: topics[len(topics)-2], + DeviceName: topics[len(topics)-1], + Direction: getDirection(topics[1]), + }, err +} + +func getDirection(dir string) DIRECTION { + switch dir { + case "up", "report", "rxd": + return UP + case "down", "update", "txd": + return DOWN + default: + return UNKNOW } - return topics[len(topics)-2], topics[len(topics)-1], err } diff --git a/src/ddsvr/ddExport/dd.go b/src/ddsvr/ddExport/dd.go index b4d576123..82ba8bb80 100644 --- a/src/ddsvr/ddExport/dd.go +++ b/src/ddsvr/ddExport/dd.go @@ -1,5 +1,5 @@ package ddExport const ( - SvrName = "dd.rpc" + SvrName = "ddRpc" ) diff --git a/src/ddsvr/ddExport/msg.go b/src/ddsvr/ddExport/msg.go index 13867c703..d37ecc6df 100644 --- a/src/ddsvr/ddExport/msg.go +++ b/src/ddsvr/ddExport/msg.go @@ -38,6 +38,8 @@ const ( //topic 定义 const ( + ThingsConsumeName = "things_consume" + ThingsStreamName = "things_msg" // TopicDevPublish dd模块收到设备的发布消息后向内部推送以下topic 最后两个是产品id和设备名称 TopicDevPublish = "dd.thing.device.clients.publish.%s.%s" TopicDevPublishAll = "dd.thing.device.clients.publish.>" diff --git a/src/ddsvr/internal/event/deviceSub.go b/src/ddsvr/internal/event/deviceSub.go index 185420a28..51db6cf4f 100644 --- a/src/ddsvr/internal/event/deviceSub.go +++ b/src/ddsvr/internal/event/deviceSub.go @@ -27,20 +27,25 @@ func NewDeviceSubServer(svcCtx *svc.ServiceContext, ctx context.Context) *Device // Publish 设备发布的信息通过nats转发给内部服务 func (s *DeviceSubServer) Publish(topic string, payload []byte) error { - s.Info("DeviceSubServer", "Publish", topic, string(payload)) - productId, deviceName, err := devices.GetDeviceInfo(topic) + s.Infof("DeviceSubServer|Publish|topic:%v payload:%v", topic, string(payload)) + topicInfo, err := devices.GetTopicInfo(topic) if err != nil { return err } + if topicInfo.Direction == devices.DOWN { + //服务器端下发的消息直接忽略 + return nil + } pub := ddExport.DevPublish{ Timestamp: time.Now().UnixMilli(), Topic: topic, Payload: payload, - ProductID: productId, - DeviceName: deviceName, + ProductID: topicInfo.ProductID, + DeviceName: topicInfo.DeviceName, } pubStr, _ := json.Marshal(pub) - return s.svcCtx.InnerLink.Publish(s.ctx, fmt.Sprintf(ddExport.TopicDevPublish, productId, deviceName), pubStr) + return s.svcCtx.InnerLink.Publish(s.ctx, + fmt.Sprintf(ddExport.TopicDevPublish, topicInfo.ProductID, topicInfo.DeviceName), pubStr) } func (s *DeviceSubServer) Connected(info *ddExport.DevConn) error { diff --git a/src/ddsvr/internal/repo/event/innerLink/nats.go b/src/ddsvr/internal/repo/event/innerLink/nats.go index 878d4fc28..509ce3a56 100644 --- a/src/ddsvr/internal/repo/event/innerLink/nats.go +++ b/src/ddsvr/internal/repo/event/innerLink/nats.go @@ -10,7 +10,7 @@ import ( type ( NatsClient struct { - client *nats.Conn + client nats.JetStreamContext } ) @@ -25,11 +25,36 @@ func NewNatsClient(conf conf.NatsConf) (InnerLink, error) { if err != nil { return nil, err } - return &NatsClient{client: nc}, nil + js, err := nc.JetStream() + if err != nil { + return nil, err + } + _, err = js.AddStream(&nats.StreamConfig{ + Name: ddExport.ThingsStreamName, + Subjects: []string{ + ddExport.TopicInnerPublish, + ddExport.TopicDevPublishAll, + ddExport.TopicDevConnected, + ddExport.TopicDevDisconnected, + }, + }) + if err != nil { + return nil, err + } + _, err = js.AddConsumer(ddExport.ThingsStreamName, &nats.ConsumerConfig{ + Durable: ddExport.ThingsConsumeName, + AckPolicy: nats.AckExplicitPolicy, + }) + if err != nil { + return nil, err + } + return &NatsClient{client: js}, nil } func (n *NatsClient) Publish(ctx context.Context, topic string, payload []byte) error { - return n.client.Publish(topic, events.NewEventMsg(ctx, payload)) + + _, err := n.client.Publish(topic, events.NewEventMsg(ctx, payload)) + return err } func (n *NatsClient) Subscribe(handle Handle) error { _, err := n.client.QueueSubscribe(ddExport.TopicInnerPublish, ddExport.SvrName, func(msg *nats.Msg) { diff --git a/src/dmsvr/dmDef/dm.go b/src/dmsvr/dmDef/dm.go index 74566c6b3..063c5f7c6 100644 --- a/src/dmsvr/dmDef/dm.go +++ b/src/dmsvr/dmDef/dm.go @@ -1,5 +1,5 @@ package dmDef const ( - SvrName = "dm.rpc" + SvrName = "dmRpc" ) diff --git a/src/dmsvr/etc/dm.yaml b/src/dmsvr/etc/dm.yaml index d252ea8b6..1a6583f39 100644 --- a/src/dmsvr/etc/dm.yaml +++ b/src/dmsvr/etc/dm.yaml @@ -16,4 +16,5 @@ TDengine: DataSource: root:taosdata@http(localhost:6041)/test?readBufferSize=52428800 AuthWhite: IpRange: - - 127.0.0.1 \ No newline at end of file + - 127.0.0.1 + - 172.18.0.1 diff --git a/src/dmsvr/internal/domain/service/deviceSend/tempParam_test.go b/src/dmsvr/internal/domain/service/deviceSend/tempParam_test.go index bc47c7a5e..bea3797ca 100644 --- a/src/dmsvr/internal/domain/service/deviceSend/tempParam_test.go +++ b/src/dmsvr/internal/domain/service/deviceSend/tempParam_test.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/i-Things/things/shared/utils" "github.com/i-Things/things/src/dmsvr/internal/domain/service/deviceSend" + "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" "testing" ) diff --git a/src/dmsvr/internal/event/eventDevSub/devicePublish.go b/src/dmsvr/internal/event/eventDevSub/devicePublish.go index 8add92948..359c87466 100644 --- a/src/dmsvr/internal/event/eventDevSub/devicePublish.go +++ b/src/dmsvr/internal/event/eventDevSub/devicePublish.go @@ -63,7 +63,7 @@ func (l *PublishLogic) DeviceResp(msg *device.PublishMsg, err error, data map[st l.Errorf("DeviceResp|PublishToDev failure err:%v", er) return } - l.Infof("DeviceResp|topic:%v payload:%v", topic, payload) + l.Infof("PublishLogic|DeviceResp|topic:%v payload:%v", topic, string(payload)) //l.svcCtx.DevClient.DeviceResp(l.dreq.Method, l.dreq.ClientToken, l.topics, err, data) } diff --git a/src/dmsvr/internal/repo/event/innerLink/nats.go b/src/dmsvr/internal/repo/event/innerLink/nats.go index 2cab9b826..f1c4de28d 100644 --- a/src/dmsvr/internal/repo/event/innerLink/nats.go +++ b/src/dmsvr/internal/repo/event/innerLink/nats.go @@ -19,7 +19,7 @@ import ( type ( NatsClient struct { - client *nats.Conn + client nats.JetStreamContext } ) @@ -34,11 +34,35 @@ func NewNatsClient(conf conf.NatsConf) (*NatsClient, error) { if err != nil { return nil, err } - return &NatsClient{client: nc}, nil + js, err := nc.JetStream() + if err != nil { + return nil, err + } + _, err = js.AddStream(&nats.StreamConfig{ + Name: ddExport.ThingsStreamName, + Subjects: []string{ + ddExport.TopicInnerPublish, + ddExport.TopicDevPublishAll, + ddExport.TopicDevConnected, + ddExport.TopicDevDisconnected, + }, + }) + if err != nil { + return nil, err + } + _, err = js.AddConsumer(ddExport.ThingsStreamName, &nats.ConsumerConfig{ + Durable: ddExport.ThingsConsumeName, + AckPolicy: nats.AckExplicitPolicy, + }) + if err != nil { + return nil, err + } + return &NatsClient{client: js}, nil } func (n *NatsClient) PublishToDev(ctx context.Context, topic string, payload []byte) error { - return n.client.Publish(ddExport.TopicInnerPublish, ddExport.PublishToDev(ctx, topic, payload)) + _, err := n.client.Publish(ddExport.TopicInnerPublish, ddExport.PublishToDev(ctx, topic, payload)) + return err } func (n *NatsClient) SubscribeDevSync(ctx context.Context, topic string) (*SubDev, error) { @@ -64,7 +88,8 @@ func (n *NatsClient) Subscribe(handle Handle) error { return } err = handle(ctx).Publish(ele) - logx.WithContext(ctx).Info(ddExport.TopicDevPublishAll, msg.Subject, string(msg.Data), err) + logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), + ddExport.TopicDevPublishAll, msg.Subject, string(msg.Data), err) }) if err != nil { return err @@ -83,7 +108,8 @@ func (n *NatsClient) Subscribe(handle Handle) error { return } err = handle(ctx).Connected(ele) - logx.WithContext(ctx).Info(ddExport.TopicDevConnected, msg.Subject, string(msg.Data), err) + logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), + ddExport.TopicDevConnected, msg.Subject, string(msg.Data), err) }) if err != nil { return err @@ -102,7 +128,8 @@ func (n *NatsClient) Subscribe(handle Handle) error { return } err = handle(ctx).Disconnected(ele) - logx.WithContext(ctx).Info(ddExport.TopicDevDisconnected, msg.Subject, string(msg.Data), err) + logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), + ddExport.TopicDevDisconnected, msg.Subject, string(msg.Data), err) }) if err != nil { return err -- Gitee From b22f15aee26bfc7182c03dff07748d78778993d1 Mon Sep 17 00:00:00 2001 From: godLei6 <603785348@qq.com> Date: Sun, 24 Apr 2022 23:14:20 +0800 Subject: [PATCH 2/4] =?UTF-8?q?dev:=20=E6=B7=BB=E5=8A=A0=E6=A8=A1=E6=9D=BF?= =?UTF-8?q?=E7=BC=93=E5=AD=98,=E5=8F=8A=E4=BF=AE=E6=94=B9=E9=80=9A?= =?UTF-8?q?=E7=9F=A5,=E5=B0=9A=E6=9C=AA=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + src/ddsvr/ddExport/dd.go | 2 +- src/dmsvr/dm.go | 31 ++++-- src/dmsvr/dmDef/dm.go | 5 +- .../domain/templateModel/templateRepo.go | 23 +++++ .../domain/templateModel/templateValidate.go | 8 +- .../event/dataUpdateEvent/dataUpdate.go | 27 +++++ .../devicePublish.go | 15 +-- .../deviceSubscribe.go | 2 +- src/dmsvr/internal/logic/assemble.go | 3 +- .../internal/logic/getdevicedatalogic.go | 8 +- .../internal/logic/getproducttemplatelogic.go | 2 +- src/dmsvr/internal/logic/managedevicelogic.go | 17 +--- .../internal/logic/manageproductlogic.go | 12 +-- .../logic/manageproducttemplatelogic.go | 51 ++++------ src/dmsvr/internal/logic/sendactionlogic.go | 10 +- src/dmsvr/internal/logic/sendpropertylogic.go | 10 +- .../repo/event/dataUpdate/dataUpdate.go | 22 +++++ .../internal/repo/event/dataUpdate/nats.go | 90 +++++++++++++++++ .../repo/event/innerLink/innerLink.go | 4 +- src/dmsvr/internal/repo/mysql/templateRepo.go | 99 +++++++++++++++++++ src/dmsvr/internal/repo/readme.md | 1 + src/dmsvr/internal/svc/servicecontext.go | 83 +++++++--------- 23 files changed, 370 insertions(+), 156 deletions(-) create mode 100644 src/dmsvr/internal/domain/templateModel/templateRepo.go create mode 100644 src/dmsvr/internal/event/dataUpdateEvent/dataUpdate.go rename src/dmsvr/internal/event/{eventDevSub => deviceMsgEvent}/devicePublish.go (95%) rename src/dmsvr/internal/event/{eventDevSub => deviceMsgEvent}/deviceSubscribe.go (98%) create mode 100644 src/dmsvr/internal/repo/event/dataUpdate/dataUpdate.go create mode 100644 src/dmsvr/internal/repo/event/dataUpdate/nats.go create mode 100644 src/dmsvr/internal/repo/mysql/templateRepo.go create mode 100644 src/dmsvr/internal/repo/readme.md diff --git a/go.mod b/go.mod index 626fd6520..411d6daf1 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.14 require ( github.com/Masterminds/squirrel v1.5.2 + github.com/dgraph-io/ristretto v0.1.0 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/eclipse/paho.mqtt.golang v1.3.5 github.com/golang/protobuf v1.5.2 diff --git a/src/ddsvr/ddExport/dd.go b/src/ddsvr/ddExport/dd.go index 82ba8bb80..9ef930160 100644 --- a/src/ddsvr/ddExport/dd.go +++ b/src/ddsvr/ddExport/dd.go @@ -1,5 +1,5 @@ package ddExport const ( - SvrName = "ddRpc" + SvrName = "dd_rpc" ) diff --git a/src/dmsvr/dm.go b/src/dmsvr/dm.go index b46278f68..05a0e84ba 100644 --- a/src/dmsvr/dm.go +++ b/src/dmsvr/dm.go @@ -5,8 +5,11 @@ import ( "flag" "fmt" "github.com/i-Things/things/shared/errors" + "github.com/i-Things/things/shared/utils" "github.com/i-Things/things/src/dmsvr/internal/config" - "github.com/i-Things/things/src/dmsvr/internal/event/eventDevSub" + "github.com/i-Things/things/src/dmsvr/internal/event/dataUpdateEvent" + "github.com/i-Things/things/src/dmsvr/internal/event/deviceMsgEvent" + "github.com/i-Things/things/src/dmsvr/internal/repo/event/dataUpdate" "github.com/i-Things/things/src/dmsvr/internal/repo/event/innerLink" "github.com/i-Things/things/src/dmsvr/internal/server" "github.com/i-Things/things/src/dmsvr/internal/svc" @@ -15,6 +18,7 @@ import ( "github.com/zeromicro/go-zero/zrpc" "google.golang.org/grpc" "google.golang.org/grpc/reflection" + "log" _ "net/http/pprof" ) @@ -22,18 +26,12 @@ var configFile = flag.String("f", "etc/dm.yaml", "the config file") func main() { flag.Parse() - //go device.NewDevice() - //device.TestMongo() var c config.Config conf.MustLoad(*configFile, &c) svcCtx := svc.NewServiceContext(c) - svcCtx.InnerLink.Subscribe(func(ctx context.Context) innerLink.InnerSubHandle { - return eventDevSub.NewDeviceMsgHandle(ctx, svcCtx) - }) + Subscribe(svcCtx) //grpc服务初始化 - srv := server.NewDmServer(svcCtx) - s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) { dm.RegisterDmServer(grpcServer, srv) reflection.Register(grpcServer) @@ -44,3 +42,20 @@ func main() { fmt.Printf("Starting rpc server at %s...\n", c.ListenOn) s.Start() } + +func Subscribe(svcCtx *svc.ServiceContext) { + err := svcCtx.InnerLink.Subscribe(func(ctx context.Context) innerLink.InnerSubEvent { + return deviceMsgEvent.NewDeviceMsgHandle(ctx, svcCtx) + }) + if err != nil { + log.Fatalf("%v|InnerLink.Subscribe|err:%v", + utils.FuncName(), err) + } + err = svcCtx.DataUpdate.Subscribe(func(ctx context.Context) dataUpdate.DataUpdateSubEvent { + return dataUpdateEvent.NewPublishLogic(ctx, svcCtx) + }) + if err != nil { + log.Fatalf("%v|DataUpdate.Subscribe|err:%v", + utils.FuncName(), err) + } +} diff --git a/src/dmsvr/dmDef/dm.go b/src/dmsvr/dmDef/dm.go index 063c5f7c6..8f0c278bc 100644 --- a/src/dmsvr/dmDef/dm.go +++ b/src/dmsvr/dmDef/dm.go @@ -1,5 +1,8 @@ package dmDef const ( - SvrName = "dmRpc" + SvrName = "dm_rpc" + DmUpdateConsumeName = "dm_rpc_update_consume" + DmUpdateStreamName = "dm_rpc_update_msg" + TopicUpdate = "dm.update" ) diff --git a/src/dmsvr/internal/domain/templateModel/templateRepo.go b/src/dmsvr/internal/domain/templateModel/templateRepo.go new file mode 100644 index 000000000..e1aa56dfe --- /dev/null +++ b/src/dmsvr/internal/domain/templateModel/templateRepo.go @@ -0,0 +1,23 @@ +// Package templateModel 这个文件定义和模板相关的接口及dto定义 +package templateModel + +import ( + "context" + "time" +) + +type ( + TemplateInfo struct { + ProductID string // 产品id + Template string // 数据模板 + CreatedTime time.Time + } + TemplateRepo interface { + Insert(ctx context.Context, productID string, template *Template) error + GetTemplate(ctx context.Context, productID string) (*Template, error) + GetTemplateInfo(ctx context.Context, productID string) (*TemplateInfo, error) + Update(ctx context.Context, productID string, template *Template) error + Delete(ctx context.Context, productID string) error + ClearCache(ctx context.Context, productID string) error + } +) diff --git a/src/dmsvr/internal/domain/templateModel/templateValidate.go b/src/dmsvr/internal/domain/templateModel/templateValidate.go index ee07feeb0..0878279ee 100644 --- a/src/dmsvr/internal/domain/templateModel/templateValidate.go +++ b/src/dmsvr/internal/domain/templateModel/templateValidate.go @@ -19,7 +19,7 @@ const ( ParamsLen = 20 ) -func ValidateWithFmt(templateStr []byte) ([]byte, error) { +func ValidateWithFmt(templateStr []byte) (*Template, error) { template := Template{} err := json.Unmarshal(templateStr, &template) if err != nil { @@ -29,11 +29,7 @@ func ValidateWithFmt(templateStr []byte) ([]byte, error) { if err != nil { return nil, err } - newTemplate, err := json.Marshal(&template) - if err != nil { - return nil, errors.Parameter.WithMsg("模板的json格式不对") - } - return newTemplate, err + return &template, err } func (t *Template) ValidateWithFmt() error { diff --git a/src/dmsvr/internal/event/dataUpdateEvent/dataUpdate.go b/src/dmsvr/internal/event/dataUpdateEvent/dataUpdate.go new file mode 100644 index 000000000..43e105a97 --- /dev/null +++ b/src/dmsvr/internal/event/dataUpdateEvent/dataUpdate.go @@ -0,0 +1,27 @@ +package dataUpdateEvent + +import ( + "context" + "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" + "github.com/i-Things/things/src/dmsvr/internal/svc" + "github.com/zeromicro/go-zero/core/logx" +) + +type DataUpdateLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewPublishLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DataUpdateLogic { + return &DataUpdateLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (d DataUpdateLogic) TempModelClearCache(info *templateModel.TemplateInfo) error { + d.Infof("DataUpdateLogic|TempModelClearCache|productID:%v", info.ProductID) + return d.svcCtx.TemplateRepo.ClearCache(d.ctx, info.ProductID) +} diff --git a/src/dmsvr/internal/event/eventDevSub/devicePublish.go b/src/dmsvr/internal/event/deviceMsgEvent/devicePublish.go similarity index 95% rename from src/dmsvr/internal/event/eventDevSub/devicePublish.go rename to src/dmsvr/internal/event/deviceMsgEvent/devicePublish.go index 359c87466..145068cc8 100644 --- a/src/dmsvr/internal/event/eventDevSub/devicePublish.go +++ b/src/dmsvr/internal/event/deviceMsgEvent/devicePublish.go @@ -1,4 +1,4 @@ -package eventDevSub +package deviceMsgEvent import ( "context" @@ -9,7 +9,6 @@ import ( "github.com/i-Things/things/src/dmsvr/internal/domain/service/deviceData" "github.com/i-Things/things/src/dmsvr/internal/domain/service/deviceSend" "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" - "github.com/i-Things/things/src/dmsvr/internal/repo/mysql" "github.com/i-Things/things/src/dmsvr/internal/svc" "github.com/zeromicro/go-zero/core/logx" "strings" @@ -19,7 +18,6 @@ type PublishLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger - pt *mysql.ProductTemplate template *templateModel.Template topics []string dreq deviceSend.DeviceReq @@ -36,14 +34,7 @@ func NewPublishLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PublishLo func (l *PublishLogic) initMsg(msg *device.PublishMsg) error { var err error - if err != nil { - return err - } - l.pt, err = l.svcCtx.ProductTemplate.FindOne(msg.ProductID) - if err != nil { - return err - } - l.template, err = templateModel.NewTemplate([]byte(l.pt.Template)) + l.template, err = l.svcCtx.TemplateRepo.GetTemplate(l.ctx, msg.ProductID) if err != nil { return err } @@ -205,7 +196,7 @@ func (l *PublishLogic) Handle(msg *device.PublishMsg) (err error) { err = l.HandleThing(msg) case "$ota": err = l.HandleOta(msg) - case l.pt.ProductID: + case msg.ProductID: err = l.HandleDefault(msg) default: err = errors.Parameter.AddDetailf("not suppot topic :%s", msg.Topic) diff --git a/src/dmsvr/internal/event/eventDevSub/deviceSubscribe.go b/src/dmsvr/internal/event/deviceMsgEvent/deviceSubscribe.go similarity index 98% rename from src/dmsvr/internal/event/eventDevSub/deviceSubscribe.go rename to src/dmsvr/internal/event/deviceMsgEvent/deviceSubscribe.go index 6c78964ec..d03af350f 100644 --- a/src/dmsvr/internal/event/eventDevSub/deviceSubscribe.go +++ b/src/dmsvr/internal/event/deviceMsgEvent/deviceSubscribe.go @@ -1,4 +1,4 @@ -package eventDevSub +package deviceMsgEvent //设备的发布,连接及断连处理 import ( diff --git a/src/dmsvr/internal/logic/assemble.go b/src/dmsvr/internal/logic/assemble.go index 69a94ede3..d810ef110 100644 --- a/src/dmsvr/internal/logic/assemble.go +++ b/src/dmsvr/internal/logic/assemble.go @@ -5,6 +5,7 @@ import ( "github.com/golang/protobuf/ptypes/wrappers" "github.com/i-Things/things/src/dmsvr/dm" "github.com/i-Things/things/src/dmsvr/internal/domain/device" + "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" mysql "github.com/i-Things/things/src/dmsvr/internal/repo/mysql" ) @@ -15,7 +16,7 @@ func GetNullTime(time sql.NullTime) int64 { return time.Time.Unix() } -func ToProductTemplate(pt *mysql.ProductTemplate) *dm.ProductTemplate { +func ToProductTemplate(pt *templateModel.TemplateInfo) *dm.ProductTemplate { return &dm.ProductTemplate{ CreatedTime: pt.CreatedTime.Unix(), ProductID: pt.ProductID, diff --git a/src/dmsvr/internal/logic/getdevicedatalogic.go b/src/dmsvr/internal/logic/getdevicedatalogic.go index d8eda64d5..9b42535c6 100644 --- a/src/dmsvr/internal/logic/getdevicedatalogic.go +++ b/src/dmsvr/internal/logic/getdevicedatalogic.go @@ -6,7 +6,6 @@ import ( "github.com/i-Things/things/shared/def" "github.com/i-Things/things/shared/errors" "github.com/i-Things/things/src/dmsvr/dm" - "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" "github.com/i-Things/things/src/dmsvr/internal/svc" "github.com/zeromicro/go-zero/core/logx" ) @@ -30,12 +29,7 @@ func (l *GetDeviceDataLogic) HandleDatas(in *dm.GetDeviceDataReq) (*dm.GetDevice dmDatas []*dm.DeviceData total int ) - - tempInfo, err := l.svcCtx.ProductTemplate.FindOne(in.ProductID) - if err != nil { - return nil, errors.Database.AddDetail(err) - } - temp, err := templateModel.NewTemplate([]byte(tempInfo.Template)) + temp, err := l.svcCtx.TemplateRepo.GetTemplate(l.ctx, in.ProductID) if err != nil { return nil, errors.System.AddDetail(err) } diff --git a/src/dmsvr/internal/logic/getproducttemplatelogic.go b/src/dmsvr/internal/logic/getproducttemplatelogic.go index 7a1f69845..89078cb33 100644 --- a/src/dmsvr/internal/logic/getproducttemplatelogic.go +++ b/src/dmsvr/internal/logic/getproducttemplatelogic.go @@ -25,7 +25,7 @@ func NewGetProductTemplateLogic(ctx context.Context, svcCtx *svc.ServiceContext) // 获取产品信息 func (l *GetProductTemplateLogic) GetProductTemplate(in *dm.GetProductTemplateReq) (*dm.ProductTemplate, error) { - pt, err := l.svcCtx.ProductTemplate.FindOne(in.ProductID) + pt, err := l.svcCtx.TemplateRepo.GetTemplateInfo(l.ctx, in.ProductID) if err != nil { return nil, err } diff --git a/src/dmsvr/internal/logic/managedevicelogic.go b/src/dmsvr/internal/logic/managedevicelogic.go index bc1206ff9..4296f3765 100644 --- a/src/dmsvr/internal/logic/managedevicelogic.go +++ b/src/dmsvr/internal/logic/managedevicelogic.go @@ -7,7 +7,6 @@ import ( "github.com/i-Things/things/shared/errors" "github.com/i-Things/things/shared/utils" "github.com/i-Things/things/src/dmsvr/internal/domain/device" - "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" mysql "github.com/i-Things/things/src/dmsvr/internal/repo/mysql" "github.com/spf13/cast" "time" @@ -77,15 +76,11 @@ func (l *ManageDeviceLogic) AddDevice(in *dm.ManageDeviceReq) (*dm.DeviceInfo, e } else if find == false { return nil, errors.Parameter.AddDetail("not find product id:" + cast.ToString(in.Info.ProductID)) } - pt, err := l.svcCtx.ProductTemplate.FindOne(in.Info.ProductID) - if err != nil { - return nil, errors.Database.AddDetail(err.Error()) - } - dt, err := templateModel.NewTemplate([]byte(pt.Template)) + pt, err := l.svcCtx.TemplateRepo.GetTemplate(l.ctx, in.Info.ProductID) if err != nil { return nil, errors.System.AddDetail(err.Error()) } - err = l.svcCtx.DeviceDataRepo.InitDevice(l.ctx, dt, in.Info.ProductID, in.Info.DeviceName) + err = l.svcCtx.DeviceDataRepo.InitDevice(l.ctx, pt, in.Info.ProductID, in.Info.DeviceName) if err != nil { return nil, errors.Database.AddDetail(err.Error()) } @@ -155,13 +150,9 @@ func (l *ManageDeviceLogic) DelDevice(in *dm.ManageDeviceReq) (*dm.DeviceInfo, e return nil, errors.System.AddDetail(err.Error()) } { //删除时序数据库中的表数据 - pt, err := l.svcCtx.ProductTemplate.FindOne(in.Info.ProductID) + template, err := l.svcCtx.TemplateRepo.GetTemplate(l.ctx, in.Info.ProductID) if err != nil { - return nil, err - } - template, err := templateModel.NewTemplate([]byte(pt.Template)) - if err != nil { - return nil, err + return nil, errors.System.AddDetail(err.Error()) } err = l.svcCtx.DeviceDataRepo.DropDevice(l.ctx, template, in.Info.ProductID, in.Info.DeviceName) if err != nil { diff --git a/src/dmsvr/internal/logic/manageproductlogic.go b/src/dmsvr/internal/logic/manageproductlogic.go index bfaaf6f0b..9c84686e9 100644 --- a/src/dmsvr/internal/logic/manageproductlogic.go +++ b/src/dmsvr/internal/logic/manageproductlogic.go @@ -200,16 +200,16 @@ func (l *ManageProductLogic) ModifyProduct(in *dm.ManageProductReq) (*dm.Product } func (l *ManageProductLogic) DelProduct(in *dm.ManageProductReq) (*dm.ProductInfo, error) { - pt, err := l.svcCtx.ProductTemplate.FindOne(in.Info.ProductID) + pt, err := l.svcCtx.TemplateRepo.GetTemplate(l.ctx, in.Info.ProductID) if err != nil { - return nil, err + return nil, errors.System.AddDetail(err.Error()) } - template, err := templateModel.NewTemplate([]byte(pt.Template)) + err = l.svcCtx.DeviceDataRepo.DropProduct(l.ctx, pt, in.Info.ProductID) if err != nil { - return nil, err + l.Errorf("DelProduct|DropProduct|err=%+v", err) + return nil, errors.Database.AddDetail(err.Error()) } - - l.svcCtx.DeviceDataRepo.DropProduct(l.ctx, template, in.Info.ProductID) + l.svcCtx.TemplateRepo.ClearCache(l.ctx, in.Info.ProductID) err = l.svcCtx.DmDB.Delete(in.Info.ProductID) if err != nil { l.Errorf("DelProduct|Delete|err=%+v", err) diff --git a/src/dmsvr/internal/logic/manageproducttemplatelogic.go b/src/dmsvr/internal/logic/manageproducttemplatelogic.go index dbe545b73..c8f517279 100644 --- a/src/dmsvr/internal/logic/manageproducttemplatelogic.go +++ b/src/dmsvr/internal/logic/manageproducttemplatelogic.go @@ -4,13 +4,11 @@ import ( "context" "github.com/i-Things/things/shared/errors" "github.com/i-Things/things/shared/utils" + "github.com/i-Things/things/src/dmsvr/dm" "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" "github.com/i-Things/things/src/dmsvr/internal/repo/mysql" - "github.com/spf13/cast" - "time" - - "github.com/i-Things/things/src/dmsvr/dm" "github.com/i-Things/things/src/dmsvr/internal/svc" + "github.com/spf13/cast" "github.com/zeromicro/go-zero/core/logx" ) @@ -29,19 +27,10 @@ func NewManageProductTemplateLogic(ctx context.Context, svcCtx *svc.ServiceConte } } -func (l *ManageProductTemplateLogic) ModifyProductTemplate(in *dm.ManageProductTemplateReq, pt *mysql.ProductTemplate) (*dm.ProductTemplate, error) { +func (l *ManageProductTemplateLogic) ModifyProductTemplate(in *dm.ManageProductTemplateReq, oldT *templateModel.Template) (*dm.ProductTemplate, error) { l.Infof("ManageProductTemplate|ModifyProductTemplate|ProductID:%v", in.Info.ProductID) - newTempMode, err := templateModel.ValidateWithFmt([]byte(in.Info.Template)) - if err != nil { - return nil, err - } - newT, err := templateModel.NewTemplate(newTempMode) - if err != nil { - return nil, err - } - oldT, err := templateModel.NewTemplate([]byte(pt.Template)) + newT, err := templateModel.ValidateWithFmt([]byte(in.Info.Template)) if err != nil { - l.Errorf("%s new old template failure,err:%v,old:%v", utils.FuncName(), err, pt.Template) return nil, err } err = templateModel.CheckModify(oldT, newT) @@ -52,29 +41,28 @@ func (l *ManageProductTemplateLogic) ModifyProductTemplate(in *dm.ManageProductT l.Errorf("%s ModifyProduct failure,err:%v", utils.FuncName(), err) return nil, errors.Database.AddDetail(err) } - pt.Template = string(newTempMode) - err = l.svcCtx.ProductTemplate.Update(pt) + err = l.svcCtx.TemplateRepo.Update(l.ctx, in.Info.ProductID, newT) if err != nil { l.Errorf("ModifyProductTemplate|ProductTemplate|Update|err=%+v", err) return nil, errors.System.AddDetail(err.Error()) } + pt, err := l.svcCtx.TemplateRepo.GetTemplateInfo(l.ctx, in.Info.ProductID) + if err != nil { + return nil, err + } return ToProductTemplate(pt), nil } func (l *ManageProductTemplateLogic) AddProductTemplate(in *dm.ManageProductTemplateReq) (*dm.ProductTemplate, error) { l.Infof("ManageProductTemplate|AddProductTemplate|ProductID:%v", in.Info.ProductID) - pi, err := l.svcCtx.ProductInfo.FindOne(in.Info.ProductID) + _, err := l.svcCtx.ProductInfo.FindOne(in.Info.ProductID) if err != nil { if err == mysql.ErrNotFound { return nil, errors.Parameter.AddDetail("not find ProductID id:" + cast.ToString(in.Info.ProductID)) } return nil, errors.Database.AddDetail(err.Error()) } - newTempMode, err := templateModel.ValidateWithFmt([]byte(in.Info.Template)) - if err != nil { - return nil, err - } - t, err := templateModel.NewTemplate(newTempMode) + t, err := templateModel.ValidateWithFmt([]byte(in.Info.Template)) if err != nil { return nil, err } @@ -82,20 +70,21 @@ func (l *ManageProductTemplateLogic) AddProductTemplate(in *dm.ManageProductTemp l.Errorf("%s InitProduct failure,err:%v", utils.FuncName(), err) return nil, errors.Database.AddDetail(err) } - - pt := &mysql.ProductTemplate{ - ProductID: pi.ProductID, - Template: string(newTempMode), - CreatedTime: time.Now(), + err = l.svcCtx.TemplateRepo.Insert(l.ctx, in.Info.ProductID, t) + if err != nil { + return nil, err } - l.svcCtx.ProductTemplate.Insert(pt) - return ToProductTemplate(pt), nil + pt, err := l.svcCtx.TemplateRepo.GetTemplateInfo(l.ctx, in.Info.ProductID) + if err != nil { + return nil, err + } + return ToProductTemplate(pt), err } // 产品模板管理 func (l *ManageProductTemplateLogic) ManageProductTemplate(in *dm.ManageProductTemplateReq) (*dm.ProductTemplate, error) { l.Infof("ManageProductTemplate|req=%+v", in) - pt, err := l.svcCtx.ProductTemplate.FindOne(in.Info.ProductID) + pt, err := l.svcCtx.TemplateRepo.GetTemplate(l.ctx, in.Info.ProductID) if err != nil { if err == mysql.ErrNotFound { return l.AddProductTemplate(in) diff --git a/src/dmsvr/internal/logic/sendactionlogic.go b/src/dmsvr/internal/logic/sendactionlogic.go index b3422734e..c9d362489 100644 --- a/src/dmsvr/internal/logic/sendactionlogic.go +++ b/src/dmsvr/internal/logic/sendactionlogic.go @@ -7,7 +7,6 @@ import ( "github.com/i-Things/things/shared/errors" "github.com/i-Things/things/src/dmsvr/internal/domain/service/deviceSend" "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" - "github.com/i-Things/things/src/dmsvr/internal/repo/mysql" "time" "github.com/i-Things/things/src/dmsvr/dm" @@ -19,7 +18,6 @@ import ( type SendActionLogic struct { ctx context.Context svcCtx *svc.ServiceContext - pt *mysql.ProductTemplate template *templateModel.Template logx.Logger } @@ -33,13 +31,9 @@ func NewSendActionLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SendAc } func (l *SendActionLogic) initMsg(productID string) error { var err error - l.pt, err = l.svcCtx.ProductTemplate.FindOne(productID) + l.template, err = l.svcCtx.TemplateRepo.GetTemplate(l.ctx, productID) if err != nil { - return err - } - l.template, err = templateModel.NewTemplate([]byte(l.pt.Template)) - if err != nil { - return err + return errors.System.AddDetail(err.Error()) } return nil } diff --git a/src/dmsvr/internal/logic/sendpropertylogic.go b/src/dmsvr/internal/logic/sendpropertylogic.go index 7713aec66..799846060 100644 --- a/src/dmsvr/internal/logic/sendpropertylogic.go +++ b/src/dmsvr/internal/logic/sendpropertylogic.go @@ -8,7 +8,6 @@ import ( "github.com/i-Things/things/shared/errors" "github.com/i-Things/things/src/dmsvr/internal/domain/service/deviceSend" "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" - "github.com/i-Things/things/src/dmsvr/internal/repo/mysql" "time" "github.com/i-Things/things/src/dmsvr/dm" @@ -21,7 +20,6 @@ type SendPropertyLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger - pt *mysql.ProductTemplate template *templateModel.Template } @@ -35,13 +33,9 @@ func NewSendPropertyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Send func (l *SendPropertyLogic) initMsg(productID string) error { var err error - l.pt, err = l.svcCtx.ProductTemplate.FindOne(productID) + l.template, err = l.svcCtx.TemplateRepo.GetTemplate(l.ctx, productID) if err != nil { - return err - } - l.template, err = templateModel.NewTemplate([]byte(l.pt.Template)) - if err != nil { - return err + return errors.System.AddDetail(err.Error()) } return nil } diff --git a/src/dmsvr/internal/repo/event/dataUpdate/dataUpdate.go b/src/dmsvr/internal/repo/event/dataUpdate/dataUpdate.go new file mode 100644 index 000000000..6b1beb450 --- /dev/null +++ b/src/dmsvr/internal/repo/event/dataUpdate/dataUpdate.go @@ -0,0 +1,22 @@ +package dataUpdate + +import ( + "context" + "github.com/i-Things/things/src/dmsvr/internal/config" + "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" +) + +type ( + DataUpdate interface { + TempModelUpdate(ctx context.Context, info *templateModel.TemplateInfo) error + Subscribe(handle Handle) error + } + Handle func(ctx context.Context) DataUpdateSubEvent + DataUpdateSubEvent interface { + TempModelClearCache(info *templateModel.TemplateInfo) error + } +) + +func NewDataUpdate(conf config.InnerLinkConf) (DataUpdate, error) { + return NewNatsClient(conf.Nats) +} diff --git a/src/dmsvr/internal/repo/event/dataUpdate/nats.go b/src/dmsvr/internal/repo/event/dataUpdate/nats.go new file mode 100644 index 000000000..4e4be70b5 --- /dev/null +++ b/src/dmsvr/internal/repo/event/dataUpdate/nats.go @@ -0,0 +1,90 @@ +package dataUpdate + +import ( + "context" + "encoding/json" + "github.com/i-Things/things/shared/conf" + "github.com/i-Things/things/shared/events" + "github.com/i-Things/things/shared/utils" + "github.com/i-Things/things/src/ddsvr/ddExport" + "github.com/i-Things/things/src/dmsvr/dmDef" + "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" + "github.com/nats-io/nats.go" + "github.com/zeromicro/go-zero/core/logx" +) + +type ( + NatsClient struct { + client nats.JetStreamContext + } +) + +func NewNatsClient(conf conf.NatsConf) (*NatsClient, error) { + connectOpts := nats.Options{ + Url: conf.Url, + User: conf.User, + Password: conf.Pass, + Token: conf.Token, + } + nc, err := connectOpts.Connect() + if err != nil { + return nil, err + } + js, err := nc.JetStream() + if err != nil { + return nil, err + } + _, err = js.AddStream(&nats.StreamConfig{ + Name: dmDef.DmUpdateStreamName, + Subjects: []string{ + dmDef.TopicUpdate, + }, + }) + if err != nil { + return nil, err + } + _, err = js.AddConsumer(dmDef.DmUpdateStreamName, &nats.ConsumerConfig{ + Durable: dmDef.DmUpdateConsumeName, + AckPolicy: nats.AckExplicitPolicy, + }) + if err != nil { + return nil, err + } + return &NatsClient{client: js}, nil +} + +func (n *NatsClient) TempModelUpdate(ctx context.Context, info *templateModel.TemplateInfo) error { + data, err := json.Marshal(info) + if err != nil { + return err + } + _, err = n.client.Publish(dmDef.TopicUpdate, events.NewEventMsg(ctx, data)) + return err +} + +func (n *NatsClient) Subscribe(handle Handle) error { + _, err := n.client.Subscribe(dmDef.DmUpdateStreamName, func(msg *nats.Msg) { + msg.Ack() + emsg := events.GetEventMsg(msg.Data) + if emsg == nil { + logx.Errorf("%v|GetEventMsg|subject:%v,data:%v", + utils.FuncName(), msg.Subject, string(msg.Data)) + return + } + ctx := emsg.GetCtx() + tempInfo := templateModel.TemplateInfo{} + err := json.Unmarshal(emsg.GetData(), &tempInfo) + if err != nil { + logx.Errorf("%v|Unmarshal|subject:%v,data:%v", + utils.FuncName(), msg.Subject, string(msg.Data)) + return + } + err = handle(ctx).TempModelClearCache(&tempInfo) + logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), + ddExport.TopicDevPublishAll, msg.Subject, string(msg.Data), err) + }) + if err != nil { + return err + } + return nil +} diff --git a/src/dmsvr/internal/repo/event/innerLink/innerLink.go b/src/dmsvr/internal/repo/event/innerLink/innerLink.go index 87825a7b0..7dfef8766 100644 --- a/src/dmsvr/internal/repo/event/innerLink/innerLink.go +++ b/src/dmsvr/internal/repo/event/innerLink/innerLink.go @@ -14,8 +14,8 @@ type ( ReqToDeviceSync(ctx context.Context, reqTopic, respTopic string, req *deviceSend.DeviceReq, productID, deviceName string) (*deviceSend.DeviceResp, error) } - Handle func(ctx context.Context) InnerSubHandle - InnerSubHandle interface { + Handle func(ctx context.Context) InnerSubEvent + InnerSubEvent interface { Publish(out *device.PublishMsg) error Connected(out *device.ConnectMsg) error Disconnected(out *device.ConnectMsg) error diff --git a/src/dmsvr/internal/repo/mysql/templateRepo.go b/src/dmsvr/internal/repo/mysql/templateRepo.go new file mode 100644 index 000000000..fcf525bc7 --- /dev/null +++ b/src/dmsvr/internal/repo/mysql/templateRepo.go @@ -0,0 +1,99 @@ +package mysql + +import ( + "context" + "encoding/json" + "github.com/dgraph-io/ristretto" + "github.com/i-Things/things/shared/errors" + "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" + "time" +) + +const ( + expirtTime = time.Hour +) + +type TemplateRepo struct { + db ProductTemplateModel + cache *ristretto.Cache +} + +func NewTemplateRepo(t ProductTemplateModel) templateModel.TemplateRepo { + cache, _ := ristretto.NewCache(&ristretto.Config{ + NumCounters: 1e7, // number of keys to track frequency of (10M). + MaxCost: 1 << 30, // maximum cost of cache (1GB). + BufferItems: 64, // number of keys per Get buffer. + }) + return &TemplateRepo{ + db: t, + cache: cache, + } +} + +func (t TemplateRepo) Insert(ctx context.Context, productID string, template *templateModel.Template) error { + templateStr, err := json.Marshal(template) + if err != nil { + return errors.Parameter.WithMsg("模板的json格式不对") + } + _, err = t.db.Insert(&ProductTemplate{ + ProductID: productID, + Template: string(templateStr), + CreatedTime: time.Now(), + }) + t.cache.SetWithTTL(productID, template, 1, expirtTime) + return err +} + +func (t TemplateRepo) GetTemplateInfo(ctx context.Context, productID string) (*templateModel.TemplateInfo, error) { + temp, err := t.db.FindOne(productID) + if err != nil { + return nil, err + } + return &templateModel.TemplateInfo{ + ProductID: temp.ProductID, + Template: temp.Template, + CreatedTime: temp.CreatedTime, + }, nil +} + +func (t TemplateRepo) GetTemplate(ctx context.Context, productID string) (*templateModel.Template, error) { + temp, ok := t.cache.Get(productID) + if ok { + return temp.(*templateModel.Template), nil + } + templateInfo, err := t.db.FindOne(productID) + if err != nil { + return nil, err + } + tempModel, err := templateModel.NewTemplate([]byte(templateInfo.Template)) + if err != nil { + return nil, err + } + t.cache.SetWithTTL(productID, tempModel, 1, expirtTime) + return tempModel, nil +} + +func (t TemplateRepo) Update(ctx context.Context, productID string, template *templateModel.Template) error { + templateStr, err := json.Marshal(template) + if err != nil { + return errors.Parameter.WithMsg("模板的json格式不对") + } + t.cache.Del(productID) + err = t.db.Update(&ProductTemplate{ + ProductID: productID, + Template: string(templateStr), + CreatedTime: time.Now(), + }) + return err +} + +func (t TemplateRepo) ClearCache(ctx context.Context, productID string) error { + t.cache.Del(productID) + return nil +} + +func (t TemplateRepo) Delete(ctx context.Context, productID string) error { + t.cache.Del(productID) + err := t.db.Delete(productID) + return err +} diff --git a/src/dmsvr/internal/repo/readme.md b/src/dmsvr/internal/repo/readme.md new file mode 100644 index 000000000..af390c071 --- /dev/null +++ b/src/dmsvr/internal/repo/readme.md @@ -0,0 +1 @@ +当前包是仓储层,如果是只涉及一种数据库则在包中即可,如果涉及多种数据库复合,则直接在当前包组装即可 \ No newline at end of file diff --git a/src/dmsvr/internal/svc/servicecontext.go b/src/dmsvr/internal/svc/servicecontext.go index efe8873cc..bded4249f 100644 --- a/src/dmsvr/internal/svc/servicecontext.go +++ b/src/dmsvr/internal/svc/servicecontext.go @@ -5,6 +5,8 @@ import ( "github.com/i-Things/things/src/dmsvr/internal/config" "github.com/i-Things/things/src/dmsvr/internal/domain/device" "github.com/i-Things/things/src/dmsvr/internal/domain/service/deviceData" + "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" + "github.com/i-Things/things/src/dmsvr/internal/repo/event/dataUpdate" "github.com/i-Things/things/src/dmsvr/internal/repo/event/innerLink" mysql "github.com/i-Things/things/src/dmsvr/internal/repo/mysql" "github.com/i-Things/things/src/dmsvr/internal/repo/tdengine/deviceDataRepo" @@ -16,47 +18,20 @@ import ( ) type ServiceContext struct { - Config config.Config - DeviceInfo mysql.DeviceInfoModel - ProductInfo mysql.ProductInfoModel - ProductTemplate mysql.ProductTemplateModel - DmDB mysql.DmModel - DeviceID *utils.SnowFlake - ProductID *utils.SnowFlake - InnerLink innerLink.InnerLink - Store kv.Store - DeviceDataRepo deviceData.DeviceDataRepo - DeviceLogRepo device.LogRepo + Config config.Config + DeviceInfo mysql.DeviceInfoModel + ProductInfo mysql.ProductInfoModel + DmDB mysql.DmModel + DeviceID *utils.SnowFlake + ProductID *utils.SnowFlake + InnerLink innerLink.InnerLink + DataUpdate dataUpdate.DataUpdate + Store kv.Store + DeviceDataRepo deviceData.DeviceDataRepo + DeviceLogRepo device.LogRepo + TemplateRepo templateModel.TemplateRepo } -//func TestTD(taos *TDengine.Td) { -// taos.Exec("create database if not exists test") -// taos.Exec("create table if not exists tb1 (ts timestamp, a int)") -// _, err := taos.Exec("insert into tb1 values(now, 0)(now+1s,1)(now+2s,2)(now+3s,3)") -// if err != nil { -// fmt.Println("failed to insert, err:", err) -// return -// } -// rows, err := taos.Query("select * from tb1") -// if err != nil { -// fmt.Println("failed to select from table, err:", err) -// return -// } -// defer rows.Close() -// for rows.Next() { -// var r struct { -// ts time.Time -// a int -// } -// err := rows.Scan(&r.ts, &r.a) -// if err != nil { -// fmt.Println("scan error:\n", err) -// return -// } -// fmt.Println("get data:", r.ts, r.a) -// } -//} - func NewServiceContext(c config.Config) *ServiceContext { deviceData := deviceDataRepo.NewDeviceDataRepo(c.TDengine.DataSource) deviceLog := deviceLogRepo.NewDeviceLogRepo(c.TDengine.DataSource) @@ -66,6 +41,8 @@ func NewServiceContext(c config.Config) *ServiceContext { di := mysql.NewDeviceInfoModel(conn, c.CacheRedis) pi := mysql.NewProductInfoModel(conn, c.CacheRedis) pt := mysql.NewProductTemplateModel(conn, c.CacheRedis) + tr := mysql.NewTemplateRepo(pt) + DmDB := mysql.NewDmModel(conn, c.CacheRedis) store := kv.NewStore(c.CacheRedis) nodeId := utils.GetNodeID(c.CacheRedis, c.Name) @@ -76,17 +53,23 @@ func NewServiceContext(c config.Config) *ServiceContext { logx.Error("NewInnerLink err", err) os.Exit(-1) } + du, err := dataUpdate.NewDataUpdate(c.InnerLink) + if err != nil { + logx.Error("NewDataUpdate err", err) + os.Exit(-1) + } return &ServiceContext{ - Config: c, - DeviceInfo: di, - ProductInfo: pi, - ProductTemplate: pt, - DmDB: DmDB, - DeviceID: DeviceID, - ProductID: ProductID, - InnerLink: il, - Store: store, - DeviceDataRepo: deviceData, - DeviceLogRepo: deviceLog, + Config: c, + DeviceInfo: di, + ProductInfo: pi, + TemplateRepo: tr, + DmDB: DmDB, + DeviceID: DeviceID, + ProductID: ProductID, + InnerLink: il, + DataUpdate: du, + Store: store, + DeviceDataRepo: deviceData, + DeviceLogRepo: deviceLog, } } -- Gitee From 8746348be522de6a3df9aafcd9c7330c61ab6ecc Mon Sep 17 00:00:00 2001 From: godLei6 <603785348@qq.com> Date: Tue, 26 Apr 2022 22:40:35 +0800 Subject: [PATCH 3/4] =?UTF-8?q?dev:=20=E6=B7=BB=E5=8A=A0=E6=A8=A1=E6=9D=BF?= =?UTF-8?q?=E7=BC=93=E5=AD=98,=E5=8F=8A=E4=BF=AE=E6=94=B9=E9=80=9A?= =?UTF-8?q?=E7=9F=A5,=E5=B0=9A=E6=9C=AA=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- {src/ddsvr/ddExport => shared/devices}/msg.go | 23 +-- shared/utils/json.go | 8 + src/ddsvr/dd/topic.go | 5 + src/ddsvr/ddExport/dd.go | 5 - src/ddsvr/internal/event/deviceSub.go | 20 +-- .../internal/repo/event/devLink/devLink.go | 6 +- src/ddsvr/internal/repo/event/devLink/mqtt.go | 23 ++- .../repo/event/innerLink/innerLink.go | 13 +- .../internal/repo/event/innerLink/nats.go | 60 ++++++-- src/dmsvr/dmDef/dm.go | 8 - src/dmsvr/internal/domain/device/deviceMsg.go | 6 +- .../event/deviceMsgEvent/deviceSubscribe.go | 6 +- .../internal/logic/manageproductlogic.go | 4 + .../logic/manageproducttemplatelogic.go | 8 + .../internal/repo/event/dataUpdate/nats.go | 31 ++-- .../internal/repo/event/innerLink/nats.go | 140 +++++++++++++----- 16 files changed, 241 insertions(+), 125 deletions(-) rename {src/ddsvr/ddExport => shared/devices}/msg.go (61%) create mode 100644 src/ddsvr/dd/topic.go delete mode 100644 src/ddsvr/ddExport/dd.go delete mode 100644 src/dmsvr/dmDef/dm.go diff --git a/src/ddsvr/ddExport/msg.go b/shared/devices/msg.go similarity index 61% rename from src/ddsvr/ddExport/msg.go rename to shared/devices/msg.go index d37ecc6df..8d5f0ea97 100644 --- a/src/ddsvr/ddExport/msg.go +++ b/shared/devices/msg.go @@ -1,4 +1,4 @@ -package ddExport +package devices import ( "context" @@ -31,27 +31,6 @@ type ( } ) -const ( - ActionLogin = "onLogin" - ActionLogout = "onLogout" -) - -//topic 定义 -const ( - ThingsConsumeName = "things_consume" - ThingsStreamName = "things_msg" - // TopicDevPublish dd模块收到设备的发布消息后向内部推送以下topic 最后两个是产品id和设备名称 - TopicDevPublish = "dd.thing.device.clients.publish.%s.%s" - TopicDevPublishAll = "dd.thing.device.clients.publish.>" - - // TopicDevConnected dd模块收到设备的登录消息后向内部推送以下topic - TopicDevConnected = "dd.thing.device.clients.connected" - // TopicDevDisconnected dd模块收到设备的登出消息后向内部推送以下topic - TopicDevDisconnected = "dd.thing.device.clients.disconnected" - // TopicInnerPublish dd模块订阅以下topic,收到内部的发布消息后向设备推送 - TopicInnerPublish = "dd.thing.inner.publish" -) - //发送给设备的数据组包 func PublishToDev(ctx context.Context, topic string, payload []byte) []byte { pub := InnerPublish{ diff --git a/shared/utils/json.go b/shared/utils/json.go index 63b76b078..55167606b 100644 --- a/shared/utils/json.go +++ b/shared/utils/json.go @@ -10,3 +10,11 @@ func Unmarshal(data []byte, v interface{}) error { decoder.UseNumber() return decoder.Decode(v) } + +func GetJson(v interface{}) string { + js, err := json.Marshal(v) + if err != nil { + return "" + } + return string(js) +} diff --git a/src/ddsvr/dd/topic.go b/src/ddsvr/dd/topic.go new file mode 100644 index 000000000..c52bbb9bd --- /dev/null +++ b/src/ddsvr/dd/topic.go @@ -0,0 +1,5 @@ +package dd + +const ( + ThingsDDDeliverGroup = "things_dd_group" +) diff --git a/src/ddsvr/ddExport/dd.go b/src/ddsvr/ddExport/dd.go deleted file mode 100644 index 9ef930160..000000000 --- a/src/ddsvr/ddExport/dd.go +++ /dev/null @@ -1,5 +0,0 @@ -package ddExport - -const ( - SvrName = "dd_rpc" -) diff --git a/src/ddsvr/internal/event/deviceSub.go b/src/ddsvr/internal/event/deviceSub.go index 51db6cf4f..909ed3be2 100644 --- a/src/ddsvr/internal/event/deviceSub.go +++ b/src/ddsvr/internal/event/deviceSub.go @@ -2,10 +2,8 @@ package event import ( "context" - "encoding/json" - "fmt" "github.com/i-Things/things/shared/devices" - "github.com/i-Things/things/src/ddsvr/ddExport" + "github.com/i-Things/things/src/ddsvr/internal/repo/event/innerLink" "github.com/i-Things/things/src/ddsvr/internal/svc" "github.com/zeromicro/go-zero/core/logx" "time" @@ -36,25 +34,21 @@ func (s *DeviceSubServer) Publish(topic string, payload []byte) error { //服务器端下发的消息直接忽略 return nil } - pub := ddExport.DevPublish{ + pub := devices.DevPublish{ Timestamp: time.Now().UnixMilli(), Topic: topic, Payload: payload, ProductID: topicInfo.ProductID, DeviceName: topicInfo.DeviceName, } - pubStr, _ := json.Marshal(pub) - return s.svcCtx.InnerLink.Publish(s.ctx, - fmt.Sprintf(ddExport.TopicDevPublish, topicInfo.ProductID, topicInfo.DeviceName), pubStr) + return s.svcCtx.InnerLink.PubDevPublish(s.ctx, pub) } -func (s *DeviceSubServer) Connected(info *ddExport.DevConn) error { +func (s *DeviceSubServer) Connected(info *devices.DevConn) error { s.Info("Connected", info) - str, _ := json.Marshal(info) - return s.svcCtx.InnerLink.Publish(s.ctx, ddExport.TopicDevConnected, str) + return s.svcCtx.InnerLink.PubConn(s.ctx, innerLink.Connect, info) } -func (s *DeviceSubServer) Disconnected(info *ddExport.DevConn) error { +func (s *DeviceSubServer) Disconnected(info *devices.DevConn) error { s.Info("Disconnected", info) - str, _ := json.Marshal(info) - return s.svcCtx.InnerLink.Publish(s.ctx, ddExport.TopicDevDisconnected, str) + return s.svcCtx.InnerLink.PubConn(s.ctx, innerLink.DisConnect, info) } diff --git a/src/ddsvr/internal/repo/event/devLink/devLink.go b/src/ddsvr/internal/repo/event/devLink/devLink.go index 27afd7971..b1989115d 100644 --- a/src/ddsvr/internal/repo/event/devLink/devLink.go +++ b/src/ddsvr/internal/repo/event/devLink/devLink.go @@ -4,7 +4,7 @@ import ( "context" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/i-Things/things/src/ddsvr/ddExport" + "github.com/i-Things/things/shared/devices" "github.com/i-Things/things/src/ddsvr/internal/config" ) @@ -20,8 +20,8 @@ type ( Handle func(ctx context.Context) DevSubHandle DevSubHandle interface { Publish(topic string, payload []byte) error - Connected(out *ddExport.DevConn) error - Disconnected(out *ddExport.DevConn) error + Connected(out *devices.DevConn) error + Disconnected(out *devices.DevConn) error } ) diff --git a/src/ddsvr/internal/repo/event/devLink/mqtt.go b/src/ddsvr/internal/repo/event/devLink/mqtt.go index 77529dd7d..baa2dc68d 100644 --- a/src/ddsvr/internal/repo/event/devLink/mqtt.go +++ b/src/ddsvr/internal/repo/event/devLink/mqtt.go @@ -7,7 +7,8 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/hashicorp/go-uuid" "github.com/i-Things/things/shared/conf" - "github.com/i-Things/things/src/ddsvr/ddExport" + "github.com/i-Things/things/shared/devices" + "github.com/i-Things/things/shared/utils" "github.com/zeromicro/go-zero/core/logx" "strings" "time" @@ -27,6 +28,11 @@ type ( } ) +const ( + ActionLogin = "onLogin" + ActionLogout = "onLogout" +) + func NewEmqClient(conf *conf.MqttConf) (DevLink, error) { opts := mqtt.NewClientOptions() for _, broker := range conf.Brokers { @@ -69,7 +75,7 @@ func (d *MqttClient) SubScribe(handle Handle) error { logx.Error(err) return } - do := ddExport.DevConn{ + do := devices.DevConn{ UserName: msg.UserName, Timestamp: msg.Ts, //毫秒时间戳 Address: msg.Address, @@ -77,15 +83,17 @@ func (d *MqttClient) SubScribe(handle Handle) error { Reason: msg.Reason, } if strings.HasSuffix(message.Topic(), "/disconnected") { - logx.WithContext(ctx).Info("disconnected", string(message.Payload()), message.Topic(), err) - do.Action = ddExport.ActionLogout + logx.WithContext(ctx).Infof("%s|disconnected|topic:%v,message:%v,err:%v", + utils.FuncName(), message.Topic(), string(message.Payload()), err) + do.Action = ActionLogout err = handle(ctx).Disconnected(&do) if err != nil { logx.Error(err) } } else { - do.Action = ddExport.ActionLogin - logx.WithContext(ctx).Info("connected", string(message.Payload()), message.Topic(), err) + do.Action = ActionLogin + logx.WithContext(ctx).Infof("%s|connected|topic:%v,message:%v,err:%v", + utils.FuncName(), message.Topic(), string(message.Payload()), err) err = handle(ctx).Connected(&do) if err != nil { logx.Error(err) @@ -99,7 +107,8 @@ func (d *MqttClient) SubScribe(handle Handle) error { 1, func(client mqtt.Client, message mqtt.Message) { ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) err := handle(ctx).Publish(message.Topic(), message.Payload()) - logx.WithContext(ctx).Info("publish", message.Topic(), string(message.Payload()), err) + logx.WithContext(ctx).Infof("%s|publish|topic:%v,message:%v,err:%v", + utils.FuncName(), message.Topic(), string(message.Payload()), err) }).Error() return err diff --git a/src/ddsvr/internal/repo/event/innerLink/innerLink.go b/src/ddsvr/internal/repo/event/innerLink/innerLink.go index 8f6cc7628..82cf18d5f 100644 --- a/src/ddsvr/internal/repo/event/innerLink/innerLink.go +++ b/src/ddsvr/internal/repo/event/innerLink/innerLink.go @@ -3,12 +3,23 @@ package innerLink import ( //"github.com/i-Things/things/src/ddsvr/internal/domain" "context" + "github.com/i-Things/things/shared/devices" "github.com/i-Things/things/src/ddsvr/internal/config" ) +type ConnType int8 + +const ( + Connect ConnType = iota + DisConnect +) + type ( InnerLink interface { - Publish(ctx context.Context, topic string, payload []byte) error + //向内部发布设备发布的消息 + PubDevPublish(ctx context.Context, publishMsg devices.DevPublish) error + //向内部发布连接及断连消息 + PubConn(ctx context.Context, conn ConnType, info *devices.DevConn) error Subscribe(handle Handle) error } Handle func(ctx context.Context) InnerSubHandle diff --git a/src/ddsvr/internal/repo/event/innerLink/nats.go b/src/ddsvr/internal/repo/event/innerLink/nats.go index 509ce3a56..cea612800 100644 --- a/src/ddsvr/internal/repo/event/innerLink/nats.go +++ b/src/ddsvr/internal/repo/event/innerLink/nats.go @@ -2,9 +2,12 @@ package innerLink import ( "context" + "encoding/json" + "fmt" "github.com/i-Things/things/shared/conf" + "github.com/i-Things/things/shared/devices" "github.com/i-Things/things/shared/events" - "github.com/i-Things/things/src/ddsvr/ddExport" + "github.com/i-Things/things/src/ddsvr/dd" "github.com/nats-io/nats.go" ) @@ -14,6 +17,22 @@ type ( } ) +const ( + ThingsQueueConsumeName = "things_dd_queue_consume" + //topic 定义 + ThingsStreamName = "thing_msg" + // TopicDevPublish dd模块收到设备的发布消息后向内部推送以下topic 最后两个是产品id和设备名称 + TopicDevPublish = "dd.thing.device.clients.publish.%s.%s" + + // TopicDevConnected dd模块收到设备的登录消息后向内部推送以下topic + TopicDevConnected = "dd.thing.device.clients.connected" + // TopicDevDisconnected dd模块收到设备的登出消息后向内部推送以下topic + TopicDevDisconnected = "dd.thing.device.clients.disconnected" + // TopicInnerPublish dd模块订阅以下topic,收到内部的发布消息后向设备推送 + TopicInnerPublish = "dd.thing.inner.publish" + TopicThing = "dd.thing.device.clients.>" +) + func NewNatsClient(conf conf.NatsConf) (InnerLink, error) { connectOpts := nats.Options{ Url: conf.Url, @@ -30,20 +49,22 @@ func NewNatsClient(conf conf.NatsConf) (InnerLink, error) { return nil, err } _, err = js.AddStream(&nats.StreamConfig{ - Name: ddExport.ThingsStreamName, + Name: ThingsStreamName, Subjects: []string{ - ddExport.TopicInnerPublish, - ddExport.TopicDevPublishAll, - ddExport.TopicDevConnected, - ddExport.TopicDevDisconnected, + TopicThing, }, }) if err != nil { return nil, err } - _, err = js.AddConsumer(ddExport.ThingsStreamName, &nats.ConsumerConfig{ - Durable: ddExport.ThingsConsumeName, + _, err = js.AddConsumer(ThingsStreamName, &nats.ConsumerConfig{ + Durable: ThingsQueueConsumeName, AckPolicy: nats.AckExplicitPolicy, + //MaxRequestBatch: 10, + //MaxRequestExpires: 2 * time.Second, + DeliverPolicy: nats.DeliverLastPolicy, + DeliverSubject: nats.NewInbox(), + DeliverGroup: dd.ThingsDDDeliverGroup, }) if err != nil { return nil, err @@ -51,15 +72,32 @@ func NewNatsClient(conf conf.NatsConf) (InnerLink, error) { return &NatsClient{client: js}, nil } -func (n *NatsClient) Publish(ctx context.Context, topic string, payload []byte) error { +func (n *NatsClient) PubDevPublish(ctx context.Context, publishMsg devices.DevPublish) error { + pubStr, _ := json.Marshal(publishMsg) + return n.publish(ctx, + fmt.Sprintf(TopicDevPublish, publishMsg.ProductID, publishMsg.DeviceName), pubStr) +} + +func (n *NatsClient) PubConn(ctx context.Context, conn ConnType, info *devices.DevConn) error { + str, _ := json.Marshal(info) + switch conn { + case Connect: + return n.publish(ctx, TopicDevConnected, str) + case DisConnect: + return n.publish(ctx, TopicDevDisconnected, str) + default: + panic("not support conn type") + } +} +func (n *NatsClient) publish(ctx context.Context, topic string, payload []byte) error { _, err := n.client.Publish(topic, events.NewEventMsg(ctx, payload)) return err } func (n *NatsClient) Subscribe(handle Handle) error { - _, err := n.client.QueueSubscribe(ddExport.TopicInnerPublish, ddExport.SvrName, func(msg *nats.Msg) { + _, err := n.client.QueueSubscribe(TopicInnerPublish, dd.ThingsDDDeliverGroup, func(msg *nats.Msg) { msg.Ack() - ctx, topic, payload := ddExport.GetPublish(msg.Data) + ctx, topic, payload := devices.GetPublish(msg.Data) handle(ctx).PublishToDev(topic, payload) }) return err diff --git a/src/dmsvr/dmDef/dm.go b/src/dmsvr/dmDef/dm.go deleted file mode 100644 index 8f0c278bc..000000000 --- a/src/dmsvr/dmDef/dm.go +++ /dev/null @@ -1,8 +0,0 @@ -package dmDef - -const ( - SvrName = "dm_rpc" - DmUpdateConsumeName = "dm_rpc_update_consume" - DmUpdateStreamName = "dm_rpc_update_msg" - TopicUpdate = "dm.update" -) diff --git a/src/dmsvr/internal/domain/device/deviceMsg.go b/src/dmsvr/internal/domain/device/deviceMsg.go index 8b9905c19..c55731788 100644 --- a/src/dmsvr/internal/domain/device/deviceMsg.go +++ b/src/dmsvr/internal/domain/device/deviceMsg.go @@ -4,7 +4,7 @@ package device import ( "context" "encoding/json" - "github.com/i-Things/things/src/ddsvr/ddExport" + "github.com/i-Things/things/shared/devices" "github.com/zeromicro/go-zero/core/logx" "time" ) @@ -28,7 +28,7 @@ type ConnectMsg struct { } func GetDevConnMsg(ctx context.Context, data []byte) (*ConnectMsg, error) { - logInfo := ddExport.DevConn{} + logInfo := devices.DevConn{} err := json.Unmarshal(data, &logInfo) if err != nil { logx.WithContext(ctx).Error("getDevConnMsg", string(data), err) @@ -46,7 +46,7 @@ func GetDevConnMsg(ctx context.Context, data []byte) (*ConnectMsg, error) { } func GetDevPublish(ctx context.Context, data []byte) (*PublishMsg, error) { - pubInfo := ddExport.DevPublish{} + pubInfo := devices.DevPublish{} err := json.Unmarshal(data, &pubInfo) if err != nil { logx.WithContext(ctx).Error("GetDevPublish", string(data), err) diff --git a/src/dmsvr/internal/event/deviceMsgEvent/deviceSubscribe.go b/src/dmsvr/internal/event/deviceMsgEvent/deviceSubscribe.go index d03af350f..48c4c0869 100644 --- a/src/dmsvr/internal/event/deviceMsgEvent/deviceSubscribe.go +++ b/src/dmsvr/internal/event/deviceMsgEvent/deviceSubscribe.go @@ -25,12 +25,12 @@ func NewDeviceMsgHandle(ctx context.Context, svcCtx *svc.ServiceContext) *Device } func (l *DeviceMsgHandle) Publish(msg *device.PublishMsg) error { - l.Infof("DevReqLogic|req=%+v", msg) + l.Infof("DevReqLogic|req=%+v", utils.GetJson(msg)) return NewPublishLogic(l.ctx, l.svcCtx).Handle(msg) } func (l *DeviceMsgHandle) Connected(msg *device.ConnectMsg) error { - l.Infof("ConnectLogic|req=%+v", msg) + l.Infof("ConnectLogic|req=%+v", utils.GetJson(msg)) //todo 这里需要查询下数据库,避免数据错误 ld, err := device.GetClientIDInfo(msg.ClientID) if err != nil { @@ -53,7 +53,7 @@ func (l *DeviceMsgHandle) Connected(msg *device.ConnectMsg) error { } func (l *DeviceMsgHandle) Disconnected(msg *device.ConnectMsg) error { - l.Infof("DisconnectLogic|req=%+v", msg) + l.Infof("DisconnectLogic|req=%+v", utils.GetJson(msg)) ld, err := device.GetClientIDInfo(msg.ClientID) if err != nil { return err diff --git a/src/dmsvr/internal/logic/manageproductlogic.go b/src/dmsvr/internal/logic/manageproductlogic.go index 9c84686e9..6ccb240e3 100644 --- a/src/dmsvr/internal/logic/manageproductlogic.go +++ b/src/dmsvr/internal/logic/manageproductlogic.go @@ -215,6 +215,10 @@ func (l *ManageProductLogic) DelProduct(in *dm.ManageProductReq) (*dm.ProductInf l.Errorf("DelProduct|Delete|err=%+v", err) return nil, errors.Database.AddDetail(err.Error()) } + err = l.svcCtx.DataUpdate.TempModelUpdate(l.ctx, &templateModel.TemplateInfo{ProductID: in.Info.ProductID}) + if err != nil { + return nil, err + } return &dm.ProductInfo{}, nil } diff --git a/src/dmsvr/internal/logic/manageproducttemplatelogic.go b/src/dmsvr/internal/logic/manageproducttemplatelogic.go index c8f517279..fed254e52 100644 --- a/src/dmsvr/internal/logic/manageproducttemplatelogic.go +++ b/src/dmsvr/internal/logic/manageproducttemplatelogic.go @@ -46,6 +46,10 @@ func (l *ManageProductTemplateLogic) ModifyProductTemplate(in *dm.ManageProductT l.Errorf("ModifyProductTemplate|ProductTemplate|Update|err=%+v", err) return nil, errors.System.AddDetail(err.Error()) } + err = l.svcCtx.DataUpdate.TempModelUpdate(l.ctx, &templateModel.TemplateInfo{ProductID: in.Info.ProductID}) + if err != nil { + return nil, err + } pt, err := l.svcCtx.TemplateRepo.GetTemplateInfo(l.ctx, in.Info.ProductID) if err != nil { return nil, err @@ -74,6 +78,10 @@ func (l *ManageProductTemplateLogic) AddProductTemplate(in *dm.ManageProductTemp if err != nil { return nil, err } + err = l.svcCtx.DataUpdate.TempModelUpdate(l.ctx, &templateModel.TemplateInfo{ProductID: in.Info.ProductID}) + if err != nil { + return nil, err + } pt, err := l.svcCtx.TemplateRepo.GetTemplateInfo(l.ctx, in.Info.ProductID) if err != nil { return nil, err diff --git a/src/dmsvr/internal/repo/event/dataUpdate/nats.go b/src/dmsvr/internal/repo/event/dataUpdate/nats.go index 4e4be70b5..e9acd15fb 100644 --- a/src/dmsvr/internal/repo/event/dataUpdate/nats.go +++ b/src/dmsvr/internal/repo/event/dataUpdate/nats.go @@ -6,8 +6,6 @@ import ( "github.com/i-Things/things/shared/conf" "github.com/i-Things/things/shared/events" "github.com/i-Things/things/shared/utils" - "github.com/i-Things/things/src/ddsvr/ddExport" - "github.com/i-Things/things/src/dmsvr/dmDef" "github.com/i-Things/things/src/dmsvr/internal/domain/templateModel" "github.com/nats-io/nats.go" "github.com/zeromicro/go-zero/core/logx" @@ -19,6 +17,13 @@ type ( } ) +const ( + DmUpdateConsumeName = "dm_rpc_update_consume" + DmUpdateStreamName = "dm_rpc_update_msg" + TopicUpdate = "dm.update" + DmUpdateDeliverGroup = "dm_rpc_update_group" +) + func NewNatsClient(conf conf.NatsConf) (*NatsClient, error) { connectOpts := nats.Options{ Url: conf.Url, @@ -35,17 +40,19 @@ func NewNatsClient(conf conf.NatsConf) (*NatsClient, error) { return nil, err } _, err = js.AddStream(&nats.StreamConfig{ - Name: dmDef.DmUpdateStreamName, + Name: DmUpdateStreamName, Subjects: []string{ - dmDef.TopicUpdate, + TopicUpdate, }, }) if err != nil { return nil, err } - _, err = js.AddConsumer(dmDef.DmUpdateStreamName, &nats.ConsumerConfig{ - Durable: dmDef.DmUpdateConsumeName, - AckPolicy: nats.AckExplicitPolicy, + _, err = js.AddConsumer(DmUpdateStreamName, &nats.ConsumerConfig{ + Durable: DmUpdateConsumeName, + AckPolicy: nats.AckExplicitPolicy, + DeliverSubject: nats.NewInbox(), + //DeliverGroup: ThingsDeliverGroup, }) if err != nil { return nil, err @@ -58,12 +65,14 @@ func (n *NatsClient) TempModelUpdate(ctx context.Context, info *templateModel.Te if err != nil { return err } - _, err = n.client.Publish(dmDef.TopicUpdate, events.NewEventMsg(ctx, data)) + _, err = n.client.Publish(TopicUpdate, events.NewEventMsg(ctx, data)) + logx.WithContext(ctx).Infof("%s|info:%v,err:%v", utils.FuncName(), + info, err) return err } func (n *NatsClient) Subscribe(handle Handle) error { - _, err := n.client.Subscribe(dmDef.DmUpdateStreamName, func(msg *nats.Msg) { + _, err := n.client.Subscribe(TopicUpdate, func(msg *nats.Msg) { msg.Ack() emsg := events.GetEventMsg(msg.Data) if emsg == nil { @@ -81,8 +90,8 @@ func (n *NatsClient) Subscribe(handle Handle) error { } err = handle(ctx).TempModelClearCache(&tempInfo) logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), - ddExport.TopicDevPublishAll, msg.Subject, string(msg.Data), err) - }) + TopicUpdate, msg.Subject, string(msg.Data), err) + }, nats.Durable(DmUpdateConsumeName), nats.BindStream(DmUpdateStreamName)) if err != nil { return err } diff --git a/src/dmsvr/internal/repo/event/innerLink/nats.go b/src/dmsvr/internal/repo/event/innerLink/nats.go index f1c4de28d..0588941bc 100644 --- a/src/dmsvr/internal/repo/event/innerLink/nats.go +++ b/src/dmsvr/internal/repo/event/innerLink/nats.go @@ -5,11 +5,10 @@ import ( "encoding/json" "fmt" "github.com/i-Things/things/shared/conf" + "github.com/i-Things/things/shared/devices" "github.com/i-Things/things/shared/errors" "github.com/i-Things/things/shared/events" "github.com/i-Things/things/shared/utils" - "github.com/i-Things/things/src/ddsvr/ddExport" - "github.com/i-Things/things/src/dmsvr/dmDef" "github.com/i-Things/things/src/dmsvr/internal/domain/device" deviceSend "github.com/i-Things/things/src/dmsvr/internal/domain/service/deviceSend" "github.com/nats-io/nats.go" @@ -23,6 +22,27 @@ type ( } ) +//topic 定义 +const ( + ThingsStreamName = "thing_msg" + // TopicDevPublish dd模块收到设备的发布消息后向内部推送以下topic 最后两个是产品id和设备名称 + TopicDevPublish = "dd.thing.device.clients.publish.%s.%s" + TopicDevPublishAll = "dd.thing.device.clients.publish.>" + + // TopicDevConnected dd模块收到设备的登录消息后向内部推送以下topic + TopicDevConnected = "dd.thing.device.clients.connected" + // TopicDevDisconnected dd模块收到设备的登出消息后向内部推送以下topic + TopicDevDisconnected = "dd.thing.device.clients.disconnected" + // TopicInnerPublish dd模块订阅以下topic,收到内部的发布消息后向设备推送 + TopicInnerPublish = "dd.thing.inner.publish" + TopicThing = "dd.thing.device.clients.>" +) +const ( + ThingsDeliverGroup = "things_dm_group" + ThingsQueueConsumeName = "things_dm_queue_consume" + ThingsAllConsumeName = "things_dm_all_consume" +) + func NewNatsClient(conf conf.NatsConf) (*NatsClient, error) { connectOpts := nats.Options{ Url: conf.Url, @@ -39,20 +59,39 @@ func NewNatsClient(conf conf.NatsConf) (*NatsClient, error) { return nil, err } _, err = js.AddStream(&nats.StreamConfig{ - Name: ddExport.ThingsStreamName, + Name: ThingsStreamName, Subjects: []string{ - ddExport.TopicInnerPublish, - ddExport.TopicDevPublishAll, - ddExport.TopicDevConnected, - ddExport.TopicDevDisconnected, + TopicThing, }, }) if err != nil { return nil, err } - _, err = js.AddConsumer(ddExport.ThingsStreamName, &nats.ConsumerConfig{ - Durable: ddExport.ThingsConsumeName, + _, err = js.AddConsumer(ThingsStreamName, &nats.ConsumerConfig{ + Durable: ThingsQueueConsumeName, AckPolicy: nats.AckExplicitPolicy, + //MaxRequestBatch: 10, + //MaxRequestExpires: 2 * time.Second, + DeliverPolicy: nats.DeliverLastPolicy, + DeliverSubject: nats.NewInbox(), + DeliverGroup: ThingsDeliverGroup, + }) + _, err = js.AddConsumer(ThingsStreamName, &nats.ConsumerConfig{ + Durable: ThingsQueueConsumeName + "2", + AckPolicy: nats.AckExplicitPolicy, + //MaxRequestBatch: 10, + //MaxRequestExpires: 2 * time.Second, + DeliverPolicy: nats.DeliverLastPolicy, + DeliverSubject: nats.NewInbox(), + DeliverGroup: ThingsDeliverGroup + "2", + }) + _, err = js.AddConsumer(ThingsStreamName, &nats.ConsumerConfig{ + Durable: ThingsAllConsumeName, + AckPolicy: nats.AckExplicitPolicy, + //MaxRequestBatch: 10, + //MaxRequestExpires: 2 * time.Second, + DeliverPolicy: nats.DeliverNewPolicy, + DeliverSubject: nats.NewInbox(), }) if err != nil { return nil, err @@ -61,12 +100,13 @@ func NewNatsClient(conf conf.NatsConf) (*NatsClient, error) { } func (n *NatsClient) PublishToDev(ctx context.Context, topic string, payload []byte) error { - _, err := n.client.Publish(ddExport.TopicInnerPublish, ddExport.PublishToDev(ctx, topic, payload)) + _, err := n.client.Publish(TopicInnerPublish, devices.PublishToDev(ctx, topic, payload)) return err } func (n *NatsClient) SubscribeDevSync(ctx context.Context, topic string) (*SubDev, error) { - subscription, err := n.client.SubscribeSync(topic) + subscription, err := n.client.SubscribeSync(topic, nats.Durable(ThingsAllConsumeName), + nats.BindStream(ThingsStreamName)) if err != nil { return nil, err } @@ -74,27 +114,51 @@ func (n *NatsClient) SubscribeDevSync(ctx context.Context, topic string) (*SubDe } func (n *NatsClient) Subscribe(handle Handle) error { - _, err := n.client.QueueSubscribe(ddExport.TopicDevPublishAll, dmDef.SvrName, func(msg *nats.Msg) { - msg.Ack() - emsg := events.GetEventMsg(msg.Data) - if emsg == nil { - logx.Error(msg.Subject, string(msg.Data)) - return - } - ctx := emsg.GetCtx() - ele, err := device.GetDevPublish(ctx, emsg.GetData()) - if err != nil { - logx.WithContext(ctx).Error(msg.Subject, string(msg.Data), err) - return - } - err = handle(ctx).Publish(ele) - logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), - ddExport.TopicDevPublishAll, msg.Subject, string(msg.Data), err) - }) - if err != nil { - return err - } - _, err = n.client.QueueSubscribe(ddExport.TopicDevConnected, dmDef.SvrName, func(msg *nats.Msg) { + //_, err := n.client.QueueSubscribe(TopicDevPublishAll, ThingsDeliverGroup, func(msg *nats.Msg) { + // msg.Ack() + // emsg := events.GetEventMsg(msg.Data) + // if emsg == nil { + // logx.Error(msg.Subject, string(msg.Data)) + // return + // } + // ctx := emsg.GetCtx() + // ele, err := device.GetDevPublish(ctx, emsg.GetData()) + // if err != nil { + // logx.WithContext(ctx).Error(msg.Subject, string(msg.Data), err) + // return + // } + // err = handle(ctx).Publish(ele) + // logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), + // TopicDevPublishAll, msg.Subject, string(msg.Data), err) + //}, nats.Durable(ThingsQueueConsumeName), nats.BindStream(ThingsStreamName)) + //if err != nil { + // return err + //} + //_, err = n.client.QueueSubscribe(TopicDevPublishAll, ThingsDeliverGroup+"2", func(msg *nats.Msg) { + // err := msg.Ack() + // if err != nil { + // logx.Error(msg.Subject, string(msg.Data), err) + // return + // } + // emsg := events.GetEventMsg(msg.Data) + // if emsg == nil { + // logx.Error(msg.Subject, string(msg.Data)) + // return + // } + // ctx := emsg.GetCtx() + // ele, err := device.GetDevPublish(ctx, emsg.GetData()) + // if err != nil { + // logx.WithContext(ctx).Error(msg.Subject, string(msg.Data), err) + // return + // } + // err = handle(ctx).Publish(ele) + // logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), + // TopicDevPublishAll, msg.Subject, string(msg.Data), err) + //}, nats.Durable(ThingsQueueConsumeName+"2"), nats.BindStream(ThingsStreamName)) + //if err != nil { + // return err + //} + _, err := n.client.QueueSubscribe(TopicDevConnected, ThingsDeliverGroup, func(msg *nats.Msg) { msg.Ack() emsg := events.GetEventMsg(msg.Data) if emsg == nil { @@ -109,12 +173,12 @@ func (n *NatsClient) Subscribe(handle Handle) error { } err = handle(ctx).Connected(ele) logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), - ddExport.TopicDevConnected, msg.Subject, string(msg.Data), err) - }) + TopicDevConnected, msg.Subject, string(msg.Data), err) + }, nats.Durable(ThingsQueueConsumeName), nats.BindStream(ThingsStreamName)) if err != nil { return err } - _, err = n.client.QueueSubscribe(ddExport.TopicDevDisconnected, dmDef.SvrName, func(msg *nats.Msg) { + _, err = n.client.QueueSubscribe(TopicDevDisconnected, ThingsDeliverGroup, func(msg *nats.Msg) { msg.Ack() emsg := events.GetEventMsg(msg.Data) if emsg == nil { @@ -129,8 +193,8 @@ func (n *NatsClient) Subscribe(handle Handle) error { } err = handle(ctx).Disconnected(ele) logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), - ddExport.TopicDevDisconnected, msg.Subject, string(msg.Data), err) - }) + TopicDevDisconnected, msg.Subject, string(msg.Data), err) + }, nats.Durable(ThingsQueueConsumeName), nats.BindStream(ThingsStreamName)) if err != nil { return err } @@ -144,7 +208,7 @@ func (n *NatsClient) ReqToDeviceSync(ctx context.Context, reqTopic, respTopic st if err != nil { return nil, err } - handle, err := n.SubscribeDevSync(ctx, fmt.Sprintf(ddExport.TopicDevPublish, productID, deviceName)) + handle, err := n.SubscribeDevSync(ctx, fmt.Sprintf(TopicDevPublish, productID, deviceName)) if err != nil { return nil, err } -- Gitee From 6a46fd929c9ff7406f989921be9c8ab1de7a4165 Mon Sep 17 00:00:00 2001 From: godLei6 <603785348@qq.com> Date: Wed, 27 Apr 2022 21:56:26 +0800 Subject: [PATCH 4/4] =?UTF-8?q?dev:=20=E5=AE=8C=E6=88=90=E4=BA=A4=E4=BB=98?= =?UTF-8?q?=E7=89=88=E6=9C=AC=E5=BC=80=E5=8F=91,=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E5=BC=80=E5=8F=91=E7=BB=93=E6=9D=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- shared/devices/msg.go | 13 +- shared/events/msgQuque.go | 1 - shared/events/nats.go | 24 +++ .../internal/repo/event/innerLink/nats.go | 41 +---- .../internal/logic/manageproductlogic.go | 7 +- .../logic/manageproducttemplatelogic.go | 7 +- .../internal/repo/event/dataUpdate/nats.go | 57 ++---- .../internal/repo/event/innerLink/nats.go | 166 ++++-------------- 8 files changed, 96 insertions(+), 220 deletions(-) delete mode 100644 shared/events/msgQuque.go create mode 100644 shared/events/nats.go diff --git a/shared/devices/msg.go b/shared/devices/msg.go index 8d5f0ea97..f0d34403d 100644 --- a/shared/devices/msg.go +++ b/shared/devices/msg.go @@ -1,9 +1,7 @@ package devices import ( - "context" "encoding/json" - "github.com/i-Things/things/shared/events" ) type ( @@ -32,19 +30,18 @@ type ( ) //发送给设备的数据组包 -func PublishToDev(ctx context.Context, topic string, payload []byte) []byte { +func PublishToDev(topic string, payload []byte) []byte { pub := InnerPublish{ Topic: topic, Payload: payload, } data, _ := json.Marshal(pub) - return events.NewEventMsg(ctx, data) + return data } //收到发送给设备的数据,解包 -func GetPublish(data []byte) (ctx context.Context, topic string, payload []byte) { +func GetPublish(data []byte) (topic string, payload []byte) { pub := InnerPublish{} - msg := events.GetEventMsg(data) - _ = json.Unmarshal(msg.GetData(), &pub) - return msg.GetCtx(), pub.Topic, pub.Payload + _ = json.Unmarshal(data, &pub) + return pub.Topic, pub.Payload } diff --git a/shared/events/msgQuque.go b/shared/events/msgQuque.go deleted file mode 100644 index b3adf695c..000000000 --- a/shared/events/msgQuque.go +++ /dev/null @@ -1 +0,0 @@ -package events diff --git a/shared/events/nats.go b/shared/events/nats.go new file mode 100644 index 000000000..a384bd44b --- /dev/null +++ b/shared/events/nats.go @@ -0,0 +1,24 @@ +package events + +import ( + "context" + "github.com/nats-io/nats.go" + "github.com/zeromicro/go-zero/core/logx" +) + +type HandleFunc func(ctx context.Context, msg []byte) error + +func NatsSubscription(handle HandleFunc) func(msg *nats.Msg) { + return func(msg *nats.Msg) { + msg.Ack() + emsg := GetEventMsg(msg.Data) + if emsg == nil { + logx.Error(msg.Subject, string(msg.Data)) + return + } + ctx := emsg.GetCtx() + err := handle(ctx, emsg.GetData()) + logx.WithContext(ctx).Infof("nats subscription|subject:%v,data:%v,err:%v", + msg.Subject, string(msg.Data), err) + } +} diff --git a/src/ddsvr/internal/repo/event/innerLink/nats.go b/src/ddsvr/internal/repo/event/innerLink/nats.go index cea612800..b6e2910a3 100644 --- a/src/ddsvr/internal/repo/event/innerLink/nats.go +++ b/src/ddsvr/internal/repo/event/innerLink/nats.go @@ -13,7 +13,7 @@ import ( type ( NatsClient struct { - client nats.JetStreamContext + client *nats.Conn } ) @@ -44,32 +44,7 @@ func NewNatsClient(conf conf.NatsConf) (InnerLink, error) { if err != nil { return nil, err } - js, err := nc.JetStream() - if err != nil { - return nil, err - } - _, err = js.AddStream(&nats.StreamConfig{ - Name: ThingsStreamName, - Subjects: []string{ - TopicThing, - }, - }) - if err != nil { - return nil, err - } - _, err = js.AddConsumer(ThingsStreamName, &nats.ConsumerConfig{ - Durable: ThingsQueueConsumeName, - AckPolicy: nats.AckExplicitPolicy, - //MaxRequestBatch: 10, - //MaxRequestExpires: 2 * time.Second, - DeliverPolicy: nats.DeliverLastPolicy, - DeliverSubject: nats.NewInbox(), - DeliverGroup: dd.ThingsDDDeliverGroup, - }) - if err != nil { - return nil, err - } - return &NatsClient{client: js}, nil + return &NatsClient{client: nc}, nil } func (n *NatsClient) PubDevPublish(ctx context.Context, publishMsg devices.DevPublish) error { @@ -91,14 +66,14 @@ func (n *NatsClient) PubConn(ctx context.Context, conn ConnType, info *devices.D } func (n *NatsClient) publish(ctx context.Context, topic string, payload []byte) error { - _, err := n.client.Publish(topic, events.NewEventMsg(ctx, payload)) + err := n.client.Publish(topic, events.NewEventMsg(ctx, payload)) return err } func (n *NatsClient) Subscribe(handle Handle) error { - _, err := n.client.QueueSubscribe(TopicInnerPublish, dd.ThingsDDDeliverGroup, func(msg *nats.Msg) { - msg.Ack() - ctx, topic, payload := devices.GetPublish(msg.Data) - handle(ctx).PublishToDev(topic, payload) - }) + _, err := n.client.QueueSubscribe(TopicInnerPublish, dd.ThingsDDDeliverGroup, + events.NatsSubscription(func(ctx context.Context, msg []byte) error { + topic, payload := devices.GetPublish(msg) + return handle(ctx).PublishToDev(topic, payload) + })) return err } diff --git a/src/dmsvr/internal/logic/manageproductlogic.go b/src/dmsvr/internal/logic/manageproductlogic.go index 6ccb240e3..161a947fb 100644 --- a/src/dmsvr/internal/logic/manageproductlogic.go +++ b/src/dmsvr/internal/logic/manageproductlogic.go @@ -56,9 +56,14 @@ func (l *ManageProductLogic) AddProduct(in *dm.ManageProductReq) (*dm.ProductInf } pi, pt := l.InsertProduct(in) t, _ := templateModel.NewTemplate([]byte(pt.Template)) + if err := l.svcCtx.DeviceLogRepo.InitProduct( + l.ctx, pi.ProductID); err != nil { + l.Errorf("%s|DeviceLogRepo|InitProduct| failure,err:%v", utils.FuncName(), err) + return nil, errors.Database.AddDetail(err) + } if err := l.svcCtx.DeviceDataRepo.InitProduct( l.ctx, t, pi.ProductID); err != nil { - l.Errorf("%s InitProduct failure,err:%v", utils.FuncName(), err) + l.Errorf("%s|DeviceDataRepo|InitProduct| failure,err:%v", utils.FuncName(), err) return nil, errors.Database.AddDetail(err) } err = l.svcCtx.DmDB.Insert(pi, pt) diff --git a/src/dmsvr/internal/logic/manageproducttemplatelogic.go b/src/dmsvr/internal/logic/manageproducttemplatelogic.go index fed254e52..5ad582f9c 100644 --- a/src/dmsvr/internal/logic/manageproducttemplatelogic.go +++ b/src/dmsvr/internal/logic/manageproducttemplatelogic.go @@ -70,8 +70,13 @@ func (l *ManageProductTemplateLogic) AddProductTemplate(in *dm.ManageProductTemp if err != nil { return nil, err } + if err := l.svcCtx.DeviceLogRepo.InitProduct( + l.ctx, in.Info.ProductID); err != nil { + l.Errorf("%s|DeviceLogRepo|InitProduct| failure,err:%v", utils.FuncName(), err) + return nil, errors.Database.AddDetail(err) + } if err := l.svcCtx.DeviceDataRepo.InitProduct(l.ctx, t, in.Info.ProductID); err != nil { - l.Errorf("%s InitProduct failure,err:%v", utils.FuncName(), err) + l.Errorf("%s|DeviceDataRepo|InitProduct| failure,err:%v", utils.FuncName(), err) return nil, errors.Database.AddDetail(err) } err = l.svcCtx.TemplateRepo.Insert(l.ctx, in.Info.ProductID, t) diff --git a/src/dmsvr/internal/repo/event/dataUpdate/nats.go b/src/dmsvr/internal/repo/event/dataUpdate/nats.go index e9acd15fb..9bd72e477 100644 --- a/src/dmsvr/internal/repo/event/dataUpdate/nats.go +++ b/src/dmsvr/internal/repo/event/dataUpdate/nats.go @@ -13,7 +13,7 @@ import ( type ( NatsClient struct { - client nats.JetStreamContext + client *nats.Conn } ) @@ -35,29 +35,7 @@ func NewNatsClient(conf conf.NatsConf) (*NatsClient, error) { if err != nil { return nil, err } - js, err := nc.JetStream() - if err != nil { - return nil, err - } - _, err = js.AddStream(&nats.StreamConfig{ - Name: DmUpdateStreamName, - Subjects: []string{ - TopicUpdate, - }, - }) - if err != nil { - return nil, err - } - _, err = js.AddConsumer(DmUpdateStreamName, &nats.ConsumerConfig{ - Durable: DmUpdateConsumeName, - AckPolicy: nats.AckExplicitPolicy, - DeliverSubject: nats.NewInbox(), - //DeliverGroup: ThingsDeliverGroup, - }) - if err != nil { - return nil, err - } - return &NatsClient{client: js}, nil + return &NatsClient{client: nc}, nil } func (n *NatsClient) TempModelUpdate(ctx context.Context, info *templateModel.TemplateInfo) error { @@ -65,33 +43,22 @@ func (n *NatsClient) TempModelUpdate(ctx context.Context, info *templateModel.Te if err != nil { return err } - _, err = n.client.Publish(TopicUpdate, events.NewEventMsg(ctx, data)) + err = n.client.Publish(TopicUpdate, events.NewEventMsg(ctx, data)) logx.WithContext(ctx).Infof("%s|info:%v,err:%v", utils.FuncName(), info, err) return err } func (n *NatsClient) Subscribe(handle Handle) error { - _, err := n.client.Subscribe(TopicUpdate, func(msg *nats.Msg) { - msg.Ack() - emsg := events.GetEventMsg(msg.Data) - if emsg == nil { - logx.Errorf("%v|GetEventMsg|subject:%v,data:%v", - utils.FuncName(), msg.Subject, string(msg.Data)) - return - } - ctx := emsg.GetCtx() - tempInfo := templateModel.TemplateInfo{} - err := json.Unmarshal(emsg.GetData(), &tempInfo) - if err != nil { - logx.Errorf("%v|Unmarshal|subject:%v,data:%v", - utils.FuncName(), msg.Subject, string(msg.Data)) - return - } - err = handle(ctx).TempModelClearCache(&tempInfo) - logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), - TopicUpdate, msg.Subject, string(msg.Data), err) - }, nats.Durable(DmUpdateConsumeName), nats.BindStream(DmUpdateStreamName)) + _, err := n.client.Subscribe(TopicUpdate, + events.NatsSubscription(func(ctx context.Context, msg []byte) error { + tempInfo := templateModel.TemplateInfo{} + err := json.Unmarshal(msg, &tempInfo) + if err != nil { + return err + } + return handle(ctx).TempModelClearCache(&tempInfo) + })) if err != nil { return err } diff --git a/src/dmsvr/internal/repo/event/innerLink/nats.go b/src/dmsvr/internal/repo/event/innerLink/nats.go index 0588941bc..750919108 100644 --- a/src/dmsvr/internal/repo/event/innerLink/nats.go +++ b/src/dmsvr/internal/repo/event/innerLink/nats.go @@ -10,7 +10,7 @@ import ( "github.com/i-Things/things/shared/events" "github.com/i-Things/things/shared/utils" "github.com/i-Things/things/src/dmsvr/internal/domain/device" - deviceSend "github.com/i-Things/things/src/dmsvr/internal/domain/service/deviceSend" + "github.com/i-Things/things/src/dmsvr/internal/domain/service/deviceSend" "github.com/nats-io/nats.go" "github.com/zeromicro/go-zero/core/logx" "time" @@ -18,13 +18,12 @@ import ( type ( NatsClient struct { - client nats.JetStreamContext + client *nats.Conn } ) //topic 定义 const ( - ThingsStreamName = "thing_msg" // TopicDevPublish dd模块收到设备的发布消息后向内部推送以下topic 最后两个是产品id和设备名称 TopicDevPublish = "dd.thing.device.clients.publish.%s.%s" TopicDevPublishAll = "dd.thing.device.clients.publish.>" @@ -35,12 +34,9 @@ const ( TopicDevDisconnected = "dd.thing.device.clients.disconnected" // TopicInnerPublish dd模块订阅以下topic,收到内部的发布消息后向设备推送 TopicInnerPublish = "dd.thing.inner.publish" - TopicThing = "dd.thing.device.clients.>" ) const ( - ThingsDeliverGroup = "things_dm_group" - ThingsQueueConsumeName = "things_dm_queue_consume" - ThingsAllConsumeName = "things_dm_all_consume" + ThingsDeliverGroup = "things_dm_group" ) func NewNatsClient(conf conf.NatsConf) (*NatsClient, error) { @@ -54,59 +50,17 @@ func NewNatsClient(conf conf.NatsConf) (*NatsClient, error) { if err != nil { return nil, err } - js, err := nc.JetStream() - if err != nil { - return nil, err - } - _, err = js.AddStream(&nats.StreamConfig{ - Name: ThingsStreamName, - Subjects: []string{ - TopicThing, - }, - }) - if err != nil { - return nil, err - } - _, err = js.AddConsumer(ThingsStreamName, &nats.ConsumerConfig{ - Durable: ThingsQueueConsumeName, - AckPolicy: nats.AckExplicitPolicy, - //MaxRequestBatch: 10, - //MaxRequestExpires: 2 * time.Second, - DeliverPolicy: nats.DeliverLastPolicy, - DeliverSubject: nats.NewInbox(), - DeliverGroup: ThingsDeliverGroup, - }) - _, err = js.AddConsumer(ThingsStreamName, &nats.ConsumerConfig{ - Durable: ThingsQueueConsumeName + "2", - AckPolicy: nats.AckExplicitPolicy, - //MaxRequestBatch: 10, - //MaxRequestExpires: 2 * time.Second, - DeliverPolicy: nats.DeliverLastPolicy, - DeliverSubject: nats.NewInbox(), - DeliverGroup: ThingsDeliverGroup + "2", - }) - _, err = js.AddConsumer(ThingsStreamName, &nats.ConsumerConfig{ - Durable: ThingsAllConsumeName, - AckPolicy: nats.AckExplicitPolicy, - //MaxRequestBatch: 10, - //MaxRequestExpires: 2 * time.Second, - DeliverPolicy: nats.DeliverNewPolicy, - DeliverSubject: nats.NewInbox(), - }) - if err != nil { - return nil, err - } - return &NatsClient{client: js}, nil + return &NatsClient{client: nc}, nil } func (n *NatsClient) PublishToDev(ctx context.Context, topic string, payload []byte) error { - _, err := n.client.Publish(TopicInnerPublish, devices.PublishToDev(ctx, topic, payload)) + msg := events.NewEventMsg(ctx, devices.PublishToDev(topic, payload)) + err := n.client.Publish(TopicInnerPublish, msg) return err } func (n *NatsClient) SubscribeDevSync(ctx context.Context, topic string) (*SubDev, error) { - subscription, err := n.client.SubscribeSync(topic, nats.Durable(ThingsAllConsumeName), - nats.BindStream(ThingsStreamName)) + subscription, err := n.client.SubscribeSync(topic) if err != nil { return nil, err } @@ -114,87 +68,37 @@ func (n *NatsClient) SubscribeDevSync(ctx context.Context, topic string) (*SubDe } func (n *NatsClient) Subscribe(handle Handle) error { - //_, err := n.client.QueueSubscribe(TopicDevPublishAll, ThingsDeliverGroup, func(msg *nats.Msg) { - // msg.Ack() - // emsg := events.GetEventMsg(msg.Data) - // if emsg == nil { - // logx.Error(msg.Subject, string(msg.Data)) - // return - // } - // ctx := emsg.GetCtx() - // ele, err := device.GetDevPublish(ctx, emsg.GetData()) - // if err != nil { - // logx.WithContext(ctx).Error(msg.Subject, string(msg.Data), err) - // return - // } - // err = handle(ctx).Publish(ele) - // logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), - // TopicDevPublishAll, msg.Subject, string(msg.Data), err) - //}, nats.Durable(ThingsQueueConsumeName), nats.BindStream(ThingsStreamName)) - //if err != nil { - // return err - //} - //_, err = n.client.QueueSubscribe(TopicDevPublishAll, ThingsDeliverGroup+"2", func(msg *nats.Msg) { - // err := msg.Ack() - // if err != nil { - // logx.Error(msg.Subject, string(msg.Data), err) - // return - // } - // emsg := events.GetEventMsg(msg.Data) - // if emsg == nil { - // logx.Error(msg.Subject, string(msg.Data)) - // return - // } - // ctx := emsg.GetCtx() - // ele, err := device.GetDevPublish(ctx, emsg.GetData()) - // if err != nil { - // logx.WithContext(ctx).Error(msg.Subject, string(msg.Data), err) - // return - // } - // err = handle(ctx).Publish(ele) - // logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), - // TopicDevPublishAll, msg.Subject, string(msg.Data), err) - //}, nats.Durable(ThingsQueueConsumeName+"2"), nats.BindStream(ThingsStreamName)) - //if err != nil { - // return err - //} - _, err := n.client.QueueSubscribe(TopicDevConnected, ThingsDeliverGroup, func(msg *nats.Msg) { - msg.Ack() - emsg := events.GetEventMsg(msg.Data) - if emsg == nil { - logx.Error(msg.Subject, string(msg.Data)) - return - } - ctx := emsg.GetCtx() - ele, err := device.GetDevConnMsg(ctx, emsg.GetData()) - if err != nil { - logx.WithContext(ctx).Error(msg.Subject, string(msg.Data), err) - return - } - err = handle(ctx).Connected(ele) - logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), - TopicDevConnected, msg.Subject, string(msg.Data), err) - }, nats.Durable(ThingsQueueConsumeName), nats.BindStream(ThingsStreamName)) + _, err := n.client.QueueSubscribe(TopicDevPublishAll, ThingsDeliverGroup, + events.NatsSubscription(func(ctx context.Context, msg []byte) error { + ele, err := device.GetDevPublish(ctx, msg) + if err != nil { + return err + } + err = handle(ctx).Publish(ele) + return err + })) if err != nil { return err } - _, err = n.client.QueueSubscribe(TopicDevDisconnected, ThingsDeliverGroup, func(msg *nats.Msg) { - msg.Ack() - emsg := events.GetEventMsg(msg.Data) - if emsg == nil { - logx.Error(msg.Subject, string(msg.Data)) - return - } - ctx := emsg.GetCtx() - ele, err := device.GetDevConnMsg(ctx, emsg.GetData()) - if err != nil { - logx.WithContext(ctx).Error(msg.Subject, string(msg.Data), err) - return - } - err = handle(ctx).Disconnected(ele) - logx.WithContext(ctx).Infof("%s|topic:%v,subject:%v,data:%v,err:%v", utils.FuncName(), - TopicDevDisconnected, msg.Subject, string(msg.Data), err) - }, nats.Durable(ThingsQueueConsumeName), nats.BindStream(ThingsStreamName)) + _, err = n.client.QueueSubscribe(TopicDevConnected, ThingsDeliverGroup, + events.NatsSubscription(func(ctx context.Context, msg []byte) error { + ele, err := device.GetDevConnMsg(ctx, msg) + if err != nil { + return err + } + return handle(ctx).Connected(ele) + })) + if err != nil { + return err + } + _, err = n.client.QueueSubscribe(TopicDevDisconnected, ThingsDeliverGroup, + events.NatsSubscription(func(ctx context.Context, msg []byte) error { + ele, err := device.GetDevConnMsg(ctx, msg) + if err != nil { + return err + } + return handle(ctx).Disconnected(ele) + })) if err != nil { return err } -- Gitee