代码拉取完成,页面将自动刷新
# Copyright (c) Quectel Wireless Solution, Co., Ltd.All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) Quectel Wireless Solution, Co., Ltd.All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from machine import UART
from machine import Timer
from queue import Queue
from usr.logging import getLogger
class Serial(object):
def __init__(self,
uart=2,
baudrate=115200,
databits=8,
parity=0,
stopbits=1,
flowctl=0,
rs485_direction_pin=""):
uart_port = getattr(UART, "UART%d" % int(uart))
self._uart = UART(uart_port, baudrate, databits, parity, stopbits, flowctl)
# init rs458 rx/tx pin
if rs485_direction_pin != "":
rs485_pin = getattr(UART, "GPIO%d" % int(rs485_direction_pin))
self._uart.control_485(rs485_pin, 0)
self._queue = Queue(maxsize=1)
self._timer = Timer(Timer.Timer1)
self._log = getLogger(__name__)
self._uart.set_callback(self._uart_cb)
self.log_enable(False)
self._log.debug("Uart Init Done!")
def _uart_cb(self, args):
self._log.debug("_uart_cb called with args:", args)
if self._queue.size() == 0:
self._log.debug("_uart_cb send a signal")
self._queue.put(None)
def _timer_cb(self, args):
self._log.debug("_timer_cb called with args:", args)
if self._queue.size() == 0:
self._log.debug("_timer_cb send a signal")
self._queue.put(None)
def log_enable(self, en):
if not isinstance(en, bool):
return False
if en:
self._log.set_level("debug")
else:
self._log.set_level("critical")
self._log.set_debug(en)
return True
def write(self, data):
self._uart.write(data)
def read(self, nbytes, timeout=0):
if nbytes == 0:
return ''
if self._uart.any() == 0 and timeout != 0:
timer_started = False
if timeout > 0: # < 0 for wait forever
self._log.debug("start a timeout timer:", timeout)
self._timer.start(period=timeout, mode=Timer.ONE_SHOT, callback=self._timer_cb)
timer_started = True
self._log.debug("wait for a signal")
self._queue.get()
if timer_started:
self._timer.stop()
r_data = self._uart.read(min(nbytes, self._uart.any())).decode()
if self._queue.size():
self._log.debug("clean an extra signal")
self._queue.get()
return r_data
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。