1 Star 0 Fork 2

凹凸曼打小怪兽/wenjian

forked from banzhuanxiaodoubi/wenjian 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
handler.py 26.15 KB
一键复制 编辑 原始数据 按行查看 历史
banzhuanxiaodoubi 提交于 2022-12-02 11:20 . wenjian
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749
import os
import hashlib
import hmac
import urllib.parse
import http.client
import json
import re
import shlex
import time
from subprocess import getstatusoutput
from copy import deepcopy
from datetime import datetime
import pytz
from flask import current_app
from celery import chord
from celeryservice import celeryconfig
from messenger.utils.requests_util import query_request, create_request, update_request
from messenger.utils.response_util import RET
from celeryservice.lib import TaskAuthHandler
from celeryservice.sub_tasks import job_result_callback, run_case
from messenger.utils.pssh import ConnectionApi
from messenger.utils.shell import ShellCmdApi
class RunJob(TaskAuthHandler):
def __init__(self, body, promise, user, logger) -> None:
self._body = body
self.promise = promise
self.app_context = current_app.app_context()
super().__init__(user, logger)
self._body.update({
"status": "PENDING",
"start_time": self.start_time,
"running_time": 0,
})
@property
def pmachine_pool(self):
return query_request(
"/api/v1/accessable-machines",
{
"machine_group_id": self._body.get("machine_group_id"),
"machine_purpose": "run_job",
"machine_type": "physical",
"frame": self._body.get("frame"),
"get_object": False,
},
self.user.get("auth")
)
def _create_job(self, multiple: bool, is_suite_job: bool):
if self._body.get("id"):
del self._body["id"]
self._body.update({
"multiple": multiple,
"is_suite_job": is_suite_job,
})
job = create_request(
"/api/v1/job",
self._body,
self.user.get("auth")
)
if not job:
raise RuntimeError(
"Failed to create job:%s."
% self._body.get("name")
)
self._body.update(job)
self._body.pop("milestone")
def _update_job(self, **kwargs):
self.next_period()
self._body.update({
"running_time": self.running_time,
**kwargs,
})
update_body = deepcopy(self._body)
update_body.pop("id")
if isinstance(update_body.get("master"), list):
update_body.update(
{
"master": ','.join(update_body.get("master"))
}
)
update_request(
"/api/v1/job/{}".format(
self._body.get("id")
),
update_body,
self.user.get("auth")
)
class RunSuite(RunJob):
def run(self):
self._create_job(multiple=False, is_suite_job=True)
try:
suite = query_request(
"/api/v1/suite/{}".format(
self._body.get("suite_id")
),
None,
self.user.get("auth")
)
if not suite:
raise RuntimeError(
"test suite of id: {} does not exist, please check testcase repo.".format(
self._body.get("suite_id"))
)
cases = query_request(
"/api/v1/case/preciseget",
{
"suite_id": suite.get("id"),
"automatic": 1,
"usabled": 1,
},
self.user.get("auth")
)
if not cases:
raise RuntimeError(
"the automatical and usabled testcases of suite {} do not exits".format(suite.get("name"))
)
env_params = {
"machine_type": suite.get("machine_type"),
"machine_num": suite.get("machine_num"),
"add_network_interface": suite.get("add_network_interface"),
"add_disk": suite.get("add_disk"),
}
suites_cases = [
(
suite.get("name"),
case.get("name")
) for case in cases
]
_task = run_case.delay(
self.user,
self._body,
env_params,
suites_cases,
self.pmachine_pool,
)
self._update_job(tid=_task.task_id)
except RuntimeError as e:
self.logger.error(str(e))
self._update_job(
result="fail",
remark=str(e),
end_time=datetime.now(tz=pytz.timezone('Asia/Shanghai')),
status="BLOCK",
)
class RunTemplate(RunJob):
def __init__(self, body, promise, user, logger) -> None:
super().__init__(body, promise, user, logger)
self._template = query_request(
"/api/v1/template/{}".format(
self._body.get("template_id")
),
None,
self.user.get("auth")
)
if not self._template:
raise RuntimeError(
"template with id {} is not exist".format(
self._body.get("template_id")
)
)
_git_repo = self._template.get("git_repo")
self._body.update({
"milestone_id": self._template.get("milestone_id"),
"git_repo_id": _git_repo.get("id") if _git_repo else None,
"total": len(self._template.get("cases")),
"success_cases": 0,
"fail_cases": 0,
})
def run(self):
self._create_job(multiple=True, is_suite_job=False)
try:
if self._body.get("id") and self._body.get("taskmilestone_id"):
resp = self._callback_task_job_init()
if resp.get("error_code") != RET.OK:
self.logger.warn(
"cannot callback job_id to taskmilestone table: " + resp.get("error_msg")
)
self._update_job(
status="CLASSIFYING",
)
classify_cases = self._sort()
tasks = []
for cases in classify_cases:
env_params = {
"machine_type": cases.get("type"),
"machine_num": cases.get("machine"),
"add_network_interface": cases.get("network"),
"add_disk": cases.get("disk"),
}
tasks.append(
run_case.s(
self.user,
self._body,
env_params,
cases.get("suites_cases"),
self.pmachine_pool,
)
)
chord_task = chord(tasks)(
job_result_callback.s(
auth=self.user.get("auth"),
job_id=self._body.get("id"),
taskmilestone_id=self._body.get("taskmilestone_id")
)
)
self._update_job(
status="RUNNING",
tid=chord_task.task_id,
)
except RuntimeError as e:
self.logger.error(str(e))
self._update_job(
result="fail",
remark=str(e),
end_time=datetime.now(tz=pytz.timezone('Asia/Shanghai')),
status="BLOCK",
)
def _callback_task_job_init(self):
return update_request(
"/api/v1/task/milestones/{}".format(
self._body.get("taskmilestone_id")
),
{
"job_id": self._body.get("id"),
},
self.user.get("auth")
)
def _sort(self):
cases = self._template.get("cases")
if not cases:
raise RuntimeError(
"template {} has no relative cases.".format(
self._template.get("name")
)
)
machine_type = set()
machine_num = set()
add_network = set()
add_disk = set()
for case in cases:
machine_type.add(case.get("machine_type"))
machine_num.add(case.get("machine_num"))
add_network.add(case.get("add_network_interface"))
add_disk.add(case.get("add_disk"))
classify_cases = []
for m_type in machine_type:
for machine in machine_num:
for network in add_network:
for disk in add_disk:
cs = {}
cl = []
for case in cases:
if (
case["machine_num"] == machine
and case["add_network_interface"] == network
and case["add_disk"] == disk
):
cl.append([case["suite"], case["name"]])
if cl:
cs["type"] = m_type
cs["machine"] = machine
cs["network"] = network
cs["disk"] = disk
cs["suites_cases"] = cl
classify_cases.append(cs)
return classify_cases
class RunAt(RunJob):
def __init__(self, body, promise, user, logger) -> None:
super().__init__(body, promise, user, logger)
self._pxe_repo_path = self._body.get("pxe_repo_path")
self._pxe_tftpboot_path = self._body.get("pxe_tftpboot_path")
self._pxe_efi_path = self._body.get("pxe_efi_path")
self._con = ConnectionApi(
ip=celeryconfig.pxe_ip,
user=celeryconfig.pxe_ssh_user,
port=celeryconfig.pxe_ssh_port,
pkey=celeryconfig.pxe_pkey
)
self._con_openqa = ConnectionApi(
ip=self._body.get("ip"),
port=self._body.get("port"),
user=self._body.get("user"),
passwd=self._body.get("password"),
)
exitcode, self._release_note = getstatusoutput("curl {}".format(self._body.get("release_url")))
if exitcode:
self.logger.info("release url is unreachable")
raise RuntimeError(
"at release_url is not correct,please check it"
)
self._release_path = self._release_note.split("dailybuild")[1].replace("//", "")
self._iso_path = os.path.join(celeryconfig.iso_local_path, self._release_path)
self._iso_url = os.path.join(celeryconfig.iso_local_path, self._release_path)
self._iso_name = self._release_note.split("/")[-1]
self._release_date = re.findall(r'\d+-\d+-\d+-\d+-\d+-\d+', self._release_path.split("/")[1])[0]
self._release_path_item = self._release_path.split("/")
def _download(self, path, source_url, remote_sha256_file, local_name):
ShellCmdApi(
"cd {} && wget -c {} && wget -c {}".format(path, remote_sha256_file, source_url),
self._con
).exec()
for times in range(int(celeryconfig.waiting_donwload_time)):
time.sleep(30)
exitcode, remote_sha256 = ShellCmdApi(
"sha256sum %s | awk '{print $1}'" % local_name + ".sha256sum",
self._con
).exec()
exitcode, local_sha256 = ShellCmdApi(
"sha256sum %s | awk '{print $1}'" % local_name,
self._con
).exec()
if local_sha256 == remote_sha256:
ShellCmdApi(
"chmod 777 -R {}".format(shlex.quote(path)),
self._con
).exec()
break
else:
continue
def _wget_source(self):
# no matter if exists before
home_path = os.path.join(
celeryconfig.iso_local_path, self._release_path_item[0], self._release_path_item[1], ""
)
url_home_path = os.path.join(
celeryconfig.source_iso_addr, self._release_path_item[0], self._release_path_item[1]
)
ShellCmdApi(
"rm -rf {}*".format(shlex.quote(home_path)),
self._con
).exec()
if "Desktop" in self._release_path:
self.logger.info("desktop iso prepare begin")
workstation_path = os.path.join(home_path, "workstation", self._body.get("frame"))
ShellCmdApi(
"mkdir -p {}".format(shlex.quote(workstation_path)),
self._con
).exec()
self._download(
workstation_path,
os.path.join(celeryconfig.source_iso_addr, self._release_path),
os.path.join(celeryconfig.source_iso_addr, self._release_path + ".sha256sum"),
os.path.join(celeryconfig.iso_local_path, self._release_path)
)
elif "netinst" in self._release_path:
self.logger.info("net iso prepare")
pass
else:
self.logger.info("standard iso prepare begin")
iso_path = os.path.join(home_path, "ISO", self._body.get("frame"))
virt_path = os.path.join(home_path, "virtual_machine_img", self._body.get("frame"))
virt_url = os.path.join(
url_home_path,
"virtual_machine_img",
self._body.get("frame"),
self._iso_name.replace("-dvd.iso", ".qcow2.xz")
)
docker_path = os.path.join(home_path, "docker_img", self._body.get("frame"))
docker_url = os.path.join(
url_home_path,
"docker_img",
self._body.get("frame"),
"openEuler-docker." + self._body.get("frame") + ".tar.xz"
)
stratovirt_path = os.path.join(home_path, "stratovirt_img", self._body.get("frame"))
strat_bin_url = os.path.join(
url_home_path,
"stratovirt_img",
self._body.get("frame"),
"vmlinux.bin"
)
strat_img_url = os.path.join(
url_home_path,
"stratovirt_img",
self._body.get("frame"),
self._body.get("product") + "-stratovirt-" + self._body.get("frame") + ".img.xz"
)
ShellCmdApi(
"mkdir -p {} && mkdir -p {} && mkdir -p {} && mkdir -p {}".format(
shlex.quote(iso_path),
shlex.quote(virt_path),
shlex.quote(docker_path),
shlex.quote(stratovirt_path)
),
self._con
).exec()
self._download(
iso_path,
os.path.join(celeryconfig.source_iso_addr, self._release_path),
os.path.join(celeryconfig.source_iso_addr, self._release_path + ".sha256sum"),
os.path.join(iso_path, self._iso_name)
)
self._download(
virt_path,
virt_url,
virt_url + ".sha256sum",
os.path.join(virt_path, self._iso_name.replace("-dvd.iso", ".qcow2.xz"))
)
self._download(
docker_path,
docker_url,
docker_url + ".sha256sum",
os.path.join(docker_path, "openEuler-docker." + self._body.get("frame") + ".tar.xz")
)
self._download(
stratovirt_path,
strat_bin_url,
strat_bin_url + ".sha256sum",
os.path.join(stratovirt_path, "vmlinux.bin")
)
self._download(
stratovirt_path,
strat_img_url,
strat_img_url + ".sha256sum",
os.path.join(stratovirt_path,
self._body.get("product") + "-stratovirt-" + self._body.get("frame") + ".img.xz")
)
def _pxe_mugen_docker(self):
docker_name = "openEuler-docker." + self._body.get("frame") + ".tar.xz"
at_docker_path = os.path.join(
celeryconfig.mugen_path_docker,
self._body.get("product"),
docker_name
)
local_docker_path = os.path.join(
celeryconfig.iso_local_path,
self._release_path_item[0],
self._release_path_item[1],
"docker_img",
self._body.get("frame"),
docker_name
)
ShellCmdApi(
"rm -rf {} && cp -a {} {} && chmod 755 {} -R".format(
shlex.quote(at_docker_path),
shlex.quote(local_docker_path),
shlex.quote(os.path.dirname(at_docker_path)),
shlex.quote(celeryconfig.mugen_path_docker)
),
self._con
).exec()
def _pxe_mugen_stratovirt(self):
at_straovirt_path = os.path.join(
celeryconfig.mugen_path_stra,
self._body.get("product"),
self._body.get("frame"),
""
)
local_straovirt_path = os.path.join(
celeryconfig.iso_local_path,
self._release_path_item[0],
self._release_path_item[1],
"stratovirt_img",
self._body.get("frame"),
""
)
ShellCmdApi(
"rm -rf {}* && cp -raf {} {} && chmod 755 {} -R".format(
shlex.quote(at_straovirt_path),
shlex.quote(local_straovirt_path),
shlex.quote(os.path.dirname(at_straovirt_path)),
shlex.quote(at_straovirt_path)
),
self._con
).exec()
def _pxe_release_iso(self):
if self._body.get("frame") == "x86_64":
pxe_frame = "openeuler_X86"
elif self._body.get("frame") == "aarch64":
pxe_frame = "openeuler_ARM64"
else:
raise RuntimeError("unknown frame is set")
if "netinst" in self._body.get("product"):
pxe_release_iso = "pxe_release_netinst"
else:
pxe_release_iso = "pxe_release_iso"
ShellCmdApi(
"echo {} > {}".format(
shlex.quote(celeryconfig.iso_web_addr + self._release_note.split("dailybuild")[1]),
shlex.quote(
os.path.join(
"/var/www/html/pxe_release_iso",
self._body.get("product"),
pxe_frame,
pxe_release_iso
)
)
),
self._con
).exec()
def _pxe_repo(self):
mount_dir = os.path.join(self._pxe_repo_path, "mnt")
iso_path = os.path.join(celeryconfig.iso_local_path, self._release_path)
ShellCmdApi(
"""umount {} && rm -rf {} && mkdir -p {} && cp {} {}
&& mount {} {} && sleep 3 && cp -r {} {} && umount {} && chmod 777 {} -R""".format(
mount_dir,
self._pxe_repo_path,
mount_dir,
iso_path,
os.path.join(self._pxe_repo_path, ""),
os.path.join(self._pxe_repo_path, self._iso_name),
mount_dir,
mount_dir,
os.path.join(self._pxe_repo_path, "latest"),
mount_dir,
self._pxe_repo_path
),
self._con
).exec()
def _pxe_tftpboot(self):
ShellCmdApi(
"rm -rf {} && mkdir -p {} && cp -r {} {} && chmod 777 {} -R && cp -ar {} {}".format(
shlex.quote(self._pxe_tftpboot_path),
shlex.quote(
os.path.join(
self._pxe_tftpboot_path,
"latest"
)
),
shlex.quote(
os.path.join(self._pxe_repo_path, "latest/images")
),
shlex.quote(
os.path.join(
self._pxe_tftpboot_path,
"latest",
""
)
),
shlex.quote(
self._pxe_tftpboot_path
),
shlex.quote(
os.path.join(
self._pxe_repo_path,
"latest/EFI/BOOT/grub*efi"
)
),
shlex.quote(
os.path.join(
self._pxe_efi_path,
""
)
)
),
self._con
).exec()
def _update_headers(self):
headers = {}
headers['Content-type'] = 'application/x-www-form-urlencoded',
headers['Accept'] = 'application/json'
headers['X-API-Key'] = celeryconfig.api_key
timestamp = time.time()
api_hash = hmac.new(celeryconfig.api_secret.encode(),
'{0}{1}'.format(celeryconfig.at_request_url, timestamp).encode(),
hashlib.sha1)
headers['X-API-Microtime'] = str(timestamp).encode()
headers['X-API-Hash'] = api_hash.hexdigest()
return headers
def _start_template(self, template, **kwargs):
server = self._body.get("ip")
port = 80
url = celeryconfig.at_request_url
form = {}
form['TEST'] = template
for k, w in kwargs.items():
form[k] = w
arr = form['release'].split('-')
if len(arr) == 4:
form['DISTRI'] = arr[0]
form['VERSION'] = arr[1]
form['ARCH'] = arr[2]
form['FLAVOR'] = arr[3]
elif len(arr) == 5:
form['DISTRI'] = arr[0]
form['VERSION'] = arr[1] + "-" + arr[2]
form['ARCH'] = arr[3]
form['FLAVOR'] = arr[4]
elif len(arr) == 6:
form['DISTRI'] = arr[0]
form['VERSION'] = arr[1] + "-" + arr[2] + "-" + arr[3]
form['ARCH'] = arr[4]
form['FLAVOR'] = arr[5]
elif len(arr) == 7:
form['DISTRI'] = arr[0]
form['VERSION'] = arr[1] + "-" + arr[2] + "-" + arr[3] + "-" + arr[4]
form['ARCH'] = arr[5]
form['FLAVOR'] = arr[6]
else:
form['DISTRI'] = 'openeuler'
form['VERSION'] = '1.0'
form['ARCH'] = 'aarch64'
form['FLAVOR'] = 'dvd'
ids = None
try:
headers = self._update_headers()
params = urllib.parse.urlencode(form)
con = http.client.HTTPConnection(server, port, timeout=10)
con.request('POST', url, params, headers=headers)
r = con.getresponse()
self.logger.info('openqa response:{} {}'.format(r.status, r.reason))
r_str = r.read()
r_dict = json.loads(r_str)
ids = r_dict.get("ids")
except http.client.HTTPConnection as e:
self.logger.error(e)
return ids
def run(self):
try:
iso_home_path = os.path.join(celeryconfig.iso_local_path, self._release_path.split("/")[0])
exitcode, local_date = ShellCmdApi(
"ls %s | grep '^openeuler-' | tail -n 1 | awk -F 'openeuler-' '{print $2}'"
% iso_home_path, self._con
).exec()
if local_date != "" and self._release_date < local_date:
raise RuntimeError("release date is old,don't run at")
exitcode, output = ShellCmdApi(
"mkdir -p {} {} {}".format(
shlex.quote(self._pxe_repo_path),
shlex.quote(self._pxe_tftpboot_path),
shlex.quote(self._pxe_efi_path)
),
self._con
).exec()
if exitcode:
raise RuntimeError("prepare pxe dir is failed")
self._wget_source()
self._pxe_mugen_docker()
self._pxe_mugen_stratovirt()
self._pxe_release_iso()
self._pxe_repo()
self._pxe_tftpboot()
self.logger.info("pxe environment is already prepare finished")
exitcode, out = ShellCmdApi(
"rm -rf {} && wget -c {} -O {}".format(
os.path.join(celeryconfig.at_iso_dir, self._iso_name),
os.path.join(celeryconfig.iso_web_addr, self._release_path),
os.path.join(celeryconfig.at_iso_dir, self._iso_name)
),
self._con_openqa
).exec()
if not exitcode:
self.logger.info("openqa already download iso {}".format(self._iso_name))
if "netinst" not in self._iso_name and "Desktop" not in self._iso_name:
exitcode, out = ShellCmdApi(
"rm -rf {}* && wget -c {} -O {} && xz -d {}".format(
os.path.join(celeryconfig.at_qcow2_dir, self._iso_name.replace("-dvd.iso", ".qcow2")),
os.path.join(
celeryconfig.iso_web_addr,
self._release_path_item[0],
self._release_path_item[1],
"virtual_machine_img",
self._body.get("frame"),
self._iso_name.replace("-dvd.iso", ".qcow2.xz")
),
os.path.join(celeryconfig.at_iso_dir, self._iso_name.replace("-dvd.iso", ".qcow2.xz")),
os.path.join(celeryconfig.at_iso_dir, self._iso_name.replace("-dvd.iso", ".qcow2.xz"))
),
self._con_openqa
).exec()
if not exitcode:
self.logger.info("openqa already download qcow2 {}".format(self._iso_name))
ShellCmdApi(
"chmod 777 /var/lib/openqa/share/factory -R;chown geekotest:geekotest /var/lib/openqa/share/factory -R",
self._con_openqa
).exec()
ids = self._start_template('base_test', release=self._iso_name, build=self._release_date)
except RuntimeError as e:
self.logger.error(str(e))
self._update_job(
result="fail",
remark=str(e),
end_time=datetime.now(tz=pytz.timezone('Asia/Shanghai')),
status="BLOCK",
)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/concave-convex-manda/wenjian.git
git@gitee.com:concave-convex-manda/wenjian.git
concave-convex-manda
wenjian
wenjian
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385