1 Star 1 Fork 0

xujia/opcda2mqtt

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
MQTTTunnel.py 1.87 KB
一键复制 编辑 原始数据 按行查看 历史
想不出来 提交于 2024-12-16 10:48 . 初始化
# -*- coding: utf-8 -*-
import logging
import paho.mqtt.client as mqtt
from functools import partial
import json
class MQTTTunnel:
def __init__(self):
self.mqtt_client = mqtt.Client()
self.globalStatus = None
self.topic_to_subscribe = None
def on_connect(self,client, userdata, flags, rc):
if rc == 0:
self.globalStatus.isMQTTConnected = True
self.globalStatus.RunStatus = True
# logging.debug("Connected with result code " + str(rc))
# if self.topic_to_subscribe:
# client.subscribe(self.topic_to_subscribe)
else:
logging.error("Failed to connect, return code " + str(rc))
def on_message(client, userdata, msg):
payload_str =msg.payload.decode('utf-8')
logging.debug(msg.topic+" "+payload_str)
def publish(self,topic,tag, data):
logging.info(tag+":"+str(data))
if len(data) == 4:
dict_list = {"Tag": tag, "Value": data[1], "Time": data[3]}
elif len(data) == 3:
dict_list = {"Tag": tag, "Value": data[0], "Time": data[2]}
json_data = json.dumps(dict_list, indent=3)
result = self.mqtt_client.publish(topic, json_data)
if result.rc != 0:
logging.error("Failed to publish data to MQTT Broker.")
def connect(self,config,globalStatus):
self.globalStatus = globalStatus
self.topic_to_subscribe = config["mqtttopic"]
self.mqtt_client.username_pw_set(config["mqttuser"], config["mqttpassword"])
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.connect(config["mqttserver"], 1883, 60)
self.mqtt_client.loop_start()
def disconnect(self,globalStatus):
globalStatus.isMQTTConnected = False
globalStatus.RunStatus= False
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/x-max/opcda2mqtt.git
git@gitee.com:x-max/opcda2mqtt.git
x-max
opcda2mqtt
opcda2mqtt
master

搜索帮助