1 Star 0 Fork 0

laich/pymqttClient

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
dataApi.py 4.17 KB
一键复制 编辑 原始数据 按行查看 历史
laich 提交于 2023-08-04 14:14 . fix bug 01
# -*- coding: utf-8 -*-
# time: 2023/8/2 10:12
# author: hanson
import requests, os
import tkinter as tk
import json
import hashlib, urllib.parse
from configparser import ConfigParser
from time import strftime, localtime,time
"""
json_datas = json.loads(res.text) # string change json
# 将 JSON 数据格式化
formatted_data = json.dumps(json_datas, indent=4)
"""
# 1、实例化
conf = ConfigParser()
def write_file():
conf.write(open('./config.ini', 'w'))
if not os.path.exists('./config.ini'):
conf['localdb'] = {'host': '', 'port': '', "username": "", "password": "", "clientid": ""}
write_file()
def md5(strs):
return hashlib.md5(strs.encode(encoding='utf-8')).hexdigest()
def getEncode(strs, type):
try:
res = requests.post(url='http://192.168.6.43:8080/boke/device/mqttAPI',
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={"jsonStr": strs, "type": type})
json_datas = json.loads(res.text)
return str(json_datas['data'])
except requests.exceptions.ConnectionError:
tk.messagebox.showinfo(title='http getEncode', message='getEncode http 连接失败')
def getTriJson(cmd, id, clientId, secret):
try:
strs = '{"cmd":"' + cmd + '","clientId": "' + clientId + '","secret": "' + md5(secret) + '","id":"' + id + '"}';
es = getEncode(strs, 1)
result = requests.post(url='http://192.168.6.43:8080/boke/device/triggerCmdData',
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={"jsonStr": es})
return result.text
except requests.exceptions.ConnectionError:
tk.messagebox.showinfo(title='http getTriJson', message='getTriJson http 连接失败')
return ""
def backTriJson(cmd, id, clientId, secret, state):
try:
strs = '{"backCmd":"' + cmd + '","clientId": "' + clientId + '","secret": "' + md5(
secret) + '","id":"' + id + '","state":' + str(state) + '}';
es = getEncode(strs, 1)
result = requests.post(url='http://192.168.6.43:8080/boke/device/deviceBackCmd',
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={"jsonStr": es})
return result.text
except requests.exceptions.ConnectionError:
tk.messagebox.showinfo(title='http getTriJson', message='getTriJson http 连接失败')
def make_device():
try:
jsonStr = '{"client":"bk2023010188","keyA":"boke","brand":"bk","num":1,"req":"'+str(time())+'"}';
es = getEncode(jsonStr, 1)
result = requests.post(url='http://192.168.6.43:8080/boke/device/make',
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={"jsonStr": es})
return result.text
except requests.exceptions.ConnectionError:
tk.messagebox.showinfo(title='http getTriJson', message='getTriJson http 连接失败')
def url_enc(s):
return urllib.parse.quote(s)
def url_dec(s):
return urllib.parse.unquote(s)
def checkResult(strs):
sjson = json.loads(strs)
if sjson['code'] == 400000:
tk.messagebox.showinfo(title='请求失败', message="" + sjson["msg"])
return 0, json.dumps(sjson["msg"])
else:
return 1, json.dumps(sjson["data"])
def formatJson(sjson):
return json.dumps(json.loads(sjson), indent=4)
# conf.get("localdb", "password")
def readConfig():
# 2、将文件中的数据读取
conf.read("./config.ini", encoding="utf-8")
# 创建一个字典
dict_ = {}
dict_["host"] = conf.get("localdb", "host")
dict_["port"] = conf.get("localdb", "port")
dict_["username"] = conf.get("localdb", "username")
dict_["password"] = conf.get("localdb", "password")
dict_["clientid"] = conf.get("localdb", "clientid")
return dict_
def writeConfig(dicts):
conf.read("./config.ini", encoding="utf-8")
# 循环字典
for key, value in dicts.items():
conf.set("localdb", key, value)
conf.write(open("./config.ini", 'r+'))
def getTimeStr():
return strftime("%Y-%m-%d %H:%M:%S", localtime())
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/laich/pymqtt-client.git
git@gitee.com:laich/pymqtt-client.git
laich
pymqtt-client
pymqttClient
master

搜索帮助