代码拉取完成,页面将自动刷新
# -*- coding: utf-8 -*-
"""
Created on Sun Mar 31 14:30:16 2019
@author: einsn
"""
import threading
import serial
import time
import winsound
class ComSetSwitch(threading.Thread):
MATCH_BOOT = "Board: RTL838x"
MATCH_ENTER = "Password :"
MATCH_PROMPT = "Realtek #"
ENTER_PASSWORD = "s"
LINE_END = "\n"
STATE_IDLE = 0
STATE_COMMAND = 1
CMD_PRODUCT_CODE = 0
CMD_SHOW_MODEL = 1
CMD_HARDWARE_VERVERSION = 2
CMD_POE_POWER = 3
CMD_CLOULD_URL = 4
CMD_OWNER_ID = 5
CMD_SERIAL_NO = 6
CMD_MAC_ADDRESS = 7
CMD_AUTH_CODE = 8
CMD_DONE = 9
CMD_FAILED = 10
CMD_NAMES = {CMD_PRODUCT_CODE:"产品代码",\
CMD_SHOW_MODEL:"设备型号", \
CMD_HARDWARE_VERVERSION:"硬件版本", \
CMD_POE_POWER:"POE额定功率", \
CMD_CLOULD_URL:"云端地址", \
CMD_OWNER_ID:"客户ID", \
CMD_SERIAL_NO:"序列号", \
CMD_MAC_ADDRESS:"MAC地址", \
CMD_AUTH_CODE:"云端认证码"
}
SOUND_FAILED = 0
SOUND_SUCCESS = 1
def __init__(self, ui):
threading.Thread.__init__(self, name="comSetThread")
self._running = threading.Event()
self._running.clear()
self.port = None
self.state = self.STATE_IDLE
self.output = []
self.ui = ui
self.cmdState = self.CMD_PRODUCT_CODE
self.cmdThread = None
def __del__(self):
print("thread del...")
def sound(self, snd):
if snd == self.SOUND_FAILED:
winsound.Beep(2000, 80)
time.sleep(0.05)
winsound.Beep(2000, 80)
elif snd == self.SOUND_SUCCESS:
winsound.Beep(1000, 200)
def log(self, text):
if self.ui != None:
q = self.ui.logQueue
q.put(text)
else:
print(text)
def updateStatus(self, state, msg=""):
if self.ui != None:
#print("send signal:%d" % state)
self.ui.runStatusSignal.emit(state, msg)
def openPort(self, name, baudrate):
try:
self.port = serial.Serial(name, baudrate, timeout=0.5)
except :
#print("Open serail port(" + name + ") failed")
self.port = None
return False
else:
return True
def closePort(self):
if self.port:
self.port.close()
self.port = None
# send line string
def send(self, line):
if self.port:
self.port.write((line + self.LINE_END).encode())
# read line from port
def readline(self, size = 1024):
if self.port == None:
return bytes([])
i = 0
res = bytearray(size)
while i < size:
r = self.port.read()
if len(r) == 0:
return bytes(res[0:i])
res[i] = int.from_bytes(r, byteorder='big', signed=False)
i = i + 1
if r == b'\x0a':
return bytes(res[0:i])
# send command
def sendCommand(self, cmd):
# reset the output capture buffer
self.output = []
self.send(cmd)
# read command result
def readCommandResult(self):
lines = len(self.output)
if lines == 0:
return []
i = 1
while i < len(self.output):
if self.output[i].startswith(self.MATCH_PROMPT):
break
i = i + 1
return self.output[1:i]
# a method to compare strings
def match(self, s, m):
if len(s) >= len(m):
a = s[0:len(m)]
if a == m:
return True
return False
def start(self, name, baudrate=115200):
# already running
if self._running.isSet():
return False
if self.openPort(name, baudrate):
self._running.set()
super().start()
return True
else:
self.sound(self.SOUND_FAILED)
return False
def stop(self):
if self._running.isSet():
self._running.clear()
def getNextCommand(self, cmd):
# get first checked commands
if cmd < self.CMD_PRODUCT_CODE and self.ui.winProductCode.isChecked():
return self.CMD_PRODUCT_CODE
elif cmd < self.CMD_SHOW_MODEL and self.ui.winShowModel.isChecked():
return self.CMD_SHOW_MODEL
elif cmd < self.CMD_HARDWARE_VERVERSION and self.ui.winHardwareVersion.isChecked():
return self.CMD_HARDWARE_VERVERSION
elif cmd < self.CMD_POE_POWER and self.ui.winPoePower.isChecked():
return self.CMD_POE_POWER
elif cmd < self.CMD_CLOULD_URL and self.ui.winCloudUrl.isChecked():
return self.CMD_CLOULD_URL
elif cmd < self.CMD_OWNER_ID and self.ui.winOwnerId.isChecked():
return self.CMD_OWNER_ID
elif cmd < self.CMD_SERIAL_NO and self.ui.winSerialNo.isChecked():
return self.CMD_SERIAL_NO
elif cmd < self.CMD_MAC_ADDRESS and self.ui.winMacAddress.isChecked():
return self.CMD_MAC_ADDRESS
elif cmd < self.CMD_AUTH_CODE and (self.ui.winMacAddress.isChecked() or self.ui.winOwnerId.isChecked()):
# auth code is dependent to mac address and ownerid
return self.CMD_AUTH_CODE
else:
return self.CMD_DONE
def firstCommand(self):
return self.getNextCommand(-1)
def nextCommand(self):
return self.getNextCommand(self.cmdState)
def getCommandOpString(self, cmd):
if cmd == self.CMD_PRODUCT_CODE:
return "ptool devcode"
elif cmd == self.CMD_SHOW_MODEL:
return "ptool model"
elif cmd == self.CMD_HARDWARE_VERVERSION:
return "ptool hwver"
elif cmd == self.CMD_POE_POWER:
return "ptool poepower"
elif cmd == self.CMD_CLOULD_URL:
return "ptool serverurl"
elif cmd == self.CMD_OWNER_ID:
return "ptool ownerid"
elif cmd == self.CMD_SERIAL_NO:
return "ptool sn"
elif cmd == self.CMD_MAC_ADDRESS:
return "ptool mac"
elif cmd == self.CMD_AUTH_CODE:
return "ptool serverauth"
else:
return ""
def getCommandOpValue(self, cmd):
if cmd == self.CMD_PRODUCT_CODE:
return self.ui.winEditProductCode.text()
elif cmd == self.CMD_SHOW_MODEL:
return self.ui.winEditShowModel.text()
elif cmd == self.CMD_HARDWARE_VERVERSION:
return self.ui.winEditHardwareVersion.text()
elif cmd == self.CMD_POE_POWER:
return self.ui.winEditPoePower.text()
elif cmd == self.CMD_CLOULD_URL:
return self.ui.winEditCloudUrl.text()
elif cmd == self.CMD_OWNER_ID:
return self.ui.winEditOwnerId.text()
elif cmd == self.CMD_SERIAL_NO:
return self.ui.winEditSerialNo.text()
elif cmd == self.CMD_MAC_ADDRESS:
return self.ui.winEditMacAddress.text()
elif cmd == self.CMD_AUTH_CODE:
return self.ui.getAuthCode()
else:
return ""
# already in command state
def commandProcess(self):
if self.cmdState < self.CMD_DONE:
if self.cmdThread == None:
#print("start thread for cmd-%d" % self.cmdState)
self.updateStatus(self.ui.STATE_DEVICE_PROCESSING, msg="正在写入%s.." % self.CMD_NAMES[self.cmdState])
self.cmdThread = CommandThread(doCommand, (self, self.getCommandOpString(self.cmdState), self.getCommandOpValue(self.cmdState)))
self.cmdThread.start()
else :
if self.cmdThread.isAlive():
# busy
#print("%s is busy" % self.cmdThread.name)
return
else:
# done, get result
if self.cmdThread.getResult():
self.cmdState = self.nextCommand()
else :
# failed!!!
self.cmdState = self.CMD_FAILED
# remove the thread
self.cmdThread = None
# if we finish all commands or failed, update ui
if self.cmdState == self.CMD_DONE:
self.updateStatus(self.ui.STATE_OPERATION_SUCCESS)
self.sound(self.SOUND_SUCCESS)
elif self.cmdState == self.CMD_FAILED:
self.updateStatus(self.ui.STATE_OPERATION_FAILED)
self.sound(self.SOUND_FAILED)
def run(self):
if self.port == None:
print("Serial port is not opened")
self._running.clear()
return
while(self._running.isSet()):
#print("com threing running...")
raw = self.readline()
if len(raw) == 0:
if self.state == self.STATE_COMMAND:
self.commandProcess()
continue
sin = ""
try:
sin = raw.decode()
except:
# may be not utf-8 found, ignore it
sin = ""
if self.match(sin, self.MATCH_BOOT):
self.state = self.STATE_IDLE
#print("Detecting device booting")
# call ui methods
self.log("检测到设备正在启动...")
self.updateStatus(self.ui.STATE_DEVICE_CONNECTING)
self.sound(self.SOUND_SUCCESS)
elif self.match(sin, self.MATCH_ENTER):
#print("Detecting command line entry")
self.port.write(self.ENTER_PASSWORD.encode())
# call ui methods
self.log("进入产测模式成功")
self.updateStatus(self.ui.STATE_DEVICE_PROCESSING)
# takes 1 sec to enter command line
time.sleep(1)
self.state = self.STATE_COMMAND
self.cmdState = self.firstCommand()
else :
#print(sin.strip())
if self.state == self.STATE_COMMAND:
#print(sin.strip())
self.output.append(sin.strip())
self.commandProcess()
# stop request
self.closePort()
class CommandThread(threading.Thread):
def __init__(self, func, args=()):
super(CommandThread, self).__init__(name="cmdThread")
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def getResult(self):
try:
return self.result
except Exception:
return None
def doCommand(comSet, cmd, value):
time.sleep(1)
cmdName = comSet.CMD_NAMES[comSet.cmdState]
comSet.log("正在写入%s=%s" % (cmdName, value))
#print(">> " + cmd + " " + value)
comSet.sendCommand(cmd + " " + value)
time.sleep(0.5)
ret = comSet.readCommandResult()
#print(">>----")
#print(ret)
#print(">>----")
time.sleep(0.5)
# read result
comSet.log("正在校验%s" % cmdName)
comSet.sendCommand(cmd)
time.sleep(0.5)
ret = comSet.readCommandResult()
if len(ret) > 0 and ret[0] == value:
comSet.log("命令操作成功,返回结果为:%s" % ret[0])
return True
else:
comSet.log("命令操作失败,返回结果为:%s" % ret)
return False
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。