代码拉取完成,页面将自动刷新
from Ui.QxdmTranslatorDlg import Ui_qxdmTranslatorDlg
from PyQt5.QtWidgets import QDialog
from PyQt5.QtCore import QSettings
import os
import _thread
import time
import socket
from qxdm import QxdmHelp
socket_port = 12345
class CQxdmTranslatorDlg(QDialog, Ui_qxdmTranslatorDlg):
def __init__(self):
super(CQxdmTranslatorDlg, self).__init__()
self.setupUi(self)
self.ini_file = os.getcwd() + "\\config.ini"
self.pSetting = QSettings(self.ini_file, QSettings.IniFormat)
self.qxdm = QxdmHelp()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._is_binding = False
self.running = False
self.enable_control(self.running)
self.host_ip_input.setText(self.pSetting.value(r"qxdm_settings/host_ip"))
self.qxdmSerialPort.setText(self.pSetting.value(r"qxdm_settings/serial_port"))
mode = int(self.pSetting.value(r"qxdm_settings/mode"))
self.nrNsaMode.setChecked(1 - mode)
self.nrSaMode.setChecked(mode)
self.actionServer.clicked.connect(self.action_monitor)
self.nrSaMode.clicked.connect(self.sa_mode_selected)
self.nrNsaMode.clicked.connect(self.nsa_mode_selected)
def closeEvent(self, event):
self.running = False
self.pSetting.setValue(r"qxdm_settings/host_ip", self.host_ip_input.text())
self.pSetting.setValue(r"qxdm_settings/serial_port", self.qxdmSerialPort.text())
mode = 0
if self.nrSaMode.isChecked():
mode = 1
self.pSetting.setValue(r"qxdm_settings/mode", mode)
# self.qxdm.quit()
def enable_control(self, enable):
self.host_ip_input.setDisabled(enable)
self.qxdmSerialPort.setDisabled(enable)
self.nrSaMode.setDisabled(enable)
self.nrNsaMode.setDisabled(enable)
def action_monitor(self):
print('action_monitor')
if self.running is True:
self.running = False
time.sleep(0.01)
self.actionServer.setText("START")
else:
self.actionServer.setText("TERMINATE")
ip_addr = self.host_ip_input.text()
self.running = True
qxdm_serial_port = self.pSetting.value(r"qxdm_settings/serial_port")
_thread.start_new_thread(self.start_tcp_server, (ip_addr, qxdm_serial_port,))
self.enable_control(self.running)
def sa_mode_selected(self):
pass
def nsa_mode_selected(self):
pass
def start_tcp_server(self, ip_addr, serial_port):
print('Enter start_tcp_server')
connect2phone = self.qxdm.connect_to_phone(serial_port)
if connect2phone is False:
print("Connected to Phone Failure")
return
print("Connected to Phone Success")
if self.nrSaMode.isChecked():
self.qxdm.nr_only()
else:
self.qxdm.multi_mode()
server_address = (ip_addr, socket_port)
if self._is_binding is False:
self.sock.bind(server_address)
self._is_binding = True
self.sock.listen(10)
print('starting listen on ip %s, port %s' % server_address)
while self.running is True:
curr_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print(curr_time + " >> waiting for connection...")
server, addr = self.sock.accept()
curr_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
msg = curr_time + ' >> address: ' + str(addr[0]) + ' Port: ' + str(addr[1]) + ' connected.'
print(msg)
data = 'OK\r\n'
# server.send(data.encode())
while self.running is True:
if self.running is False:
server.close()
return
try:
rec = server.recv(128).decode()
curr_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print(curr_time + ' >> got msg: ' + rec)
if rec == 'mode online':
self.qxdm.online()
time.sleep(0.2)
server.send(data.encode())
elif rec == 'mode lpm':
self.qxdm.lpm()
time.sleep(0.2)
server.send(data.encode())
elif rec == 'at':
time.sleep(0.1)
server.send(data.encode())
elif len(rec) == 0:
break
# except ConnectionAbortedError or ConnectionResetError:
except:
break
server.close()
msg = curr_time + ' >> address: ' + str(addr[0]) + ' Port: ' + str(addr[1]) + ' disconnected.'
print(msg)
print('Exited from start_tcp_server')
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。