1 Star 0 Fork 10

Ink/TTEmulatorPortFinder

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
emulator_port.py 7.60 KB
一键复制 编辑 原始数据 按行查看 历史
tp7309 提交于 2018-08-29 19:46 . update search policy for multidevice
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import json
from src.utils import ttutil
from src.utils import ioutil
import gettext
import psutil
import webbrowser
import copy
# lang_zh_CN = gettext.translation('emulator_port', 'src/locale', ['zh_CN'])
# lang_zh_CN.install()
# _ = lang_zh_CN.gettext
domains = ['emulator_port']
languageDir = os.path.abspath('src/locale')
for domain in domains:
gettext.bindtextdomain(domain, languageDir)
gettext.textdomain(domain)
_ = gettext.gettext
_CONFIG_PATH = os.path.abspath(
os.path.join(os.path.abspath(__file__), os.path.pardir, 'config.json'))
_FEEDBACK_URL = 'https://gitee.com/tp7309/TTEmulatorPortFinder/issues'
_OFFICIAL_EMULATOR = 'official_emulator'
_EMULATORS = {}
class Emulator(object):
def __init__(self, d):
self.type = d['type']
self.name = d['name']
# handle with different emulators with same process name
self.unique_id = d.get('unique_id')
self.process_name = d['process_name']
self.default_ports = d['default_ports']
self.re_port = d.get('re_port')
self.desc = d.get('desc')
# 2: never tips
# 1: tip when unknown emulator found
# 0: tip when known emulator port change and unknown emulator found
self.upload_tip_level = \
d.get('upload_tip_level') if d.get('upload_tip_level') else 0
def get_processes(emulator):
return [x for x in psutil.process_iter()
if emulator.unique_id == get_process_unique_id(x)]
def get_ports(emulator):
ps = get_processes(emulator)
print(
_("{name}({type}) processes:").format(
name=emulator.name, type=emulator.type))
print(ps)
ports = []
if len(ps) >= 1 and len(ps) <= len(emulator.default_ports):
for default_port in emulator.default_ports:
print(default_port)
result = ttutil.check_adb_connectable_by_port(
default_port, auto_disconnect=False)
if result:
ports.append(default_port)
if ports:
return ports
for p in ps:
connections = [
x.laddr for x in p.connections() if x.status == psutil.CONN_LISTEN
]
print(connections)
if emulator.re_port:
ports += [
int(x.port) for x in connections
if re.match(emulator.re_port, str(x.port)) and x.port > 2000
]
else:
ports += [int(x.port) for x in connections if x.port > 2000]
ports = ttutil.check_adb_connectable_by_ports(ports)
return ports
def read_config(src=_CONFIG_PATH):
with open(src, 'r', encoding='utf-8') as f:
config = json.load(f)
for e in config['emulators']:
_EMULATORS[e['unique_id']] = Emulator(e)
def write_config(config, dest=_CONFIG_PATH):
with open(dest, 'w', encoding='utf-8') as f:
json.dump(config, f, sort_keys=True, indent=4, ensure_ascii=False)
def update_config(changed_emulators, dest=_CONFIG_PATH):
if not changed_emulators:
return
with open(dest, 'r', encoding='utf-8') as f:
config = json.load(f)
for e in changed_emulators:
pr = ttutil.props(e)
old_emulators = config['emulators']
if pr['unique_id'] in _EMULATORS:
for index, old_e in enumerate(old_emulators):
# keep order
if old_e['unique_id'] == pr['unique_id']:
old_emulators[index] = pr
break
else:
config['emulators'].append(pr)
write_config(config, dest=dest)
def is_known_port_changed(emulator, new_ports):
if not emulator or emulator.upload_tip_level >= 1:
return False
old_ports = emulator.default_ports
if not old_ports:
old_ports = []
if not new_ports:
new_ports = []
if len(new_ports) > len(old_ports):
for port in old_ports:
if port not in new_ports:
return True
else:
for port in new_ports:
if port not in old_ports:
return True
return False
def get_process_unique_id(p):
relative_path = None
try:
# read system process propertity may throw an exception.
path = os.path.dirname(p.exe())
path = os.path.basename(os.path.dirname(path)) + \
'/' + os.path.basename(path)
relative_path = path
except (PermissionError, psutil.AccessDenied):
pass
return "%s/%s" % (relative_path, p.name())
def check_known_emulators():
emulators = {}
filtered_emulator = []
ischanged = False
for p in psutil.process_iter():
unique_id = get_process_unique_id(p)
if unique_id in filtered_emulator:
continue
filtered_emulator.append(unique_id)
for t, e in _EMULATORS.items():
if e.unique_id == unique_id:
print('..............................')
result = get_ports(e)
if result:
if not emulators.get(e.unique_id):
emulators[e.unique_id] = copy.deepcopy(e)
emulators[e.unique_id].default_ports = result
if not ischanged:
ischanged = is_known_port_changed(
e, emulators[e.unique_id].default_ports)
return ischanged, emulators.values()
def check_unknown_emulators():
emulator_map = {}
for p in psutil.process_iter():
unique_id = get_process_unique_id(p)
existed_emulator = _EMULATORS.get(unique_id)
connections = [
x.laddr for x in p.connections() if x.status == psutil.CONN_LISTEN
]
for conn in connections:
if conn.port > 2000 and ttutil.check_adb_connectable_by_port(
conn.port):
if not emulator_map.get(unique_id):
if not existed_emulator:
args = {
'type': p.name(),
'name': p.name(),
'unique_id': get_process_unique_id(p),
'process_name': p.name(),
'default_ports': [conn.port]
}
e = Emulator(args)
else:
e = existed_emulator
emulator_map[e.unique_id] = e
if conn.port not in emulator_map[e.unique_id].default_ports:
emulator_map[e.unique_id].default_ports.append(
conn.port)
return emulator_map.values()
def dump(emulators):
for e in emulators:
print(json.dumps(e.__dict__))
def main():
read_config()
ischanged, result = check_known_emulators()
if ttutil.check_has_connected_devices():
print('\n')
print(_("one active emulator found"))
ttutil.sh('adb devices')
if result:
print('\n')
print(_("emulator has been found!"))
dump(result)
search_all = ioutil.read_input('continue search(y/n)?', timeout=5,)
if not search_all == 'y':
print('wait timeout, exit')
return
if not result:
print(_('no known emulator found, try to find new emulator'))
result = check_unknown_emulators()
if len(result) <= 0:
print(_("no adb connectable emulator found"))
return
dump(result)
# update config
update_config(result)
print(
_("new emulator config has been found! please paste '{config}' content to\n{url} by new issue!"
).format(config=_CONFIG_PATH, url=_FEEDBACK_URL))
webbrowser.open_new_tab(_FEEDBACK_URL)
if __name__ == '__main__':
main()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/inkdevelopercatalog/TTEmulatorPortFinder.git
git@gitee.com:inkdevelopercatalog/TTEmulatorPortFinder.git
inkdevelopercatalog
TTEmulatorPortFinder
TTEmulatorPortFinder
master

搜索帮助