代码拉取完成,页面将自动刷新
import base64
import gzip
import io
import json
import sys
import time
from PyQt6 import QtWidgets
from PyQt6.QtCore import pyqtSignal
from PyQt6.QtGui import QStandardItemModel, QStandardItem, QPixmap, QImage
from PyQt6.QtWidgets import QAbstractItemView, QGraphicsPixmapItem, QGraphicsScene, QApplication
import imouse_api_1 as imouse_api
from main_form import Ui_MainWindow # 导入QTdesigner的样式设计
def base64_to_bmp(data: dict):
if not 'data' in data or not 'img' in data['data']:
return None
image_base64 = data['data']['img']
is_gzip = data['data']['gzip']
imgdata = base64.b64decode(image_base64)
if is_gzip:
with gzip.GzipFile(fileobj=io.BytesIO(imgdata)) as f:
imgdata = f.read()
return imgdata
# with open("{}.bmp".format(time.strftime("%Y-%m-%d-%H%M%S", time.localtime())), 'wb') as f:
# f.write(imgdata)
class MyApp(QtWidgets.QMainWindow): # 自定义一个窗口类,便于主程序入口调用
_signal_binaryEvent = pyqtSignal(bytes)
_signal_msgEvent = pyqtSignal(dict)
def __init__(self):
super().__init__() # 继承QMainWindow这一主窗口基类
self.api = imouse_api.WsApi('127.0.0.1', on_message=self._on_message, on_binary=self._on_binary,
QApplication=QApplication)
self._signal_binaryEvent.connect(self._binaryEvent)
self._signal_msgEvent.connect(self._msgEvent)
self._img = QImage()
self.api.start()
self.device_list = {} # 设备列表
self.get_devicemodel_list = {} # 支持类型列表
self.usb_list = {} # usb硬件列表
self.__ui = Ui_MainWindow() # 声明导入的样式类的对象
self.__ui.setupUi(self) # 初始化样式类中的setupui方法,构造界面
self.devlist_tv = self.__ui.tableView_devlist
self.devlist_tv.verticalHeader().hide() # 隐藏序号
self.devlist_tv.setEditTriggers(QAbstractItemView.EditTrigger.NoEditTriggers) # 不准编辑
self.devlist_tv.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows) # 整行选择
self.devlist_tv.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection) # 设置只能选中一行
self.devlist_model = QStandardItemModel(0, 8) # 设置列数
self.devlist_model.setHorizontalHeaderLabels(['名称', '设备名', 'deviceid地址', '投屏名称', '绑定硬件', '设备型号', '系统版本', '状态'])
self.devlist_tv.setModel(self.devlist_model) # 绑定数据模型
self.__ui.button_get_device_list.clicked.connect(lambda: self._button_click('get_device_list')) # 绑定点击事件
self.__ui.button_get_devicemodel_list.clicked.connect(lambda: self._button_click('get_devicemodel_list'))
self.__ui.button_get_usb_list.clicked.connect(lambda: self._button_click('get_usb_list'))
self.__ui.button_get_device_screenshot.clicked.connect(lambda: self._button_click('get_device_screenshot'))
self.__ui.button_loop_device_screenshot.clicked.connect(lambda: self._button_click('loop_device_screenshot'))
self.__ui.button_stop_screenshot.clicked.connect(lambda: self._button_click('loop_stop_screenshot'))
self.__ui.button_set_dev_usb_id.clicked.connect(lambda: self._button_click('set_dev_usb_id'))
self.__ui.button_del_dev.clicked.connect(lambda: self._button_click('del_dev'))
self.__ui.button_set_dev_name.clicked.connect(lambda: self._button_click('set_dev_name'))
self.__ui.button_send_key.clicked.connect(lambda: self._button_click('send_key'))
self.__ui.button_mouse_events.clicked.connect(lambda: self._button_click('mouse_events'))
self.__ui.comboBox_4.addItems(['左键', '右键'])
self.__ui.comboBox_5.addItems(['左滑', '右滑', '上滑', '下滑'])
def _button_click(self, fun):
if fun == 'get_device_list' or fun == 'get_devicemodel_list' or fun == 'get_usb_list':
if fun == 'get_device_list': # 获取设备列表
self.api.get_device_list(False)
elif fun == 'get_devicemodel_list': # 获取支持型号列表
self.api.get_devicemodel_list(False)
elif fun == 'get_usb_list': # 获取usb设备列表
self.api.get_usb_list(False)
else:
deviceid = self._get_selected_deviceid()
if deviceid == '':
return
if fun == 'get_device_screenshot': # 截图屏幕
gzip = self.__ui.checkBox_gzip.checkState().value
sync = self.__ui.checkBox_sync.checkState().value
binary = self.__ui.checkBox_binary.checkState().value
ret = self.api.get_device_screenshot(deviceid, gzip, sync, binary)
if not 'data' in ret: # 没有数据代表是异步调用
self._debug('异步调用' + ret['fun'])
return
else:
self._debug('同步调用' + ret['fun'])
img_data = base64_to_bmp(ret)
if img_data == None:
return
self._show_img(img_data)
elif fun == 'loop_device_screenshot': # 循环截图屏幕
delayed = self.__ui.spinBox_delayed.value()
self.api.loop_device_screenshot(deviceid=deviceid, time=delayed, stop=False, sync=False)
elif fun == 'loop_stop_screenshot': # 停止循环截图屏幕
self.api.loop_device_screenshot(deviceid=deviceid, time=100, stop=True, sync=False)
elif fun == 'set_dev_usb_id': # 设置usb设备列表
comboBox3_index = self.__ui.comboBox_3.currentIndex()
vid = ''
pid = ''
if comboBox3_index < len(self.usb_list):
vid = list(self.usb_list.values())[comboBox3_index].get('vid', '')
pid = list(self.usb_list.values())[comboBox3_index].get('pid', '')
self.api.change_dev_usb_id(deviceid=deviceid, vid=vid, pid=pid, sync=False)
elif fun == 'del_dev': # 删除设备
self.api.del_dev(deviceid=deviceid, sync=False)
elif fun == 'set_dev_name': # 设置设备名称
self.api.change_dev_name(deviceid=deviceid, name=self.__ui.lineEdit.text(), sync=False)
elif fun == 'send_key': # 键盘输入
self.api.send_key(deviceid=deviceid, key=self.__ui.lineEdit_2.text(), sync=False)
elif fun == 'mouse_events': # 鼠标操作
if self.__ui.comboBox_4.currentIndex() == 0:
button = 'left'
else:
button = 'right'
try:
x = int(self.__ui.lineEdit_3.text())
y = int(self.__ui.lineEdit_4.text())
except:
x = 0
y = 0
if self.__ui.comboBox_4.currentIndex() == 0:
direction = 'left'
elif self.__ui.comboBox_4.currentIndex() == 1:
direction = 'right'
elif self.__ui.comboBox_4.currentIndex() == 2:
direction = 'up'
elif self.__ui.comboBox_4.currentIndex() == 3:
direction = 'down'
if self.__ui.radioButton_1.isChecked():
self.api.click(deviceid=deviceid, x=x, y=y, button=button, sync=False)
elif self.__ui.radioButton_2.isChecked():
self.api.swipe(deviceid=deviceid, direction=direction, button=button, sync=False)
elif self.__ui.radioButton_3.isChecked():
self.api.mouse_down(deviceid=deviceid, button=button, sync=False)
elif self.__ui.radioButton_4.isChecked():
self.api.mouse_up(deviceid=deviceid, button=button, sync=False)
elif self.__ui.radioButton_5.isChecked():
self.api.mouse_movie(deviceid=deviceid, x=x, y=y, sync=False)
elif self.__ui.radioButton_6.isChecked():
self.api.mouse_reset_pos(deviceid=deviceid, sync=False)
def closeEvent(self, event):
self.api.stop()
def _show_img(self, img_data):
self._img.loadFromData(img_data)
pix = QPixmap.fromImage(self._img)
item = QGraphicsPixmapItem(pix) # 创建像素图元
scene = QGraphicsScene() # 创建场景
scene.addItem(item)
self.__ui.graphicsView.setScene(scene)
self.__ui.graphicsView.fitInView(QGraphicsPixmapItem(QPixmap(self._img)))
def _stop_screenshot_click(self): # 停止循环截图屏幕
deviceid = self._get_selected_deviceid()
if deviceid == '':
return
self.api.loop_device_screenshot(deviceid=deviceid, time=100, stop=True, sync=False)
def _add_device_list(self, data: dict, is_update: bool = False):
if is_update:
self.device_list.update(data)
else:
self.device_list = data
row = 0
for item in self.device_list:
device_data = self.device_list[item]
# (['名称','设备名','deviceid地址','投屏名称','绑定硬件id','设备型号','系统版本','状态'])
for column in range(8):
if column == 0:
str1 = device_data.get('name', '')
if str1 == '':
str1 = '未命名'
elif column == 1:
str1 = device_data.get('username', '')
elif column == 2:
str1 = device_data.get('deviceid', '')
elif column == 3:
str1 = device_data.get('srvname', '')
elif column == 4:
str1 = device_data.get('vid', '')
if str1 == '':
str1 = '未绑定'
else:
str1 = str1 + '|' + device_data.get('pid', '')
elif column == 5:
str1 = device_data.get('device_name', '未设置')
elif column == 6:
str1 = device_data.get('version', '未设置')
elif column == 7:
if device_data.get('state', 0) == 0:
str1 = '离线'
else:
str1 = '在线'
self.devlist_model.setItem(row, column, QStandardItem(str1))
row = row + 1
for row in range(self.__ui.tableView_devlist.model().rowCount() - 1, -1, -1): # 这里反向遍历一下删除多余的项
deviceid = self.__ui.tableView_devlist.model().index(row, 2).data()
if not deviceid in self.device_list:
self.__ui.tableView_devlist.model().removeRow(row)
self.devlist_tv.setColumnWidth(0, 100)
self.devlist_tv.setColumnWidth(1, 100)
self.devlist_tv.setColumnWidth(2, 130)
self.devlist_tv.setColumnWidth(3, 60)
self.devlist_tv.setColumnWidth(4, 80)
self.devlist_tv.setColumnWidth(5, 120)
self.devlist_tv.setColumnWidth(6, 60)
self.devlist_tv.setColumnWidth(7, 60)
self.devlist_tv.horizontalHeader().setStretchLastSection(True)
def _add_usb_list(self, data: dict):
self.usb_list = data
comboBox_3 = self.__ui.comboBox_3
comboBox_3.clear()
for item in self.usb_list.keys():
comboBox_3.addItem(item)
comboBox_3.addItem('取消绑定')
def _get_selected_deviceid(self):
if len(self.__ui.tableView_devlist.selectedIndexes()) <= 0:
self._debug('请先选择要操作的设备')
return ''
index = self.__ui.tableView_devlist.selectionModel().currentIndex().row()
return self.__ui.tableView_devlist.model().index(index, 2).data()
def _debug(self, log: str):
self.__ui.textEdit_log.append(
str.format('{} {}', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), log))
def _binaryEvent(self, data: bytes):
deviceid = data[:261].decode(encoding="utf-8").split('\x00')[0]
isjpg = int.from_bytes(data[261:265], "little")
img_data = data[265:]
print("收到设备{}的屏幕数据".format(deviceid))
self._show_img(img_data)
# with open(deviceid.replace(':', '') + "screenshot.bmp", 'wb') as f:
# f.write(img_data)
def _msgEvent(self, data: dict):
fun = data['fun']
status = data.get('status', 0)
if status != 0:
self._debug(str.format('调用 {} 接口失败,原因:{}', fun, data['message']))
return
else:
self._debug(str.format('调用 {} 接口成功', fun))
if fun == 'connect':
self._debug('连接成功')
self.api.get_device_list(False)
self.api.get_devicemodel_list(False)
self.api.get_usb_list(False)
elif fun == 'connect_disconnect':
self._debug('连接断开')
elif fun == 'dev_connect':
self._debug(str.format('有设备连接 {}', data['data']))
self._add_device_list({data['data']['deviceid']: data['data']}, True)
elif fun == 'dev_disconnect':
self._debug(str.format('有设备断开连接 {}', data['data']))
self._add_device_list({data['data']['deviceid']: data['data']}, True)
elif fun == 'dev_rotate':
self._debug(str.format('设备屏幕方向发生变化 {}', data['data']))
elif fun == 'mouse_collection_cfg_ret':
self._debug(str.format('鼠标参数采集返回 {}', data['data']))
elif fun == 'collection':
self._debug(str.format('手机打开采集页面回调 {}', data['data']))
elif fun == 'set_dev':
self._debug(str.format('有设备配置发生改变 {}', data['data']))
self._add_device_list({data['data']['deviceid']: data['data']}, True)
elif fun == 'get_device_list':
self._debug('获取设备列表成功')
self._add_device_list(data['data'])
elif fun == 'get_devicemodel_list':
self._debug('获取支持型号列表成功')
self._debug(data['data'])
elif fun == 'get_usb_list':
self._debug('获取usb硬件列表成功')
self._add_usb_list(data['data'])
elif fun == 'del_dev':
self._debug('删除设备{}成功'.format(data['data']['deviceid']))
self.api.get_device_list(False)
if fun == 'get_device_screenshot':
img_data = base64_to_bmp(data)
if img_data == None:
return
self._show_img(img_data)
def _on_message(self, data: dict):
fun = data['fun']
if fun != 'get_device_screenshot': # 截图返回的内容太大了就不打印
print("收到回调消息{}".format(data))
self._signal_msgEvent.emit(data) # 因为是线程操作ui,使用信号发送
def _on_binary(self, data: bytes):
print("收到二进制消息")
self._signal_binaryEvent.emit(data) # 因为是线程操作ui,使用信号发送
if __name__ == "__main__": # 主程序入口
app = QtWidgets.QApplication(sys.argv)
window = MyApp()
window.show()
app.exec()
# sys.exit(app.exec())
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。