代码拉取完成,页面将自动刷新
同步操作将从 zhangpengju/LoRaClusterLaunch 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
import serial
import time
import glob
import threading
import logging
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class EnvTest:
def __init__(self, command, time_sleep=1) -> None:
self.command=command+"\r\n"
self.time_sleep = time_sleep
def get_tty_path(self) -> list:
return glob.glob("/dev/ttyUSB*")
@staticmethod
def work(ttyUSB, command):
try:
ser = serial.Serial(ttyUSB, 115200)
time.sleep(2)
ser.write(command.encode())
n = 0
rec_num = 0
while True:
if n == 10:
logger.error(ttyUSB + "---" + "未接到二次返回值")
break
time.sleep(1)
r = ser.read_all()
if bool(r):
rec_num+=1
logger.info(ttyUSB + f"--{rec_num}--\n" + r.decode())
if rec_num == 2:
break
n+=1
except Exception as e:
logger.error(ttyUSB + "---" + str(e))
finally:
ser.close()
def start_threads(self):
logger.info("____START_______________________________________________________")
tty_paths = self.get_tty_path()
threads = []
# 根据 tty_paths 列表的个数开启对应数量的线程
for i, tty_path in enumerate(tty_paths):
thread = threading.Thread(target=self.work, args=(tty_path, self.command))
thread.start()
threads.append(thread)
time.sleep(self.time_sleep) # 线程启动间隔
# 等待所有线程执行完毕
for thread in threads:
thread.join()
logger.info("_____________________________________________________END______")
if __name__ == "__main__":
import sys
args = sys.argv
e = EnvTest(args[1], 0.5) # 第二个参数为线程启动时间间隔
#e.work("/dev/ttyUSB0", "AT+ADDR=?\r\n")
x = e.start_threads()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。