diff --git a/src/domain/collective_communication/algorithm/impl/hccl_aiv.cc b/src/domain/collective_communication/algorithm/impl/hccl_aiv.cc index 1e20a9361a10927347ec00a7a69cd8d859c310eb..61b70d4cf45dac4e8c9cdcf3e9a94bcbdeda57ba 100644 --- a/src/domain/collective_communication/algorithm/impl/hccl_aiv.cc +++ b/src/domain/collective_communication/algorithm/impl/hccl_aiv.cc @@ -8,6 +8,7 @@ * See LICENSE in the root of the software repository for the full text of the License. */ +#include "alg_profiling.h" #include #include #include @@ -606,7 +607,14 @@ HcclResult ExecuteKernelLaunchInner(const AivOpArgs &opArgs, const AivTopoArgs & CHK_PRT_RET(ret != HCCL_SUCCESS, HCCL_ERROR("[AIV][ExecuteKernelLaunch] errNo[0x%016llx] rtKernelLaunch aiv fail, " "return[%d]", HCCL_ERROR_CODE(HCCL_E_RUNTIME), ret), HCCL_E_RUNTIME); - + + TaskAivProfiler(opArgs.cmdType, aivProfilingInfo.tag, opArgs.count * sizeof(opArgs.dataType), + aivProfilingInfo.blockDim, topoArgs.rankSize, resourceArgs.buffersOut[topoArgs.rank], + resourceArgs.stream, algArgs.step, aivProfilingInfo.beginTime); + + CHK_PRT_RET(ret != HCCL_SUCCESS, HCCL_ERROR("[AIV][ExecuteKernelLaunch] errNo[0x%016llx] rtKernelLaunch aiv fail, " + "return[%d]", HCCL_ERROR_CODE(HCCL_E_RUNTIME), ret), HCCL_E_RUNTIME); + return HCCL_SUCCESS; } diff --git a/src/domain/collective_communication/common/debug/profiling/inc/profiler_base_pub.h b/src/domain/collective_communication/common/debug/profiling/inc/profiler_base_pub.h index 91b53302e3024b84dcd3d0a39b44bdf807626d5b..ce442383359868abb31f63a5adac89d126397df7 100644 --- a/src/domain/collective_communication/common/debug/profiling/inc/profiler_base_pub.h +++ b/src/domain/collective_communication/common/debug/profiling/inc/profiler_base_pub.h @@ -242,6 +242,7 @@ public: virtual HcclResult Save(u32 &streamID, u32 &taskID, TaskType &taskType, const TaskParaReduce ¶) = 0; virtual HcclResult Save(u32 &streamID, u32 &taskID, TaskType &taskType, const TaskParaNotify ¶) = 0; virtual HcclResult Save(u32 &streamID, u32 &taskID, const TaskParaAiv ¶) = 0; + virtual HcclResult Save(u32 &streamID, rtStream_t &stream, u32 &taskID, const TaskParaAiv ¶) = 0; virtual HcclResult Save(u32 &streamID, u32 &taskID) = 0; virtual HcclResult Save(u32 captureStreamID, u32 streamID, u32 taskID, TaskType &taskType, const TaskParaDMA ¶) = 0; virtual HcclResult Save(u32 captureStreamID, u32 streamID, u32 taskID, TaskType &taskType, const TaskParaReduce ¶) = 0; diff --git a/src/domain/collective_communication/common/debug/profiling/inc/task_exception_handler_pub.h b/src/domain/collective_communication/common/debug/profiling/inc/task_exception_handler_pub.h index ec5dd9ccb0c56c48839af76a250a042e3227b99c..305f5fd5c05f26a5fd83e77eb99c071f67eebd3b 100644 --- a/src/domain/collective_communication/common/debug/profiling/inc/task_exception_handler_pub.h +++ b/src/domain/collective_communication/common/debug/profiling/inc/task_exception_handler_pub.h @@ -54,6 +54,7 @@ struct ParaAiv{ }; struct TaskInfo { u32 streamID; + rtStream_t stream; u32 taskID; std::string tag; TaskType taskType; @@ -73,6 +74,7 @@ struct TaskInfo { TaskInfo(u32 &streamID, u32 &taskID, std::string &tag, TaskType &taskType, AlgType &algType, u32 &index, const TaskParaNotify ¶); TaskInfo(u32 &streamID, u32 &taskID, std::string &tag, const TaskParaAiv& para); + TaskInfo(u32 &streamID, rtStream_t &stream, u32 &taskID, string &tag, const TaskParaAiv& para) std::string GetBaseInfoStr(); // 防止tag字符串过长,base信息和para信息分开打印 std::string GetParaInfoStr(); std::string GetParaDMA(); @@ -122,6 +124,7 @@ public: HcclResult Save(u32 &streamID, u32 &taskID, TaskType &taskType, const TaskParaReduce ¶) override; HcclResult Save(u32 &streamID, u32 &taskID, TaskType &taskType, const TaskParaNotify ¶) override; HcclResult Save(u32 &streamID, u32 &taskID, const TaskParaAiv ¶) override; + HcclResult Save(u32 &streamID, rtStream_t &stream, u32 &taskID, const TaskParaAiv ¶) override; HcclResult Save(u32 &streamID, u32 &taskID) override; HcclResult SaveToLog(const TaskParaHost ¶Host) override; HcclResult Save(u32 captureStreamID, u32 streamID, u32 taskID, TaskType &taskType, const TaskParaDMA ¶) override; diff --git a/src/domain/collective_communication/common/debug/profiling/inc/task_profiling_pub.h b/src/domain/collective_communication/common/debug/profiling/inc/task_profiling_pub.h index bd25be16dc57e14d8f50af084cad487ac4dcd897..bd576b5332b686dd256115d06850806b9f116c7b 100644 --- a/src/domain/collective_communication/common/debug/profiling/inc/task_profiling_pub.h +++ b/src/domain/collective_communication/common/debug/profiling/inc/task_profiling_pub.h @@ -281,6 +281,7 @@ public: HcclResult Save(u32 &streamID, u32 &taskID, TaskType &taskType, const TaskParaReduce ¶Reduce) override; HcclResult Save(u32 &streamID, u32 &taskID, TaskType &taskType, const TaskParaNotify ¶Notify) override; HcclResult Save(u32 &streamID, u32 &taskID, const TaskParaAiv ¶Aiv) override; + HcclResult Save(u32 &streamID, rtStream_t &stream, u32 &taskID, const TaskParaAiv ¶) override; HcclResult Save(u32 &streamID, u32 &taskID) override; HcclResult SaveToLog(const TaskParaHost ¶Host) override; HcclResult Save(u32 captureStreamID, u32 streamID, u32 taskID, TaskType &taskType, const TaskParaDMA ¶) override; diff --git a/src/domain/collective_communication/common/debug/profiling/plugin_runner.cc b/src/domain/collective_communication/common/debug/profiling/plugin_runner.cc index a9a04946bc4f3f2b75277a8c76bb9e07e0f8b827..584a5119ebd1ee05bad13c17adfa46f4edb91608 100644 --- a/src/domain/collective_communication/common/debug/profiling/plugin_runner.cc +++ b/src/domain/collective_communication/common/debug/profiling/plugin_runner.cc @@ -164,8 +164,14 @@ void PluginRunner::operator () (rtStream_t stream, const TaskParaAiv ¶Aiv) c ret = hrtGetTaskIdAndStreamID(taskID, streamID); CHK_PRT_RET(ret != HCCL_SUCCESS, HCCL_ERROR("[PluginRunner][Operator]rtGet task id and stream id fail. return[%d]", ret),); - - if (profiler_ != nullptr) { + + TaskExceptionHandler *taskExceptionHandler = dynamic_cast(profiler_); + if (taskExceptionHandler != nullptr) { + taskExceptionHandler->Save(streamID, stream, taskID, paraAiv); + } else { profiler_->Save(streamID, taskID, paraAiv); } + // if (profiler_ != nullptr) { + // profiler_->Save(streamID, taskID, paraAiv); + // } } \ No newline at end of file diff --git a/src/domain/collective_communication/common/debug/profiling/task_exception_handler.cc b/src/domain/collective_communication/common/debug/profiling/task_exception_handler.cc index afb86dc0fb7f3d4aead6a2a89c3f18fd8f4ce1c5..b816b09d4b2a6c77aa813fd749cc4f94687d1d70 100644 --- a/src/domain/collective_communication/common/debug/profiling/task_exception_handler.cc +++ b/src/domain/collective_communication/common/debug/profiling/task_exception_handler.cc @@ -19,6 +19,8 @@ #include "sal_pub.h" #include "../../../algorithm/pub_inc/common.h" #include "runtime/rt_error_codes.h" +#include "/ssd_0/nfs/t00567959/community_cann_path/ascend-toolkit/8.2.RC1.alpha001/aarch64-linux/include/experiment/runtime/runtime/stream.h" + using namespace hccl; using namespace std; @@ -192,6 +194,19 @@ TaskInfo::TaskInfo(u32 &streamID, u32 &taskID, string &tag, const TaskParaAiv& p taskPara.Aiv.flagMem = para.flagMem; taskPara.Aiv.aivRdmaStep = para.aivRdmaStep; } + +TaskInfo::TaskInfo(u32 &streamID, rtStream_t &stream, u32 &taskID, string &tag, const TaskParaAiv& para) : + streamID(streamID), stream(stream), taskID(taskID), tag(tag), isAlgInfo(true) +{ + taskPara.Aiv.cmdType = para.cmdType; + taskPara.Aiv.tag = para.tag; + taskPara.Aiv.size = para.size; + taskPara.Aiv.blockDim = para.blockDim; + taskPara.Aiv.rankSize = para.rankSize; + taskPara.Aiv.flagMem = para.flagMem; + taskPara.Aiv.aivRdmaStep = para.aivRdmaStep; +} + CtxInfo::CtxInfo(TaskType &taskType, const TaskParaDMA ¶) : taskType(taskType) { @@ -1052,6 +1067,17 @@ void TaskExceptionHandler::PrintTaskAivInfo(const std::shared_ptr>> retryTaskMap; +extern std::mutex retryTaskMapMutex; + bool TaskExceptionHandler::DealExceptionTask(rtExceptionInfo *exceptionInfo) { std::unique_lock lock(taskMapMutex[exceptionInfo->deviceid]); @@ -1076,6 +1102,42 @@ bool TaskExceptionHandler::DealExceptionTask(rtExceptionInfo *exceptionInfo) return false; } + RetryTaskInfo retryTaskInfo; + retryTaskInfo.deviceId = exceptionInfo->deviceid; + retryTaskInfo.streamId = exceptionInfo->streamid; + retryTaskInfo.taskId = exceptionInfo->taskid; + retryTaskInfo.tag = exceptionTaskInfo.tag; + rtStreamGetSqid(exceptionTaskInfo.stream, &retryTaskInfo.sqId); + + auto it = retryTaskMap.find(retryTaskInfo.streamId); + if (it == retryTaskMap.end()) { + std::lock_guard lock(retryTaskMapMutex); + CHK_PRT_RET(retryTaskMap.size() >= maxStrCount, HCCL_ERROR("[Insert][retryTaskMap]retryTaskMap size is " + "bigger than max stream count[%u]. stream add fail", maxStrCount), HCCL_E_INTERNAL); + std::shared_ptr> tmpRetryTaskInfoQue = nullptr; + EXECEPTION_CATCH((tmpRetryTaskInfoQue = make_shared>()), return HCCL_E_PTR); + tmpRetryTaskInfoQue->push_back(retryTaskInfo); + retryTaskMap.insert({ retryTaskInfo.streamId, tmpRetryTaskInfoQue }); + } else { + std::lock_guard lock(retryTaskMapMutex); + it->second->push_back(retryTaskInfo); + if (it->second->size() > maxTaskCount) { + it->second->pop_front(); + } + } + + for (auto it = retryTaskMap.begin(); it != retryTaskMap.end(); it++) { + HCCL_ERROR("[DealExceptionTask_debug] retryTaskInfo stream_id is %d", it->first); + std::shared_ptr> dq = it->second; + for (auto dqIt = dq->begin(); dqIt != dq->end(); dqIt++) { + HCCL_ERROR("[DealExceptionTask_debug] retryTaskInfo taskInfo.streamId is %d", dqIt->streamId); + HCCL_ERROR("[DealExceptionTask_debug] retryTaskInfo taskInfo.sqId is %d", dqIt->sqId); + HCCL_ERROR("[DealExceptionTask_debug] retryTaskInfo taskInfo.taskId is %d", dqIt->taskId); + HCCL_ERROR("[DealExceptionTask_debug] retryTaskInfo taskInfo.deviceId is %d", dqIt->deviceId); + HCCL_ERROR("[DealExceptionTask_debug] retryTaskInfo taskInfo.tag is %s", dqIt->tag); + } + } + if (exceptionTaskInfo.isAlgInfo){ PrintTaskAivBuffer(queIt); PrintTaskAivInfo(queIt); @@ -1365,6 +1427,21 @@ HcclResult TaskExceptionHandler::Save(u32 &streamID, u32 &taskID, const TaskPara return HCCL_SUCCESS; } +HcclResult TaskExceptionHandler::Save(u32 &streamID, rtStream_t &stream, u32 &taskID, const TaskParaAiv ¶) +{ + CHK_PRT_RET(deviceLogicId_ >= MAX_MODULE_DEVICE_NUM, + HCCL_ERROR("[TaskExceptionHandler][Save]deviceLogicId_[%u] is bigger than MAX_MODULE_DEVICE_NUM[%u]", + deviceLogicId_, MAX_MODULE_DEVICE_NUM), HCCL_E_INTERNAL); + + std::string tag; + CHK_RET(ProfilerBase::GetTagByStream(streamID, tag)); + TaskInfo tmpTaskInfo(streamID, stream, taskID, tag, para); + CHK_RET(InsertTaskMap(streamID, tmpTaskInfo)); + CHK_RET(InsertRankInfo(tag)); + CHK_RET(InsertOpData(tag)); + return HCCL_SUCCESS; +} + HcclResult TaskExceptionHandler::Save(u32 &streamID, u32 &taskID, TaskType &taskType, const TaskParaReduce ¶) { return Save(streamID, streamID, taskID, taskType, para); diff --git a/src/domain/collective_communication/common/debug/profiling/task_overflow.cc b/src/domain/collective_communication/common/debug/profiling/task_overflow.cc index e2f3f0c5808113a1d5d0d715fed547d11120a345..e095372b0f1a31ef35a114bf8aa37ee48997d7f9 100644 --- a/src/domain/collective_communication/common/debug/profiling/task_overflow.cc +++ b/src/domain/collective_communication/common/debug/profiling/task_overflow.cc @@ -101,6 +101,11 @@ HcclResult TaskOverflow::Save(u32 &streamID, u32 &taskID, const TaskParaAiv &par return HCCL_SUCCESS; } +HcclResult TaskOverflow::Save(u32 &streamID, rtStream_t &stream, u32 &taskID, const TaskParaAiv ¶Aiv) +{ + return HCCL_SUCCESS; +} + HcclResult TaskOverflow::Save(u32 &streamID, u32 &taskID) { return HCCL_SUCCESS; diff --git a/src/domain/collective_communication/common/debug/profiling/task_overflow_pub.h b/src/domain/collective_communication/common/debug/profiling/task_overflow_pub.h index 10e33f4604d9af23f38f15455e00dbc323c5435d..8024b13c090b9ff246f31644982a41e18cafbc31 100644 --- a/src/domain/collective_communication/common/debug/profiling/task_overflow_pub.h +++ b/src/domain/collective_communication/common/debug/profiling/task_overflow_pub.h @@ -25,6 +25,7 @@ public: HcclResult Save(u32 &streamID, u32 &taskID, TaskType &taskType, const TaskParaReduce ¶Reduce) override; HcclResult Save(u32 &streamID, u32 &taskID, TaskType &taskType, const TaskParaNotify ¶Notify) override; HcclResult Save(u32 &streamID, u32 &taskID, const TaskParaAiv ¶Aiv) override; + HcclResult Save(u32 &streamID, rtStream_t &stream, u32 &taskID, const TaskParaAiv ¶Aiv) override; HcclResult Save(u32 &streamID, u32 &taskID) override; HcclResult Save(u32 captureStreamID, u32 streamID, u32 taskID, TaskType &taskType, const TaskParaDMA ¶DMA) override; HcclResult Save(u32 captureStreamID, u32 streamID, u32 taskID, TaskType &taskType, const TaskParaReduce ¶Reduce) override; diff --git a/src/domain/collective_communication/common/debug/profiling/task_profiling.cc b/src/domain/collective_communication/common/debug/profiling/task_profiling.cc index 2d9a823225347cf752c425fdc91626dcc5cc987d..7b7d37e19957ce95105ca1fdf21bdaf988c014d3 100644 --- a/src/domain/collective_communication/common/debug/profiling/task_profiling.cc +++ b/src/domain/collective_communication/common/debug/profiling/task_profiling.cc @@ -370,6 +370,10 @@ HcclResult TaskProfiling::Save(u32 &streamID, u32 &taskID, const TaskParaAiv &pa return HCCL_SUCCESS; } +HcclResult TaskProfiling::Save(u32 &streamID, rtStream_t &stream, u32 &taskID, const TaskParaAiv ¶) { + return HCCL_SUCCESS; +} + HcclResult TaskProfiling::Save(u32 &streamID, u32 &taskID) { return HCCL_SUCCESS; diff --git a/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_op_retry_pub.h b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_op_retry_pub.h new file mode 100644 index 0000000000000000000000000000000000000000..c76b39ac091340706220925c128683403b38fbf0 --- /dev/null +++ b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_op_retry_pub.h @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2025 Huawei Technologies Co., Ltd. + * This file is a part of the CANN Open Software. + * Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + */ + +#ifndef HCCL_C3_OP_RETRY_PUB_H +#define HCCL_C3_OP_RETRY_PUB_H + +#include +#include + +namespace hccl { +typedef enum { + // server状态 + C3_RETRY_STATE_SERVER_RUNNING = 0, + C3_RETRY_STATE_SERVER_RETRY_OP, + C3_RETRY_STATE_SERVER_STOP_OP, + C3_RETRY_STATE_SERVER_OP_STOPPED, + + // agent状态 + C3_RETRY_STATE_AGENT_RUNNING, + C3_RETRY_STATE_AGENT_OP_ERROR, + C3_RETRY_STATE_AGENT_OP_STOPPED, + + C3_RETRY_STATE_RESERVED, +} C3RetryState; + +const std::map C3_RETRY_STATE_STR_MAP { + // server状态 + {C3_RETRY_STATE_SERVER_RUNNING, "C3_RETRY_STATE_SERVER_RUNNING"}, + {C3_RETRY_STATE_SERVER_RETRY_OP, "C3_RETRY_STATE_SERVER_RETRY_OP"}, + {C3_RETRY_STATE_SERVER_STOP_OP, "C3_RETRY_STATE_SERVER_STOP_OP"}, + {C3_RETRY_STATE_SERVER_OP_STOPPED, "C3_RETRY_STATE_SERVER_OP_STOPPED"}, + + // agent状态 + {C3_RETRY_STATE_AGENT_RUNNING, "C3_RETRY_STATE_AGENT_RUNNING"}, + {C3_RETRY_STATE_AGENT_OP_ERROR, "C3_RETRY_STATE_AGENT_OP_ERROR"}, + {C3_RETRY_STATE_AGENT_OP_STOPPED, "C3_RETRY_STATE_AGENT_OP_STOPPED"}, + + {C3_RETRY_STATE_RESERVED, "C3_RETRY_STATE_RESERVED"} +}; + +typedef enum { + C3_RETRY_CMD_RETRY_OP = 0, + C3_RETRY_CMD_STOP_OP +} C3RetryCommand; + +const std::map C3_RETRY_COMMAND_STR_MAP { + {C3_RETRY_CMD_RETRY_OP, "C3_RETRY_CMD_RETRY_OP"}, + {C3_RETRY_CMD_STOP_OP, "C3_RETRY_CMD_STOP_OP"} +}; + +struct C3RetryTaskInfo { + int streamId; + int taskId; + int deviceId; + std::string tag; + int retryCount; +}; + +struct C3RetryCommandInfo { + C3RetryCommand command; + C3RetryTaskInfo retryTaskInfo; +}; +} +#endif \ No newline at end of file diff --git a/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_agent.cc b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_agent.cc new file mode 100644 index 0000000000000000000000000000000000000000..3aa61325fa2df6e5dd2c5e6f9c18ba4330adaf1c --- /dev/null +++ b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_agent.cc @@ -0,0 +1,254 @@ +/* + * Copyright (c) 2025 Huawei Technologies Co., Ltd. + * This file is a part of the CANN Open Software. + * Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + */ + +#include +#include "c3_opretry_agent.h" +#include "/ssd_0/nfs/t00567959/community_cann_path/ascend-toolkit/8.2.RC1.alpha001/aarch64-linux/include/experiment/ascend_hal/driver/ascend_hal.h" +#include "/ssd_0/nfs/t00567959/community_cann_path/ascend-toolkit/8.2.RC1.alpha001/aarch64-linux/include/experiment/ascend_hal/driver/ascend_hal_define.h" + +extern "C" { + drvError_t __attribute__((weak)) halSqCqConfig(uint32_t devId, struct halSqCqConfigInfo *info); +} + +struct RetryTaskInfo { + int streamId; + int taskId; + int deviceId; + std::string tag; +}; + +extern std::map>> retryTaskMap; +extern std::mutex retryTaskMapMutex; + +namespace hccl { +bool ConfigSqStatusByType(uint32_t devId, uint32_t sqId, drvSqCqPropType_t type, uint32_t value) +{ + halSqCqConfigInfo configInfo; + configInfo.tsId = 0; + configInfo.sqId = sqId; + configInfo.cqId = 0; + configInfo.type = DRV_NORMAL_TYPE; + + configInfo.prop = type; + configInfo.value[0] = value; + uint32_t ret = halSqCqConfig(devId, &configInfo); + if (ret != 0) { + HCCL_ERROR("halSqCqConfig %d failed. ret = %d, sqid:%d, type:%d, value:%d", type, ret, configInfo.sqId, + type, value); + return false; + } + return true; +} + +HcclResult CreateC3OpRetryAgentByState(C3RetryState state, C3RetryContext* c3RetryCtx) +{ + std::shared_ptr c3RetryPtr = nullptr; + switch (state) { + case C3_RETRY_STATE_AGENT_RUNNING: + EXECEPTION_CATCH(c3RetryPtr = std::make_shared(), return HCCL_E_PTR); + break; + case C3_RETRY_STATE_AGENT_OP_ERROR: + EXECEPTION_CATCH(c3RetryPtr = std::make_shared(), return HCCL_E_PTR); + break; + case C3_RETRY_STATE_AGENT_OP_STOPPED: + EXECEPTION_CATCH(c3RetryPtr = std::make_shared(), return HCCL_E_PTR); + break; + default: { + HCCL_ERROR("[OpRetry][Agent]CreateC3OpRetryAgentByState failed, state[%s] is invalid", + c3RetryCtx->GetCtxReadableC3State(state)); + return HCCL_E_NOT_SUPPORT; + } + } + c3RetryCtx->SetC3RetryState(state, c3RetryPtr); + return HCCL_SUCCESS; +} + +C3OpRetryAgentRunning::C3OpRetryAgentRunning() +{ + lastPollingRetryTaskMapTime_ = std::chrono::steady_clock::now(); + pollTimeout_ = std::chrono::seconds(POLL_RETRY_TASK_MAP_INTERVAL); // 轮询RetryTaskMap的间隔时间 +} + +std::map retryTaskRecord; // key为retryTask的streamid + taskid, value为该task已重执行的次数 + +HcclResult C3OpRetryAgentRunning::ProcessEvent(C3RetryContext* c3RetryCtx) +{ + C3RetryCommandInfo c3RetryCommandInfo; + C3RetryState nextState = C3_RETRY_STATE_RESERVED; + HcclResult ret; + + // 收到c3_server下发的C3_RETRY_CMD_RETRY_OP --> C3OpRetryAgentRunning + ret = WaitCommandFromC3Server(c3RetryCtx->c3AgentSocket_, c3RetryCommandInfo); + if (ret == HCCL_SUCCESS) { + if (c3RetryCommandInfo.command == C3_RETRY_CMD_RETRY_OP) { + // bool retryRet = false; + // retryRet = ConfigSqStatusByType(c3RetryCommandInfo.retryTaskInfo.deviceId, c3RetryCommandInfo.retryTaskInfo.streamId, + // DRV_SQCQ_PROP_SQ_DISABLE_TO_ENABLE, 1); + HCCL_ERROR("[C3OpRetryAgentRunning] c3RetryCommandInfo.command is %d.", C3_RETRY_CMD_RETRY_OP); + bool retryRet = true; + if (retryRet) { + std::string retryTaskId = std::to_string(c3RetryCommandInfo.retryTaskInfo.streamId) + + std::to_string(c3RetryCommandInfo.retryTaskInfo.taskId); + HCCL_ERROR("[C3OpRetryAgentRunning] device_id %d retry task %s success.", c3RetryCommandInfo.retryTaskInfo.deviceId, retryTaskId); + } + return HCCL_SUCCESS; + } + + // 收到c3_server下发的C3_RETRY_CMD_STOP_OP --> C3_RETRY_STATE_AGENT_OP_STOPPED + if (c3RetryCommandInfo.command == C3_RETRY_CMD_STOP_OP) { + HCCL_ERROR("[C3OpRetryAgentRunning] c3RetryCommandInfo.command is %d.", C3_RETRY_CMD_STOP_OP); + nextState = C3_RETRY_STATE_AGENT_OP_STOPPED; + CreateC3OpRetryAgentByState(nextState, c3RetryCtx); + return HCCL_SUCCESS; + } + } + + // 轮询到有task执行失败的消息 --> C3_RETRY_STATE_AGENT_OP_ERROR + std::chrono::steady_clock::time_point curTime = std::chrono::steady_clock::now(); + // 定期轮询RetryTaskMap是否非空 + const auto pollTime = std::chrono::duration_cast(curTime - lastPollingRetryTaskMapTime_); + if (pollTime < pollTimeout_) { + return HCCL_SUCCESS; + } + HCCL_ERROR("[C3OpRetryAgentRunning] Running."); + + // 转移状态前,打印retryTaskMap中的元素 + std::unique_lock lock1(retryTaskMapMutex); + for (auto it = retryTaskMap.begin(); it != retryTaskMap.end(); it++) { + HCCL_ERROR("[C3OpRetryAgentRunning] retryTaskInfo stream_id is %d", it->first); + std::shared_ptr> dq = it->second; + for (auto dqIt = dq->begin(); dqIt != dq->end(); dqIt++) { + HCCL_ERROR("[C3OpRetryAgentRunning] retryTaskInfo taskInfo.streamId is %d", dqIt->streamId); + HCCL_ERROR("[C3OpRetryAgentRunning] retryTaskInfo taskInfo.taskId is %d", dqIt->taskId); + HCCL_ERROR("[C3OpRetryAgentRunning] retryTaskInfo taskInfo.deviceId is %d", dqIt->deviceId); + HCCL_ERROR("[C3OpRetryAgentRunning] retryTaskInfo taskInfo.tag is %s", dqIt->tag); + } + } + lock1.unlock(); + + // std::unique_lock lock2(retryTaskMapMutex); + // for (auto it = retryTaskMap.begin(); it != retryTaskMap.end(); it++) { + // if (it->second->size() > 0) { + // nextState = C3_RETRY_STATE_AGENT_OP_ERROR; + // CreateC3OpRetryAgentByState(nextState, c3RetryCtx); + // break; + // } + // } + // lock2.unlock(); + + // 轮询间隔 + SaluSleep(C3_OP_RETRY_RUNNING_POLL_INTERVAL); + return HCCL_SUCCESS; +} + +HcclResult C3OpRetryAgentOpError::ProcessEvent(C3RetryContext* c3RetryCtx) +{ + C3RetryCommandInfo c3RetryCommandInfo; + C3RetryState nextState = C3_RETRY_STATE_RESERVED; + HcclResult ret; + + // 收到c3_server下发的C3_RETRY_CMD_RETRY_OP --> C3OpRetryAgentRunning + ret = WaitCommandFromC3Server(c3RetryCtx->c3AgentSocket_, c3RetryCommandInfo); + if (ret == HCCL_SUCCESS) { + if (c3RetryCommandInfo.command == C3_RETRY_CMD_RETRY_OP) { + // bool retryRet = false; + // retryRet = ConfigSqStatusByType(c3RetryCommandInfo.retryTaskInfo.deviceId, c3RetryCommandInfo.retryTaskInfo.streamId, + // DRV_SQCQ_PROP_SQ_DISABLE_TO_ENABLE, 1); + HCCL_ERROR("[C3OpRetryAgentOpError] c3RetryCommandInfo.command is %d.", C3_RETRY_CMD_RETRY_OP); + bool retryRet = true; + if (retryRet) { + std::string retryTaskId = std::to_string(c3RetryCommandInfo.retryTaskInfo.streamId) + + std::to_string(c3RetryCommandInfo.retryTaskInfo.taskId); + HCCL_ERROR("[C3OpRetryAgentOpError] device_id %d retry task %s success.", c3RetryCommandInfo.retryTaskInfo.deviceId, retryTaskId); + } + nextState = C3_RETRY_STATE_AGENT_RUNNING; + CreateC3OpRetryAgentByState(nextState, c3RetryCtx); + return HCCL_SUCCESS; + } + } + + // 收到c3_server下发的C3_RETRY_CMD_STOP_OP --> C3_RETRY_STATE_AGENT_OP_STOPPED + if (c3RetryCommandInfo.command == C3_RETRY_CMD_STOP_OP) { + HCCL_ERROR("[C3OpRetryAgentOpError] c3RetryCommandInfo.command is %d.", C3_RETRY_CMD_STOP_OP); + nextState = C3_RETRY_STATE_AGENT_OP_STOPPED; + CreateC3OpRetryAgentByState(nextState, c3RetryCtx); + } + + // 向c3_server发送task执行失败消息前,打印retryTaskMap中的元素 + std::unique_lock lock1(retryTaskMapMutex); + for (auto it = retryTaskMap.begin(); it != retryTaskMap.end(); it++) { + HCCL_ERROR("[C3OpRetryAgentOpError][Before Send Task Failed Msg] retryTaskInfo stream_id is %d", it->first); + std::shared_ptr> dq = it->second; + for (auto dqIt = dq->begin(); dqIt != dq->end(); dqIt++) { + HCCL_ERROR("[C3OpRetryAgentOpError][Before Send Task Failed Msg] retryTaskInfo taskInfo.streamId is %d", dqIt->streamId); + HCCL_ERROR("[C3OpRetryAgentOpError][Before Send Task Failed Msg] retryTaskInfo taskInfo.taskId is %d", dqIt->taskId); + HCCL_ERROR("[C3OpRetryAgentOpError][Before Send Task Failed Msg] retryTaskInfo taskInfo.deviceId is %d", dqIt->deviceId); + HCCL_ERROR("[C3OpRetryAgentOpError][Before Send Task Failed Msg] retryTaskInfo taskInfo.tag is %s", dqIt->tag); + } + } + lock1.unlock(); + + // 向c3_server发送task执行失败消息 --> C3_RETRY_STATE_AGENT_OP_ERROR + std::unique_lock lock2(retryTaskMapMutex); + for (auto it = retryTaskMap.begin(); it != retryTaskMap.end(); it++) { + std::shared_ptr> dq = it->second; + for (auto taskIt = dq->begin(); taskIt != dq->end(); ) { + C3RetryTaskInfo c3RetryTaskInfo; + std::string retryTaskId = std::to_string(taskIt->streamId) + std::to_string(taskIt->taskId); + auto taskRecordIt = retryTaskRecord.find(retryTaskId); + if (taskRecordIt != retryTaskRecord.end()) { + taskRecordIt->second++; + c3RetryTaskInfo.retryCount = taskRecordIt->second; + } else { + retryTaskRecord.insert(std::make_pair(retryTaskId, 0)); + c3RetryTaskInfo.retryCount = 0; + } + c3RetryTaskInfo.streamId = taskIt->streamId; + c3RetryTaskInfo.taskId = taskIt->taskId; + c3RetryTaskInfo.deviceId = taskIt->deviceId; + c3RetryTaskInfo.tag = taskIt->tag; + HCCL_ERROR("[C3OpRetryAgentOpError][Send Task Msg] retryTaskInfo c3RetryTaskInfo.streamId is %d", c3RetryTaskInfo.streamId); + HCCL_ERROR("[C3OpRetryAgentOpError][Send Task Msg] retryTaskInfo c3RetryTaskInfo.taskId is %d", c3RetryTaskInfo.taskId); + HCCL_ERROR("[C3OpRetryAgentOpError][Send Task Msg] retryTaskInfo c3RetryTaskInfo.deviceId is %d", c3RetryTaskInfo.deviceId); + HCCL_ERROR("[C3OpRetryAgentOpError][Send Task Msg] retryTaskInfo c3RetryTaskInfo.tag is %s", c3RetryTaskInfo.tag); + HCCL_ERROR("[C3OpRetryAgentOpError][Send Task Msg] retryTaskInfo c3RetryTaskInfo.retryCount is %d", c3RetryTaskInfo.retryCount); + SendC3RetryTaskInfoToServer(c3RetryCtx->c3AgentSocket_, c3RetryTaskInfo); + taskIt = dq->erase(taskIt); + } + } + lock2.unlock(); + + // 向c3_server发送task执行失败消息后,打印retryTaskMap中的元素 + std::unique_lock lock3(retryTaskMapMutex); + for (auto it = retryTaskMap.begin(); it != retryTaskMap.end(); it++) { + HCCL_ERROR("[C3OpRetryAgentOpError][After Send Task Failed Msg] retryTaskInfo stream_id is %d", it->first); + std::shared_ptr> dq = it->second; + if (dq->size() == 0) { + HCCL_ERROR("[C3OpRetryAgentOpError][After Send Task Failed Msg] retryTaskInfo is empty"); + continue; + } + for (auto dqIt = dq->begin(); dqIt != dq->end(); dqIt++) { + HCCL_ERROR("[C3OpRetryAgentOpError][After Send Task Failed Msg] retryTaskInfo taskInfo.streamId is %d", dqIt->streamId); + HCCL_ERROR("[C3OpRetryAgentOpError][After Send Task Failed Msg] retryTaskInfo taskInfo.taskId is %d", dqIt->taskId); + HCCL_ERROR("[C3OpRetryAgentOpError][After Send Task Failed Msg] retryTaskInfo taskInfo.deviceId is %d", dqIt->deviceId); + HCCL_ERROR("[C3OpRetryAgentOpError][After Send Task Failed Msg] retryTaskInfo taskInfo.tag is %s", dqIt->tag); + } + } + lock3.unlock(); + return HCCL_SUCCESS; +} + +HcclResult C3OpRetryAgentOpStopped::ProcessEvent(C3RetryContext* c3RetryCtx) +{ + HCCL_ERROR("[C3OpRetryAgentOpStopped]"); + return HCCL_E_INTERNAL; +} + +} // namespace hccl \ No newline at end of file diff --git a/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_agent.h b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_agent.h new file mode 100644 index 0000000000000000000000000000000000000000..36e24edba5aedd21066b5b2f32fef215273bbbba --- /dev/null +++ b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_agent.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2025 Huawei Technologies Co., Ltd. + * This file is a part of the CANN Open Software. + * Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + */ + +#ifndef HCCL_C3_RETRY_AGENT_H +#define HCCL_C3_RETRY_AGENT_H +#include +#include +#include +#include +#include +#include +#include "c3_opretry_base.h" +#include "task_exception_handler_pub.h" + +namespace hccl { +// C3_RETRY_STATE_AGENT_RUNNING, 正常运行状态 +class C3OpRetryAgentRunning : public C3OpRetryBase { +public: + C3OpRetryAgentRunning(); + HcclResult ProcessEvent(C3RetryContext* c3RetryCtx) override; +private: +protected: + std::chrono::steady_clock::time_point lastPollingRetryTaskMapTime_; // 上一次轮询RetryTaskMap的时间 + std::chrono::seconds pollTimeout_; // 轮询RetryTaskMap的超时时间 +}; + +// C3_RETRY_STATE_AGENT_OP_STOPPED, 停止尝试重执行报错的OP +class C3OpRetryAgentOpStopped : public C3OpRetryBase { +public: + HcclResult ProcessEvent(C3RetryContext* c3RetryCtx) override; +}; + +// C3_RETRY_STATE_AGENT_OP_ERROR, 非INPLACE_OP执行失败 +class C3OpRetryAgentOpError : public C3OpRetryBase { +public: + HcclResult ProcessEvent(C3RetryContext* c3RetryCtx) override; +}; +} +#endif \ No newline at end of file diff --git a/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_base.cc b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_base.cc new file mode 100644 index 0000000000000000000000000000000000000000..2791351546796db103959d6b51c611eaa1b16af5 --- /dev/null +++ b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_base.cc @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2025 Huawei Technologies Co., Ltd. + * This file is a part of the CANN Open Software. + * Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + */ + +#include "c3_opretry_base.h" + +namespace hccl { +HcclResult C3OpRetryBase::Handle(C3RetryContext* c3RetryCtx) +{ + HcclResult ret = ProcessEvent(c3RetryCtx); + if (ret != HCCL_SUCCESS) { + HCCL_ERROR("C3OpRetryBase::Handle Error."); + } + return ret; +} + +HcclResult C3OpRetryBase::Recv(std::shared_ptr socket, void *data, u64 totalSize) +{ + const auto start = std::chrono::steady_clock::now(); + const std::chrono::seconds timeout = std::chrono::seconds(C3_OP_RETRY_SEND_RECV_TIMEOUT); + + u64 recvSize = 0; + while (true) { + // 超时判断 + const auto elapsed = + std::chrono::duration_cast(std::chrono::steady_clock::now() - start); + CHK_PRT_RET(elapsed > timeout, + HCCL_ERROR("[OpRetry]Recv timeout, data[%p], recvSize[%llu Byte], totalSize[%llu Byte]", data, recvSize, totalSize), + HCCL_E_TIMEOUT); + + u64 compSize = 0; // 本次接收到的长度 + u64 resetSize = totalSize - recvSize; // 待接收长度 + void* recvPtr = reinterpret_cast(reinterpret_cast(data) + recvSize); + HcclResult ret = socket->IRecv(recvPtr, resetSize, compSize); + CHK_PRT_RET(ret != HCCL_SUCCESS, , ret); + + recvSize += compSize; + if (recvSize == 0) { // 未收到数据 + return HCCL_E_AGAIN; + } else if (recvSize < totalSize) { // 数据未接收完, 继续等待该对端 + HCCL_DEBUG("[OpRetry]Recv not complete, recvSize[%llu Byte], totalSize[%llu Byte]", + recvSize, totalSize); + SaluSleep(C3_OP_RETRY_SEND_RECV_TIMEOUT); + continue; + } else { // 数据接收完成 + return HCCL_SUCCESS; + } + } + return HCCL_SUCCESS; +} + +HcclResult C3OpRetryBase::Send(std::shared_ptr socket, void *data, u64 size) +{ + HCCL_DEBUG("[OpRetry][Send]start, para: data[%p], size[%llu Byte]", data, size); + const auto start = std::chrono::steady_clock::now(); + const std::chrono::seconds timeout = std::chrono::seconds(C3_OP_RETRY_SEND_RECV_TIMEOUT); + + u64 restSize = size; // 待发送数据长度 + while (true) { + u64 sendDis = size - restSize; + void* dataPtr = reinterpret_cast(reinterpret_cast(data) + sendDis); + /* 获取当前时间,如果耗时超过timeout,则返回错误 */ + const auto elapsed = + std::chrono::duration_cast(std::chrono::steady_clock::now() - start); + CHK_PRT_RET(elapsed > timeout, + HCCL_WARNING("Send fail, Wait timeout for sockets send, dataPtr[%p], restSize[%llu Byte]", dataPtr, restSize), + HCCL_E_TIMEOUT); + + u64 compSize = 0; // 本次发送数据长度 + HcclResult ret = socket->ISend(dataPtr, restSize, compSize); + CHK_PRT_RET(ret != HCCL_SUCCESS, + HCCL_WARNING("Send fail, dataPtr[%p], restSize[%llu Byte], compSize[%llu]", dataPtr, restSize, compSize), ret); + + if (restSize == compSize) { // 数据发送完成 + HCCL_DEBUG("OpRetryBase send end"); + return HCCL_SUCCESS; + } else if (restSize < compSize) { + HCCL_ERROR("Send fail, restSize[%llu Byte], compSize[%llu Byte]", restSize, compSize); + return HCCL_E_TCP_TRANSFER; + } + restSize -= compSize; + } + return HCCL_SUCCESS; +} + +HcclResult C3OpRetryBase::WaitCommandFromC3Server(std::shared_ptr socket, C3RetryCommandInfo &c3RetryCommandInfo) +{ + HcclResult ret = Recv(socket, &c3RetryCommandInfo, sizeof(c3RetryCommandInfo)); + if (ret == HCCL_SUCCESS) { + HCCL_ERROR("[C3OpRetry]WaitCommand success, command[%s]", GetReadableC3Cmd(c3RetryCommandInfo.command)); + } + return ret; +} + +HcclResult C3OpRetryBase::SendC3RetryTaskInfoToServer(std::shared_ptr socket, C3RetryTaskInfo &c3RetryTaskInfo) +{ + HcclResult ret = Send(socket, &c3RetryTaskInfo, sizeof(c3RetryTaskInfo)); + if (ret == HCCL_SUCCESS) { + HCCL_ERROR("[C3OpRetry]SendC3RetryTaskInfoToServer success."); + } + return ret; +} + +HcclResult C3OpRetryBase::WaitC3RetryTaskInfoFromAgent(std::shared_ptr socket, C3RetryTaskInfo &c3RetryTaskInfo) +{ + HcclResult ret = Recv(socket, &c3RetryTaskInfo, sizeof(c3RetryTaskInfo)); + if (ret == HCCL_SUCCESS) { + HCCL_ERROR("[C3OpRetry]WaitC3RetryTaskInfoFromAgent success."); + } + return ret; +} + +HcclResult C3OpRetryBase::SendCommandToC3Agent(std::shared_ptr socket, C3RetryCommandInfo &c3RetryCommandInfo) +{ + HcclResult ret = Send(socket, &c3RetryCommandInfo, sizeof(c3RetryCommandInfo)); + if (ret == HCCL_SUCCESS) { + HCCL_ERROR("[C3OpRetry]SendCommandToC3Agent success, command[%s]", GetReadableC3Cmd(c3RetryCommandInfo.command)); + } + return ret; +} +} \ No newline at end of file diff --git a/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_base.h b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_base.h new file mode 100644 index 0000000000000000000000000000000000000000..c072b2a4140aae7bedfd564ce8173d951750b3d1 --- /dev/null +++ b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_base.h @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2025 Huawei Technologies Co., Ltd. + * This file is a part of the CANN Open Software. + * Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + */ + +#ifndef C3_OPRETRY_BASE_H +#define C3_OPRETRY_BASE_H +#include "c3_op_retry_pub.h" +#include "hccl_op_retry_pub.h" + +namespace hccl { +constexpr int POLL_RETRY_TASK_MAP_INTERVAL = 1; // 轮询RetryTaskMap的间隔时间 +constexpr int C3_OP_RETRY_RUNNING_POLL_INTERVAL = 100000; // 重执行状态轮询状态的间隔, 单位us +constexpr int C3_OP_RETRY_KEEP_INTERVAL = 1; // 保活时间间隔, 单位s +constexpr int C3_OP_RETRY_SEND_RECV_TIMEOUT = 205; // 发送和接收的超时时间, 单位s + +class C3RetryContext; + +class C3OpRetryBase { +public: + virtual HcclResult Handle(C3RetryContext* c3RetryCtx); + virtual HcclResult ProcessEvent(C3RetryContext* c3RetryCtx) = 0; + + C3OpRetryBase() {}; + virtual ~C3OpRetryBase() {}; +protected: + HcclResult WaitCommandFromC3Server(std::shared_ptr socket, C3RetryCommandInfo &c3RetryCommandInfo); + HcclResult SendC3RetryTaskInfoToServer(std::shared_ptr socket, C3RetryTaskInfo &c3RetryTaskInfo); + HcclResult WaitC3RetryTaskInfoFromAgent(std::shared_ptr socket, C3RetryTaskInfo &c3RetryTaskInfo); + HcclResult SendCommandToC3Agent(std::shared_ptr socket, C3RetryCommandInfo &c3RetryCommandInfo); +private: + HcclResult Send(std::shared_ptr socket, void *data, u64 size); + HcclResult Recv(std::shared_ptr socket, void *data, u64 size); +}; + +using C3AgentRetryInfo = struct HcclC3RetryInfoDef { + std::shared_ptr socket{nullptr}; + C3RetryTaskInfo c3RetryTaskInfo; +}; + +inline const char *GetReadableC3State(C3RetryState c3RetryState) { + auto it = C3_RETRY_STATE_STR_MAP.find(c3RetryState); + return (it != C3_RETRY_STATE_STR_MAP.end()) ? it->second.c_str() : "unkown state"; +} + +inline const char *GetReadableC3Cmd(C3RetryCommand c3RetryCommand) { + auto it = C3_RETRY_COMMAND_STR_MAP.find(c3RetryCommand); + return (it != C3_RETRY_COMMAND_STR_MAP.end()) ? it->second.c_str() : "unkown cmd"; +} + +class C3RetryContext { +public: + // C3Agent状态机,上下文初始化 + C3RetryContext(const std::string& group, std::shared_ptr socket, std::shared_ptr opStreamPtr, + std::shared_ptr c3RetryBase, const OpRetryAgentInfo& agentInfo): + group_(group), c3AgentSocket_(socket), c3OpStreamPtr_(opStreamPtr), c3RetryBase_(c3RetryBase), isRootRetryCtx_(false) + { + rankId_ = agentInfo.userRank; + deviceLogicId_ = agentInfo.deviceLogicId; + } + + // C3Server状态机,上下文初始化 + C3RetryContext(std::map> &sockets, + std::shared_ptr c3RetryBase, const OpRetryAgentInfo& agentInfo) : + c3RetryBase_(c3RetryBase), isRootRetryCtx_(true) + { + for (auto it = sockets.begin(); it != sockets.end(); ++it) { + C3AgentRetryInfo tempAgentInfo; + tempAgentInfo.socket = it->second; + c3ServerSockets_.insert(std::make_pair(it->first, std::move(tempAgentInfo))); + } + rankId_ = agentInfo.userRank; + } + + C3RetryState GetC3RetryState() { + return c3State_; + } + + void SetC3RetryState(C3RetryState nextC3State, std::shared_ptr c3RetryBase) { + HCCL_RUN_INFO("[C3OpRetry][%s]State Transfer, cur state %s, next state %s", + GetCtxOpC3RetryMachineType(), GetCtxReadableC3State(c3State_), GetCtxReadableC3State(nextC3State)); + c3State_ = nextC3State; + c3RetryBase_ = c3RetryBase; + } + + // 外部接口调用Request() + HcclResult Request() { + CHK_SMART_PTR_NULL(c3RetryBase_); + return c3RetryBase_->Handle(this); + } + + int GetRankId() { + return rankId_; + } + + inline const char *GetCtxReadableC3State(C3RetryState c3RetryState) { + auto it = C3_RETRY_STATE_STR_MAP.find(c3RetryState); + return (it != C3_RETRY_STATE_STR_MAP.end()) ? it->second.c_str() : "unkown state"; + } + + inline const char *GetCtxReadableC3Cmd(C3RetryCommand c3RetryCommand) { + auto it = C3_RETRY_COMMAND_STR_MAP.find(c3RetryCommand); + return (it != C3_RETRY_COMMAND_STR_MAP.end()) ? it->second.c_str() : "unkown cmd"; + } + + const char *GetCtxOpC3RetryMachineType() const { + std::string ctxType = isRootRetryCtx_ ? "Server" : "Agent"; + return ctxType.c_str(); + } + + std::string group_ = ""; + int deviceLogicId_ = INVALID_INT; + int rankId_ = INVALID_UINT; + + // agent状态机储存信息 + std::shared_ptr c3AgentSocket_ = nullptr; + std::shared_ptr c3OpStreamPtr_ = nullptr; + + // server状态机储存信息 + std::map c3ServerSockets_; + std::vector needRetryServerRanks_; +private: + std::shared_ptr c3RetryBase_ = nullptr; + C3RetryState c3State_ = C3_RETRY_STATE_RESERVED; + bool isRootRetryCtx_ = false; +}; +} +#endif \ No newline at end of file diff --git a/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_server.cc b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_server.cc new file mode 100644 index 0000000000000000000000000000000000000000..35efbdb71e28cfc47d7a3597b3ecc0fbf44b785c --- /dev/null +++ b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_server.cc @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2025 Huawei Technologies Co., Ltd. + * This file is a part of the CANN Open Software. + * Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + */ + +#include "c3_opretry_server.h" + +namespace hccl { +C3RetryTaskInfo curC3RetryTaskInfo; + +HcclResult CreateC3OpRetryServerByState(C3RetryState state, C3RetryContext* c3RetryCtx) +{ + std::shared_ptr c3RetryPtr = nullptr; + switch (state) { + case C3_RETRY_STATE_SERVER_RUNNING: + EXECEPTION_CATCH(c3RetryPtr = std::make_shared(), return HCCL_E_PTR); + break; + case C3_RETRY_STATE_SERVER_RETRY_OP: + EXECEPTION_CATCH(c3RetryPtr = std::make_shared(), return HCCL_E_PTR); + break; + case C3_RETRY_STATE_SERVER_STOP_OP: + EXECEPTION_CATCH(c3RetryPtr = std::make_shared(), return HCCL_E_PTR); + break; + case C3_RETRY_STATE_SERVER_OP_STOPPED: + EXECEPTION_CATCH(c3RetryPtr = std::make_shared(), return HCCL_E_PTR); + break; + default: { + HCCL_ERROR("[C3OpRetry][Server]CreateC3OpRetryServerByState failed, state[%s] is invalid", + GetReadableC3State(state)); + return HCCL_E_NOT_SUPPORT; + } + } + c3RetryCtx->SetC3RetryState(state, c3RetryPtr); + return HCCL_SUCCESS; +} + +HcclResult C3OpRetryServerRunning::ProcessEvent(C3RetryContext* c3RetryCtx) +{ + C3RetryState nextState = C3_RETRY_STATE_RESERVED; + + const std::chrono::seconds timeout = std::chrono::seconds(C3_OP_RETRY_KEEP_INTERVAL); + for (auto &it : c3RetryCtx->c3ServerSockets_) { + const int &agentId = it.first; + // 若对端已经关闭,则不再轮询 + if (disableAgent_.find(agentId) != disableAgent_.end()) { + continue; + } + // 记录时间, 检测和对端上一次通信时间是否超过保活时间 + std::chrono::steady_clock::time_point curTime = std::chrono::steady_clock::now(); + if (lastRecvTimes_.find(agentId) == lastRecvTimes_.end()) { + lastRecvTimes_.insert(std::make_pair(agentId, curTime)); + } + // 轮询接收agent状态机信息 + HcclResult ret = WaitC3RetryTaskInfoFromAgent(it.second.socket, it.second.c3RetryTaskInfo); + if (ret == HCCL_SUCCESS) { + HCCL_ERROR("[C3OpRetryServerRunning] c3RetryTaskInfo.streamId is %d", it.second.c3RetryTaskInfo.streamId); + HCCL_ERROR("[C3OpRetryServerRunning] c3RetryTaskInfo.taskId is %d", it.second.c3RetryTaskInfo.taskId); + HCCL_ERROR("[C3OpRetryServerRunning] c3RetryTaskInfo.deviceId is %d", it.second.c3RetryTaskInfo.deviceId); + HCCL_ERROR("[C3OpRetryServerRunning] c3RetryTaskInfo.tag is %s", it.second.c3RetryTaskInfo.tag); + HCCL_ERROR("[C3OpRetryServerRunning] c3RetryTaskInfo.retryCount is %d", it.second.c3RetryTaskInfo.retryCount); + + if (it.second.c3RetryTaskInfo.retryCount < 3) { + // 收到c3_agent发送的task执行失败或者重执行1次或2次失败的消息 --> C3_RETRY_STATE_SERVER_RETRY_OP + curC3RetryTaskInfo = it.second.c3RetryTaskInfo; // 搞个队列。agent收到命令之后,还要再去查一下retryMap。 + nextState = C3_RETRY_STATE_SERVER_RETRY_OP; + CHK_RET(CreateC3OpRetryServerByState(nextState, c3RetryCtx)); + return HCCL_SUCCESS; + } else { + // 收到c3_agent发送的task重执行3次失败的消息 --> C3_RETRY_STATE_SERVER_STOP_OP + curC3RetryTaskInfo = it.second.c3RetryTaskInfo; + nextState = C3_RETRY_STATE_SERVER_STOP_OP; + CHK_RET(CreateC3OpRetryServerByState(nextState, c3RetryCtx)); + return HCCL_SUCCESS; + } + } else if (ret == HCCL_E_AGAIN) { // 未接收到数据 + // 校验是否超时 + const auto elapsed = std::chrono::duration_cast(curTime - lastRecvTimes_[agentId]); + if (elapsed > timeout) { + HCCL_WARNING("[C3OpRetry][Server]C3OpRetryServerRunning Recv Retry Frame from AgentId[%u] timeout", + agentId); + lastRecvTimes_[agentId] = curTime; + } + } else { // 接收数据失败 + disableAgent_.insert(agentId); + HCCL_RUN_INFO("[C3OpRetry][Server]C3OpRetryServerRunning WaitResponse from AgentId[%u] fail, ret[%u]", agentId, ret); + } + } + + // 轮询间隔 + SaluSleep(C3_OP_RETRY_RUNNING_POLL_INTERVAL); + return HCCL_SUCCESS; +} + +HcclResult C3OpRetryServerRetryOp::ProcessEvent(C3RetryContext* c3RetryCtx) +{ + // 向c3_agent下发C3_RETRY_CMD_RETRY_OP --> C3_RETRY_STATE_SERVER_RUNNING + C3RetryState nextState = C3_RETRY_STATE_RESERVED; + for (auto rank : c3RetryCtx->needRetryServerRanks_) { + C3RetryCommandInfo c3RetryCommandInfo; + c3RetryCommandInfo.command = C3_RETRY_CMD_RETRY_OP; + c3RetryCommandInfo.retryTaskInfo = curC3RetryTaskInfo; + HcclResult ret = SendCommandToC3Agent(c3RetryCtx->c3ServerSockets_[rank].socket, c3RetryCommandInfo); + CHK_PRT_RET(ret != HCCL_SUCCESS, + HCCL_ERROR("[C3OpRetry][Server]SendCommandToC3Agent C3_RETRY_CMD_RETRY_OP fail, dst_rank is %d", rank), + ret); + } + nextState = C3_RETRY_STATE_SERVER_RUNNING; + CHK_RET(CreateC3OpRetryServerByState(nextState, c3RetryCtx)); + return HCCL_SUCCESS; +} + +HcclResult C3OpRetryServerStopOp::ProcessEvent(C3RetryContext* c3RetryCtx) +{ + // 向c3_agent下发C3_RETRY_CMD_STOP_OP --> C3_RETRY_STATE_SERVER_OP_STOPPED + C3RetryState nextState = C3_RETRY_STATE_RESERVED; + for (auto rank : c3RetryCtx->needRetryServerRanks_) { + C3RetryCommandInfo c3RetryCommandInfo; + c3RetryCommandInfo.command = C3_RETRY_CMD_STOP_OP; + c3RetryCommandInfo.retryTaskInfo = curC3RetryTaskInfo; + HcclResult ret = SendCommandToC3Agent(c3RetryCtx->c3ServerSockets_[rank].socket, c3RetryCommandInfo); + CHK_PRT_RET(ret != HCCL_SUCCESS, + HCCL_ERROR("[C3OpRetry][Server]SendCommandToC3Agent C3_RETRY_CMD_STOP_OP fail, dst_rank is %d", rank), + ret); + } + nextState = C3_RETRY_STATE_SERVER_OP_STOPPED; + CHK_RET(CreateC3OpRetryServerByState(nextState, c3RetryCtx)); + return HCCL_SUCCESS; +} + +HcclResult C3OpRetryServerOpStopped::ProcessEvent(C3RetryContext* c3RetryCtx) +{ + HCCL_ERROR("[C3OpRetryServerOpStopped]"); + return HCCL_E_PTR; +} + +} \ No newline at end of file diff --git a/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_server.h b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_server.h new file mode 100644 index 0000000000000000000000000000000000000000..da19251dc343b548dbd817927e8109b2a871a7a4 --- /dev/null +++ b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/c3_opretry_server.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2025 Huawei Technologies Co., Ltd. + * This file is a part of the CANN Open Software. + * Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + */ + +#ifndef HCCL_C3_RETRY_SERVER_H +#define HCCL_C3_RETRY_SERVER_H +#include +#include "c3_opretry_base.h" + +namespace hccl { +// C3_RETRY_STATE_SERVER_RUNNING,正常运行状态 +class C3OpRetryServerRunning : public C3OpRetryBase { +public: + HcclResult ProcessEvent(C3RetryContext* c3RetryCtx) override; +protected: + std::map lastRecvTimes_; + std::unordered_set disableAgent_; // 记录已经关闭的对端, 不再轮询, 避免刷屏 +}; + +// C3_RETRY_STATE_SERVER_RETRY_OP, 向c3_agent下发RETRY_OP_CMD +class C3OpRetryServerRetryOp : public C3OpRetryBase { +public: + HcclResult ProcessEvent(C3RetryContext* c3RetryCtx) override; +}; + +// C3_RETRY_STATE_SERVER_STOP_OP,向c3_agent下发STOP_CMD +class C3OpRetryServerStopOp : public C3OpRetryBase { +public: + HcclResult ProcessEvent(C3RetryContext* c3RetryCtx) override; +}; + +// C3_RETRY_STATE_SERVER_OP_STOPPED, 停止尝试重执行报错的OP +class C3OpRetryServerOpStopped : public C3OpRetryBase { +public: + HcclResult ProcessEvent(C3RetryContext* c3RetryCtx) override; +}; +} +#endif \ No newline at end of file diff --git a/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/opretry_manager.cc b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/opretry_manager.cc index 4ae1bc43556c2d15edda52d34df16909bd122394..88a393801a75d0fd06715c21e1fb31e037e374f0 100644 --- a/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/opretry_manager.cc +++ b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/opretry_manager.cc @@ -13,6 +13,8 @@ #include "opretry_connection_pub.h" #include "opretry_agent.h" #include "opretry_server.h" +#include "c3_opretry_agent.h" +#include "c3_opretry_server.h" #include "adapter_rts_common.h" #include "sal_pub.h" @@ -21,6 +23,7 @@ OpRetryManager::~OpRetryManager() { HCCL_DEBUG("Destory OpRetryManager"); (void)DeInit(); + // (void)C3DeInit(); } HcclResult OpRetryManager::Init() @@ -31,6 +34,14 @@ HcclResult OpRetryManager::Init() return HCCL_SUCCESS; } +// HcclResult OpRetryManager::C3Init() +// { +// CHK_PRT_RET(initialized_ == true, HCCL_WARNING("C3OpRetryManager has already initialized"), HCCL_SUCCESS); +// c3Initialized_ = true; +// HCCL_INFO("C3OpRetryManager Init success"); +// return HCCL_SUCCESS; +// } + HcclResult OpRetryManager::DeInit() { std::unique_lock lock(ProcessLock_); @@ -54,6 +65,29 @@ HcclResult OpRetryManager::DeInit() return HCCL_SUCCESS; } +// HcclResult OpRetryManager::C3DeInit() +// { +// std::unique_lock lock(ProcessLock_); +// if (c3Initialized_) { +// c3Initialized_ = false; +// for (auto it = c3AgentOpRetry_.begin(); it != c3AgentOpRetry_.end(); ++it) { +// if (it->second.thread != nullptr && it->second.thread->joinable()) { +// it->second.thread->join(); +// } +// } +// c3AgentOpRetry_.clear(); + +// for (auto it = c3ServerOpRetry_.begin(); it != c3ServerOpRetry_.end(); ++it) { +// if (it->second.thread != nullptr && it->second.thread->joinable()) { +// it->second.thread->join(); +// } +// } +// c3ServerOpRetry_.clear(); +// HCCL_INFO("C3OpRetryManager DeInit success"); +// } +// return HCCL_SUCCESS; +// } + HcclResult OpRetryManager::RegisterOpRetryMachine(const std::string& group, u32 rankSize, bool isRoot, std::shared_ptr agentConnection, std::map > &serverConnections, std::shared_ptr h2dPtr, std::shared_ptr d2hPtr, @@ -77,17 +111,55 @@ HcclResult OpRetryManager::RegisterOpRetryMachine(const std::string& group, u32 } // 注册agent状态机 - CHK_RET(RegisterAgentRetryMachine(group, agentConnection, h2dPtr, d2hPtr, - opStreamPtr, notifyResetCallback, setTransprotStatusCallback, isEnableBackupLink, agentInfo)); + // CHK_RET(RegisterAgentRetryMachine(group, agentConnection, h2dPtr, d2hPtr, + // opStreamPtr, notifyResetCallback, setTransprotStatusCallback, isEnableBackupLink, agentInfo)); // 注册server状态机 + // if (isRoot) { + // CHK_RET(RegisterServerRetryMachine(group, serverConnections, agentInfo)); + // } + // HCCL_INFO("[Register][RetryMachine]group[%s] register success", group.c_str()); + + // 注册c3_agent状态机 + CHK_RET(RegisterC3AgentRetryMachine(group, agentConnection, opStreamPtr, agentInfo)); + + // 注册c3_server状态机 if (isRoot) { - CHK_RET(RegisterServerRetryMachine(group, serverConnections, agentInfo)); + CHK_RET(RegisterC3ServerRetryMachine(group, serverConnections, agentInfo)); } - HCCL_INFO("[Register][RetryMachine]group[%s] register success", group.c_str()); + HCCL_INFO("[Register][C3OpRetryMachine]group[%s] register success", group.c_str()); + return HCCL_SUCCESS; } +// HcclResult OpRetryManager::RegisterC3OpRetryMachine(const std::string& group, u32 rankSize, bool isRoot, +// std::shared_ptr opStreamPtr, const OpRetryServerInfo& serverInfo, const OpRetryAgentInfo& agentInfo) +// { +// std::unique_lock lock(ProcessLock_); +// CHK_SMART_PTR_NULL(opStreamPtr); +// CHK_PRT_RET(group.empty(), +// HCCL_ERROR("[OpRetryManager][RegisterC3OpRetryMachine]params invalid, group is empty"), HCCL_E_PARA); +// std::shared_ptr c3AgentConnection; +// std::map> c3ServerConnections; +// std::string aivOpRetryGroup = "AIV_OP_RETRY" + group; +// CHK_RET(OpRetryConnectionPub::Init(aivOpRetryGroup, rankSize, serverInfo, agentInfo)); +// CHK_RET(OpRetryConnectionPub::GetConns(aivOpRetryGroup, isRoot, c3AgentConnection, c3ServerConnections)); +// // 初始化 +// if (c3Initialized_ == false) { +// CHK_RET(C3Init()); +// } + +// // 注册c3_agent状态机 +// CHK_RET(RegisterC3AgentRetryMachine(group, c3AgentConnection, opStreamPtr, agentInfo)); + +// // 注册c3_server状态机 +// if (isRoot) { +// CHK_RET(RegisterC3ServerRetryMachine(group, c3ServerConnections, agentInfo)); +// } +// HCCL_INFO("[Register][C3OpRetryMachine]group[%s] register success", group.c_str()); +// return HCCL_SUCCESS; +// } + HcclResult OpRetryManager::RegisterAgentRetryMachine(const std::string& group, std::shared_ptr socket, std::shared_ptr h2dPtr, std::shared_ptr d2hPtr, std::shared_ptr opStreamPtr, OpRetryResetNotifyCallback notifyResetCallback, @@ -146,6 +218,61 @@ HcclResult OpRetryManager::RegisterServerRetryMachine(const std::string& group, return HCCL_SUCCESS; } +HcclResult OpRetryManager::RegisterC3AgentRetryMachine(const std::string& group, std::shared_ptr socket, + std::shared_ptr opStreamPtr, const OpRetryAgentInfo& agentInfo) +{ + if (c3AgentOpRetry_.find(group) != c3AgentOpRetry_.end()) { + HCCL_INFO("[Register][C3AgentRetryMachine]group[%s] has Registered to c3AgentOpRetry, skip", group.c_str()); + return HCCL_SUCCESS; + } + + C3RetryCtrl c3RetryCtrl; + c3AgentOpRetry_.insert(std::make_pair(group, std::move(c3RetryCtrl))); + std::shared_ptr c3RetryPtr; + EXECEPTION_CATCH((c3RetryPtr = std::make_shared()), return HCCL_E_PTR); + EXECEPTION_CATCH((c3AgentOpRetry_[group].c3RetryCtx = + std::make_shared(group, socket, opStreamPtr, c3RetryPtr, agentInfo)), return HCCL_E_PTR); + c3AgentOpRetry_[group].startExec = true; + + HcclRtContext ctx = nullptr; + CHK_RET(hrtCtxGetCurrent(&ctx)); + c3AgentOpRetry_[group].thread.reset(new (std::nothrow) std::thread(&OpRetryManager::C3RetryStateMonitor, this, + group, c3AgentOpRetry_[group].c3RetryCtx, std::ref(c3AgentOpRetry_[group].startExec), ctx)); + CHK_SMART_PTR_NULL(c3AgentOpRetry_[group].thread); + HCCL_INFO("[%s]group[%s] rank[%u], register to C3AgentOpRetry success", __func__, group.c_str(), agentInfo.userRank); + return HCCL_SUCCESS; +} + +HcclResult OpRetryManager::RegisterC3ServerRetryMachine(const std::string& group, + std::map> &serverConnections, const OpRetryAgentInfo& agentInfo) +{ + if (c3ServerOpRetry_.find(group) != c3ServerOpRetry_.end()) { + HCCL_INFO("[Register][C3ServerRetryMachine]group[%s] has Registered to C3ServerOpRetry, skip", group.c_str()); + return HCCL_SUCCESS; + } + + for (auto it = serverConnections.begin(); it != serverConnections.end(); ++it) { + CHK_SMART_PTR_NULL(it->second); + } + + C3RetryCtrl c3RetryCtrl; + c3ServerOpRetry_.insert(std::make_pair(group, std::move(c3RetryCtrl))); + std::shared_ptr c3RetryPtr = nullptr; + EXECEPTION_CATCH((c3RetryPtr = std::make_shared()), return HCCL_E_PTR); + + EXECEPTION_CATCH((c3ServerOpRetry_[group].c3RetryCtx = + std::make_shared(serverConnections, c3RetryPtr, agentInfo)), return HCCL_E_PTR); + c3ServerOpRetry_[group].startExec = true; + + HcclRtContext ctx = nullptr; + CHK_RET(hrtCtxGetCurrent(&ctx)); + c3ServerOpRetry_[group].thread.reset(new (std::nothrow) std::thread(&OpRetryManager::C3RetryStateMonitor, this, + group, c3ServerOpRetry_[group].c3RetryCtx, std::ref(c3ServerOpRetry_[group].startExec), ctx)); + CHK_SMART_PTR_NULL(c3ServerOpRetry_[group].thread); + HCCL_INFO("[%s]group[%s] rank[%u], register to C3ServerOpRetry success", __func__, group.c_str(), agentInfo.userRank); + return HCCL_SUCCESS; +} + HcclResult OpRetryManager::UnRegisterOpRetryManager(const std::string& group) { std::unique_lock lock(ProcessLock_); @@ -176,6 +303,36 @@ HcclResult OpRetryManager::UnRegisterOpRetryManager(const std::string& group) return HCCL_SUCCESS; } +// HcclResult OpRetryManager::UnRegisterC3OpRetryManager(const std::string& group) +// { +// std::unique_lock lock(ProcessLock_); +// CHK_PRT_RET(group.empty(), +// HCCL_ERROR("[OpRetryManager][UnRegisterC3OpRetryManager]params invalid, group is empty"), HCCL_E_PARA); +// HCCL_INFO("[UnRegister][C3OpRetryManager]group[%s] unregister start", group.c_str()); +// CHK_PRT_RET(c3Initialized_ == false, HCCL_WARNING("C3OpRetryManager has been destroyed"), HCCL_SUCCESS); + +// if (c3AgentOpRetry_.find(group) != c3AgentOpRetry_.end()) { +// c3AgentOpRetry_[group].startExec = false; +// if (c3AgentOpRetry_[group].thread != nullptr && c3AgentOpRetry_[group].thread->joinable()) { +// c3AgentOpRetry_[group].thread->join(); +// } +// c3AgentOpRetry_.erase(group); +// HCCL_INFO("[UnRegister][C3OpRetryManager]group[%s] unregister agentOpRetry success", group.c_str()); +// } + +// if (c3ServerOpRetry_.find(group) != c3ServerOpRetry_.end()) { +// c3ServerOpRetry_[group].startExec = false; +// if (c3ServerOpRetry_[group].thread != nullptr && c3ServerOpRetry_[group].thread->joinable()) { +// c3ServerOpRetry_[group].thread->join(); +// } +// c3ServerOpRetry_.erase(group); +// HCCL_INFO("[UnRegister][C3OpRetryManager]group[%s] unregister serverOpRetry success", group.c_str()); +// } +// HCCL_INFO("[UnRegister][C3OpRetryManager]group[%s] unregister success", group.c_str()); +// OpRetryConnectionPub::DeInit(group); +// return HCCL_SUCCESS; +// } + void OpRetryManager::RetryStateMonitor(const std::string &group, std::shared_ptr retryCtx, const bool &startExec, HcclRtContext rtCtx) { @@ -198,6 +355,28 @@ void OpRetryManager::RetryStateMonitor(const std::string &group, std::shared_ptr group.c_str(), ret, initialized_, startExec); } +void OpRetryManager::C3RetryStateMonitor(const std::string &group, std::shared_ptr c3RetryCtx, + const bool &startExec, HcclRtContext rtCtx) +{ + CHK_SMART_PTR_RET_NULL(c3RetryCtx); + CHK_SMART_PTR_RET_NULL(rtCtx); + CHK_RET_NULL(hrtCtxSetCurrent(rtCtx)); + + // 给当前线程添加名字 + SetThreadName("C3_AIV_OpRetry"); + + HCCL_RUN_INFO("[%s]%s start, group[%s], rankId[%u], IpInfo[%s]", __func__, c3RetryCtx->GetCtxOpC3RetryMachineType(), + group.c_str(), c3RetryCtx->GetRankId()); + + HcclResult ret = HCCL_SUCCESS; + while(initialized_ && startExec) { + ret = c3RetryCtx->Request(); + CHK_PRT_RET(ret != HCCL_SUCCESS, HCCL_ERROR("C3RetryStateMonitor group[%s] exec fail", group.c_str()), ); + } + HCCL_INFO("C3RetryStateMonitor group[%s] exit, ret[%d], initialized_[%d], startExec[%d]", + group.c_str(), ret, initialized_, startExec); +} + HcclResult OpRetryManager::AddLinkInfoByIdentifier(s32 deviceLogicID, const std::string &identifier, const std::string &newTag, std::vector &remoteRankList, bool incre) { diff --git a/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/opretry_manager.h b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/opretry_manager.h index 7c48f31267618ea0011f6d7f0f44f4baccfd4e01..a7d68d958e742649eb7dad93f7ea70b031798d85 100644 --- a/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/opretry_manager.h +++ b/src/domain/collective_communication/framework/cluster_maintenance/recovery/operator_retry/opretry_manager.h @@ -13,6 +13,7 @@ #include #include #include "opretry_base.h" +#include "c3_opretry_base.h" namespace hccl { struct RetryCtrl { @@ -21,6 +22,12 @@ struct RetryCtrl { bool startExec = false; }; +struct C3RetryCtrl { + std::unique_ptr thread; + std::shared_ptr c3RetryCtx; + bool startExec = false; +}; + class OpRetryManager { public: @@ -34,6 +41,10 @@ public: const OpRetryServerInfo& serverInfo, const OpRetryAgentInfo& agentInfo); HcclResult UnRegisterOpRetryManager(const std::string& group); + // HcclResult OpRetryManager::RegisterC3OpRetryMachine(const std::string& group, u32 rankSize, bool isRoot, + // std::shared_ptr opStreamPtr, const OpRetryServerInfo& serverInfo, const OpRetryAgentInfo& agentInfo); + // HcclResult UnRegisterC3OpRetryManager(const std::string& group); + static HcclResult AddLinkInfoByIdentifier(s32 deviceLogicID, const std::string &identifier, const std::string &newTag, std::vector &remoteRankList, bool incre = false); static HcclResult GetLinkInfoByIdentifier(s32 deviceLogicID, const std::string &identifier, @@ -50,13 +61,22 @@ private: const OpRetryAgentInfo& agentInfo); HcclResult RegisterServerRetryMachine(const std::string& group, std::map> &serverConnections, const OpRetryAgentInfo& agentInfo); + HcclResult RegisterC3AgentRetryMachine(const std::string& group, std::shared_ptr socket, + std::shared_ptr opStreamPtr, const OpRetryAgentInfo& agentInfo); + HcclResult RegisterC3ServerRetryMachine(const std::string& group, + std::map> &serverConnections, const OpRetryAgentInfo& agentInfo); void RetryStateMonitor(const std::string &group, std::shared_ptr retryCtx, const bool &startExec, HcclRtContext rtCtx_); + void C3RetryStateMonitor(const std::string &group, std::shared_ptr c3RetryCtx, + const bool &startExec, HcclRtContext rtCtx_); private: std::map serverOpRetry; std::map agentOpRetry_; + std::map c3ServerOpRetry_; + std::map c3AgentOpRetry_; bool initialized_ = false; + bool c3Initialized_ = false; std::mutex ProcessLock_; }; } diff --git a/src/domain/collective_communication/framework/communicator/impl/hccl_communicator.cc b/src/domain/collective_communication/framework/communicator/impl/hccl_communicator.cc index 375a8a2d2d1ca4ba385571c693f3be44693541a1..e11db6a753e2edca117e183c81aaa816758ee95b 100644 --- a/src/domain/collective_communication/framework/communicator/impl/hccl_communicator.cc +++ b/src/domain/collective_communication/framework/communicator/impl/hccl_communicator.cc @@ -143,6 +143,7 @@ HcclCommunicator::~HcclCommunicator() if (opRetryManager_ != nullptr) { OpRetryManager::DeleteLinkInfoByIdentifier(deviceLogicId_, identifier_); opRetryManager_->UnRegisterOpRetryManager(identifier_); + // opRetryManager_->UnRegisterC3OpRetryManager(identifier_); opRetryManager_ = nullptr; } resMap_.clear(); @@ -1314,6 +1315,9 @@ HcclResult HcclCommunicator::InitOpRetry() commConnections_.agentConnection, commConnections_.serverConnections, kfcControlTransferH2D_, kfcStatusTransferD2H_, opRetryStreamPtr_, notifyResetCallback, setTransportStatusCallback, IsEnableBackupLink(), serverInfo, agentInfo)); + + // CHK_RET(opRetryManager_->RegisterC3OpRetryMachine(identifier_, userRankSize_, commConnections_.isRoot, + // opRetryStreamPtr_, serverInfo, agentInfo)); } return HCCL_SUCCESS; } diff --git a/src/domain/collective_communication/framework/op_base/src/op_base.cc b/src/domain/collective_communication/framework/op_base/src/op_base.cc index b9a9f5a658c439bde3673665160f5dbb267e27e6..437bac5fe9294d28a8621da2919dd717d588f51e 100644 --- a/src/domain/collective_communication/framework/op_base/src/op_base.cc +++ b/src/domain/collective_communication/framework/op_base/src/op_base.cc @@ -186,6 +186,16 @@ HcclResult GetDeviceComm(uint32_t ndev, const HcclRootInfo &rootHandle, const s3 return HCCL_SUCCESS; } +struct RetryTaskInfo { + int streamId; + int taskId; + int deviceId; + std::string tag; +}; + +std::map>> retryTaskMap; +std::mutex retryTaskMapMutex; + HcclResult HcclGetCommAll(uint32_t ndev, int32_t *devices, HcclComm *comms) { // 入参校验