1 Star 0 Fork 6

shiyang/modules

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
quecthing.py 28.10 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768
# Copyright (c) Quectel Wireless Solution, Co., Ltd.All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uos
import usys
import uzlib
import ql_fs
import ujson
import utime
import osTimer
import uhashlib
import ubinascii
import app_fota_download
from misc import Power
from queue import Queue
from usr.modules.logging import getLogger
from usr.modules.common import CloudObservable, CloudObjectModel
try:
import quecIot
except ImportError:
quecIot = None
log = getLogger(__name__)
EVENT_CODE = {
1: {
10200: "Device authentication succeeded.",
10420: "Bad request data (connection failed).",
10422: "Device authenticated (connection failed).",
10423: "No product information found (connection failed).",
10424: "PAYLOAD parsing failed (connection failed).",
10425: "Signature verification failed (connection failed).",
10426: "Bad authentication version (connection failed).",
10427: "Invalid hash information (connection failed).",
10430: "PK changed (connection failed).",
10431: "Invalid DK (connection failed).",
10432: "PK does not match authentication version (connection failed).",
10450: "Device internal error (connection failed).",
10466: "Boot server address not found (connection failed).",
10500: "Device authentication failed (an unknown exception occurred in the system).",
10300: "Other errors.",
},
2: {
10200: "Access is successful.",
10430: "Incorrect device key (connection failed).",
10431: "Device is disabled (connection failed).",
10450: "Device internal error (connection failed).",
10471: "Implementation version not supported (connection failed).",
10473: "Abnormal access heartbeat (connection timed out).",
10474: "Network exception (connection timed out).",
10475: "Server changes.",
10476: "Abnormal connection to AP.",
10500: "Access failed (an unknown exception occurred in the system).",
},
3: {
10200: "Subscription succeeded.",
10300: "Subscription failed.",
},
4: {
10200: "Transparent data sent successfully.",
10210: "Object model data sent successfully.",
10220: "Positioning data sent successfully.",
10300: "Failed to send transparent data.",
10310: "Failed to send object model data.",
10320: "Failed to send positioning data.",
},
5: {
10200: "Receive transparent data.",
10210: "Receive data from the object model.",
10211: "Received object model query command.",
10473: "Received data but the length exceeds the module buffer limit, receive failed.",
10428: "The device receives too much buffer and causes current limit.",
},
6: {
10200: "Logout succeeded (disconnection succeeded).",
},
7: {
10700: "New OTA plain.",
10701: "The module starts to download.",
10702: "Package download.",
10703: "Package download complete.",
10704: "Package update.",
10705: "Firmware update complete.",
10706: "Failed to update firmware.",
10707: "Received confirmation broadcast.",
},
8: {
10428: "High-frequency messages on the device cause current throttling.",
10429: "Exceeds the number of activations per device or daily requests current limit.",
}
}
class QuecObjectModel(CloudObjectModel):
"""This class is queccloud object model
This class extend CloudObjectModel
Attribute:
events:
Attribute:
- object model event
- attribute value data format
{
"sos_alert": {
local_time: 0
}
}
properties:
Attribute:
- object model property
- attribute value data format
{
"energy": 0,
"power_switch": True
"phone_num": ""
}
id_code:
- queccloud object model id and name map
- data format
{
4: "energy",
9: "power_switch",
23: "phone_num"
}
code_id:
- queccloud object model id and name map
- data format
{
"energy": 4,
"power_switch": 9,
"phone_num": 23
}
struct_code_id:
- data format
{
"sos_alert": {
"local_time": 19
}
}
"""
def __init__(self, om_file="/usr/quec_object_model.json"):
super().__init__(om_file)
self.code_id = {}
self.id_code = {}
self.struct_code_id = {}
self.init()
def __init_value(self, om_type):
if om_type in ("int", "enum", "date"):
om_value = 0
elif om_type in ("float", "double"):
om_value = 0.0
elif om_type == "bool":
om_value = True
elif om_type == "text":
om_value = ""
elif om_type == "array":
om_value = []
elif om_type == "struct":
om_value = {}
else:
om_value = None
return om_value
def __get_property(self, om_item):
om_item_key = om_item["code"]
om_item_type = om_item["dataType"].lower()
om_item_val = self.__init_value(om_item_type)
self.id_code[om_item["id"]] = om_item["code"]
self.code_id[om_item["code"]] = om_item["id"]
if om_item_type == "struct":
om_item_struct = om_item["specs"]
om_item_val = {i["code"]: self.__init_value(i["dataType"].lower()) for i in om_item_struct}
self.struct_code_id[om_item["code"]] = {i["code"]: i["id"] for i in om_item_struct}
return om_item_key, om_item_val
def __init_properties(self, om_properties):
for om_property in om_properties:
om_property_key, om_property_val = self.__get_property(om_property)
setattr(self.properties, om_property_key, {om_property_key: om_property_val})
def __init_events(self, om_events):
for om_event in om_events:
om_event_key = om_event["code"]
om_event_out_put = om_event.get("outputData", [])
om_event_val = {}
self.id_code[om_event["id"]] = om_event["code"]
self.code_id[om_event["code"]] = om_event["id"]
if om_event_out_put:
for om_property in om_event_out_put:
property_id = int(om_property.get("$ref", "").split("/")[-1])
om_property_key = self.id_code.get(property_id)
om_event_val.update(getattr(self.properties, om_property_key))
setattr(self.events, om_event_key, {om_event_key: om_event_val})
def __init_services(self, om_services):
for om_service in om_services:
om_service_key = om_service["code"]
om_service_output = om_service.get("outputData", [])
om_service_input = om_service.get("inputData", [])
om_service_output_val = {}
om_service_input_val = {}
self.id_code[om_service["id"]] = om_service["code"]
self.code_id[om_service["code"]] = om_service["id"]
if om_service_output:
for om_property in om_service_output:
property_id = int(om_property.get("$ref", "").split("/")[-1])
om_property_key = self.id_code.get(property_id)
om_service_output_val.update(getattr(self.properties, om_property_key))
if om_service_input:
for om_property in om_service_input:
property_id = int(om_property.get("$ref", "").split("/")[-1])
om_property_key = self.id_code.get(property_id)
om_service_input_val.update(getattr(self.properties, om_property_key))
setattr(self.services, om_service_key, {"output": om_service_output_val, "input": om_service_input_val})
def init(self):
with open(self.om_file, "rb") as f:
cloud_object_model = ujson.load(f)
self.__init_properties(cloud_object_model.get("properties", []))
self.__init_events(cloud_object_model.get("events", []))
self.__init_services(cloud_object_model.get("services", []))
class QuecThing(CloudObservable):
"""This is a class for queccloud iot.
This class extend CloudObservable.
This class has the following functions:
1. Cloud connect and disconnect
2. Publish data to cloud
3. Monitor data from cloud by event callback
Run step:
1. cloud = QuecThing(pk, ps, dk, ds, server)
2. cloud.addObserver(RemoteSubscribe)
3. cloud.set_object_model(QuecObjectModel)
4. cloud.init()
5. cloud.post_data(data)
6. cloud.close()
"""
def __init__(self, pk, ps, dk, ds, server, life_time=120, mcu_name="", mcu_version="", mode=1):
"""
1. Init parent class CloudObservable
2. Init cloud connect params
"""
super().__init__()
self.__pk = pk
self.__ps = ps
self.__dk = dk
self.__ds = ds
self.__server = server
self.__life_time = life_time
self.__mcu_name = mcu_name
self.__mcu_version = mcu_version
self.__mode = mode
self.__object_model = None
self.__ota = QuecOTA()
self.__post_result_wait_queue = Queue(maxsize=16)
self.__quec_timer = osTimer()
def __rm_empty_data(self, data):
"""Remove post success data item from data"""
for k, v in data.items():
if not v:
del data[k]
def __quec_timer_cb(self, args):
"""osTimer callback to break waiting of get publish result"""
self.__put_post_res(False)
def __get_post_res(self):
"""Get publish result"""
self.__quec_timer.start(1000 * 10, 0, self.__quec_timer_cb)
res = self.__post_result_wait_queue.get()
self.__quec_timer.stop()
return res
def __put_post_res(self, res):
"""Save publish result to queue"""
if self.__post_result_wait_queue.size() >= 16:
self.__post_result_wait_queue.get()
self.__post_result_wait_queue.put(res)
def __data_format(self, k, v):
"""Publish data format by AliObjectModel
Parameter:
k: object model name
v: object model value
return:
{
"object_model_id": object_model_value
}
e.g.:
k:
"sos_alert"
v:
{"local_time": 1649995898000}
return data:
{
6: {
19: 1649995898000
}
}
"""
# log.debug("k: %s, v: %s" % (k, v))
k_id = None
struct_info = {}
if hasattr(self.__object_model.properties, k):
k_id = self.__object_model.code_id[k]
if self.__object_model.struct_code_id.get(k):
struct_info = self.__object_model.struct_code_id.get(k)
elif hasattr(self.__object_model.events, k):
k_id = self.__object_model.code_id[k]
event_struct_info = getattr(self.__object_model.events, k)
for i in event_struct_info[k].keys():
if isinstance(getattr(self.__object_model.properties, i).get(i), dict):
struct_info[i] = self.__object_model.struct_code_id.get(i, None)
else:
struct_info[i] = self.__object_model.code_id[i]
elif hasattr(self.__object_model.services, k):
k_id = self.__object_model.code_id[k]
service_struct_info = getattr(self.__object_model.services, k)
for i in service_struct_info["output"].keys():
if isinstance(getattr(self.__object_model.properties, i).get(i), dict):
struct_info[i] = self.__object_model.struct_code_id.get(i, None)
else:
struct_info[i] = self.__object_model.code_id[i]
else:
return False
log.debug("__data_format struct_info: %s" % str(struct_info))
if isinstance(v, dict):
nv = {}
for ik, iv in v.items():
if isinstance(struct_info.get(ik), int):
nv[struct_info[ik]] = iv
elif isinstance(struct_info.get(ik), dict):
if isinstance(iv, dict):
nv[self.__object_model.code_id[ik]] = {struct_info[ik][ivk]: ivv for ivk, ivv in iv.items()}
else:
nv[self.__object_model.code_id[ik]] = iv
else:
nv[ik] = iv
v = nv
return {k_id: v}
def __event_cb(self, data):
"""Queccloud downlink message callback
Parameter:
data: response dictionary info, all event info see `EVENT_CODE`
data format: (`event_code`, `errcode`, `event_data`)
- `event_code`: event code
- `errcode`: detail code
- `event_data`: event data info, data type: bytes or dict
"""
res_datas = []
event = data[0]
errcode = data[1]
eventdata = b""
if len(data) > 2:
eventdata = data[2]
log.info("[Event-ErrCode-Msg][%s][%s][%s] EventData[%s]" % (event, errcode, EVENT_CODE.get(event, {}).get(errcode, ""), eventdata))
if event == 4:
if errcode == 10200:
self.__put_post_res(True)
elif errcode == 10210:
self.__put_post_res(True)
elif errcode == 10220:
self.__put_post_res(True)
elif errcode == 10300:
self.__put_post_res(False)
elif errcode == 10310:
self.__put_post_res(False)
elif errcode == 10320:
self.__put_post_res(False)
elif event == 5:
if errcode == 10200:
# TODO: Data Type Passthrough (Not Support Now).
res_data = ("raw_data", eventdata)
res_datas.append(res_data)
elif errcode == 10210:
dl_data = []
for k, v in eventdata.items():
event_item = ()
if isinstance(v, bytes):
event_item = (self.__object_model.id_code.get(k, k), v.decode())
elif isinstance(v, dict):
event_item = (self.__object_model.id_code.get(k, k), {self.__object_model.id_code.get(v_k, v_k): v_v for v_k, v_v in v.items()})
else:
event_item = (self.__object_model.id_code.get(k, k), v)
if not hasattr(self.__object_model.services, self.__object_model.id_code.get(k, k)):
dl_data.append(event_item)
else:
dl_data.append(("thing_services", {"data": event_item[1], "service": event_item[0]}))
res_data = ("object_model", dl_data)
res_datas.append(res_data)
elif errcode == 10211:
# eventdata[0] is pkgId.
object_model_ids = eventdata[1]
object_model_val = [self.__object_model.id_code[i] for i in object_model_ids if self.__object_model.id_code.get(i)]
res_data = ("query", object_model_val)
res_datas.append(res_data)
elif event == 7:
if errcode == 10700:
if eventdata:
file_info = eval(eventdata)
log.info("OTA File Info: componentNo: %s, sourceVersion: %s, targetVersion: %s, "
"batteryLimit: %s, minSignalIntensity: %s, useSpace: %s" % file_info)
res_data = ("object_model", [("ota_status", (file_info[0], 1, file_info[2]))])
res_datas.append(res_data)
ota_cfg = {
"componentNo": file_info[0],
"sourceVersion": file_info[1],
"targetVersion": file_info[2],
"batteryLimit": file_info[3],
"minSignalIntensity": file_info[4],
"useSpace": file_info[5],
}
res_data = ("ota_plain", [("ota_cfg", ota_cfg)])
res_datas.append(res_data)
elif errcode == 10701:
res_data = ("object_model", [("ota_status", (None, 2, None))])
res_datas.append(res_data)
file_info = eval(eventdata)
ota_info = {
"componentNo": file_info[0],
"length": file_info[1],
"MD5": file_info[2],
}
self.__ota.set_ota_info(ota_info["length"], ota_info["MD5"])
elif errcode == 10702:
res_data = ("object_model", [("ota_status", (None, 2, None))])
res_datas.append(res_data)
elif errcode == 10703:
res_data = ("object_model", [("ota_status", (None, 2, None))])
res_datas.append(res_data)
file_info = eval(eventdata)
ota_info = {
"componentNo": file_info[0],
"length": file_info[1],
"startaddr": file_info[2],
"piece_length": file_info[3],
}
self.__ota.start_ota(ota_info["startaddr"], ota_info["piece_length"])
elif errcode == 10704:
res_data = ("object_model", [("ota_status", (None, 2, None))])
res_datas.append(res_data)
elif errcode == 10705:
res_data = ("object_model", [("ota_status", (None, 3, None))])
res_datas.append(res_data)
elif errcode == 10706:
res_data = ("object_model", [("ota_status", (None, 4, None))])
res_datas.append(res_data)
if res_datas:
for res_data in res_datas:
self.notifyObservers(self, *res_data)
def set_object_model(self, object_model):
"""Register QuecObjectModel to this class"""
if object_model and isinstance(object_model, QuecObjectModel):
self.__object_model = object_model
return True
return False
def init(self, enforce=False):
"""queccloud connect
Parameter:
enforce:
True: enfore cloud connect
False: check connect status, return True if cloud connected
Return:
Ture: Success
False: Failed
"""
log.debug(
"[init start] enforce: %s QuecThing Work State: %s, quecIot.getConnmode(): %s"
% (enforce, quecIot.getWorkState(), quecIot.getConnmode())
)
log.debug("[init start] PK: %s, PS: %s, DK: %s, DS: %s, SERVER: %s" % (self.__pk, self.__ps, self.__dk, self.__ds, self.__server))
if enforce is False:
if self.get_status():
return True
quecIot.init()
quecIot.setEventCB(self.__event_cb)
quecIot.setProductinfo(self.__pk, self.__ps)
if self.__dk or self.__ds:
quecIot.setDkDs(self.__dk, self.__ds)
quecIot.setServer(self.__mode, self.__server)
quecIot.setLifetime(self.__life_time)
quecIot.setMcuVersion(self.__mcu_name, self.__mcu_version)
quecIot.setConnmode(1)
count = 0
while quecIot.getWorkState() != 8 and count < 10:
utime.sleep_ms(200)
count += 1
if not self.__ds and self.__dk:
count = 0
while count < 3:
dkds = quecIot.getDkDs()
if dkds:
self.__dk, self.__ds = dkds
log.debug("dk: %s, ds: %s" % dkds)
break
count += 1
utime.sleep(count)
log.debug("[init over] QuecThing Work State: %s, quecIot.getConnmode(): %s" % (quecIot.getWorkState(), quecIot.getConnmode()))
if self.get_status():
return True
else:
return False
def close(self):
"""queccloud disconnect"""
return quecIot.setConnmode(0)
def get_status(self):
"""Get quectel cloud connect status
Return:
True -- connect success
False -- connect falied
"""
return True if quecIot.getWorkState() == 8 and quecIot.getConnmode() == 1 else False
def post_data(self, data):
"""Publish object model property, event
Parameter:
data format:
{
"phone_num": "123456789",
"energy": 100,
"gps": [
"$GNGGA,XXX"
"$GNVTG,XXX"
"$GNRMC,XXX"
],
}
Return:
Ture: Success
False: Failed
"""
res = True
# log.debug("post_data: %s" % str(data))
for k, v in data.items():
om_data = self.__data_format(k, v)
log.debug("post_data om_data: %s" % str(om_data))
if om_data is not False:
if v is not None:
phymodelReport_res = quecIot.phymodelReport(2, om_data)
if not phymodelReport_res:
res = False
break
else:
continue
elif k == "gps":
locReportOutside_res = quecIot.locReportOutside(v)
if not locReportOutside_res:
res = False
break
elif k == "non_gps":
locReportInside_res = quecIot.locReportInside(v)
if not locReportInside_res:
res = False
break
else:
v = {}
continue
res = self.__get_post_res()
if res:
v = {}
else:
res = False
break
self.__rm_empty_data(data)
return res
def device_report(self):
return quecIot.devInfoReport([i for i in range(1, 13)])
def ota_request(self, mp_mode=0):
"""Publish mcu and firmware ota plain request
Return:
Ture: Success
False: Failed
"""
return quecIot.otaRequest(mp_mode) if mp_mode in (0, 1) else False
def ota_action(self, action=1, module=None):
"""Publish ota upgrade start or cancel ota upgrade
Parameter:
action: confirm or cancel upgrade
- 0: cancel upgrade
- 1: confirm upgrade
module: useless
Return:
Ture: Success
False: Failed
"""
return quecIot.otaAction(action) if action in (0, 1, 2, 3) else False
class QuecOTA(object):
def __init__(self):
self.__ota_file = "/usr/sotaFile.tar.gz"
self.__updater_dir = "/usr/.updater/usr/"
self.__file_hash = uhashlib.md5()
self.__file_size = 0
self.__file_md5 = ""
self.__download_size = 0
def __write_ota_file(self, data):
with open(self.__ota_file, "ab+") as fp:
fp.write(data)
self.__file_hash.update(data)
def __get_file_size(self, data):
size = data.decode("ascii")
size = size.rstrip("\0")
if (len(size) == 0):
return 0
size = int(size, 8)
return size
def __get_file_name(self, name):
file_name = name.decode("ascii")
file_name = file_name.rstrip("\0")
return file_name
def __check_md5(self):
file_md5 = ubinascii.hexlify(self.__file_hash.digest()).decode("ascii")
log.debug("DMP Calc MD5 Value: %s, Device Calc MD5 Value: %s" % (self.__file_md5, file_md5))
if (self.__file_md5 != file_md5):
log.error("MD5 Verification Failed")
return False
log.debug("MD5 Verification Success.")
return True
def __download(self, start_addr, piece_size):
res = 2
readsize = 4096
while piece_size > 0:
readsize = readsize if readsize <= piece_size else piece_size
updateFile = quecIot.mcuFWDataRead(start_addr, readsize)
self.__write_ota_file(updateFile)
log.debug("Download File Size: %s" % readsize)
piece_size -= readsize
start_addr += readsize
self.__download_size += readsize
if (self.__download_size == self.__file_size):
log.debug("File Download Success, Update Start.")
res = 3
quecIot.otaAction(res)
break
else:
quecIot.otaAction(res)
return res
def __upgrade(self):
with open(self.__ota_file, "rb+") as ota_file:
ota_file.seek(10)
unzipFp = uzlib.DecompIO(ota_file, -15)
log.debug("[OTA Upgrade] Unzip file success.")
ql_fs.mkdirs(self.__updater_dir)
file_list = []
try:
while True:
data = unzipFp.read(0x200)
if not data:
log.debug("[OTA Upgrade] Read file size zore.")
break
size = self.__get_file_size(data[124:135])
file_name = self.__get_file_name(data[:100])
log.debug("[OTA Upgrade] File Name: %s, File Size: %s" % (file_name, size))
if not size:
if len(file_name):
log.debug("[OTA Upgrade] Create file: %s" % self.__updater_dir + file_name)
ql_fs.mkdirs(self.__updater_dir + file_name)
else:
log.debug("[OTA Upgrade] Have no file unzip.")
break
else:
log.debug("File %s write size %s" % (self.__updater_dir + file_name, size))
with open(self.__updater_dir + file_name, "wb+") as fp:
read_size = 0x200
last_size = size
while last_size > 0:
data = unzipFp.read(read_size)
write_size = read_size if read_size <= last_size else last_size
fp.write(data[:write_size])
last_size -= write_size
file_list.append({"file_name": "/usr/" + file_name, "size": size})
for file_name in file_list:
app_fota_download.update_download_stat("/usr/.updater" + file_name["file_name"], file_name["file_name"], file_name["size"])
log.debug("Remove %s" % self.__ota_file)
uos.remove(self.__ota_file)
app_fota_download.set_update_flag()
except Exception as e:
usys.print_exception(e)
return False
return True
def set_ota_info(self, size, md5):
self.__file_size = size
self.__file_md5 = md5
def start_ota(self, start_addr, piece_size):
ota_download_res = self.__download(start_addr, piece_size)
if ota_download_res == 3:
if self.__check_md5():
if self.__upgrade():
log.debug("File Update Success, Power Restart.")
else:
log.debug("File Update Failed, Power Restart.")
Power.powerRestart()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/shy1991/modules.git
git@gitee.com:shy1991/modules.git
shy1991
modules
modules
master

搜索帮助