代码拉取完成,页面将自动刷新
同步操作将从 Hao/RTT-T 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
# This is a sample Python script.
# Press Shift+F10 to execute it or replace it with your code.
# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.
import PySimpleGUI as sg
import time
from datetime import datetime
import threading
import pylink
import re
import json
import logging
from queue import Queue
import base64
#base64_main_icon = b'iVBORw0KGgoAAAANSUhEUgAAADAAAAAwCAYAAABXAvmHAAAAAXNSR0IArs4c6QAAB79JREFUaEPVmguQVXUdxz/3/dh79+6DZbFGncB1NqyIIR9l9tAINUXFqTShGSWpIIWZpEhlypmwl48KnNKMFDEnaiIgFSxGpyQ3HhGITRsYBugir1127+u87ml+57F7uXtfu967DL+ZM/fsOf/H7/v///6/x/eshzNcPGe4/pxOAGcDc4GznMsLHAbeBn4H7KxmcU8HgBuAzwE3VVDwT8BvgF+WazeWAGYDXwU+IgoFIl6zrSPsaZ8cpnVimGy/YV0n31Tp2Z0meVR39V4PXFcKxFgB+B6wRJSINPvouLyR869oJNLsL7m4ezf30/X4Uff9XcCDxRrXG8Ak4CFgpkzeOSPB+65rKqt4oZJP3fy6++giYFvh+3oCmAysAS7weOHiuW00TggQnxAg2lJ65QsV/NezfexYfVwevwxcCaTy29QLwApgQSn78AU9g2AEUHtnhNZJIcKNvqJd/rzsLXr2ZOTdncDysQDwV+Cj1bjB/DZtHWEmfTxOxxWNp3Td99IArzx6RJ49D1xdKwAxoN25ggXKXg8sBI4BP3F+5V6ucc4VAMLAOcCngfPdMVrfE6LzqgQTL4tbj3r/p/LHJQfd11HA2g6RkZrQh4BZgPjyzpGucIn24mp+CxwAFDzchsn7pe3ka5qYdksryoDB2oUH0DI5efxJ4KWRAvgMMAf4fL4SEY+H9/r9NHiGr8NWTUMxTSb5fLzLN2TbOdPEwKRbN+g1zUJMO4BfgTXkItPkrIZxfi6dP57tq45z4g1lVAA+63gTa7J7YjE6/H4u8PuZ7C/tTW7q7WWDorCooYFlcdsUCmWXkuVv2Qz/zeV4WtU5OQToKWAZ8BjwMel3zkUNHNhqOaAR7cB3gG9Lr4k+H79IJLgkWGjuxQ3jGwMDPJJKcWs0yorGUw9lfo8+XWO/YoP4UVblVcMyExExEzlLAkJSD1eqBvAt4H7pNScS4eeJRFmTn9PXxw5NY3YkwqdCITYpCt9PJrk2FGJpPE63rvNMJsNrus5343FmheX8DsnOVL/1x2OKxipVc1/cB8giiiMQF1r1DlwMdEnrSwIBNre2Wj19M2YQeuIJ1MWL0VevPkWBW/v6WJPNlgUpLxMeDysSiWEA0jmD7owdo57XdJZlVXes+cDPHMcxxQE0OE8pL/Rr4GZpdaS9ffCQhtevx3vhhSjz5mFs2DBM2fuTSfboOgcNg55cjsOGgRjc2T4f43w+rg6FmB4K8YESZ+ewptCjWgeVxRmFV3RDbruBD4s3LbY6xQDMADZK4/vice5qaLD6BRYsIHDvveS2bSM700ptai6aadKdSSK/+3M55qQGd9Q1pWFzFgMgp/9useN1zc1WB+955xFatw5PSwvK/PkYa9fWXHl3wINqlmOabT4rFY2V9nn4u1hztTuwRxIwcZd3xyTYQmDpUgKi+ObNKLMlra+f9Bs6r2fT1gRbdINvZmyTciL2YDh2HxbugGg8IC9l9WUXREKrVuGbPh11yRL0J5+sn/bOyP9M9SMhLmmaXJkczBquAZ4tnLwQgOTv+6RRT3s7jU6EjXR14Tn3XMv25QzUW15LJ1FNOx7MTWfptmPDl4qVl4UApNzb8sFAgC2O6/TEYkT27oVUivSUKdZvveU/mRSpnOWB+Kmiska1yst73LiUP38hgE8AL14WDLKxpcVq5506lfBzz9XV+xQuyN5siqRhA8g7yEU9UUUAntZWgg8/TG7XLrQHi5alNd+QmgKouXZVDPhOAEwDtksKvLutrYqp6tNEUgpJLUR+kFXZoFlnoCgzUWhCEwGLBkhNmFAf7aoYNd8L3ZHOstP2QsLirazkRiX0npBGsgOyE6dDdqcHMJza4IZkhqP2vVSCw1KAYqmE1frppiauL0h5xwKMbpq8mrZiKSdMk5lDgeyUOsDVpRgAyZNv+UIkYhUwYy1HNJU3VTuJe0bVeESxciEhfIvadDEANwo7LFG4e/z4wWg8VkD2ZdMMGDYvuiitsN2OB48DtxfToRgAqf/2Ay3LGxu5LSosxthIflEj6YOkEY5I/j68AClDq0gF9JWpgQAvOynFWEA4qGQ5ptup9A+zKutt97kVkAqxqJSqyITXlBw8Nla7UGb1vwgISzEiANLYosSFOvl9c7NVFtZTDigZjut2MS/1sNTFwAuAVIglpRwz927gL8KoXBoM8oKT3NUDRK+u8YZi5/1/0HQeGCroS9q+q0clanGQ1Lo9GuXHZfid0QKTgCWBS0Q8jngeR8TriPcpK5UASOc7JC2Xmy9HozxUQxCZnMG/HSolL++XqeSjyNcrKS/vqwEg7eYBj8rNVUJUxWJMCQi5PHpxGbk+02S5orHJtnmRYRR6uVmqBSBjCGMsxOu0sMfDwoYG7oxGafLK19HqRdKyt1UF4YDWaboVbQ/lBkleYeAWVT9a9Tvgjim5xQNOfUqn328FusuDQYulriQnDZ1uJcuLqspG3WCrTVyJ7HbGLekuS409kh3IH0MY468BcsgtEbbt2nCYFq+XVucSIxN27pCu8ZZh8A9dp2tIaXxwyLB5T/kkVZmXLIJitADcoeRzzywP3GhCU6UdyHsvacEm54u8JGqjlncKIH9i2Q2p6KSmcC+xK+E03UuKJcnp5V8KaiK1BFAThUY6yBkP4P87aclPY0yGLQAAAABJRU5ErkJggg=='
THREAD_RUN_INTERVAL_S = 0.002
RTT_DATA_WIN = '-DB_OUT-'+sg.WRITE_ONLY_KEY
button_colors = {
'default': ('grey0', 'grey100'),
'active': ('grey0', 'green4')
}
default_color = button_colors['default']
active_color = button_colors['active']
jk_thread_lock = threading.Lock()
mul_scroll_end = False
real_time_save_file_name = False
def batch_dequeue(q):
return ''.join(q.get() for _ in range(q.qsize()))
class JlinkThread(threading.Thread):
gui_wakeup_tick_th = 0
def __init__(self, window, jlink, rtt_data_q, timestamp = False, thread_start = False):
super().__init__(daemon=True)
self._thread_start = thread_start
self.jlink = jlink
self.window = window
self.timestamp = timestamp
self.rtt_data_queue = rtt_data_q
self.event = threading.Event()
self.event.clear()
def read_rtt_data(self, rtt_data_none_cnt, rtt_data_list):
rtt_data_list_temp = self.jlink.rtt_read(0,4096)
if len(rtt_data_list_temp):
rtt_data_list = rtt_data_list + rtt_data_list_temp
rtt_data_none_cnt = 0
else:
rtt_data_none_cnt = rtt_data_none_cnt + 1
return rtt_data_list, rtt_data_none_cnt
def handle_timeout(self, rtt_data_none_cnt, gui_wake_tick):
time_to = False
if self.gui_wakeup_tick_th != 0:
if rtt_data_none_cnt >= self.gui_wakeup_tick_th:
rtt_data_none_cnt = 0
time_to = True
else:
gui_wake_tick = gui_wake_tick + 1
if gui_wake_tick >= 10/(THREAD_RUN_INTERVAL_S*1000):
gui_wake_tick = 0
time_to = True
return time_to, gui_wake_tick, rtt_data_none_cnt
def process_rtt_data(self, rtt_data_list):
if len(rtt_data_list):
str_data = ''
if self.timestamp == True:
str_data = '[' + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[0:-3] + '] '
str_data += ''.join([chr(v) for v in rtt_data_list])
jk_thread_lock.acquire()
try:
self.rtt_data_queue.put(str_data)
finally:
jk_thread_lock.release()
rtt_data_list = []
return rtt_data_list
def handle_exception(self, e):
logging.debug(f'Thread:rtt error [{e}]\n')
print(str(e))
self.window.write_event_value('-JLINK_READ_DATA_THREAD-', 'J-Link connect lost')
self._thread_start = False
def log_data(self, rtt_data_list):
print(rtt_data_list)
logging.debug('Thread:rtt data length [' + str(len(rtt_data_list)) + ']')
logging.debug('Thread:rtt data [' + ''.join([chr(i) for i in rtt_data_list]) +']')
logging.debug('Thread:GUI wakeup tick threshold {}'.format(self.gui_wakeup_tick_th))
logging.debug('Thread:J-Link thread wait......')
self.event.wait()
self.event.clear()
def run(self):
rtt_data_list = []
gui_wake_tick = 0
rtt_data_none_cnt = 0
time_to = False
while True:
if self._thread_start:
try:
rtt_data_list, rtt_data_none_cnt = self.read_rtt_data(rtt_data_none_cnt, rtt_data_list)
time_to, gui_wake_tick, rtt_data_none_cnt = self.handle_timeout(rtt_data_none_cnt, gui_wake_tick)
if time_to:
time_to = False
rtt_data_list = self.process_rtt_data(rtt_data_list)
time.sleep(THREAD_RUN_INTERVAL_S)
except Exception as e:
self.handle_exception(e)
except:
self.handle_exception('Unknown error')
else:
self.log_data(rtt_data_list)
def jlink_thread_ctr(self, state):
self._thread_start = state
if state:
self.event.set()
logging.debug('Thread:J-Link thread enable')
else:
self.event.clear()
logging.debug('Thread:J-Link thread disble')
def jlink_timestamp_set(self, state):
self.timestamp = state
def jlink_reset(jlink):
try:
jlink.rtt_stop()
except:
print("rtt stop error\n")
pass
if jlink.opened():
jlink.close()
def open_jlink(jlink):
try:
jlink.open()
ser_num = jlink.serial_number
jlink.close()
return ser_num
except pylink.errors.JLinkException as e:
sg.popup(e)
return None
def set_jlink_params(window, jlink, ser_num):
window['-SN-'].update(str(ser_num))
window[RTT_DATA_WIN].write('LOG:J-Link SN:'+str(ser_num)+'\n')
try:
jlink.open(str(ser_num))
jk_interface = window['-JK_INTER-'].get()
if jk_interface == 'SWD':
jlink.set_tif(pylink.enums.JLinkInterfaces.SWD)
jlink.set_speed(int(window['-SPEED-'].get()))
jlink.connect(window['-CHIP_SEL-'].get())
jlink.reset(ms=10, halt=False)
window[RTT_DATA_WIN].write('LOG:J-Link interface:' + jk_interface + '\n')
window[RTT_DATA_WIN].write('LOG:J-Link speed:' + window['-SPEED-'].get() + 'KHz''\n')
except ValueError:
logging.debug('Speed setting error')
sg.popup('Speed setting error')
jlink_reset(jlink)
except Exception as e:
logging.debug(f'J-Link Operation error[{e}]')
sg.popup(f'J-Link Operation error[{e}]')
jlink_reset(jlink)
def update_window_params(window, jlink, jlink_thread, rtt_data_queue):
window[RTT_DATA_WIN].write('LOG:J-Link connect success!\n')
window['-JK_CONNECT-'].update(button_color=active_color)
window['-JK_CONNECT-'].update('Disconnect J-Link')
#Check timeout time
try:
time_ms = int(window['-RX_TIMEOUT-'].get())
except:
sg.popup('Receive timeout time is set incorrectly')
time_ms = 0
window['-RX_TIMEOUT-'].update('0')
window[RTT_DATA_WIN].write('LOG:RTT rx data timout:' + window['-RX_TIMEOUT-'].get() + '\n')
jlink_thread.gui_wakeup_tick_th = int(time_ms / (THREAD_RUN_INTERVAL_S * 10 ** 3))
if not jlink_thread.is_alive():
jlink_thread.start()
jlink_thread.jlink_thread_ctr(True)
jlink.rtt_start()
rtt_data_queue.queue.clear()
window['-RX_TIMEOUT-'].update(disabled=True)
window['-SN-'].update(disabled=True)
window['-SPEED-'].update(disabled=True)
def reset_jlink(window, jlink, jlink_thread):
jlink_thread.jlink_thread_ctr(False)
jlink_reset(jlink)
window['-JK_CONNECT-'].update(button_color=default_color)
window['-JK_CONNECT-'].update('Connect J-Link')
window['-RX_TIMEOUT-'].update(disabled=False)
window['-SN-'].update(disabled=False)
window['-SPEED-'].update(disabled=False)
def jlink_connect(window, values, jlink, jlink_thread, rtt_data_queue):
if window['-JK_CONNECT-'].get_text() == 'Connect J-Link':
ser_num = open_jlink(jlink)
if ser_num is None:
return
set_jlink_params(window, jlink, ser_num)
if jlink.connected():
update_window_params(window, jlink, jlink_thread, rtt_data_queue)
else:
reset_jlink(window, jlink, jlink_thread)
def clear(window, values, jlink, jlink_thread, rtt_data_queue):
window[RTT_DATA_WIN].update('')
def timestamp_ctr(window, values, jlink, jlink_thread, rtt_data_queue):
if window['-TIMESTAMP_CTR-'].get_text() == 'Open Timestamp':
window['-TIMESTAMP_CTR-'].update(button_color=active_color)
window['-TIMESTAMP_CTR-'].update('Close Timestamp')
jlink_thread.jlink_timestamp_set(True)
else:
window['-TIMESTAMP_CTR-'].update(button_color=default_color)
window['-TIMESTAMP_CTR-'].update('Open Timestamp')
jlink_thread.jlink_timestamp_set(False)
def scroll_to_the_end(window, values, jlink, jlink_thread, rtt_data_queue):
window[RTT_DATA_WIN].update(autoscroll=True)
return True
def data_save(window, values, jlink, jlink_thread, rtt_data_queue):
file_name = r'log_' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + '.txt'
rtt_data_str = re.sub('(\r\n)+','\n',window[RTT_DATA_WIN].get())
with open(file_name, 'w', encoding='utf-8') as file:
file.write(rtt_data_str)
sg.popup_no_wait('File saved successfully')
def rt_data_save(window, values, jlink, jlink_thread, rtt_data_queue):
global real_time_save_file_name
if window['-RT_DATA_SAVE-'].get_text() == 'Open Real-time saving data':
window['-RT_DATA_SAVE-'].update(button_color=active_color)
window['-RT_DATA_SAVE-'].update('Close Real-time saving data')
real_time_save_file_name = r'real_time_log_' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + '.txt'
else:
window['-RT_DATA_SAVE-'].update(button_color=default_color)
window['-RT_DATA_SAVE-'].update('Open Real-time saving data')
def timeout(window, values, jlink, jlink_thread, rtt_data_queue):
if rtt_data_queue.qsize() > 0:
jk_thread_lock.acquire()
try:
rtt_data = batch_dequeue(rtt_data_queue)
finally:
jk_thread_lock.release()
window[RTT_DATA_WIN].write(rtt_data)
if window['-RT_DATA_SAVE-'].get_text() == 'Close Real-time saving data':
real_time_save_data = ''
real_time_save_data += re.sub('(\r\n)+', '\n', rtt_data)
try:
with open(real_time_save_file_name, 'a', encoding='utf-8') as f:
f.write(real_time_save_data)
real_time_save_data = ''
except Exception as e:
real_time_save_data = ''
logging.debug(f'Write real time data to file error[{e}]\n')
except:
real_time_save_data = ''
logging.debug('Write real time data to file error')
sg.Print("Write real time data to file error")
def jlink_read_data_thread(window, values, jlink, jlink_thread, rtt_data_queue):
if values.get('-JLINK_READ_DATA_THREAD-', '') != 'J-Link connect lost':
window[RTT_DATA_WIN].write(values.get('-JLINK_READ_DATA_THREAD-', ''))
if window['-RT_DATA_SAVE-'].get_text() == 'Close Real-time saving data':
real_time_save_data = real_time_save_data + re.sub('(\r\n)+','\n',values.get('-JLINK_READ_DATA_THREAD-', ''))
else:
logging.debug('J-Link connect lost')
jlink_reset(jlink)
rtt_data_queue.queue.clear()
jlink_thread.jlink_thread_ctr(False)
window['-DB_OUT-' + sg.WRITE_ONLY_KEY].write('LOG:J-Link connection has been lost' + '\n')
window['-JK_CONNECT-'].update(button_color=default_color)
window['-JK_CONNECT-'].update('Connect J-Link')
window['-RX_TIMEOUT-'].update(disabled=False)
window['-SN-'].update(disabled=False)
window['-SPEED-'].update(disabled=False)
def sd_data_bt(window, values, jlink, jlink_thread, rtt_data_queue):
data_cmd = window['-SD_DATA_TXT-'].get()
if data_cmd.startswith('cmd:'):
if data_cmd.find('time syn') >= 0:
if jlink.opened():
try:
time_str = 'time syn:' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
time_list = [ord(i) for i in time_str]
jlink.rtt_write(0,time_list)
logging.debug('Data send success(cmd:time syn):'+ time_str)
except Exception as e:
logging.debug('Data send failt(cmd:time syn)[' + str(e) + ']\n')
except:
logging.debug('Data send failt(cmd:time syn)')
else:
if jlink.opened():
try:
time_list = [ord(i) for i in data_cmd]
jlink.rtt_write(0, time_list)
logging.debug(f'Data send success: {data_cmd}')
except Exception as e:
logging.debug('Data send error[' + str(e) + ']\n')
except:
logging.debug('Data send error')
def ico_to_code(png_path):
with open(png_path, "rb") as image_file:
# 读取图片内容
image_data = image_file.read()
# 将图片内容转换为base64编码的字符串
encoded_string = base64.b64encode(image_data).decode('utf-8')
return encoded_string
def initialize_window(json_data):
chip_list = json_data['chip model']
jk_interface_list = json_data['jlink interface']
jk_speed = json_data['Speed']
rx_timeout = json_data['Rx Timeout']
sn = json_data['SN']
font_type = json_data['font'][0]
font_size = json_data['font size']
try:
data_window_w = int(json_data['data window width'])
except:
data_window_w = 83
font = font_type + ' '
right_click_menu = ['',['Clear','Scroll to the end']]
cfg_frame_layout = [
[sg.Button('Connect J-Link',pad=((45,3),(16,16)),key='-JK_CONNECT-',size=(18,2),button_color=('grey0','grey100'),font=(font+str(12)))],\
[sg.Button('Open Timestamp',key='-TIMESTAMP_CTR-',pad=((45,3),(16,16)),button_color=('grey0','grey100'),size=(18,2),font=(font+str(12)))],\
[sg.Button('Open Real-time saving data',key='-RT_DATA_SAVE-',pad=((45,3),(16,16)),size=(18,2),button_color=('grey0','grey100'),font=(font+str(12)))],\
[sg.Button('Save all data',key='-DATA_SAVE-',pad=((45,3),(16,16)),size=(18,2),button_color=('grey0','grey100'),font=(font+str(12)))],\
[sg.Text('Chip Model'),sg.Combo(chip_list,chip_list[0],size=(16,1),pad=((23,3),(20,16)),key='-CHIP_SEL-')],\
[sg.Text('J-Link Interface'),sg.Combo(jk_interface_list,jk_interface_list[0],size=(16,1),key='-JK_INTER-',pad=((0,3),(20,16)))],\
[sg.Text('RX Timout',pad=(10,20)),sg.InputText(rx_timeout,key='-RX_TIMEOUT-',size=(18,1),pad=((26,3),(20,16)),tooltip='0 means no timeout,Unit ms')],\
[sg.Text('J-Link S/N'),sg.InputText(sn,key='-SN-',pad=((24,3),(20,16)),size=(18,1))],\
[sg.Text('Speed(KHz)'), sg.InputText(jk_speed,pad=((14,3),(20,16)),key='-SPEED-',size=(18,1))],\
]
sd_bt_frame_layout = [[sg.Button('Send',key='-SD_DATA_BT-',pad=((45,28),(6,6)),size=(18,2),button_color=('grey0','grey100'),font=(font+str(12)))]]
sd_data_frame_layout = [[sg.InputText('cmd:time syn',key='-SD_DATA_TXT-',size=(50,1),pad=((9,8),(10,15)),font=(font+str(20)),tooltip='Text type')]]
layout = [
[sg.Frame('Config', cfg_frame_layout, vertical_alignment='top',key='-CFG-'),sg.Multiline(autoscroll=True,key=RTT_DATA_WIN,size=(data_window_w,31), \
pad=(10,10),right_click_menu=right_click_menu,font=(font+font_size+' bold'),expand_x=True,expand_y=True)],
[sg.Frame('Send data',sd_bt_frame_layout,vertical_alignment='top'), sg.Frame('Text Data',sd_data_frame_layout,vertical_alignment='top')],
]
main_icon = ico_to_code('apple.png')
main_icon = base64.b64decode(main_icon)
window = sg.Window('RTT Tool 1.6', layout,finalize=True,element_padding=(10,1),return_keyboard_events=False,icon=main_icon,resizable=True)
window.set_min_size(window.size)
#window['-CFG-'].expand(True, True, True)
window[RTT_DATA_WIN].expand(True, True, True)
return window
def handle_event(event, values, window, jlink, jlink_thread, rtt_data_queue, real_time_save_file_name, mul_scroll_end):
if event == sg.WIN_CLOSED:
jlink_thread.jlink_thread_ctr(False)
jlink_reset(jlink)
return False
#print(event, values)
#Scroll bar in the middle position, as the data in the text box increases, y1 increases and y2 decreases
y1, y2 = window[RTT_DATA_WIN].get_vscroll_position()
if mul_scroll_end == True and bool(1 - y2):
mul_scroll_end = False
window[RTT_DATA_WIN].update(autoscroll=False)
elif mul_scroll_end == False and y2 == 1:
mul_scroll_end = True
window[RTT_DATA_WIN].update(autoscroll=True)
event_dict = {
'-JK_CONNECT-': jlink_connect,
'Clear': clear,
'-TIMESTAMP_CTR-': timestamp_ctr,
'Scroll to the end': scroll_to_the_end,
'-DATA_SAVE-': data_save,
'-RT_DATA_SAVE-': rt_data_save,
'__TIMEOUT__': timeout,
'-JLINK_READ_DATA_THREAD-': jlink_read_data_thread,
'-SD_DATA_BT-': sd_data_bt
}
if event in event_dict:
event_dict[event](window, values, jlink, jlink_thread, rtt_data_queue)
return True
def main():
with open('config.json', 'r') as f:
json_data = json.load(f)
window = initialize_window(json_data)
rtt_data_queue = Queue()
jlink = pylink.JLink()
jlink_thread = JlinkThread(window, jlink, rtt_data_queue)
real_time_save_file_name = ''
mul_scroll_end = True
while True:
event, values = window.read(timeout=100)
if not handle_event(event, values, window, jlink, jlink_thread, rtt_data_queue, real_time_save_file_name, mul_scroll_end):
break
window.close()
if __name__ == '__main__':
main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。