From 43b0a00e575645effd3e542851f7f871c9fe0013 Mon Sep 17 00:00:00 2001 From: huangbin Date: Tue, 29 Apr 2025 21:34:17 +0800 Subject: [PATCH] Feature: add mspti sampling for comm ops. --- mspti_convert/convert_json2csv.py | 55 +++++++ mspti_convert/convert_mspti_timeline.py | 129 ++++++++++++++++ src/mspti/json_file_writer.h | 186 ++++++++++++++++++++++++ src/mspti/mspti_tracker.cpp | 97 ++++++++++++ src/mspti/mspti_tracker.hpp | 33 +++++ 5 files changed, 500 insertions(+) create mode 100644 mspti_convert/convert_json2csv.py create mode 100644 mspti_convert/convert_mspti_timeline.py create mode 100644 src/mspti/json_file_writer.h create mode 100644 src/mspti/mspti_tracker.cpp create mode 100644 src/mspti/mspti_tracker.hpp diff --git a/mspti_convert/convert_json2csv.py b/mspti_convert/convert_json2csv.py new file mode 100644 index 00000000..55b8ea24 --- /dev/null +++ b/mspti_convert/convert_json2csv.py @@ -0,0 +1,55 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:convert_json2_csv.py +Author: h00568282/huangbin +Create Date: 2025/3/28 16:17 +Notes: + +""" +import os +import json +import pandas as pd +from util.logging_utils import get_default_logger + +logger = get_default_logger(__name__) + + +def convert_json2csv(json_path): + csv_path = f"{json_path[:-5]}.csv" + if os.path.exists(csv_path): + return + + try: + with open(json_path, 'r', encoding='utf-8') as file: + content = file.read() + content = content.replace(']\n[', ',').strip() + json_data = json.loads(content) + except: + logger.error("json data read error") + json_data = None + + if not json_data: + return + df = pd.json_normalize(json_data, sep='_') + + logger.info(f"save path: {csv_path}") + df.to_csv(csv_path, index=False) + + +def convert_jsons2csv(root_path): + json_files = [file for file in os.listdir(root_path) if file.endswith("json")] + + for json_file in json_files: + logger.info(f"{json_file}") + json_path = os.path.join(root_path, json_file) + convert_json2csv(json_path) + + +if __name__ == "__main__": + # json_path = "./data/json_data/hccl_activity.3.json" + # convert_json2csv(json_path) + + root_path = "./data/json_tp4dp1" + convert_jsons2csv(root_path) \ No newline at end of file diff --git a/mspti_convert/convert_mspti_timeline.py b/mspti_convert/convert_mspti_timeline.py new file mode 100644 index 00000000..d8bbcf8f --- /dev/null +++ b/mspti_convert/convert_mspti_timeline.py @@ -0,0 +1,129 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:slow_node_detection.py +Author: h00568282/huangbin +Create Date: 2025/3/26 11:23 +Notes: + +""" +import os +import json +import pandas as pd +from convert_json2csv import convert_jsons2csv + +__all__ = ['convert_mspti_timeline'] + +MODE = { + 0: "Host", + 1: "Device" +} +OP_COLORS = { + 'HcclAllreduce': "good", + 'HcclAllReduce': "good", + 'HcclAllGather': "bad", + 'HcclBroadcast': "yellow", + 'HcclReduceScatter': "olive", + 'HcclSend': "good", + 'HcclReceive': "good", + 'HcclBatchSendRecv': "thread_state_runnable" +} + + +def create_args(row): + return { + "id": row["Id"], + "comm_group": row["comm_group"], + "count": row["count"] + } + + +def split_df(df): + """ + 根据 mode 列将 DataFrame 拆分为 host 和 device 两个 DataFrame + """ + df_host = df[df['SourceKind'] == 0] + df_device = df[df['SourceKind'] == 1] + return df_host, df_device + + +def process_df(data_df, device_id, id2name_dict: dict): + """ + 对 DataFrame 进行处理,包括分组聚合、列拆分、添加新列等操作 + """ + + data_df["Name"] = data_df['Id'].map(id2name_dict) + df = data_df.groupby('Id').agg({ + 'Timestamp': ['min', 'max'], + 'Kind': 'first', + 'SourceKind': 'first', + 'Name': 'first', + }).reset_index() + df.columns = ['Id', 'start', 'end', 'Kind', 'SourceKind', 'Name'] + df[['comm_op', 'comm_group', 'data_type', 'count']] = df['Name'].str.replace('comm:', '').str.split(',', + expand=True) + df = df.drop(columns=['Name']) + df['cat'] = "hccl" + df['name'] = df['comm_op'] + df['cname'] = df['comm_op'].map(OP_COLORS) + df['end'] = df['end'] / 1000. + df['start'] = df['start'] / 1000. + df['dur'] = df['end'] - df['start'] + df['ph'] = "X" + df['pid'] = f"rank_{device_id}" + df['tid'] = df["SourceKind"].map(MODE) + df['args'] = df.apply(create_args, axis=1) + result = df[['cat', 'name', 'ph', 'pid', 'tid', 'start', 'dur', 'cname', 'args']].rename( + columns={'start': 'ts'}).to_dict(orient='records') + return result + + +def process_files(root_path, debug: bool = False): + """ + 处理指定路径下的所有 CSV 文件 + """ + csv_files = [file for file in os.listdir(root_path) if file.endswith("csv") and "device" not in file] + all_ranks = [] + for csv_file in csv_files: + print(f"start file: {csv_file}") + csv_file_path = os.path.join(root_path, csv_file) + df = pd.read_csv(csv_file_path) + if debug: + df = df.head(12) + + id2name_dict = df[df['Name'].notna()].set_index('Id')['Name'].to_dict() + # df['name'] = df.groupby('id')['name'].transform(lambda x: x.ffill().bfill()) + df_host, df_device = split_df(df) + device_id = df_device['msptiObjecId_Ds_DeviceId'].unique()[0] + host_result = process_df(df_host, device_id, id2name_dict) + all_ranks.extend(host_result) + device_result = process_df(df_device, device_id, id2name_dict) + all_ranks.extend(device_result) + return all_ranks + + +def save_to_json(all_ranks, files_path): + """ + 将处理结果保存为 JSON 文件 + """ + output = { + "traceEvents": all_ranks, + "stackFrames": {} + } + json_output = json.dumps(output, indent=4) + with open(os.path.join(files_path, f'mspti_comm_ops_timeline.json'), 'w') as f: + f.write(json_output) + + +def convert_mspti_timeline(data_path: str): + convert_jsons2csv(data_path) + all_ranks = process_files(data_path) + save_to_json(all_ranks, data_path) + + +if __name__ == "__main__": + files_path = "D:\\startwork\\AOPS\\09-25年技术规划\\Code\\mspti_test-megatron-0224\\mspti_test-megatron-0224\\data\\log\\all_merge" + convert_jsons2csv(files_path) + all_ranks = process_files(files_path) + save_to_json(all_ranks, files_path) \ No newline at end of file diff --git a/src/mspti/json_file_writer.h b/src/mspti/json_file_writer.h new file mode 100644 index 00000000..dbaad69a --- /dev/null +++ b/src/mspti/json_file_writer.h @@ -0,0 +1,186 @@ +#pragma once +#include "mspti.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +class MSPTIHcclFileWriter +{ + private: + std::ofstream file; + std::mutex buffermtx; + std::mutex bufferMarkerMtx; + std::mutex threadmtx; + std::atomic opened; + std::unique_ptr> markerActivityBuffer; + std::thread writerThread; + std::condition_variable cv; + std::atomic stop; + Json::Value root = Json::Value(Json::ValueType::arrayValue); + + public: + MSPTIHcclFileWriter(const std::string &filename) + { + // obtain environment variable LOCAL_RANK + // to determine the rank of the process + // and append it to the filename + const char *savePath = std::getenv("METRIC_PATH"); + if (savePath == nullptr) + { + savePath = "/var/log"; + } + std::string savePathStr = savePath; + if (!savePathStr.empty() && savePathStr.back() != '/') + { + savePathStr += "/"; + } + std::string saveFilename = savePathStr + filename; + std::string filenameWithRank = saveFilename; + this->markerActivityBuffer = + std::make_unique>(); + + const char *localRankCStr = std::getenv("RANK"); + if (localRankCStr == nullptr) + { + localRankCStr = "-1"; + } + std::string localRank = + localRankCStr; // Now safe to construct std::string + auto rank = std::stoi(localRank); + if (saveFilename.length() >= 5 && + saveFilename.substr(saveFilename.length() - 5) == ".json") + { + std::string baseName = + saveFilename.substr(0, saveFilename.length() - 5); + filenameWithRank = baseName + "." + std::to_string(rank) + ".json"; + } + else + { + filenameWithRank = saveFilename + "." + std::to_string(rank); + } + std::cout << "Filename: " << filenameWithRank << std::endl; + this->file.open(filenameWithRank, std::ios::out | std::ios::app); + this->opened.store(true); + this->stop.store(false); + this->run(); + } + + void stopWriter() + { + if (this->file.is_open()) + { + { + std::unique_lock lock(this->threadmtx); + this->stop.store(true); + } + this->cv.notify_all(); + this->hcclActivityFormatToJson(); + if (this->writerThread.joinable()) + { + this->writerThread.join(); + } + this->file.close(); + this->opened.store(false); + } + } + + ~MSPTIHcclFileWriter() { this->stopWriter(); } + + bool fileExists(const std::string &fp) + { + std::ifstream file(fp.c_str()); + return file.good() && file.is_open(); + } + + void bufferMarkerActivity(msptiActivityMarker *activity) + { + std::lock_guard lock(this->bufferMarkerMtx); + this->markerActivityBuffer->push_back(*activity); + } + + void run() + { + // a thread to periodically flush + // the buffer to the file + // watch the conditional variable for signal + this->writerThread = std::thread( + [this]() + { + while (!this->stop.load()) + { + std::unique_lock lock(this->threadmtx); + if (this->cv.wait_for(lock, std::chrono::seconds(5)) == + std::cv_status::timeout) + { + this->hcclActivityFormatToJson(); + } + else if (this->stop.load()) + { + break; + }; + } + }); + } + + void hcclActivityFormatToJson() + { + std::lock_guard lock(this->buffermtx); + if (this->file.is_open()) + { + for (auto activity : *this->markerActivityBuffer) + { + Json::Value markerJson; + markerJson["Kind"] = activity.kind; + markerJson["SourceKind"] = activity.sourceKind; + markerJson["Timestamp"] = activity.timestamp; + markerJson["Id"] = activity.id; + // markerJson["Domain"] = ""; + markerJson["Flag"] = activity.flag; + Json::Value msptiObjecId; + if (activity.sourceKind == MSPTI_ACTIVITY_SOURCE_KIND_HOST) + { + Json::Value pt; + pt["ProcessId"] = activity.objectId.pt.processId; + pt["ThreadId"] = activity.objectId.pt.threadId; + Json::Value ds; + ds["DeviceId"] = activity.objectId.pt.processId; + ds["StreamId"] = activity.objectId.pt.threadId; + msptiObjecId["Pt"] = pt; + msptiObjecId["Ds"] = ds; + } + else if (activity.sourceKind == + MSPTI_ACTIVITY_SOURCE_KIND_DEVICE) + { + Json::Value ds; + ds["DeviceId"] = activity.objectId.ds.deviceId; + ds["StreamId"] = activity.objectId.ds.streamId; + Json::Value pt; + pt["ProcessId"] = activity.objectId.ds.deviceId; + pt["ThreadId"] = activity.objectId.ds.streamId; + msptiObjecId["Pt"] = pt; + msptiObjecId["Ds"] = ds; + } + markerJson["msptiObjectId"] = msptiObjecId; + markerJson["Name"] = activity.name; + this->root.append(markerJson); + } + if (this->root.size() > 0) + { + Json::StyledWriter writer; + this->file << writer.write(this->root); + this->root.clear(); + } + this->markerActivityBuffer->clear(); + } + else + { + std::cout << "File is not open" << std::endl; + } + } +}; \ No newline at end of file diff --git a/src/mspti/mspti_tracker.cpp b/src/mspti/mspti_tracker.cpp new file mode 100644 index 00000000..e8eceac3 --- /dev/null +++ b/src/mspti/mspti_tracker.cpp @@ -0,0 +1,97 @@ +#include "mspti_tracker.hpp" +#include +#include +#include + +constexpr size_t KB = 1 * 1024; +constexpr size_t MB = 1 * 1024 * KB; +constexpr size_t ALIGN_SIZE = 8; + +std::mutex MSPTITracker::mtx; + +inline uint8_t *align_buffer(uint8_t *buffer, size_t align) +{ + return reinterpret_cast( + (reinterpret_cast(buffer) + (align - 1)) & ~(align - 1)); +} + +MSPTITracker::MSPTITracker() +{ + std::cout << "Logging initialized from preloaded library." << std::endl; + hcclFileWriter = + std::make_unique("hccl_activity.json"); + msptiSubscribe(&subscriber, nullptr, nullptr); + msptiActivityRegisterCallbacks(UserBufferRequest, UserBufferComplete); + msptiActivityEnable(MSPTI_ACTIVITY_KIND_MARKER); +} + +MSPTITracker::~MSPTITracker() +{ + msptiActivityFlushAll(1); + msptiActivityDisable(MSPTI_ACTIVITY_KIND_MARKER); + finish(); + msptiUnsubscribe(subscriber); +} + +MSPTITracker &MSPTITracker::getInstance() +{ + static MSPTITracker instance; + return instance; +} + +void MSPTITracker::finish() +{ + std::cout << "Finishing MSPTI Tracker" << std::endl; + if (hcclFileWriter) + { + hcclFileWriter->stopWriter(); + } +} + +void MSPTITracker::readActivityMarker(msptiActivityMarker *activity) +{ + if (hcclFileWriter) + { + hcclFileWriter->bufferMarkerActivity(activity); + } +} + +void MSPTITracker::UserBufferRequest(uint8_t **buffer, size_t *size, + size_t *maxNumRecords) +{ + auto &instance = getInstance(); + std::lock_guard lock(mtx); + constexpr uint32_t SIZE = (uint32_t)MB * 1; + instance.requestedCount.fetch_add(1); + uint8_t *pBuffer = (uint8_t *)malloc(SIZE + ALIGN_SIZE); + *buffer = align_buffer(pBuffer, ALIGN_SIZE); + *size = MB * 1; + *maxNumRecords = 0; +} + +void MSPTITracker::UserBufferComplete(uint8_t *buffer, size_t size, + size_t validSize) +{ + auto &instance = getInstance(); + if (validSize > 0) + { + msptiActivity *pRecord = nullptr; + msptiResult status = MSPTI_SUCCESS; + do + { + std::lock_guard lock(mtx); + status = msptiActivityGetNextRecord(buffer, validSize, &pRecord); + if (status == MSPTI_SUCCESS && + pRecord->kind == MSPTI_ACTIVITY_KIND_MARKER) + { + instance.readActivityMarker( + reinterpret_cast(pRecord)); + } + else if (status == MSPTI_ERROR_MAX_LIMIT_REACHED) + { + break; + } + } while (status == MSPTI_SUCCESS); + } + free(buffer); +} \ No newline at end of file diff --git a/src/mspti/mspti_tracker.hpp b/src/mspti/mspti_tracker.hpp new file mode 100644 index 00000000..a1c75bcd --- /dev/null +++ b/src/mspti/mspti_tracker.hpp @@ -0,0 +1,33 @@ +#include "json_file_writer.h" +#include "mspti.h" +#include +#include +#include + +class MSPTITracker +{ + private: + static std::mutex mtx; + + msptiSubscriberHandle subscriber; + std::unique_ptr hcclFileWriter; + std::atomic requestedCount{0}; + + MSPTITracker(); + ~MSPTITracker(); + + public: + MSPTITracker(const MSPTITracker &) = delete; + MSPTITracker &operator=(const MSPTITracker &) = delete; + + static MSPTITracker &getInstance(); + + msptiSubscriberHandle *getSubscriber() { return &subscriber; } + void finish(); + void readActivityMarker(msptiActivityMarker *activity); + + static void UserBufferRequest(uint8_t **buffer, size_t *size, + size_t *maxNumRecords); + static void UserBufferComplete(uint8_t *buffer, size_t size, + size_t validSize); +}; \ No newline at end of file -- Gitee