1 Star 0 Fork 0

dustin.wei/tianyi

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
serial.py 3.64 KB
一键复制 编辑 原始数据 按行查看 历史
Dustin Wei(韦伟) 提交于 2024-08-30 10:22 . first commit
# Copyright (c) Quectel Wireless Solution, Co., Ltd.All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) Quectel Wireless Solution, Co., Ltd.All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from machine import UART
from machine import Timer
from queue import Queue
from usr.logging import getLogger
class Serial(object):
def __init__(self,
uart=2,
baudrate=115200,
databits=8,
parity=0,
stopbits=1,
flowctl=0,
rs485_direction_pin=""):
uart_port = getattr(UART, "UART%d" % int(uart))
self._uart = UART(uart_port, baudrate, databits, parity, stopbits, flowctl)
# init rs458 rx/tx pin
if rs485_direction_pin != "":
rs485_pin = getattr(UART, "GPIO%d" % int(rs485_direction_pin))
self._uart.control_485(rs485_pin, 0)
self._queue = Queue(maxsize=1)
self._timer = Timer(Timer.Timer1)
self._log = getLogger(__name__)
self._uart.set_callback(self._uart_cb)
self.log_enable(False)
self._log.debug("Uart Init Done!")
def _uart_cb(self, args):
self._log.debug("_uart_cb called with args:", args)
if self._queue.size() == 0:
self._log.debug("_uart_cb send a signal")
self._queue.put(None)
def _timer_cb(self, args):
self._log.debug("_timer_cb called with args:", args)
if self._queue.size() == 0:
self._log.debug("_timer_cb send a signal")
self._queue.put(None)
def log_enable(self, en):
if not isinstance(en, bool):
return False
if en:
self._log.set_level("debug")
else:
self._log.set_level("critical")
self._log.set_debug(en)
return True
def write(self, data):
self._uart.write(data)
def read(self, nbytes, timeout=0):
if nbytes == 0:
return ''
if self._uart.any() == 0 and timeout != 0:
timer_started = False
if timeout > 0: # < 0 for wait forever
self._log.debug("start a timeout timer:", timeout)
self._timer.start(period=timeout, mode=Timer.ONE_SHOT, callback=self._timer_cb)
timer_started = True
self._log.debug("wait for a signal")
self._queue.get()
if timer_started:
self._timer.stop()
r_data = self._uart.read(min(nbytes, self._uart.any())).decode()
if self._queue.size():
self._log.debug("clean an extra signal")
self._queue.get()
return r_data
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/dustin-wei/tianyi.git
git@gitee.com:dustin-wei/tianyi.git
dustin-wei
tianyi
tianyi
master

搜索帮助