From b00e4111e007b882e65da63a06949bae8537e71c Mon Sep 17 00:00:00 2001 From: Cao Xueliang Date: Mon, 28 Oct 2024 16:15:08 +0800 Subject: [PATCH] scheduler optimizes the ss function job.yaml: suite: test-pkgbuild testbox: vm-2p8g os: openeuler os_version: 22.03-lts-sp4 os_arch: aarch64 host-info: ss: linux: fork: linux-next commit: 8d387a5f172f26ff8c76096d5876b881dec6b7ce config: /home/xxx/yaml/build-linux/kconfig testbox: dc-32g the job.yaml will split two jobs by scheduler, host-info job and pkgbuild job, pkgbuild job will build linux repo by ss field, host-info job will use the pkgbuild products. Signed-off-by: Cao Xueliang --- src/lib/init_ready_queues.cr | 27 +++++++++++++-- src/lib/job.cr | 1 + src/scheduler/close_job.cr | 17 ++++++++++ src/scheduler/plugins/pkgbuild.cr | 55 ++++++++++++------------------- 4 files changed, 64 insertions(+), 36 deletions(-) diff --git a/src/lib/init_ready_queues.cr b/src/lib/init_ready_queues.cr index fcfc8d663..2aea3d815 100644 --- a/src/lib/init_ready_queues.cr +++ b/src/lib/init_ready_queues.cr @@ -95,6 +95,7 @@ class InitReadyQueues # {"local-test-2" => {"host_name" => "test-2", "is_remote" => "false"}, # "remote-test-1" => {"host_name" => "test-1", "is_remote" => "true"}}} all_keys = @redis.scan_each("/tbox/*") + #all_keys = ["/tbox/vm/local-caoxl"] @all_tbox = Hash(String, Hash(String, Hash(String, String))).new @log.info("init_common_tbox_from_redis #{all_keys}") TBOX_TYPES.each do |type| @@ -257,7 +258,7 @@ class InitReadyQueues "aggs": { "sorted_by_submit_time": { "top_hits": { - "_source": ["id", "os_arch", "my_account","os_project", "spec_file_name", "memory_minimum", "build_type", "max_duration", "use_remote_tbox"], + "_source": ["id", "os_arch", "my_account","os_project", "spec_file_name", "memory_minimum", "build_type", "max_duration", "use_remote_tbox", "ss_wait_jobs"], "sort": [ { "submit_time": { "order": "desc" } } ], @@ -288,7 +289,16 @@ class InitReadyQueues vals = my_account["sorted_by_submit_time"]["hits"]["hits"] vals.as_a.each do |val| _aggs[key] = [] of JSON::Any unless _aggs[key]? - _aggs[key] << val["_source"] + tmp_source = val["_source"].as_h + if tmp_source.has_key?("ss_wait_jobs") + filter_state = filter_by_ss(tmp_source) + if filter_state == 1 + _aggs[key] << val["_source"] + end + next + else + _aggs[key] << val["_source"] + end end end _item_arch["jobs"] = _aggs @@ -298,6 +308,19 @@ class InitReadyQueues return ret_aggs end + private def filter_by_ss(source) + ss_wait_jobs = source["ss_wait_jobs"].as_h + ss_wait_jobs_values = ss_wait_jobs.values + all_finished = ss_wait_jobs_values.all?{|value| value == "finished"} + return 1 if all_finished + + arr_failed = ["oom", "abnormal", "failed", "incomplete"] + include_failed = arr_failed.any?{|value| ss_wait_jobs_values.includes?(value)} + return -1 if include_failed + + return 0 + end + private def set_priority_weight(aggs) arr = Array(Hash(String, String)).new if aggs.is_a?(String) diff --git a/src/lib/job.cr b/src/lib/job.cr index 262117845..8ca2a489f 100644 --- a/src/lib/job.cr +++ b/src/lib/job.cr @@ -436,6 +436,7 @@ class JobHash install_os_packages boot_params on_fail + ss_wait_jobs waited hw vt diff --git a/src/scheduler/close_job.cr b/src/scheduler/close_job.cr index 6ac240045..41d193891 100644 --- a/src/scheduler/close_job.cr +++ b/src/scheduler/close_job.cr @@ -23,6 +23,21 @@ class Sched end end + def update_wait_job_by_ss(job) + job_waited = job.waited? + return unless job_waited + + job_waited.not_nil!.each do |k, v| + k_job = @es.get_job(k) + next unless k_job + next unless k_job.ss_wait_jobs? + + k_job.ss_wait_jobs.not_nil!.merge!({job.id => job.job_health}) + k_job.job_health = job.job_health if job.job_health != "success" + @es.set_job(k_job) + end + end + def close_job job_id = @env.params.query["job_id"]? mem = @env.params.query["mem"]? @@ -84,6 +99,8 @@ class Sched end set_job2watch(job, "close", job.job_health) + update_wait_job_by_ss(job) + response = @es.set_job(job) if response["_id"] == nil diff --git a/src/scheduler/plugins/pkgbuild.cr b/src/scheduler/plugins/pkgbuild.cr index 2fc504267..6329004c3 100644 --- a/src/scheduler/plugins/pkgbuild.cr +++ b/src/scheduler/plugins/pkgbuild.cr @@ -15,7 +15,7 @@ class PkgBuild < PluginsCommon # job id has been init in init_job_id function wait_id = job.id # add job to wait queue for waited job update current - wait_queue = add_job2queue(job) + job.added_by = ["pkgbuild"] # ss struct: # ss: @@ -23,6 +23,7 @@ class PkgBuild < PluginsCommon # commit: xxx # mysql: # commit: xxx + ss_wait_jobs = {} of String => String ss.each do |pkg_name, pkg_params| pbp = init_pkgbuild_params(job, pkg_name, pkg_params) cgz, exists = cgz_exists?(pbp) @@ -35,27 +36,17 @@ class PkgBuild < PluginsCommon submit_result = submit_pkgbuild_job(wait_id, pkg_name, pbp) waited_id = submit_result.first_value.not_nil! - # the same pkg job has been submitted - ret = add_waited2job(waited_id, {wait_id => "job_health"}) if submit_result.has_key?("latest") - raise "add waited to id2job failed, waited_job_id=#{waited_id}" if ret == -1 - - # cant find id2job in etcd if ret == 0, need update wait current from es - update_wait_current_from_es(job, waited_id, "job_health") if ret == 0 - - add_desired2queue(job, {waited_id => JSON.parse({"job_health" => "success"}.to_json)}) + # {"1" => "unknown", "2" => "unknown"} + ss_wait_jobs.merge!({"#{waited_id}" => "unknown"}) end + if ss_wait_jobs + job.ss_wait_jobs = ss_wait_jobs + end save_job2es(job) save_job2etcd(job) - if job.wait? - @log.info("#{job.id}, #{job.wait}") - else - # if no wait field, move wait to ready queue - wait2ready(wait_queue) - end rescue ex @log.error("pkgbuild handle job #{ex}") - wait2die(wait_queue) if wait_queue raise ex.to_s end @@ -102,7 +93,7 @@ class PkgBuild < PluginsCommon response = %x($LKP_SRC/sbin/submit #{job_yaml}) @log.info("submit pkgbuild job response: #{job_yaml}, #{response}") - response = response.split("\n")[-2] + response = response.split("\n")[-3] return {"latest" => $1} if response =~ /latest job id=(.*)/ id = $1 if response =~ /job id=(.*)/ @@ -120,6 +111,7 @@ class PkgBuild < PluginsCommon params = pkg_params || Hash(String, String).new repo_name = params["fork"]? || pkg_name upstream_repo = "#{pkg_name[0]}/#{pkg_name}/#{repo_name}" + upstream_commit = params["commit"]? || "HEAD" upstream_info = get_upstream_info(upstream_repo) pkgbuild_repo = "pkgbuild/#{upstream_info["pkgbuild_repo"][0]}" pkgbuild_repos = upstream_info["pkgbuild_repo"].as_a @@ -127,28 +119,24 @@ class PkgBuild < PluginsCommon next unless "#{repo}" =~ /(-git|linux)$/ pkgbuild_repo = "pkgbuild/#{repo}" end - # now support openeuler:20.03-fat and archlinux:02-23-fat - os_version = "#{job.os_version}".split("-pre")[0].split("-fat")[0] - docker_image = "#{job.os}:#{os_version}-fat" - if pkg_name == "linux" - testbox = "dc-32g" - else - testbox = "dc-16g" - end + + os = params["os"]? || job.os + os_version = params["os_version"]? || job.os_version + testbox = params["testbox"]? || job.testbox build_job = JobHash.new(Hash(String, JSON::Any).new) - build_job.os = job.os - build_job.os_version = "#{os_version}-fat" + build_job.os = os + build_job.os_version = os_version build_job.os_arch = job.os_arch build_job.testbox = testbox build_job.os_mount = "container" - build_job.docker_image = docker_image - build_job.commit = "HEAD" + build_job.upstream_commit = upstream_commit build_job.upstream_repo = upstream_repo build_job.pkgbuild_repo = pkgbuild_repo build_job.upstream_url = upstream_info["url"][0].as_s build_job.upstream_dir = "upstream" build_job.pkgbuild_source = upstream_info["pkgbuild_source"][0].as_s if upstream_info["pkgbuild_source"]? + # update job.id when finished build_job build_job.waited = {job.id => "job_health"} build_job.services = { "SCHED_HOST" => ENV["SCHED_HOST"], @@ -173,18 +161,17 @@ class PkgBuild < PluginsCommon #link file File.symlink(ss_upload_filepath, pkg_dest_file) unless File.exists?(pkg_dest_file) end + build_job.config = filename + next end build_job.hash_any[k] = v end default = load_default_pkgbuild_yaml - if build_job.commit == "HEAD" - upstream_commit = get_head_commit(upstream_repo) - else - upstream_commit = build_job.commit + if build_job.upstream_commit == "HEAD" + build_job.upstream_commit = get_head_commit(upstream_repo) end - build_job.upstream_commit = upstream_commit build_job.import2hash(default) @log.info(build_job) -- Gitee