1 Star 0 Fork 7

zhangll/serial-monitor

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
AsyncDataReceiver.py 2.92 KB
一键复制 编辑 原始数据 按行查看 历史
devis 提交于 2021-09-23 10:41 . 完成串口转发功能
# -*- coding: UTF-8 -*-
# Copyright 2020 YangZhongxi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import os
from PyQt5 import QtCore
import time
import queue
from FileWriter import FileWriter
class AsyncDataReceiver(QtCore.QThread):
dataListenSignal = QtCore.pyqtSignal(str)
def __init__(self, filename, listener):
super(AsyncDataReceiver, self).__init__()
self.DataBuffer = []
self.isRun = False
self.dataQueue = queue.Queue() # 队列,用于存储实时数据
self.mutex = QtCore.QMutex()
self.writer = FileWriter(filename)
self.dataListenSignal.connect(listener)
def Start(self):
if self.isRun:
return False
self.isRun = True
if not self.writer.open():
self.isRun = False
print("打开文件失败")
return False
self.start() # 开启线程
return True
def Stop(self):
self.isRun = False # 将纯种标志位设为false,以便线程退出循环
self.wait(1000)
def AddData(self, port, data): # 添加数据, 因为dataQueue是多线程访问,需要加锁
"""
添加一条数据
:param port: 串口名
:param data: 数据内容
:return: 无
"""
d = (time.time(), port, data) # 串口数据:时间,串口号,字节数组
self.mutex.lock() # 加锁
self.dataQueue.put(d) # 将数据放到队列中
self.mutex.unlock() # 解锁
def onWork(self): # 侦听业务处理函数,将队列中的所有数据都存入文件,并将数据通过dataListener发送出去
message = ""
while self.dataQueue.qsize() > 0: # 将队列中所有数据组成一个字符串,通过信号发射出去
data = self.dataQueue.get()
message += time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(data[0])) + " [" + data[1] + "] " + data[2].hex() + "\n"
if len(message) > 0:
message = message.upper()
self.writer.write(message)
self.dataListenSignal.emit(message) # 将字符串数据抛出
message = ""
def run(self):
print("启动线程")
while self.isRun:
time.sleep(0.1)
self.mutex.lock() # 加锁
self.onWork()
self.mutex.unlock() # 解锁
print("线程退出")
def __del__(self):
print("线程销毁")
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/zhangll990/serial-monitor.git
git@gitee.com:zhangll990/serial-monitor.git
zhangll990
serial-monitor
serial-monitor
master

搜索帮助