1 Star 0 Fork 13

wudoo/ruyangmao

forked from 请叫我code哥/ruyangmao 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
start.py 10.70 KB
一键复制 编辑 原始数据 按行查看 历史
请叫我code哥 提交于 2019-10-23 15:22 . xc
# -*- encoding=utf8 -*-
__author__ = "code哥"
import subprocess
import threading
import time as py_time
import random
import shutil
import json
from airtest.core.api import *
from poco.drivers.android.uiautomation import AndroidUiautomationPoco
from apscheduler.schedulers.blocking import BlockingScheduler
from conf.app_conf import global_var_mode, APPMode
from conf.app_list import apps_info
from conf.app_conf import APP_DEV
from conn_pool import get_redis, \
REDIS_APP_EXECUTION_TIME_KEY, \
REDIS_APP_EXECUTION_CURRENT_INFO, \
REDIS_NEXT_DAY_EXECUTION_TIME, \
REDIS_SCRIPT_DEVICE_EXECUTION_HEALTH
rs = get_redis()
scheduler = BlockingScheduler()
poco = None
global_var_dic = {}
# 设置每个APP执行的大概时间区间
# 例如(50, 60):app会在50~60分钟区间随机一个时间,根据执行权重计算出一个时间,作为APP的执行时间。
if APP_DEV:
TIME_SCOPE = (5, 5)
else:
TIME_SCOPE = (50, 60)
# 程序默认执行2遍
LOOP_MAX = 2
def main_start():
# 清除上次调度器
scheduler.remove_all_jobs()
# 清除redis上一天数据
rs.flushall()
# 清除上一天的所有图片
img_path = '{path}/static/source_img'.format(path=os.path.dirname(os.path.abspath(__file__)))
if os.path.exists(img_path):
shutil.rmtree(img_path)
# 读取设备并写入
# adb_read_devices()
# 生成新的调度器
hour = str(random.randint(6, 8))
minute = str(random.randint(10, 59))
second = str(random.randint(10, 59))
print('下次程序启动时间:{hour}时{minute}分{second}秒'.format(hour=hour, minute=minute, second=second))
rs.set(REDIS_NEXT_DAY_EXECUTION_TIME,
json.dumps({'nextStartHour': hour,
'nextStartMinute': minute,
'nextStartSecond': second}))
scheduler.add_job(main_start, "cron", day_of_week='*', hour=hour, minute=minute, second=second, id='job_main')
# wifi信息
read_dic_wifi()
# 关闭应用脚本
# stop_app_phone()
# 读取设备信息
global poco
poco = AndroidUiautomationPoco()
device_info = device()
dev = connect_device("Android://127.0.0.1:5037/{uuid}".format(uuid=device_info.uuid))
# set_current(device_info.uuid)
if global_var_mode == APPMode.USB:
poco = AndroidUiautomationPoco()
else:
for key1, value1 in global_var_dic.items():
# device1 = key1.replace('rym_', '')
client_ip1 = value1.get('client', '')
port1 = value1.get('port', '')
conn_info1 = 'Android://127.0.0.1:5037/{clientIp}:{port}?cap_method=JAVACAP&&ori_method=ADBORI'.format(
clientIp=client_ip1,
port=port1)
devices1 = connect_device(conn_info1)
poco = AndroidUiautomationPoco(devices1, use_airtest_input=True, screenshot_each_action=False)
break
install_apps = []
for app in apps_info:
for item in dev.list_app():
if app.get('package', '') == item:
install_apps.append(app)
all_install_apps = []
for index in range(LOOP_MAX):
# 随机app的执行顺序
random.shuffle(install_apps)
print('app的执行顺序:%s' % ([item.get('name', '') for item in install_apps],))
time_list = []
for childIndex in range(len(install_apps)):
item = install_apps[childIndex]
name = item.get('name', '')
package = item.get('package', '')
script = item.get('script', '')
weight = item.get('weight', 1)
# 执行时间, 默认随机在20分钟~30分钟之间,并根据权重计算时间
run_time = weight * 60 * random.randint(TIME_SCOPE[0], TIME_SCOPE[1])
print('执行时间:%s分钟' % (run_time / 60,))
time_list.append({'name': name,
'package': package,
'script': script,
'time': run_time,
'weight': weight,
'creatTime': py_time.time(),
'loopIndex': index,
'childIndex': childIndex})
all_install_apps.append(time_list)
# redis存储APP执行时间和顺序
rs.set(REDIS_APP_EXECUTION_TIME_KEY, json.dumps(all_install_apps))
for loopIndex in range(len(all_install_apps)):
item_list = all_install_apps[loopIndex]
for childIndex in range(len(item_list)):
item = item_list[childIndex]
run_app_py(item, item.get('time', 0), loopIndex, childIndex)
def read_dic_wifi():
global global_var_dic
path = '{path}/conf/devices_list_wifi.conf'.format(path=os.path.dirname(os.path.abspath(__file__)))
with open(path) as f:
global_var_dic = json.loads(f.read())
f.close()
return global_var_dic
def android_devices() -> []:
devices_list = []
if global_var_mode == APPMode.USB:
# mode = 'USB'
path = '{path}/conf/devices_list.conf'.format(path=os.path.dirname(os.path.abspath(__file__)))
with open(path) as f:
for line in f.readlines():
if len(line.strip()) > 0 and len(line.replace("\r", "").replace("\n", "").replace("\r\n", "")) > 0:
devices_list.append(line.replace("\r", "").replace("\n", "").replace("\r\n", ""))
else:
# mode = 'WIFI'
read_dic_wifi()
print('wifi global_var_dic : ' + ','.join(global_var_dic))
for key, value in global_var_dic.items():
print('wifi ==>> ' + key + ':' + json.dumps(value))
devices = key.replace('rym_', '')
if len(devices) > 0:
devices_list.append(devices)
return devices_list
def run_app_py(item, run_time, loop_index, child_index):
name = item.get('name', '')
package = item.get('package', '')
script = item.get('script', '')
# 停止脚本
for device in android_devices():
stop(name, script, device)
print('执行时间:%s分钟' % (run_time / 60,))
# 执行脚本
health_list = []
for device in android_devices():
# 每启动一个脚本记录脚本信息,便于单控
print('启动设备:' + device)
info = {'name': name,
'package': package,
'script': script,
'device': device,
'health': 1}
health_list.append(info)
start(name, script, device)
sleep(10)
# 存储当前脚本执行的所有设备的健康状态
rs.set(REDIS_SCRIPT_DEVICE_EXECUTION_HEALTH, json.dumps(health_list))
# 每隔10秒执行检查
scheduler.add_job(script_job, trigger='interval', seconds=5, id='%s_task' % (script,))
# 记录执行开始、结束和执行过程时间
begin_time = py_time.time()
rs.set(REDIS_APP_EXECUTION_CURRENT_INFO,
json.dumps({'name': name,
'package': package,
'script': script,
'loopIndex': loop_index,
'childIndex': child_index,
'beginTime': begin_time,
'runTime': run_time,
'endTime': begin_time + run_time}))
# 等待执行
sleep(run_time)
scheduler.remove_job('%s_task' % (script,))
# 停止脚本
for device in android_devices():
stop(name, script, device)
def script_job():
redis_health_list = rs.get(REDIS_SCRIPT_DEVICE_EXECUTION_HEALTH)
health_list = []
if redis_health_list is not None and len(str(redis_health_list)) > 0:
health_list = json.loads(redis_health_list, encoding='utf-8')
health_list_tmp = []
for health in health_list:
name = health.get('name', '')
script = health.get('script', '')
device = health.get('device', '')
# 判断脚本是否有进程id
command = "ps -ef | grep %s.py | grep %s | grep -v grep | awk '{print $2}'" % (script, device,)
process = subprocess.Popen(command, shell=True, close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
process.wait()
res = process.stdout.read().decode('utf-8')
print('health res: %s - %s' % (device, str(res)), )
if res is None or len(str(res)) <= 0:
health['health'] = 0
stop(name, script, device)
# 重启脚本
start(name, script, device)
else:
health['health'] = 1
health_list_tmp.append(health)
# 存储当前脚本执行的所有设备的健康状态
rs.set(REDIS_SCRIPT_DEVICE_EXECUTION_HEALTH, json.dumps(health_list_tmp))
def start(name, script, device):
if global_var_mode == APPMode.USB:
mode = 'USB'
else:
mode = 'WIFI'
wifi_info = global_var_dic.get('rym_' + str(device), {})
client_ip = wifi_info.get('client', '')
port = wifi_info.get('port', '')
# 后台执行脚本
command = 'nohup python3 -u {path}/src/{script}.py {device} {mode} {clientIp} {port} >> {path}/run.log 2>&1 &'.format(
path=os.path.dirname(os.path.abspath(__file__)),
script=script,
device=device,
mode=mode,
clientIp=client_ip,
port=port,
)
print('command - {command}'.format(command=command))
execute(command, name)
def stop(name, script, device):
# 停止脚本
command = "ps -ef | grep %s.py | grep %s | grep -v grep | awk '{print $2}' | xargs kill -TERM" % (script, device,)
print('command - {command}'.format(command=command))
execute(command, name)
def stop_app_phone():
for app in apps_info:
for devices in android_devices():
stop(app.get('name', ''), app.get('script', ''), devices)
stop_app(app.get('package', ''))
def execute(command, name):
subprocess.Popen(command, shell=True)
time_str = py_time.strftime("%Y-%m-%d %H:%M:%S", py_time.localtime())
print('[{time_str}] [{name}] shell {cmd}'.format(time_str=time_str, name=name, cmd=command))
def start_scheduler():
"""
启动调度器
:return:
"""
try:
scheduler.start()
print('scheduler success')
except (KeyboardInterrupt, SystemExit) as e:
scheduler.shutdown()
print('scheduler fail {e}'.format(e=e))
def stop_scheduler():
try:
scheduler.remove_all_jobs()
scheduler.shutdown()
print('scheduler shutdown success')
except Exception as e:
print(e)
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
main_start()
def main():
MyThread().start()
# thread.join() # 阻塞子线程,待子线程结束后,再往下执行
start_scheduler()
if __name__ == '__main__':
main()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/wudoo/ruyangmao.git
git@gitee.com:wudoo/ruyangmao.git
wudoo
ruyangmao
ruyangmao
master

搜索帮助