diff --git a/cplusplus/level1_single_api/11_llm_data_dist/CMakeLists.txt b/cplusplus/level1_single_api/11_llm_data_dist/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..603b6e968fa0e345dfaf701f92badbc45afa7514 --- /dev/null +++ b/cplusplus/level1_single_api/11_llm_data_dist/CMakeLists.txt @@ -0,0 +1,71 @@ +cmake_minimum_required(VERSION 3.5.1) +project(llm_datadist_sample) + +set(CMAKE_VERBOSE_MAKEFILE ON) +set(CMAKE_SKIP_INSTALL_ALL_DEPENDENCY TRUE) + +if (DEFINED ENV{ASCEND_INSTALL_PATH}) + set(ASCEND_PATH $ENV{ASCEND_INSTALL_PATH}) +else () + set(ASCEND_PATH /usr/local/Ascend/latest) +endif () + +set(INCLUDE_DIR ${ASCEND_PATH}/include) + +set(common_compile_options + --std=c++11 + -g + -Wall +) + +set(common_compile_definitions + _GLIBCXX_USE_CXX11_ABI=0 +) + +add_executable(prompt_sample "prompt_sample.cpp") + +target_compile_options(prompt_sample PRIVATE + ${common_compile_options} +) + +target_compile_definitions(prompt_sample PRIVATE + ${common_compile_definitions} +) + +target_include_directories(prompt_sample PRIVATE + ${INCLUDE_DIR} + ${INCLUDE_DIR}/external/ge_common +) + +target_link_directories(prompt_sample PRIVATE + ${ASCEND_PATH}/lib64 +) + +target_link_libraries(prompt_sample PRIVATE + llm_engine + graph +) + +add_executable(decoder_sample "decoder_sample.cpp") + +target_compile_options(decoder_sample PRIVATE + ${common_compile_options} +) + +target_compile_definitions(decoder_sample PRIVATE + ${common_compile_definitions} +) + +target_include_directories(decoder_sample PRIVATE + ${INCLUDE_DIR} + ${INCLUDE_DIR}/external/ge_common +) + +target_link_directories(decoder_sample PRIVATE + ${ASCEND_PATH}/lib64 +) + +target_link_libraries(decoder_sample PRIVATE + llm_engine + graph +) \ No newline at end of file diff --git a/cplusplus/level1_single_api/11_llm_data_dist/decoder_sample.cpp b/cplusplus/level1_single_api/11_llm_data_dist/decoder_sample.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d0b6a57a14ade67ef95ac9536c728fca718df61e --- /dev/null +++ b/cplusplus/level1_single_api/11_llm_data_dist/decoder_sample.cpp @@ -0,0 +1,213 @@ +/** + * Copyright 2024 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "llm_datadist/llm_datadist.h" + +using namespace llm_datadist; + +constexpr uint16_t PROMPT_LISTEN_PORT = 26000; +constexpr uint16_t PROMPT_CLUSTER_ID = 0; +constexpr uint16_t DECODER_CLUSTER_ID = 1; +constexpr uint32_t NUM_TENSORS = 4U; +constexpr int32_t WAIT_PROMPT_TIME = 5; +constexpr int32_t EXPECTED_ARG_CNT = 4; +constexpr uint32_t ARG_INDEX_DEVICE_ID = 1; +constexpr uint32_t ARG_INDEX_LOCAL_IP = 2; +constexpr uint32_t ARG_INDEX_REMOTE_IP = 3; + +int Initialize(LlmDataDist &llmDataDist, const std::string &deviceId) +{ + std::map options; + options[OPTION_DEVICE_ID] = deviceId.c_str(); + options[OPTION_BUF_POOL_CFG] = R"({ +"buf_pool_size": 2147483648 +})"; + auto ret = llmDataDist.Initialize(options); + if (ret != LLM_SUCCESS) { + printf("[ERROR] Initialize failed, ret = %u\n", ret); + return -1; + } + printf("[INFO] Initialize success\n"); + return LLM_SUCCESS; +} + +int Link(LlmDataDist &llmDataDist, const char *localIp, const char *remoteIp) +{ + std::vector rets; + std::vector clusters; + ClusterInfo clusterInfo; + IpInfo localIpInfo; + localIpInfo.ip = localIp; + IpInfo remoteIpInfo; + remoteIpInfo.ip = remoteIp; + remoteIpInfo.port = PROMPT_LISTEN_PORT; + clusterInfo.remote_cluster_id = PROMPT_CLUSTER_ID; + clusterInfo.local_ip_infos.emplace_back(std::move(localIpInfo)); + clusterInfo.remote_ip_infos.emplace_back(std::move(remoteIpInfo)); + clusters.emplace_back(std::move(clusterInfo)); + auto ret = llmDataDist.LinkLlmClusters(clusters, rets); + if (ret != LLM_SUCCESS) { + printf("[ERROR] LinkLlmClusters failed, ret = %u\n", ret); + return -1; + } + printf("[INFO] LinkLlmClusters success\n"); + return 0; +} + +int32_t PullCache(LlmDataDist &llmDataDist, Cache &cache) +{ + std::vector promptBlocks {1,2,3,6,5,4,7}; + std::vector decoderBlocks {1,2,3,6,5,4,7}; + CacheIndex cacheIndex{PROMPT_CLUSTER_ID, 1, 0}; + // 可以使用PullKvBlock拉取多块block的数据 + auto ret = llmDataDist.PullKvBlocks(cacheIndex, cache, promptBlocks, decoderBlocks); + if (ret != LLM_SUCCESS) { + printf("[ERROR] PullKvBlocks failed, ret = %u\n", ret); + return -1; + } + printf("[INFO] PullKvBlocks success\n"); + // 也可以使用PullKvCache拉取一个batch中的连续数据 + cacheIndex.batch_index = 0; + ret = llmDataDist.PullKvCache(cacheIndex, cache, 0); + if (ret != LLM_SUCCESS) { + printf("[ERROR] PullKvCache failed, ret = %u\n", ret); + return -1; + } + printf("[INFO] PullKvCache success\n"); + return 0; +} + +int32_t RunWithCache(LlmDataDist &llmDataDist, Cache &npuCache) +{ + Cache host_cache{}; + host_cache.cache_desc = npuCache.cache_desc; + host_cache.cache_desc.placement = CachePlacement::kHost; + auto buffers = std::vector>(host_cache.cache_desc.num_tensors, + std::vector(8 * 16)); + for (uint32_t i = 0; i < host_cache.cache_desc.num_tensors; ++i) { + std::iota(buffers[i].begin(), buffers[i].end(), 0); + host_cache.tensor_addrs.emplace_back(reinterpret_cast(buffers[i].data())); + } + + // 通过D2H拷贝npu cache的值到host + std::vector blockIndices(host_cache.cache_desc.shape.front()); + // 可以使用CopyKvBlocks一次拷贝多块内存 + std::iota(blockIndices.begin(), blockIndices.end(), 0); + auto ret = llmDataDist.CopyKvBlocks(host_cache, npuCache, blockIndices, {blockIndices}); + if (ret != LLM_SUCCESS) { + printf("[ERROR] CopyKvBlocks failed, ret = %u\n", ret); + return -1; + } + printf("[INFO] CopyKvBlocks success\n"); + // 也可以使用CopyKvCache拷贝一个batch中的连续数据 + ret = llmDataDist.CopyKvCache(npuCache, host_cache, 0, 0); + if (ret != LLM_SUCCESS) { + printf("[ERROR] CopyKvCache failed, ret = %u\n", ret); + return -1; + } + printf("[INFO] CopyKvCache success\n"); + // 打印数据 + printf("[INFO] print cache data:\n"); + for (size_t i = 0; i < buffers[0].size(); ++i) { + if ((i > 0) && (i % host_cache.cache_desc.shape.back() == 0)) { + printf("\n"); + } + printf("%d\t", buffers[0][i]); + } + printf("\n"); + return 0; +} + +void OnError(LlmDataDist &llmDataDist, Cache &cache) +{ + if (cache.cache_id > 0) { + (void) llmDataDist.DeallocateCache(cache.cache_id); + } + llmDataDist.Finalize(); +} + +int32_t RunDecoderSample(const char *deviceId, const char *localIp, const char *remoteIp) +{ + printf("[INFO] Decoder Sample start\n"); + // 1. 初始化 + LlmDataDist llmDataDist(DECODER_CLUSTER_ID, LlmRole::kDecoder); + if (Initialize(llmDataDist, deviceId) != 0) { + return -1; + } + // 2. 与prompt建链 + if (Link(llmDataDist, localIp, remoteIp) != 0) { + return -1; + } + + // 3. 申请device cache + CacheDesc kv_cache_desc{}; + kv_cache_desc.num_tensors = NUM_TENSORS; + kv_cache_desc.data_type = DT_INT32; + kv_cache_desc.shape = {8, 16}; + Cache cache{}; + auto ret = llmDataDist.AllocateCache(kv_cache_desc, cache); + if (ret != LLM_SUCCESS) { + printf("[ERROR] AllocateCache failed, ret = %u\n", ret); + OnError(llmDataDist, cache); + return -1; + } + printf("[INFO] AllocateCache success\n"); + for (size_t i = 0U; i < cache.tensor_addrs.size(); ++i) { + printf("[INFO] Tensor[%zu] addr = %p\n", i, reinterpret_cast(cache.tensor_addrs[i])); + } + + // 4. 等待Prompt写完cache,实际业务场景可通过合适方式实现通知 + std::this_thread::sleep_for(std::chrono::seconds(WAIT_PROMPT_TIME)); + // 5. 从prompt拉取Cache + if (PullCache(llmDataDist, cache) != 0) { + OnError(llmDataDist, cache); + return -1; + } + // 6. 使用拉取后的Cache,如进行模型推理,此处简单打印下pull回来的数据 + if (RunWithCache(llmDataDist, cache) != 0) { + OnError(llmDataDist, cache); + return -1; + } + + // 7. 释放Cache与LlmDatadist + ret = llmDataDist.DeallocateCache(cache.cache_id); + if (ret != LLM_SUCCESS) { + printf("[ERROR] DeallocateCache failed, ret = %u\n", ret); + } else { + printf("[INFO] DeallocateCache success\n"); + } + llmDataDist.Finalize(); + printf("[INFO] Finalize success\n"); + printf("[INFO] Decoder Sample end\n"); + return 0; +} + +int main(int32_t argc, char **argv) +{ + if (argc != EXPECTED_ARG_CNT) { + printf("[ERROR] expect 3 args(deviceId, localIp, remoteIp), but got %d\n", argc - 1); + return -1; + } + const auto deviceId = argv[ARG_INDEX_DEVICE_ID]; + const auto localIp = argv[ARG_INDEX_LOCAL_IP]; + const auto remoteIp = argv[ARG_INDEX_REMOTE_IP]; + printf("[INFO] deviceId = %s, localIp = %s, remoteIp = %s\n", deviceId, localIp, remoteIp); + auto ret = RunDecoderSample(deviceId, localIp, remoteIp); + return ret; +} \ No newline at end of file diff --git a/cplusplus/level1_single_api/11_llm_data_dist/prompt_sample.cpp b/cplusplus/level1_single_api/11_llm_data_dist/prompt_sample.cpp new file mode 100644 index 0000000000000000000000000000000000000000..531006dfc3bcf6106dd9487fd5ec5091a98aeae4 --- /dev/null +++ b/cplusplus/level1_single_api/11_llm_data_dist/prompt_sample.cpp @@ -0,0 +1,144 @@ +/** + * Copyright 2024 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "llm_datadist/llm_datadist.h" + +using namespace llm_datadist; + +constexpr uint16_t PROMPT_LISTEN_PORT = 26000; +constexpr uint16_t PROMPT_CLUSTER_ID = 0; +constexpr uint32_t NUM_TENSORS = 4U; +constexpr int32_t WAIT_TIME = 30; +constexpr int32_t EXPECTED_ARG_CNT = 4; +constexpr uint32_t ARG_INDEX_DEVICE_ID = 1; +constexpr uint32_t ARG_INDEX_LOCAL_IP = 2; + +int Initialize(LlmDataDist &llmDataDist, const std::string &deviceId, const std::string &localIp) +{ + std::map options; + options[OPTION_DEVICE_ID] = deviceId.c_str(); + options[OPTION_LISTEN_IP_INFO] = (localIp + ":" + std::to_string(PROMPT_LISTEN_PORT)).c_str(); + options[OPTION_BUF_POOL_CFG] = R"({ +"buf_cfg":[{"total_size":2097152,"blk_size":256,"max_buf_size":8192}], +"buf_pool_size": 2147483648 +})"; + options[OPTION_ENABLE_SET_ROLE] = "1"; + auto ret = llmDataDist.Initialize(options); + if (ret != LLM_SUCCESS) { + printf("[ERROR] Initialize failed, ret = %u\n", ret); + return -1; + } + printf("[INFO] Initialize success\n"); + return LLM_SUCCESS; +} + +int32_t RunWithCache(LlmDataDist &llmDataDist, Cache &npuCache) +{ + // 在本Sample中,通过CopyCache/CopyBlocksCache接口对其进行赋值 + // 1. 申请host cache, 并赋值,用于后续Copy + Cache hostCache{}; + hostCache.cache_desc = npuCache.cache_desc; + hostCache.cache_desc.placement = CachePlacement::kHost; + auto buffers = std::vector>(hostCache.cache_desc.num_tensors, + std::vector(8 * 16)); + for (uint32_t i = 0; i < hostCache.cache_desc.num_tensors; ++i) { + std::iota(buffers[i].begin(), buffers[i].end(), 0); + hostCache.tensor_addrs.emplace_back(reinterpret_cast(buffers[i].data())); + } + + // 通过Copy H2D 为npu_cache赋值(CacheKvCache接口暂不支持H2D copy) + std::vector blockIndices(hostCache.cache_desc.shape.front()); + std::iota(blockIndices.begin(), blockIndices.end(), 0); + auto ret = llmDataDist.CopyKvBlocks(hostCache, npuCache, blockIndices, {blockIndices}); + if (ret != LLM_SUCCESS) { + printf("[ERROR] CopyKvBlocks failed, ret = %u\n", ret); + return -1; + } + printf("[INFO] CopyKvBlocks success\n"); + return 0; +} + +void OnError(LlmDataDist &llmDataDist, Cache &cache) +{ + if (cache.cache_id > 0) { + (void) llmDataDist.DeallocateCache(cache.cache_id); + } + llmDataDist.Finalize(); +} + +int32_t RunPromptSample(const char *deviceId, const char *localIp) +{ + printf("[INFO] Prompt Sample start\n"); + // 1. 初始化 + LlmDataDist llmDataDist(PROMPT_CLUSTER_ID, LlmRole::kPrompt); + if (Initialize(llmDataDist, deviceId, localIp) != 0) { + printf("[ERROR] Initialize LlmDataDist failed\n"); + return -1; + } + // 2. 申请device cache + CacheDesc cache_desc{}; + cache_desc.num_tensors = NUM_TENSORS; + cache_desc.data_type = DT_INT32; + cache_desc.shape = {8, 16}; + Cache cache{}; + auto ret = llmDataDist.AllocateCache(cache_desc, cache); + if (ret != LLM_SUCCESS) { + printf("[ERROR] AllocateCache failed, ret = %u\n", ret); + OnError(llmDataDist, cache); + return -1; + } + // 3. Allocate成功后,可以获取cache中各tensor的地址用于后续操作 + printf("[INFO] AllocateCache success\n"); + for (size_t i = 0U; i < cache.tensor_addrs.size(); ++i) { + printf("[INFO] Tensor[%zu] addr = %p\n", i, reinterpret_cast(cache.tensor_addrs[i])); + } + if (RunWithCache(llmDataDist, cache) != 0) { + printf("[ERROR] RunWithCache failed\n"); + OnError(llmDataDist, cache); + return -1; + } + + // 4. 等待Decoder拉取cache + std::this_thread::sleep_for(std::chrono::seconds(WAIT_TIME)); + + // 5. 释放Cache与LlmDatadist + ret = llmDataDist.DeallocateCache(cache.cache_id); + if (ret != LLM_SUCCESS) { + printf("[ERROR] DeallocateCache failed, ret = %u\n", ret); + } else { + printf("[INFO] DeallocateCache success\n"); + } + llmDataDist.Finalize(); + printf("[INFO] Finalize success\n"); + printf("[INFO] Prompt Sample end\n"); + return 0; +} + +int main(int32_t argc, char **argv) +{ + if (argc != EXPECTED_ARG_CNT) { + printf("[ERROR] expect 2 args(deviceId, localIp), but got %d\n", argc - 1); + return -1; + } + const auto deviceId = argv[ARG_INDEX_DEVICE_ID]; + const auto localIp = argv[ARG_INDEX_LOCAL_IP]; + printf("[INFO] deviceId = %s, localIp = %s\n", deviceId, localIp); + auto ret = RunPromptSample(deviceId, localIp); + return ret; +} \ No newline at end of file diff --git a/cplusplus/level1_single_api/11_llm_data_dist/readme.md b/cplusplus/level1_single_api/11_llm_data_dist/readme.md new file mode 100644 index 0000000000000000000000000000000000000000..121168723619f1e5b6108db238cd0e5985e7958f --- /dev/null +++ b/cplusplus/level1_single_api/11_llm_data_dist/readme.md @@ -0,0 +1,74 @@ +## 目录 + +- [样例介绍](#样例介绍) +- [目录结构](#目录结构) +- [环境要求](#环境要求) +- [程序编译](#程序编译) +- [样例运行](#样例运行) + + +## 样例介绍 + +功能:通过LLM-DataDist接口实现分离部署场景下KvCache和Cache的管理功能。 + + +## 目录结构 + +``` +├── prompt_sampe.cpp // prompt样例main函数 +├── decoder_sampe.cpp // decoder样例main函数 +├── CMakeLists.txt // 编译脚本 +``` + + +## 环境要求 + +- 操作系统及架构:Euleros x86系统、Euleros aarch64系统 +- 编译器:g++ +- 芯片:Atlas 训练系列产品、Atlas 推理系列产品(配置Ascend 310P AI处理器) +- python及依赖的库:python3.7.5 +- 已完成昇腾AI软件栈在运行环境上的部署 + +## 程序编译 + +1. 修改CMakeLists.txt文件中的安装包路径 + +2. 执行如下命令进行编译。 + + 依次执行: + + ``` + mkdir build && cd build + cmake .. && make + ``` + +3. 编译结束后,在**build**目录下生成可执行文件**prompt_sample**与**decoder_sample**。 + +## 样例运行 +1. 执行前准备: + + - 在Prompt与Decoder的主机分别执行以下命令,查询该主机的device ip信息 + ``` + for i in {0..7}; do hccn_tool -i $i -ip -g; done + ``` + **注: 如果出现hccn_tool命令找不到的情况,可在CANN包安装目录下搜索hccn_tool,找到可执行文件执行** +2. 配置环境变量 + - 若运行环境上安装的“Ascend-cann-toolkit”包,环境变量设置如下: + + ``` + . ${HOME}/Ascend/ascend-toolkit/set_env.sh + ``` + + “$HOME/Ascend”请替换相关软件包的实际安装路径。 + +3. 在运行环境执行可执行文件。 + + - 执行prompt_sample, 参数为device_id与local_ip其中device_id为prompt要使用的device_id, local_ip为prompt所在device的ip,如: + ``` + ./prompt_sample 0 10.10.10.1 + ``` + + - 执行decoder_sample, 参数为device_id、local_ip与remote_ip, 其中device_id为decoder要使用的device_id, local_ip为decoder所在device的ip,remote_ip为prompt所在device的ip,如: + ``` + ./decoder_sample 4 10.10.10.5 10.10.10.1 + ``` diff --git a/cplusplus/level1_single_api/README.md b/cplusplus/level1_single_api/README.md index 83feeca74c1d385e769bec09d7c3f8761c2d12b9..82be76df807f946d0115989e30b9240c74606716 100644 --- a/cplusplus/level1_single_api/README.md +++ b/cplusplus/level1_single_api/README.md @@ -16,4 +16,5 @@ This catalog is a sample of a single function interface. Each folder corresponds | [7_dvpp](./7_dvpp) | media data processing interface sample | | [8_graphrun](./8_graphrun) | graphrunrelated interface sample | | [9_feature_retrieval](./9_feature_retrieval) | feature vector search interface sample | -| [10_aoe_api](./10_aoe_api) | aoe interface sample | \ No newline at end of file +| [10_aoe_api](./10_aoe_api) | aoe interface sample | +| [11_llm_data_dist](./11_llm_data_dist) | LLM-DataDist sample | diff --git a/cplusplus/level1_single_api/README_CN.md b/cplusplus/level1_single_api/README_CN.md index 629654c73575528bfcccb3166e94ce523bcc2ee4..1b42dd547c48acbbdf1e281dc72205889fde0caa 100644 --- a/cplusplus/level1_single_api/README_CN.md +++ b/cplusplus/level1_single_api/README_CN.md @@ -16,4 +16,5 @@ | [7_dvpp](./7_dvpp) | 媒体数据处理相关接口样例 | | [8_graphrun](./8_graphrun) | graphrun相关接口样例 | | [9_feature_retrieval](./9_feature_retrieval) | 特征向量检索相关接口样例 | -| [10_aoe_api](./10_aoe_api) | aoe相关接口样例 | \ No newline at end of file +| [10_aoe_api](./10_aoe_api) | aoe相关接口样例 | +| [11_llm_data_dist](./11_llm_data_dist) | LLM-DataDist相关接口样例 |