代码拉取完成,页面将自动刷新
# -*- coding:utf-8 *-
# 项目 : all_auto
# 文件 : my_page_fabu.py
# 作者 : zhangchen
# 时间 : 2021/5/28 3:22 下午
# IDE : PyCharm
import platform
import time
import paramiko
from filelock import FileLock
import drivers.android
import os
import requests
import pytest
from lib.loggers import log
'''
全局配置,与case配合使用,每次case运行,返回首页即可运行下一个模块
需要第一条case运行完成后返回到主页
注:此文件不要随意更改,共享文件
'''
def pytest_collection_modifyitems(items):
for item in items:
item.name = item.name.encode("utf-8").decode("unicode_escape")
item._nodeid = item.nodeid.encode("utf-8").decode("unicode_escape")
# def _capture_screenshot(nodeid):
# '''截图保存为base64'''
# scrname = nodeid[nodeid.index('[')+1:nodeid.index(']')]
# global driver
# driver.screenshot('report/' + scrname + '.png')
# return scrname + '.png'
#初始化设备
@pytest.fixture(scope="session", autouse=True)
def driver(tmp_path_factory, worker_id):
devices_list = drivers.android.devices()
if not devices_list:
raise
global device
if worker_id == "master":
log.info('单台测试')
device = drivers.android.device_android(devices_list[0])
return device
root_tmp_dir = tmp_path_factory.getbasetemp().parent
fn = root_tmp_dir / "devices.json"
log.info(type(fn))
log.info(fn)
log.info(fn.is_file())
log.info('fn.is_file()')
with FileLock(str(fn) + ".lock"):
log.info(fn.is_file())
if fn.is_file():
devices_ls = fn.read_text()
if not devices_ls:
return Exception('No Device')
device_serial, devices_l = _devices_list_pop(devices_ls)
device = drivers.android.device_android(device_serial)
fn.write_text(
str(devices_l)
)
else:
device = drivers.android.device_android(devices_list[0])
devices_list.pop(0)
fn.write_text(str(devices_list))
log.info(device)
return device
def _devices_list_pop(dl):
d = dl[1:-1].replace("'", '').strip().split(', ')
ds = d[0]
d.pop(0)
return ds, d
class Failed:
# 跳过用例
skip = False
def sub_report(data):
"""通过接口传给服务器"""
print(11111111111111)
print(data)
ret = requests.post("http://qa-api.3imx.cn/ci/sub_report", json=data)
def getCurrPwd():
return os.path.abspath(os.path.abspath("."))
def write_sh(time_new):
'''写入bash文件,传送给92机器'''
if platform.system().lower() == 'windows':
t = paramiko.Transport(('192.168.2.98', 22))
t.connect(username='root', password='360che')
sftp = paramiko.SFTPClient.from_transport(t)
sftp.put(r'%s/report/report.html' % (getCurrPwd()),
'/data/static/qa.kcimg.cn/app_report/index_%s.html' % (time_new))
sftp.close()
elif platform.system().lower() == 'linux':
with open("sendreport.sh", "w") as f:
f.write(r"#!/bin/sh")
f.write("\r\n")
f.write("sshpass -p '360che' scp -r %s/report/report root@192.168.2.98:"
"/data/static/qa.kcimg.cn/app_report" % (getCurrPwd()))
time.sleep(3)
os.system("sendreport.sh")
def red_conf():
"""读取本地文件获取需要的参数"""
a = open("conf.ini", 'r', encoding="gbk")
for i in a.readlines():
if "=" in i:
if "build_id" in i:
build_id = i.split("=")[-1]
if "case_id" in i:
case_id = i.split("=")[-1]
if "task_id" in i:
task_id = i.split("=")[-1]
if "time_new" in i:
time_new = i.split("=")[-1]
return task_id.replace("\n", ""), build_id.replace("\n", ""), case_id.replace("\n", ""), time_new
# @pytest.hookimpl(hookwrapper=True)
# def pytest_terminal_summary(terminalreporter, exitstatus, config):
# '''收集测试结果'''
# result = {}
# result["sum"] = terminalreporter._numcollected
# result["pass"] = len(terminalreporter.stats.get('passed', []))
# result['fail'] = len(terminalreporter.stats.get('failed', []))
# result['error'] = len(terminalreporter.stats.get('error', []))
# if len(terminalreporter.stats.get('passed', [])) == 0:
# result['passrate'] = '0.00%'
# else:
# result['passrate'] = '通过率:%.2f' % (
# len(terminalreporter.stats.get('passed', [])) / terminalreporter._numcollected * 100) + '%'
# # 调用red_conf中的参数
# task_id, build_id, case_id, time_new = red_conf()
# info = "http://qa-static.3imx.cn/app_report/index_%s.html" % time_new
# data = {
# "task_id": task_id,
# "build_id": build_id,
# "case_id": case_id,
# "type": "2",
# "total": result["sum"],
# "succ": result["pass"],
# "fail": result["fail"],
# "err": result["error"],
# "info": info
# }
# sub_report(data)
#
# return result
@pytest.hookimpl(hookwrapper=True)
def pytest_terminal_summary(terminalreporter): # type: (TerminalReporter) -> generator
yield
pass_count = 0
fail_count = 0
error_count = 0
for error in terminalreporter.stats.get('error', []): # type: TestReport
error_count +=1
for passed in terminalreporter.stats.get('passed', []): # type: TestReport
pass_count +=1
for failed in terminalreporter.stats.get('failed', []): # type: TestReport
fail_count +=1
sum = pass_count+fail_count+error_count
result = {}
result["sum"] = sum
result["pass"] = pass_count
result['fail'] = fail_count
result['error'] = error_count
if pass_count == 0:
result['passrate'] = '0.00%'
else:
result['passrate'] = '通过率:%.2f' % (pass_count / sum * 100) + '%'
print(result)
times = time.time()
task_id, build_id, case_id, time_new = red_conf()
info = "http://qa-static.3imx.cn/app_report/index_%s.html" % str(times)
data = {
"task_id": task_id,
"build_id": build_id,
"case_id": case_id,
"type": "2",
"total": result["sum"],
"succ": result["pass"],
"fail": result["fail"],
"err": result["error"],
"info": info
}
sub_report(data)
return result
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。