1 Star 0 Fork 1

tt/LoRaClusterLaunch

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
env_test.py 2.17 KB
一键复制 编辑 原始数据 按行查看 历史
zhangpengju 提交于 2023-06-15 10:32 . add service file
import serial
import time
import glob
import threading
import logging
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class EnvTest:
def __init__(self, command, time_sleep=1) -> None:
self.command=command+"\r\n"
self.time_sleep = time_sleep
def get_tty_path(self) -> list:
return glob.glob("/dev/ttyUSB*")
@staticmethod
def work(ttyUSB, command):
try:
ser = serial.Serial(ttyUSB, 115200)
time.sleep(2)
ser.write(command.encode())
n = 0
rec_num = 0
while True:
if n == 10:
logger.error(ttyUSB + "---" + "未接到二次返回值")
break
time.sleep(1)
r = ser.read_all()
if bool(r):
rec_num+=1
logger.info(ttyUSB + f"--{rec_num}--\n" + r.decode())
if rec_num == 2:
break
n+=1
except Exception as e:
logger.error(ttyUSB + "---" + str(e))
finally:
ser.close()
def start_threads(self):
logger.info("____START_______________________________________________________")
tty_paths = self.get_tty_path()
threads = []
# 根据 tty_paths 列表的个数开启对应数量的线程
for i, tty_path in enumerate(tty_paths):
thread = threading.Thread(target=self.work, args=(tty_path, self.command))
thread.start()
threads.append(thread)
time.sleep(self.time_sleep) # 线程启动间隔
# 等待所有线程执行完毕
for thread in threads:
thread.join()
logger.info("_____________________________________________________END______")
if __name__ == "__main__":
import sys
args = sys.argv
e = EnvTest(args[1], 0.5) # 第二个参数为线程启动时间间隔
#e.work("/dev/ttyUSB0", "AT+ADDR=?\r\n")
x = e.start_threads()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/tt9090/lo-ra-cluster-launch.git
git@gitee.com:tt9090/lo-ra-cluster-launch.git
tt9090
lo-ra-cluster-launch
LoRaClusterLaunch
master

搜索帮助