From aded51acd7d2b47377c36e0d98ccfe1aa818f069 Mon Sep 17 00:00:00 2001 From: VectorSL <864733542@qq.com> Date: Thu, 15 Aug 2024 14:54:07 +0800 Subject: [PATCH] support multi stream --- .../common/session/kernel_graph_mgr.cc | 42 +++++++++++++++++++ mindspore/ccsrc/include/common/utils/utils.h | 6 +++ .../ascend/hal/hardware/acl_stream_assign.cc | 26 ++++++++++++ .../ascend/hal/hardware/acl_stream_assign.h | 1 + .../ascend/hal/hardware/ge_kernel_executor.cc | 4 ++ .../ascend/hal/hardware/ge_kernel_executor.h | 3 ++ .../runtime/graph_scheduler/graph_compiler.cc | 2 + .../ccsrc/runtime/hardware/device_context.h | 3 ++ 8 files changed, 87 insertions(+) diff --git a/mindspore/ccsrc/backend/common/session/kernel_graph_mgr.cc b/mindspore/ccsrc/backend/common/session/kernel_graph_mgr.cc index 726918cb35c..1ace747c25e 100644 --- a/mindspore/ccsrc/backend/common/session/kernel_graph_mgr.cc +++ b/mindspore/ccsrc/backend/common/session/kernel_graph_mgr.cc @@ -295,6 +295,10 @@ void LoadAnfKernelInfoFromJson(const nlohmann::json &kernel_infos_json) { } kernel_info->set_graph_id(kernel_info_value[kGraphId]); kernel_info->set_feature_map_flag(kernel_info_value[kIsFeatureMap]); + kernel_info->set_stream_id(kernel_info_value[kAttrStreamId]); + kernel_info->set_stream_distinction_label(kernel_info_value[kStreamDistinctionLabel]); + kernel_info->SetSomasResult(std::move(kernel_info_value[kSomasOutputResult]), + std::move(kernel_info_value[kSomasWorkspaceResult])); node->set_kernel_info(kernel_info); LoadAnfSelectKernelBuildInfo(kernel_info_value, node); @@ -436,6 +440,28 @@ nlohmann::json SaveGraphsId(const HashMap SaveVecPair(const std::vector> &vec) { + std::vector ret_json; + for (auto &i : vec) { + nlohmann::json iter_json; + iter_json.push_back(i.first); + iter_json.push_back(i.second); + ret_json.push_back(iter_json); + } + return ret_json; +} + +std::vector SaveMapPair(const std::map &m) { + std::vector ret_json; + for (auto &i : m) { + nlohmann::json iter_json; + iter_json.push_back(i.first); + iter_json.push_back(i.second); + ret_json.push_back(iter_json); + } + return ret_json; +} + std::vector SavePrevOutputs(const std::map> &save_map) { std::vector ret_json; for (const auto &i : save_map) { @@ -545,6 +571,11 @@ nlohmann::json SaveAnfKernelInfo(const AnfNodePtr &node) { single_json[kGraphId] = graph_id; const auto &is_feature_map = device_kernel_info->is_feature_map(); single_json[kIsFeatureMap] = is_feature_map; + uint32_t stream_id = device_kernel_info->stream_id(); + single_json[kAttrStreamId] = stream_id; + single_json[kStreamDistinctionLabel] = device_kernel_info->stream_distinction_label(); + single_json[kSomasOutputResult] = SaveVecPair(device_kernel_info->somas_output_result()); + single_json[kSomasWorkspaceResult] = SaveVecPair(device_kernel_info->somas_workspace_result()); if (node->isa() && common::AnfAlgo::HasNodeAttr(kAttrIsUBFusionOp, node->cast()) && common::AnfAlgo::GetNodeAttr(node->cast(), kAttrIsUBFusionOp)) { @@ -664,6 +695,9 @@ nlohmann::json GenKernelGraphJson(const KernelGraphPtr &kg, const std::vectoris_need_gil(); kg_json[kIsFromSingleOp] = kg->is_from_single_op(); kg_json[kLabelNum] = kg->label_num(); + kg_json[kEnableMultiStream] = kg->enable_multi_stream(); + kg_json[kSomasWholeBlockSize] = kg->somas_whole_block_size(); + kg_json[kSomasMergedBlocksMap] = SaveMapPair(kg->somas_merged_blocks_map()); #ifndef ENABLE_SECURITY kg_json[kSummaryNodeExist] = kg->summary_node_exist(); #endif @@ -2219,6 +2253,14 @@ void HandleGraphSimpleAttr(const nlohmann::json &graph_json, KernelGraph *graph) graph->set_is_from_single_op(graph_json[kIsFromSingleOp]); graph->set_subgraph_multi_call(graph_json[kHasSubgraphMultiCall]); graph->set_label_num(graph_json[kLabelNum]); + graph->set_enable_multi_stream(graph_json[kEnableMultiStream]); + if (graph->MutableSomasInfo() != nullptr) { + auto somas_info = graph->MutableSomasInfo(); + somas_info->whole_block_size_ = graph_json[kSomasWholeBlockSize]; + for (auto block : graph_json[kSomasMergedBlocksMap]) { + somas_info->merged_blocks_map_[block.at(0)] = block.at(1); + } + } #ifndef ENABLE_SECURITY // set summary_node of graph graph->set_summary_node_exist(graph_json[kSummaryNodeExist]); diff --git a/mindspore/ccsrc/include/common/utils/utils.h b/mindspore/ccsrc/include/common/utils/utils.h index 1b6f79a591c..5aa54bf9e71 100644 --- a/mindspore/ccsrc/include/common/utils/utils.h +++ b/mindspore/ccsrc/include/common/utils/utils.h @@ -168,6 +168,7 @@ constexpr auto kAttrDatadumpOriginalNames = "_datadump_original_names"; constexpr auto kAttrDatadumpIsMultiop = "_datadump_is_multiop"; constexpr auto kAttrNeedRecordEvent = "need_record_event"; constexpr auto kAttrStreamId = "stream_id"; +constexpr auto kStreamDistinctionLabel = "stream_distinction_label"; constexpr auto kAttrRecomputeId = "recompute_id"; constexpr auto kAttrRecordEvent = "record_event"; constexpr auto kAttrAccumulatedAttention = "AccumulatedAttention"; @@ -691,6 +692,7 @@ constexpr auto kJsonName = "json_name"; constexpr auto kHasSelectKernelBuildInfo = "has_select_kernel_build_info"; constexpr auto kBackendParamToFrontendParamIndex = "backend_param_to_frontend_param_index_"; constexpr auto kLabelNum = "label_num"; +constexpr auto kEnableMultiStream = "enable_multi_stream"; constexpr auto kParameterUniqueNameToName = "param_unique_name_to_name"; constexpr auto kRefInOutMap = "ref_in_out_map"; constexpr auto kRetryIntervalMilliSeconds = 500; @@ -705,6 +707,10 @@ constexpr auto kGraphOutputToFrontNodeMap = "graph_output_to_front_node_map"; constexpr auto kFrontNodeToGraphOutputMap = "front_node_to_graph_output_map"; constexpr auto kInlineSubGraphKernelsMap = "inline_sub_graph_kernels"; constexpr auto kConditionGatherToSwitchMap = "condition_gather_to_switch"; +constexpr auto kSomasOutputResult = "somas_output_result"; +constexpr auto kSomasWorkspaceResult = "somas_workspace_result"; +constexpr auto kSomasWholeBlockSize = "somas_whole_block_size"; +constexpr auto kSomasMergedBlocksMap = "somas_merged_blocks_map"; // recompute and parallel constexpr auto kRecomputeInsert = "recompute_insert"; diff --git a/mindspore/ccsrc/plugin/device/ascend/hal/hardware/acl_stream_assign.cc b/mindspore/ccsrc/plugin/device/ascend/hal/hardware/acl_stream_assign.cc index f85c1b38c55..2a6e6c68615 100644 --- a/mindspore/ccsrc/plugin/device/ascend/hal/hardware/acl_stream_assign.cc +++ b/mindspore/ccsrc/plugin/device/ascend/hal/hardware/acl_stream_assign.cc @@ -155,6 +155,32 @@ void AclStreamAssign::AssignStream(const NotNull &kernel_graph) } } +void AclStreamAssign::CreateEvent(const NotNull &kernel_graph) { + std::map event_send_map; + std::map event_recv_map; + auto nodes = kernel_graph->execution_order(); + for (auto &node : nodes) { + MS_EXCEPTION_IF_NULL(node); + auto name = common::AnfAlgo::GetCNodeName(node); + if (name == kStreamRecvOpName) { + auto event_id = common::AnfAlgo::GetNodeAttr(node, kAttrEventId); + event_recv_map[event_id] = node; + } + if (name == kStreamSendOpName) { + auto event_id = common::AnfAlgo::GetNodeAttr(node, kAttrEventId); + event_send_map[event_id] = node; + } + } + auto &resource_manager = AscendStreamMng::GetInstance(); + for (auto iter : event_send_map) { + auto event = resource_manager.ApplyRtEvent(); + auto send_node = iter.second; + common::AnfAlgo::SetNodeAttr(kAttrRecordEvent, MakeValue(reinterpret_cast(event)), send_node); + auto recv_node = event_recv_map.find(iter.first)->second; + common::AnfAlgo::SetNodeAttr(kAttrWaitEvent, MakeValue(reinterpret_cast(event)), recv_node); + } +} + void AclStreamAssign::GenKernelIoExecInfoMap( const NotNull &kernel_graph, mindspore::HashMap *kernel_io_exec_info_map) const { diff --git a/mindspore/ccsrc/plugin/device/ascend/hal/hardware/acl_stream_assign.h b/mindspore/ccsrc/plugin/device/ascend/hal/hardware/acl_stream_assign.h index 92b0108e2f1..9b177cc48d5 100644 --- a/mindspore/ccsrc/plugin/device/ascend/hal/hardware/acl_stream_assign.h +++ b/mindspore/ccsrc/plugin/device/ascend/hal/hardware/acl_stream_assign.h @@ -58,6 +58,7 @@ class AclStreamAssign { AclStreamAssign &operator=(const AclStreamAssign &) = delete; void AssignStream(const NotNull &kernel_graph); + void CreateEvent(const NotNull &kernel_graph); private: AclStreamAssign() = default; diff --git a/mindspore/ccsrc/plugin/device/ascend/hal/hardware/ge_kernel_executor.cc b/mindspore/ccsrc/plugin/device/ascend/hal/hardware/ge_kernel_executor.cc index b7c169afc2a..e55dc6f9ee2 100644 --- a/mindspore/ccsrc/plugin/device/ascend/hal/hardware/ge_kernel_executor.cc +++ b/mindspore/ccsrc/plugin/device/ascend/hal/hardware/ge_kernel_executor.cc @@ -1077,6 +1077,10 @@ void GeKernelExecutor::PreprocessBeforeRun(const FuncGraphPtr &graph) const { profiler::CollectHostInfo("Ascend", "PreprocessBeforeRun", "GePreprocess", 1, 0, 1); } +void GeKernelExecutor::CreateEventForCache(const KernelGraphPtr &kernel_graph) const { + AclStreamAssign::GetInstance().CreateEvent(NOT_NULL(kernel_graph)); +} + bool GeKernelExecutor::PySyncRuning(void *stream) const { MS_EXCEPTION_IF_NULL(res_manager_); auto ms_context = MsContext::GetInstance(); diff --git a/mindspore/ccsrc/plugin/device/ascend/hal/hardware/ge_kernel_executor.h b/mindspore/ccsrc/plugin/device/ascend/hal/hardware/ge_kernel_executor.h index 7138722b371..f6b85c02e11 100644 --- a/mindspore/ccsrc/plugin/device/ascend/hal/hardware/ge_kernel_executor.h +++ b/mindspore/ccsrc/plugin/device/ascend/hal/hardware/ge_kernel_executor.h @@ -54,6 +54,9 @@ class GeKernelExecutor : public KernelExecutor { // Adjust kernel graph before run graph, used in Graph Mode. void PreprocessBeforeRun(const FuncGraphPtr &graph) const override; + // Create event for graph from cache. + void CreateEventForCache(const KernelGraphPtr &kernel_graph) const override; + // Launch a kernel via 'KernelMod' of the kernel. bool LaunchKernel(const CNodePtr &kernel, const std::vector &inputs, const std::vector &workspace, const std::vector &outputs, diff --git a/mindspore/ccsrc/runtime/graph_scheduler/graph_compiler.cc b/mindspore/ccsrc/runtime/graph_scheduler/graph_compiler.cc index cc92a341fa2..c81cdde7e59 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/graph_compiler.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/graph_compiler.cc @@ -695,6 +695,8 @@ std::pair GraphCompiler::CompileGraphForKernelRunModeUseCache(con KernelGraphPtr graph = session_->ConstructKernelGraph(&all_graphs); GraphId graph_id = graph->graph_id(); use_cache_to_compile_graph_ = true; + // Create event before create kernelmod + device_context->GetKernelExecutor(false)->CreateEventForCache(graph); PROF_START(CreateKernel); device_context->GetKernelExecutor(false)->CreateKernel(graph->execution_order()); PROF_END(CreateKernel); diff --git a/mindspore/ccsrc/runtime/hardware/device_context.h b/mindspore/ccsrc/runtime/hardware/device_context.h index 0e66c1d36e4..1e205a16a97 100644 --- a/mindspore/ccsrc/runtime/hardware/device_context.h +++ b/mindspore/ccsrc/runtime/hardware/device_context.h @@ -381,6 +381,9 @@ class BACKEND_EXPORT KernelExecutor { // Adjust kernel graph before run graph. virtual void PreprocessBeforeRun(const FuncGraphPtr &graph) const {} + // Create event for graph from cache. + virtual void CreateEventForCache(const KernelGraphPtr &kernel_graph) const {} + // Launch a kernel via 'KernelMod' of the kernel, use KernelTensor input type. virtual bool LaunchKernel(const CNodePtr &kernel, const std::vector &inputs, const std::vector &workspace, const std::vector &outputs, -- Gitee