2 Star 3 Fork 0

夏令/DearSH

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
main.py 20.62 KB
一键复制 编辑 原始数据 按行查看 历史
夏令 提交于 2023-03-01 00:32 . init
import binascii
import datetime
from math import sin
import time
import serial
import threading
import dearpygui.dearpygui as dpg
import serial.tools.list_ports
class signal:
def __init__(self, value: int) -> None:
self.value = value
# 描述/ 别名
self.desc = "未定义描述"
# 位数,用于归一化
self.bits = 10
# 用于监视选择
self.check = False
have_table = False
ser: serial.Serial = serial.Serial()
devices: list = []
serial_ports: list[str] = [i.device for i in serial.tools.list_ports.comports()]
ser_buffer: str = ""
rx_buffer: str = ""
tx_buffer: str = ""
rx_data: list = []
rx_data_list: list[list[signal]] = []
sig_data: list = []
sig_data_list: list = []
#######################################
class Log:
LEVEL = {
0: "DEBUG",
1: "DEBUG",
2: "ERROR"
}
def __init__(self, level, msg, model) -> None:
self.level: str = level,
self.msg: str = msg,
self.model = model,
self.time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_info_list: list[Log] = []
log: str = ""
log_info_list_lens_buffer = 0
def show_log():
with dpg.window(label="日志", pos=(50, 50), width=350, height=100):
with dpg.group(horizontal=True, horizontal_spacing=20):
dpg.add_combo(["csv", "json", "xls", "txt"], default_value="json", width=100)
dpg.add_button(label="导出")
dpg.add_button(label="刷新")
with dpg.table(header_row=True, resizable=True, policy=dpg.mvTable_SizingStretchProp,
borders_outerH=True, borders_innerV=True, borders_innerH=True, borders_outerV=True) as log_table:
dpg.add_table_column(label="级别")
dpg.add_table_column(label="模块")
dpg.add_table_column(label="时间")
dpg.add_table_column(label="信息")
# dpg.add_table_column(label="Header 3")
for i in log_info_list:
with dpg.table_row():
with dpg.table_cell():
dpg.add_text(default_value=i.level)
with dpg.table_cell():
dpg.add_text(default_value=i.model)
with dpg.table_cell():
dpg.add_text(default_value=i.time)
with dpg.table_cell():
dpg.add_text(default_value=i.msg)
dpg.create_context()
dpg.create_viewport(title='DearSH', width=1200, height=700)
def logging(level):
def outwrapper(func):
def wrapper(sender, app_data, user_data):
start = time.time()
func(sender, app_data, user_data)
end = time.time()
log = Log(level=level, msg=f"run_time: {end - start : .2f}s sender:{sender} , app_data:{app_data} , user_data:{user_data}", model=func.__name__)
log_info_list.append(log)
print(log.msg)
return wrapper
return outwrapper
@logging(Log.LEVEL[0])
def get_ports(sender, app_data, user_data):
serial_ports = [i.device for i in serial.tools.list_ports.comports()]
dpg.configure_item("port_name_combo", items=serial_ports)
dpg.set_value("port_name_combo", serial_ports[0] if len(serial_ports) > 0 else "")
@logging(Log.LEVEL[0])
def connect(sender, app_data, user_data):
ser.port = dpg.get_value("port_name_combo")
ser.baudrate = dpg.get_value("baudrate_combo")
if ser.is_open and ser.port != "":
ser.close()
dpg.set_item_label(sender, "连接")
else:
try:
ser.open()
dpg.set_item_label(sender, "断开连接")
except:
print('An exception occurred')
with dpg.window(pos=(200, 200), modal=True, popup=True) as msgbox:
with dpg.group(horizontal=True, horizontal_spacing=20):
dpg.add_loading_indicator()
with dpg.group():
dpg.add_text('Error !', color=(255, 0, 0))
dpg.add_text('串口连接失败\r\n\r\n请检查设备是否正常连接或端口被其他应用占用')
@logging(Log.LEVEL[0])
def clear_rx_text(sender, app_data, user_data):
global rx_buffer
rx_buffer = ""
dpg.set_value("rx_text", rx_buffer)
@logging(Log.LEVEL[0])
def clear_tx_text(sender, app_data, user_data):
global tx_buffer
tx_buffer = ""
dpg.set_value("tx_text", tx_buffer)
def tx_change_decode(sender, app_data, user_data):
pass
def rx_change_decode(sender, app_data, user_data):
pass
# 循环接收数据
# def watch_rx(sender , app_data , user_data):
# watch_window()
# creating data
sindatax = []
sindatay = []
for i in range(0, 500):
sindatax.append(i / 1000)
sindatay.append(0.5 + 0.5 * sin(50 * i / 1000))
@logging(Log.LEVEL[0])
def on_send_serial(sender, app_data, user_data):
global tx_buffer
end_text = r"\r\n" if dpg.get_value("add_newline_checkbox") else ""
main_text = dpg.get_value("send_text")
full_text = f"{main_text}{end_text}"
if ser.is_open:
ser.write(bytes(full_text.encode()))
tx_buffer = full_text + tx_buffer
dpg.set_value(item="send_text", value="")
dpg.set_value(item="tx_text", value=tx_buffer)
# def watch_window():
# with dpg.window(label="watch",pos=(50,50) ,width=600 , height= 400):
# with dpg.group(horizontal=True):
# with dpg.child_window(width= 400):
# with dpg.table(header_row=True, resizable=True, policy=dpg.mvTable_SizingStretchProp,
# borders_outerH=True, borders_innerV=True, borders_innerH=True, borders_outerV=True):
# dpg.add_table_column(label="index")
# dpg.add_table_column(label="watch")
# # dpg.add_table_column(label="Header 3")
# # 数据栈 长度200
# data = [signal(i) for i in range(0,100)]
# data_set = []
# for i in data:
# with dpg.table_row():
# with dpg.table_cell():
# dpg.add_progress_bar(default_value= i.value/i.bits*10 ,width=-1 ,overlay=f"value:{i.value}")
# with dpg.table_cell():
# dpg.add_text(default_value= i.value)
# with dpg.table_cell():
# dpg.add_input_int(default_value= i.bits , min_value=2 , max_value=10)
# with dpg.child_window():
# # create plot
# with dpg.plot(label="Line Series"):
# # optionally create legend
# dpg.add_plot_legend()
# # REQUIRED: create x and y axes
# dpg.add_plot_axis(dpg.mvXAxis, label="x")
# dpg.add_plot_axis(dpg.mvYAxis, label="y", tag="y_axis")
# # series belong to a y axis
# dpg.add_line_series(sindatax, sindatay, label="0.5 + 0.5 * sin(x)", parent="y_axis")
# for i in rx_data:
# dpg.add_line_series(y =i.value , x= sindatax , label= i.desc )
def set_bits(sender, app_data, user_data):
user_data.bits = app_data
def set_check(sender, app_data, user_data):
user_data.check = app_data
def set_desc(sender, app_data, user_data):
user_data.desc = app_data
# 实时循环
def realtime_update_data():
global rx_buffer, rx_data, rx_data_list, have_table, sig_data, sig_data_list
while True:
if ser.is_open:
try:
ser_buffer = str(ser.readline(), encoding="utf-8")
rx_buffer = rx_buffer + ser_buffer
# 按照编码方式更新到接收区页面
if dpg.get_value("rx_change_decode") == "Ascii":
# rx_buffer = binascii.unhexlify(rx_buffer.encode())
pass
else:
# rx_buffer = binascii.hexlify(rx_buffer.encode())
rx_buffer = bytes(rx_buffer,encoding="utf-8").hex()
print(rx_buffer)
dpg.set_value("rx_text", rx_buffer)
except:
print("is_error")
# 解析成数据集
if ser_buffer.startswith(str(dpg.get_value("start_flag"))):
rx_data_buffer = ser_buffer.lstrip(dpg.get_value("start_flag"))
rx_data_buffer = rx_data_buffer.rstrip(dpg.get_value("end_flag"))
rx_data = [int(i) for i in rx_data_buffer.split(dpg.get_value("split_flag"))]
# rx_data_list.append(rx_data)
# dpg.delete_item(item="rx_table" ,children_only=True)
if len(sig_data_list) > 200:
sig_data_list.pop(0)
sig_data_list.append(rx_data)
if have_table == False:
sig_data = [signal(i) for i in rx_data]
with dpg.table(header_row=True, resizable=True, policy=dpg.mvTable_SizingStretchProp,
borders_outerH=True, borders_innerV=True, borders_innerH=True, borders_outerV=True, parent="rx_table"):
dpg.add_table_column(label="数据")
dpg.add_table_column(label="数据位")
dpg.add_table_column(label="监视")
dpg.add_table_column(label="描述")
# data_set = []
index = 0
for i in sig_data:
with dpg.table_row():
with dpg.table_cell():
# dpg.add_progress_bar(default_value= i.value/((i.bits**2)-1) ,tag=f"a{index}")
dpg.add_slider_int(default_value=i.value, max_value=2**i.bits-1, tag=f"a{index}", width=-1)
with dpg.tooltip(f"a{index}"):
dpg.add_simple_plot(default_value=(0.3, 0.9, 0.5, 0.3), height=100, width=300, tag=f"p{index}")
with dpg.table_cell():
dpg.add_input_int(default_value=i.bits, min_value=1, max_value=10, tag=f"c{index}", user_data=i, callback=set_bits)
with dpg.table_cell():
dpg.add_checkbox(default_value=i.check, tag=f"b{index}", user_data=i, callback=set_check)
with dpg.table_cell():
dpg.add_input_text(default_value=i.desc, tag=f"d{index}", user_data=i, callback=set_desc, width=-1)
index += 1
have_table = True
else:
index = 0
for i, j in zip(sig_data, rx_data):
i.value = j
dpg.set_value(f"a{index}", i.value)
dpg.configure_item(f"a{index}", max_value=2**i.bits-1)
# dpg.set_value(f"b{index}",i.value)
dpg.set_value(f"c{index}", i.bits)
dpg.set_value(f"b{index}", i.check)
dpg.set_value(f"d{index}", i.desc)
# append new number into simple_polt
dpg.set_value(f"p{index}", [k[index] for k in sig_data_list])
index += 1
# 注册主题样式
with dpg.theme() as global_theme:
with dpg.theme_component(dpg.mvAll):
# dpg.add_theme_color(dpg.mvThemeCol_FrameBg, (25, 14, 23), category=dpg.mvThemeCat_Core)
dpg.add_theme_style(dpg.mvStyleVar_FrameRounding, 10, category=dpg.mvThemeCat_Core)
with dpg.theme_component(dpg.mvInputInt):
# dpg.add_theme_color(dpg.mvThemeCol_FrameBg, (14, 25, 23), category=dpg.mvThemeCat_Core)
dpg.add_theme_style(dpg.mvStyleVar_FrameRounding, 15, category=dpg.mvThemeCat_Core)
dpg.bind_theme(global_theme)
def print_me(args):
pass
# 注册字体
with dpg.font_registry():
with dpg.font("VonwaonBitmap-12px.ttf", 12) as font1:
# add the default font range
dpg.add_font_range_hint(dpg.mvFontRangeHint_Default)
dpg.add_font_range_hint(dpg.mvFontRangeHint_Chinese_Full)
dpg.add_font_range_hint(dpg.mvFontRangeHint_Chinese_Simplified_Common)
# helper to add range of characters
# Options:
# mvFontRangeHint_Japanese
# mvFontRangeHint_Korean
# mvFontRangeHint_Chinese_Full
# mvFontRangeHint_Chinese_Simplified_Common
# mvFontRangeHint_Cyrillic
# mvFontRangeHint_Thai
# mvFontRangeHint_Vietnamese
dpg.add_font_range_hint(dpg.mvFontRangeHint_Japanese)
# add specific range of glyphs
dpg.add_font_range(0x3100, 0x3ff0)
# add specific glyphs
dpg.add_font_chars([0x3105, 0x3107, 0x3108])
# remap や to %
dpg.add_char_remap(0x3084, 0x0025)
with dpg.viewport_menu_bar():
with dpg.menu(label="文件"):
dpg.add_menu_item(label="Save", callback=print_me)
dpg.add_menu_item(label="Save As", callback=print_me)
with dpg.menu(label="Settings"):
dpg.add_menu_item(label="Setting 1", callback=print_me, check=True)
dpg.add_menu_item(label="Setting 2", callback=print_me)
dpg.add_menu_item(label="帮助", callback=print_me)
with dpg.menu(label="Widget Items"):
dpg.add_checkbox(label="Pick Me", callback=print_me)
dpg.add_button(label="Press Me", callback=print_me)
dpg.add_color_picker(label="Color Me", callback=print_me)
# with dpg.menu(label="工具"):
# dpg.add_menu_item( label="Watch" , callback= watch_rx)
dpg.add_menu_item(label="日志", callback=show_log)
with dpg.window(label="Window", tag="primary_window"):
dpg.bind_font(font1)
with dpg.child_window(tag="state_bar", border=False):
dpg.add_progress_bar(default_value=0.02, width=-1, height=5, tag="progress_bar")
with dpg.child_window(label="main_winow", height=-5, tag="main_window", before="state_bar"):
with dpg.group(horizontal=True):
with dpg.child_window(label="left_winow", width=300, tag="left_winow"):
with dpg.collapsing_header(label="连接选项", default_open=True):
with dpg.group(indent=20):
dpg.add_button(callback=get_ports, label="搜索", width=80)
dpg.add_combo(items=serial_ports, default_value=serial_ports[0] if len(serial_ports) > 0 else "", label="端口名", tag="port_name_combo")
dpg.add_combo(items=serial.Serial.BAUDRATES, label="波特率", default_value=9600, tag="baudrate_combo")
dpg.add_combo(items=list(serial.PARITY_NAMES.values()), label="校验位", default_value="None")
# dpg.add_combo(items=serial.Serial.BYTESIZES ,label="bytes_ize" ,default_value=8)
dpg.add_slider_int(label="数据位", default_value=8, min_value=5, max_value=8)
dpg.add_combo(items=serial.Serial.STOPBITS, label="停止位", default_value=1)
dpg.add_button(callback=connect, label="连接", width=-1)
with dpg.collapsing_header(label="接收选项", default_open=True):
with dpg.group(indent=20):
dpg.add_radio_button(("Ascii", "Hex"), tag="rx_change_decode" , callback=rx_change_decode, horizontal=True, default_value="Ascii")
dpg.add_checkbox(label="自动换行")
dpg.add_checkbox(label="显示时间")
dpg.add_checkbox(label="自动保存")
dpg.add_checkbox(label="自动回复")
dpg.add_slider_float(label="delay_sce", width=-1, format=f"延迟 %.3f s", max_value=5, default_value=0.5)
dpg.add_combo(items=["", "^"], label="起始符", default_value="^", tag="start_flag")
dpg.add_combo(items=[" ", ","], label="分隔符", default_value=",", tag="split_flag")
dpg.add_combo(items=["\r\n", "\n"], label="结束符", default_value="\r\n", tag="end_flag")
dpg.add_button(label="清理接收", width=-1, callback=clear_rx_text)
with dpg.collapsing_header(label="发送选项", default_open=True):
with dpg.group(indent=20):
dpg.add_radio_button(("Ascii", "Hex"), tag="tx_change_decode" ,callback=tx_change_decode, horizontal=True, default_value="Ascii")
dpg.add_checkbox(label="自动计数")
dpg.add_checkbox(label="附加换行符", tag="add_newline_checkbox", default_value=True)
dpg.add_checkbox(label="重复发送")
dpg.add_slider_float(label="发送间隔", width=-1, format=f"间隔 %.3f s", max_value=5, default_value=0.5)
dpg.add_button(label="清理发送", width=-1, callback=clear_tx_text)
with dpg.child_window(label="right_winow", tag="right_winow", width=500):
with dpg.collapsing_header(label="发送", default_open=True):
with dpg.child_window(label="tx_winow", tag="tx_winow", height=200):
with dpg.group(horizontal=True, tag="send_group"):
dpg.add_input_text(width=-60, tag="send_text")
with dpg.tooltip("send_text"):
dpg.add_text("指令说明:\r\n\r\n- 第一位:D = 0 , E = 1\r\n- 第二位 0-9 数字\r\n- 第三位及以后为 参数数值\r\n\r\n- 11号引脚置为1 [ E11 ]\r\n- 7号引脚置为0 [ D70 ]")
dpg.add_button(label="发送", callback=on_send_serial, width=40)
with dpg.child_window():
dpg.add_text(default_value=ser_buffer, label="tx", tag="tx_text", tracked=True, track_offset=1)
with dpg.collapsing_header(label="接收", default_open=True):
with dpg.child_window(label="rx_winow", tag="rx_winow"):
with dpg.child_window():
dpg.add_text(default_value=ser_buffer, label="接收", tag="rx_text", tracked=True, track_offset=1)
with dpg.child_window():
with dpg.collapsing_header(label="接收表视图", default_open=True, tag="rx_table"):
pass
with dpg.collapsing_header(label="示波器", default_open=True, tag="rx_plot"):
sindatax = []
sindatay = []
for i in range(0, 100):
sindatax.append(i / 100)
sindatay.append(0.5 + 0.5 * sin(50 * i / 100))
with dpg.plot(label="Multi Axes Plot", width=-1):
dpg.add_plot_legend()
# create x axis
dpg.add_plot_axis(dpg.mvXAxis, label="x")
# create y axis 1
dpg.add_plot_axis(dpg.mvYAxis, label="y1")
dpg.add_line_series(sindatax, sindatay, label="y1 lines", parent=dpg.last_item())
# create y axis 2
dpg.add_plot_axis(dpg.mvYAxis, label="y2")
dpg.add_stem_series(sindatax, sindatay, label="y2 stem", parent=dpg.last_item())
# create y axis 3
dpg.add_plot_axis(dpg.mvYAxis, label="y3 scatter")
dpg.add_scatter_series(sindatax, sindatay, label="y3", parent=dpg.last_item())
dpg.set_primary_window("primary_window", True)
dpg.setup_dearpygui()
dpg.show_viewport()
thread = threading.Thread(target=realtime_update_data)
thread.start()
# below replaces, start_dearpygui()
count: int = 0
while dpg.is_dearpygui_running():
if ser.is_open:
count += 1
dpg.set_value(item="progress_bar", value=count % 100 * 0.01)
# data = [signal(i) for i in rx_data]
# # data_set = []
# try:
# dpg.set_value(item="table_value",value = i.value)
# except:
# print("erro")
dpg.render_dearpygui_frame()
dpg.start_dearpygui()
dpg.destroy_context()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/xialing78/dear-sh.git
git@gitee.com:xialing78/dear-sh.git
xialing78
dear-sh
DearSH
master

搜索帮助