diff --git a/build.sh b/build.sh index b9c0cf709dd68dc75ca867cb3198a08b0186db02..2dcffeddd8d79c5462a35bdbbdcd4d6b1f1088d8 100755 --- a/build.sh +++ b/build.sh @@ -339,7 +339,7 @@ function run_example() { # sanitize path local old_ld_path=$LD_LIBRARY_PATH - local new_ld_path=$(echo $old_ld_path | tr ':' '\n' | grep -v "output/datasystem/service" | grep -v "output/datasystem/sdk" | tr '\n' ':') + local new_ld_path=$(echo $old_ld_path | tr ':' '\n' | grep -v "output/service" | grep -v "output/sdk" | tr '\n' ':') export LD_LIBRARY_PATH=$new_ld_path echo -e "---- Sanitize LD_LIBRARY_PATH from ${old_ld_path} to ${new_ld_path}" @@ -348,7 +348,7 @@ function run_example() { echo -e "---- [TIMER] Run example: $(($(date +%s)-$baseTime_s)) seconds" # clean unpackage files. - rm -rf "${INSTALL_DIR}/datasystem/service" "${INSTALL_DIR}/datasystem/sdk" + rm -rf "${INSTALL_DIR}/service" "${INSTALL_DIR}/sdk" fi } @@ -455,7 +455,7 @@ function run_manual_ut() echo -e "---- [TIMER] Run python llt: $(($(date +%s)-$baseTime_s)) seconds" # clean unpackage files. - rm -rf "${INSTALL_DIR}/datasystem/service" "${INSTALL_DIR}/datasystem/sdk" + rm -rf "${INSTALL_DIR}/service" "${INSTALL_DIR}/sdk" fi fi } @@ -566,7 +566,7 @@ function build_datasystem() cmake "${cmake_options[@]}" || go_die "-- build datasystem CMake project failed!" cmake --build "${BUILD_DIR}" -j "${BUILD_THREAD_NUM}" || go_die "-- datasystem cmake build failed!" cmake --install "${BUILD_DIR}" || go_die "-- datasystem cmake install failed!" - cp "${DATASYSTEM_DIR}/LOG_README" "${INSTALL_DIR}/datasystem/service/" + cp "${DATASYSTEM_DIR}/LOG_README" "${INSTALL_DIR}/service/" echo -e "---- [TIMER] Build source: $(($(date +%s)-$baseTime_s)) seconds" # erase symbol table if need. @@ -574,15 +574,15 @@ function build_datasystem() if [[ "${BUILD_TYPE}" = "Debug" ]]; then echo -e "WARNING: Build in debug mode and use strip tool to erase symbol table, it could be a problem when you use gdb." fi - strip_symbols "${INSTALL_DIR}/datasystem/sdk/cpp/lib" "${INSTALL_DIR}/datasystem/sdk/DATASYSTEM_SYM" + strip_symbols "${INSTALL_DIR}/sdk/cpp/lib" "${INSTALL_DIR}/sdk/DATASYSTEM_SYM" if is_on "${BUILD_HETERO}"; then - cp ${BUILD_DIR}/src/datasystem/common/device/ascend/plugin/libacl_plugin.so.sym "${INSTALL_DIR}/datasystem/sdk/DATASYSTEM_SYM" + cp ${BUILD_DIR}/src/datasystem/common/device/ascend/plugin/libacl_plugin.so.sym "${INSTALL_DIR}/sdk/DATASYSTEM_SYM" fi if is_on "${PACKAGE_PYTHON}"; then - cp ${BUILD_DIR}/python_lib/*.sym "${INSTALL_DIR}/datasystem/sdk/DATASYSTEM_SYM" + cp ${BUILD_DIR}/python_lib/*.sym "${INSTALL_DIR}/sdk/DATASYSTEM_SYM" fi - strip_symbols "${INSTALL_DIR}/datasystem/service" "${INSTALL_DIR}/datasystem/service/DATASYSTEM_SYM" - strip_symbols "${INSTALL_DIR}/datasystem/service/lib" "${INSTALL_DIR}/datasystem/service/DATASYSTEM_SYM" + strip_symbols "${INSTALL_DIR}/service" "${INSTALL_DIR}/service/DATASYSTEM_SYM" + strip_symbols "${INSTALL_DIR}/service/lib" "${INSTALL_DIR}/service/DATASYSTEM_SYM" fi # build example if -t is build or run @@ -594,7 +594,7 @@ function build_datasystem() # package cd "${INSTALL_DIR}" - tar --remove-files -zcf yr-datasystem-v$(cat "${BASE_DIR}/VERSION").tar.gz datasystem + tar --remove-files -zcf yr-datasystem-v$(cat "${BASE_DIR}/VERSION").tar.gz service sdk cd - echo -e "-- build datasystem success!" } diff --git a/cmake/package.cmake b/cmake/package.cmake index 41d23ec4a415e0fe8acfb59a9758ba58027699b3..09ef4ef5e8bbf294bf3ae1841248f1770cf31e7b 100644 --- a/cmake/package.cmake +++ b/cmake/package.cmake @@ -1,7 +1,7 @@ ############################################################ # Datasystem targets and config files. ############################################################ -set(DATASYSTEM_CONFIG_PATH datasystem/sdk/cpp/lib/cmake/${PROJECT_NAME}) +set(DATASYSTEM_CONFIG_PATH sdk/cpp/lib/cmake/${PROJECT_NAME}) configure_package_config_file(cmake/config.cmake.in ${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}Config.cmake @@ -28,8 +28,8 @@ endif() ############################################################ # Datasystem header files and share libraries. ############################################################ -set(DATASYSTEM_SDK_USER_INCLUDEDIR datasystem/sdk/cpp/include) -set(DATASYSTEM_SDK_USER_LIBPATH datasystem/sdk/cpp/lib) +set(DATASYSTEM_SDK_USER_INCLUDEDIR sdk/cpp/include) +set(DATASYSTEM_SDK_USER_LIBPATH sdk/cpp/lib) if (NOT ENABLE_PERF) install(DIRECTORY ${CMAKE_SOURCE_DIR}/include/datasystem @@ -115,16 +115,68 @@ if (BUILD_PYTHON_API) package_python(datasystem PYTHON_SRC_DIR ${CMAKE_SOURCE_DIR}/python - CMAKE_INSTALL_PATH ${CMAKE_INSTALL_PREFIX}/datasystem/sdk + CMAKE_INSTALL_PATH ${CMAKE_INSTALL_PREFIX}/sdk DEPEND_TARGETS ${DEPEND_TARGETS} THIRDPATRY_LIBS_PATTERN ${PYTHON_LIB_PATTERNS}) endif () +############################################################ +# Datasystem go header files and share libraries. +############################################################ +if (BUILD_GO_API) + set(DATASYSTEM_GO_INCLUDEDIR sdk/go/include) + set(DATASYSTEM_GO_LIBPATH sdk/go/lib) + set(DATASYSTEM_GO_PATH sdk) + + install(DIRECTORY ${CMAKE_SOURCE_DIR}/go + DESTINATION ${DATASYSTEM_GO_PATH} + FILES_MATCHING PATTERN "*") + + install(FILES ${CMAKE_SOURCE_DIR}/src/datasystem/c_api/status_definition.h + ${CMAKE_SOURCE_DIR}/src/datasystem/c_api/state_cache_c_wrapper.h + ${CMAKE_SOURCE_DIR}/src/datasystem/c_api/stream_cache_c_wrapper.h + ${CMAKE_SOURCE_DIR}/src/datasystem/c_api/object_cache_c_wrapper.h + ${CMAKE_SOURCE_DIR}/src/datasystem/c_api/utilC.h + ${CMAKE_SOURCE_DIR}/src/datasystem/c_api/cipher.h + DESTINATION ${DATASYSTEM_GO_INCLUDEDIR}/datasystem/c_api) + + set(datasystem_c_INSTALL_LIBPATH ${DATASYSTEM_GO_LIBPATH}) + install_datasystem_target(datasystem_c) + + set(datasystem_INSTALL_LIBPATH ${DATASYSTEM_GO_LIBPATH}) + install_datasystem_target(datasystem) + + set(GO_LIB_PATTERNS + ${SDK_SPDLOG_LIB} + ${SDK_PROTOBUF_LIB} + ${SDK_PROTOC_LIB} + "${SecureC_LIB_PATH}/libsecurec.so" + "${TBB_LIB_PATH}/libtbb.so*" + "${OpenSSL_LIB_PATH}/libssl.so*" + "${OpenSSL_LIB_PATH}/libcrypto.so*" + "${gRPC_LIB_PATH}/libgrpc.so*" + "${gRPC_LIB_PATH}/libgrpc++.so*" + "${gRPC_LIB_PATH}/libgpr.so*" + "${gRPC_LIB_PATH}/libupb*" + "${gRPC_LIB_PATH}/libutf8*" + "${gRPC_LIB_PATH}/libaddress_sorting.so*" + "${SPDLOG_LIB_PATH}/libds-spdlog.so*" + ${RPC_LIB_PATH} + ) + + install_file_pattern( + PATH_PATTERN ${GO_LIB_PATTERNS} + DEST_DIR ${DATASYSTEM_GO_LIBPATH} + ) +endif () + + + ############################################################ # Datasystem bin and depends libs. ############################################################ -set(DATASYSTEM_SERVICE_BINPATH datasystem/service) -set(DATASYSTEM_SERVICE_LIBPATH datasystem/service/lib) +set(DATASYSTEM_SERVICE_BINPATH service) +set(DATASYSTEM_SERVICE_LIBPATH service/lib) set(datasystem_worker_INSTALL_BINPATH ${DATASYSTEM_SERVICE_BINPATH}) install_datasystem_target(datasystem_worker) diff --git a/cmake/scripts/PackageDatasystem.cmake.in b/cmake/scripts/PackageDatasystem.cmake.in index 96897fc93ffcf8bf498bd43fad3a8d18289f43bc..fcedd7508f469a65ef83706f659849713fbe75d0 100644 --- a/cmake/scripts/PackageDatasystem.cmake.in +++ b/cmake/scripts/PackageDatasystem.cmake.in @@ -16,7 +16,7 @@ if(EXISTS ${PYTHON_LIBPATH}) DESTINATION ${DATASYSTEM_WHEEL_PATH}/lib REGEX ".*sym$" EXCLUDE) endif() -file(GLOB_RECURSE FILE_LIST ${CMAKE_INSTALL_PREFIX}/datasystem/sdk/cpp/lib/*so*) +file(GLOB_RECURSE FILE_LIST ${CMAKE_INSTALL_PREFIX}/sdk/cpp/lib/*so*) file(WRITE "${DATASYSTEM_WHEEL_PATH}/sdk_lib_list" "") foreach(FILE ${FILE_LIST}) get_filename_component(FILENAME ${FILE} NAME) diff --git a/cmake/util.cmake b/cmake/util.cmake index 9d871e9146fc67bba611cb096504298746cdde2a..aa4365fceb265f0368a44cb4755663b43d86dbe7 100644 --- a/cmake/util.cmake +++ b/cmake/util.cmake @@ -736,9 +736,9 @@ function(PACKAGE_DATASYSTEM_WHEEL PACKAGE_NAME) # Store helm chart set(HELM_CHART_PATH ${CMAKE_SOURCE_DIR}/k8s/helm_chart) - set(SERVER_LIB ${CMAKE_INSTALL_PREFIX}/datasystem/service/lib) - set(OUTPUT_INCLUDE_DIF ${CMAKE_INSTALL_PREFIX}/datasystem/sdk/cpp/include) - set(SDK_LIB ${CMAKE_INSTALL_PREFIX}/datasystem/sdk/cpp/lib) + set(SERVER_LIB ${CMAKE_INSTALL_PREFIX}/service/lib) + set(OUTPUT_INCLUDE_DIF ${CMAKE_INSTALL_PREFIX}/sdk/cpp/include) + set(SDK_LIB ${CMAKE_INSTALL_PREFIX}/sdk/cpp/lib) set(PYTHON_SDK ${CMAKE_SOURCE_DIR}/python) set(PYTHON_LIBPATH ${CMAKE_BINARY_DIR}/python_lib) @@ -802,7 +802,7 @@ function(PACKAGE_DATASYSTEM_WHEEL PACKAGE_NAME) install(DIRECTORY ${CMAKE_SOURCE_DIR}/cli/cpp_template DESTINATION ${DATASYSTEM_WHEEL_PATH}) # Copy worker and worker_config to package lib path - install(FILES ${CMAKE_INSTALL_PREFIX}/datasystem/service/datasystem_worker ${CMAKE_SOURCE_DIR}/cli/deploy/conf/worker_config.json ${CMAKE_SOURCE_DIR}/cli/deploy/conf/cluster_config.json + install(FILES ${CMAKE_INSTALL_PREFIX}/service/datasystem_worker ${CMAKE_SOURCE_DIR}/cli/deploy/conf/worker_config.json ${CMAKE_SOURCE_DIR}/cli/deploy/conf/cluster_config.json DESTINATION ${DATASYSTEM_WHEEL_PATH}) find_package(Python3 COMPONENTS Interpreter Development) diff --git a/example/cpp/CMakeLists.txt b/example/cpp/CMakeLists.txt index 467fcd28a948b09a78faf77a80b7d153c2894cb6..5100987385386f974134ee3f39041cc5cdee0b53 100644 --- a/example/cpp/CMakeLists.txt +++ b/example/cpp/CMakeLists.txt @@ -16,14 +16,14 @@ if(EXISTS "${BASE_DIR}/config.cmake") include(${BASE_DIR}/config.cmake) endif() -if(NOT EXISTS "${INSTALL_DIR}/datasystem/sdk/cpp") +if(NOT EXISTS "${INSTALL_DIR}/sdk/cpp") file(ARCHIVE_EXTRACT INPUT ${INSTALL_DIR}/yr-datasystem-v${version}.tar.gz DESTINATION ${INSTALL_DIR} ) endif() -SET(CMAKE_PREFIX_PATH "${INSTALL_DIR}/datasystem/sdk/cpp") +SET(CMAKE_PREFIX_PATH "${INSTALL_DIR}/sdk/cpp") find_package(Datasystem ${Datasystem_version} REQUIRED) diff --git a/example/cpp/datasystem_example.cpp b/example/cpp/datasystem_example.cpp index 8519cbaa8488c028d156018329d076e9a402e06a..8d35e69d59cf07b667e4872c6352889acaeee584 100644 --- a/example/cpp/datasystem_example.cpp +++ b/example/cpp/datasystem_example.cpp @@ -206,7 +206,6 @@ int main(int argc, char *argv[]) } ConnectOptions connectOptions{ .host = ip, .port = port, .connectTimeoutMs = 3 * 1000 }; - connectOptions.enableExclusiveConnection = false; dsClient_ = std::make_shared(connectOptions); (void)Context::SetTraceId("init"); Status status = dsClient_->Init(); diff --git a/example/cpp/hetero_client_example.cpp b/example/cpp/hetero_client_example.cpp index 751ab5e7ab8596df569e59648d7067c88d632a38..f4a6d660f19567ed1c46caebf0d7da635ebc4550 100644 --- a/example/cpp/hetero_client_example.cpp +++ b/example/cpp/hetero_client_example.cpp @@ -142,7 +142,6 @@ int main(int argc, char *argv[]) .clientPublicKey = clientPublicKey, .clientPrivateKey = clientPrivateKey, .serverPublicKey = serverPublicKey }; - connectOpts.enableExclusiveConnection = false; client_ = std::make_shared(connectOpts); (void)Context::SetTraceId("init"); Status status = client_->Init(); diff --git a/example/cpp/kv_client_example.cpp b/example/cpp/kv_client_example.cpp index 245fd9fd52b97aadd131cc3170d6c3bb9f592f5f..a85ebc1684f931395acf1e30d700ba82c94c9442 100644 --- a/example/cpp/kv_client_example.cpp +++ b/example/cpp/kv_client_example.cpp @@ -156,7 +156,6 @@ int main(int argc, char *argv[]) .clientPublicKey = clientPublicKey, .clientPrivateKey = clientPrivateKey, .serverPublicKey = serverPublicKey }; - connectOpts.enableExclusiveConnection = false; client_ = std::make_shared(connectOpts); (void)Context::SetTraceId("init"); Status status = client_->Init(); diff --git a/example/cpp/object_client_example.cpp b/example/cpp/object_client_example.cpp index 535da96197845343556fd8970ebb541fe17423d8..ed2500a72e900b30b387e90d7435f72662050855 100644 --- a/example/cpp/object_client_example.cpp +++ b/example/cpp/object_client_example.cpp @@ -181,7 +181,6 @@ int main(int argc, char *argv[]) .clientPublicKey = clientPublicKey, .clientPrivateKey = clientPrivateKey, .serverPublicKey = serverPublicKey }; - connectOpts.enableExclusiveConnection = false; if (InitClient(connectOpts) != 0) { return -1; } diff --git a/example/cpp/stream_client_example.cpp b/example/cpp/stream_client_example.cpp index a861314174a9fabb92783ac35ba74ad81a9c6ad9..a0f7d76aded67228a44e173e37e2bf13f1d4cbed 100644 --- a/example/cpp/stream_client_example.cpp +++ b/example/cpp/stream_client_example.cpp @@ -109,7 +109,6 @@ int RunExample(const std::string &ip, const int32_t port, const std::string &cli .clientPublicKey = clientPublicKey, .clientPrivateKey = clientPrivateKey, .serverPublicKey = serverPublicKey }; - connectOpts.enableExclusiveConnection = false; auto client = std::make_shared(connectOpts); Status status = client->Init(); if (status.IsError()) { diff --git a/example/python/object_client_example.py b/example/python/object_client_example.py index 5aac0020ca2925749c1ffac2fd84030c9bafee92..71da3b7794691971a4a4ac37f2054473fbedb7ef 100644 --- a/example/python/object_client_example.py +++ b/example/python/object_client_example.py @@ -64,7 +64,7 @@ class ObjectClientExample: buf.unwlatch() # Get the key. - buffer_list = client.get([key], 0) + buffer_list = client.get([key], True) if value != buffer_list[0].immutable_data(): raise RuntimeError(f"Assert failed, expect {value}, but got {buffer_list[0].immutable_data()}") diff --git a/example/run-example.sh b/example/run-example.sh index 28bd5eed944b528a3d6eb641176d610125780a6a..cb73d9c6dc682ba91575a2bb3dd97bb8d0a33e76 100755 --- a/example/run-example.sh +++ b/example/run-example.sh @@ -52,7 +52,7 @@ trap cleanup EXIT INT TERM # run cpp example echo -e "---- Running cpp example..." -export LD_LIBRARY_PATH="${ds_output_dir}/datasystem/sdk/cpp/lib:${LD_LIBRARY_PATH}" +export LD_LIBRARY_PATH="${ds_output_dir}/sdk/cpp/lib:${LD_LIBRARY_PATH}" echo "Set LD_LIBRARY_PATH=${LD_LIBRARY_PATH} before cpp example test." ${example_cpp_dir}/stream_client_example "127.0.0.1" "${worker_port}" ${example_cpp_dir}/datasystem_example "127.0.0.1" "${worker_port}" diff --git a/scripts/modules/llt_util.sh b/scripts/modules/llt_util.sh index 8b409115040b83f280620ae5c0c40daa588f3f4e..84f384b4b9efc3e5fee93348a97877a8d5e1643d 100644 --- a/scripts/modules/llt_util.sh +++ b/scripts/modules/llt_util.sh @@ -139,7 +139,7 @@ function start_all() }) # generate worker config - ${DSCLI} generate_config -o "${deploy_dir}/datasystem/service" + ${DSCLI} generate_config -o "${deploy_dir}/service" worker_port=$(get_random_port "${DEFAULT_MIN_PORT}" "${DEFAULT_MAX_PORT}") WORKER_ADDRESS="127.0.0.1:${worker_port}" @@ -149,9 +149,9 @@ function start_all() -e '/"worker_address": {/,/}/ s|"value": "[^"]*"|"value": "'"${WORKER_ADDRESS}"'"|' \ -e '/"etcd_address": {/,/}/ s|"value": "[^"]*"|"value": "'"${WORKER_ETCD_ADDRESS}"'"|' \ -e '/"add_node_wait_time_s": {/,/}/ s|"value": "[^"]*"|"value": "0"|' \ - "${deploy_dir}/datasystem/service/worker_config.json" + "${deploy_dir}/service/worker_config.json" - ${DSCLI} start -d "${root_dir}" -f "${deploy_dir}/datasystem/service/worker_config.json" + ${DSCLI} start -d "${root_dir}" -f "${deploy_dir}/service/worker_config.json" WORKER_HEALTH_CHECK_PATH=$(find "${root_dir}" -type f -path "*/probe/healthy" 2>/dev/null | head -1) # wait worker ready @@ -165,7 +165,7 @@ function stop_all() exit 1; fi local deploy_dir=$1 - ${DSCLI} stop -f "${deploy_dir}/datasystem/service/worker_config.json" + ${DSCLI} stop -f "${deploy_dir}/service/worker_config.json" if ps -p ${etcd_pid} >/dev/null; then # interrupt signal will shutdown the etcd cluster echo "Shutting down etcd service pid: ${etcd_pid}" diff --git a/tests/python/test_device_oc_client.py b/tests/python/test_device_oc_client.py index 5934bf5361b480eb535684d6f71bd4741fa1e3ad..4e349ad46a3058e34414f4aa9e6c63aa6ebddbc1 100644 --- a/tests/python/test_device_oc_client.py +++ b/tests/python/test_device_oc_client.py @@ -48,7 +48,7 @@ class TestDeviceOcClientMethods(unittest.TestCase): @classmethod def setUpClass(cls): root_dir = os.path.dirname(os.path.abspath("..")) - worker_env_path = os.path.join(root_dir, 'output', 'datasystem', 'service', 'worker_config.json') + worker_env_path = os.path.join(root_dir, 'output', 'service', 'worker_config.json') with open(worker_env_path, "r") as f: config = json.load(f) diff --git a/tests/python/test_ds_client.py b/tests/python/test_ds_client.py index cb22205d5741e3a48fdfed8f2bb1aa8a80454b25..077244403badac4ba72f51fc0e90d12f324df6ce 100644 --- a/tests/python/test_ds_client.py +++ b/tests/python/test_ds_client.py @@ -46,7 +46,7 @@ class TestDsClientMethods(unittest.TestCase): @classmethod def setUpClass(cls): root_dir = os.path.dirname(os.path.abspath('..')) - worker_env_path = os.path.join(root_dir, 'output', 'datasystem', 'service', 'worker_config.json') + worker_env_path = os.path.join(root_dir, 'output', 'service', 'worker_config.json') with open(worker_env_path, "r") as f: config = json.load(f) diff --git a/tests/python/test_ds_tensor_client.py b/tests/python/test_ds_tensor_client.py index 594a305dbeacaa89b2f64776ee968add0812a804..97ca1e16b0c7f0eafdfd5c63f6954d1eed9c24af 100644 --- a/tests/python/test_ds_tensor_client.py +++ b/tests/python/test_ds_tensor_client.py @@ -89,7 +89,7 @@ class TestDsTensorClient(unittest.TestCase): @classmethod def setUpClass(cls): root_dir = os.path.dirname(os.path.abspath('..')) - worker_env_path = os.path.join(root_dir, 'output', 'datasystem', 'service', 'worker_config.json') + worker_env_path = os.path.join(root_dir, 'output', 'service', 'worker_config.json') with open(worker_env_path, "r") as f: config = json.load(f) diff --git a/tests/python/test_kv_cache_client.py b/tests/python/test_kv_cache_client.py index 59689484631752bfbd892193640eb308635d1cec..5af0f31933a94e4ea333db1ae1024245aad765fe 100644 --- a/tests/python/test_kv_cache_client.py +++ b/tests/python/test_kv_cache_client.py @@ -48,7 +48,7 @@ class TestKVClientMethods(unittest.TestCase): @classmethod def setUpClass(cls): root_dir = os.path.dirname(os.path.abspath('..')) - worker_env_path = os.path.join(root_dir, 'output', 'datasystem', 'service', 'worker_config.json') + worker_env_path = os.path.join(root_dir, 'output', 'service', 'worker_config.json') with open(worker_env_path, "r") as f: config = json.load(f) diff --git a/tests/python/test_oc_client.py b/tests/python/test_oc_client.py index abafd0038ac8273ae6881cd65b89d2ad9068608b..eb59cbca1ce1c94b6b55ad67012309d65e7e4908 100644 --- a/tests/python/test_oc_client.py +++ b/tests/python/test_oc_client.py @@ -34,7 +34,7 @@ class TestOcClientMethods(unittest.TestCase): @classmethod def setUpClass(cls): root_dir = os.path.dirname(os.path.abspath('..')) - worker_env_path = os.path.join(root_dir, "output", "datasystem", "service", "worker_config.json") + worker_env_path = os.path.join(root_dir, "output", "service", "worker_config.json") with open(worker_env_path, "r") as f: config = json.load(f) diff --git a/tests/python/test_sc_client.py b/tests/python/test_sc_client.py index 978596540e942e3d5c7faaebcfc12982beb0f8ca..53317cd7c6394634be90b0388c5b8e68190950ad 100644 --- a/tests/python/test_sc_client.py +++ b/tests/python/test_sc_client.py @@ -44,7 +44,7 @@ class TestScClientMethods(unittest.TestCase): logging.info("********************sc_client test start*********************") time.sleep(3) root_dir = os.path.dirname(os.path.abspath('..')) - worker_env_path = os.path.join(root_dir, "output", "datasystem", "service", "worker_config.json") + worker_env_path = os.path.join(root_dir, "output", "service", "worker_config.json") with open(worker_env_path, "r") as f: config = json.load(f) diff --git a/tests/st/CMakeLists.txt b/tests/st/CMakeLists.txt index 3fe5c7e96b3d81b219b2728822fe78dca8a83b6b..f409c35d2fff61f98996526e5229bde60aad46bf 100644 --- a/tests/st/CMakeLists.txt +++ b/tests/st/CMakeLists.txt @@ -32,6 +32,10 @@ set(DS_ST_DEPEND_LIBS httpclient common_persistence_api) +if (BUILD_GO_API) + list(APPEND DS_ST_DEPEND_LIBS + datasystem_c_static) +endif () #include dirs include_directories(${CMAKE_CURRENT_SOURCE_DIR}) @@ -65,6 +69,13 @@ if (NOT ENABLE_PERF) list(FILTER DS_TEST_ST_SRCS EXCLUDE REGEX .*/client/perf_client/.*) endif() +if (NOT BUILD_GO_API) + list(FILTER DS_TEST_ST_SRCS EXCLUDE REGEX .*/client_c_api/.*) + list(FILTER DS_ST_STREAM_CACHE_SRCS EXCLUDE REGEX .*/client_c_api/.*) + list(FILTER DS_ST_OBJECT_CACHE_SRCS EXCLUDE REGEX .*/client_c_api/.*) + list(FILTER DS_ST_KV_CACHE_SRCS EXCLUDE REGEX .*/client_c_api/.*) +endif () + set(ST_COMMON_SRCS test_main.cpp st_oc_service_impl.cpp diff --git a/tests/st/client_c_api/stream_cache/stream_cache_test.cpp b/tests/st/client_c_api/stream_cache/stream_cache_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b3ceed107977188023da53f8c8f2e2b4d49b7551 --- /dev/null +++ b/tests/st/client_c_api/stream_cache/stream_cache_test.cpp @@ -0,0 +1,290 @@ +/** + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Description: test cases for c client api. + */ + +#include +#include +#include + +#include + +#include "common.h" +#include "datasystem/common/log/log.h" +#include "datasystem/common/util/random_data.h" +#include "datasystem/stream/element.h" +#include "datasystem/stream_client.h" + +namespace datasystem { +namespace st { +class StreamCacheTest : public ExternalClusterTest { +public: + void SetClusterSetupOptions(ExternalClusterOptions &opts) override + { + int numWorkers = 2; + opts.numWorkers = numWorkers; + opts.numMasters = 1; + opts.numEtcd = 1; + opts.workerGflagParams = "-shared_memory_size_mb=10000"; + opts.isStreamCacheCase = true; + } + + void SetUp() override + { + ClusterTest::SetUp(); + HostPort srcWorkerAddress; + DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); + client0_ = CreateStreamCacheClient(srcWorkerAddress.Host(), srcWorkerAddress.Port(), 60000, "", "", "", "", ak_, + sk_, "", "", "", "", "true"); + ASSERT_EQ(StreamConnectWorker(client0_, false).code, K_OK); + DS_ASSERT_OK(cluster_->GetWorkerAddr(1, srcWorkerAddress)); + client1_ = CreateStreamCacheClient(srcWorkerAddress.Host(), srcWorkerAddress.Port(), 60000, "", "", "", "", ak_, + sk_, "", "", "", "", "true"); + ASSERT_EQ(StreamConnectWorker(client1_, false).code, K_OK); + } + + void TearDown() override + { + if (client0_ != nullptr) { + StreamFreeClient(client0_); + } + if (client1_ != nullptr) { + StreamFreeClient(client1_); + } + } + + StreamClient_p CreateStreamCacheClient(const std::string &workerHost, const int workerPort, const int timeOut, + const std::string &token, const std::string &clientPublicKey, + const std::string &clientPrivateKey, const std::string &serverPublicKey, + const std::string &accessKey, const std::string &secretKey, + const std::string &oAuthClientid, const std::string &oAuthClientSecret, + const std::string &oAuthUrl, const std::string &tenantId, + const std::string &enableCrossNodeConnection) + { + (void)oAuthClientid; + (void)oAuthClientSecret; + (void)oAuthUrl; + (void)token; + return StreamCreateClient(workerHost.c_str(), workerPort, timeOut, + clientPublicKey.c_str(), clientPublicKey.length(), clientPrivateKey.c_str(), + clientPrivateKey.length(), serverPublicKey.c_str(), serverPublicKey.length(), + accessKey.c_str(), accessKey.length(), secretKey.c_str(), secretKey.length(), + tenantId.c_str(), tenantId.length(), enableCrossNodeConnection.c_str()); + } + + void Subscribe(StreamClient_p clientPtr, const std::string &streamName, const std::string &subName, + Consumer_p *consumer) + { + auto rc = StreamSubscribe(clientPtr, streamName.c_str(), streamName.length(), subName.c_str(), subName.length(), + SubType::STREAM, false, false, SubscriptionConfig::SC_CACHE_CAPACITY, + SubscriptionConfig::SC_CACHE_LWM, consumer); + ASSERT_EQ(rc.code, K_OK); + } + + void CreateProducer(StreamClient_p clientPtr, const std::string &streamName, int64_t delayFlushTime, + int64_t pageSize, uint64_t maxStreamSize, bool autoCleanup, Producer_p *producer) + { + auto rc = StreamCreateProducer(clientPtr, streamName.c_str(), streamName.length(), delayFlushTime, pageSize, + maxStreamSize, autoCleanup, producer); + ASSERT_EQ(rc.code, K_OK); + } + + void CreateProducerWithConfig(StreamClient_p clientPtr, const std::string &streamName, int64_t delayFlushTime, + int64_t pageSize, uint64_t maxStreamSize, bool autoCleanup, + uint64_t retainForNumConsumers, bool encryptStream, uint64_t reserveSize, + Producer_p *producer) + { + auto rc = StreamCreateProducerWithConfig(clientPtr, streamName.c_str(), streamName.length(), delayFlushTime, + pageSize, maxStreamSize, autoCleanup, retainForNumConsumers, + encryptStream, reserveSize, producer); + ASSERT_EQ(rc.code, K_OK); + } + + void CreateElement(size_t elementSize, Element &element, std::string &writeElement) + { + writeElement = RandomData().GetRandomString(elementSize); + element = Element(reinterpret_cast(&writeElement[0]), elementSize); + } + + void CreateElements(size_t numEle, size_t elementSize, std::vector &elements, + std::vector &writeElements) + { + elements.clear(); + elements.resize(numEle); + writeElements.clear(); + writeElements.resize(numEle); + for (size_t i = 0; i < numEle; ++i) { + CreateElement(elementSize, elements[i], writeElements[i]); + } + } + + void SendElements(Producer_p producerPtr, std::vector &elements) + { + StatusC rc; + for (auto &ele : elements) { + rc = StreamProducerSend(producerPtr, ele.ptr, ele.size, ele.id); + ASSERT_EQ(rc.code, K_OK); + } + } + + void ReceiveElements(Consumer_p consumerPtr, std::vector &elements) + { + StreamElement *eles = nullptr; + uint64_t count = 0; + elements.clear(); + auto rc = StreamConsumerReceive(consumerPtr, timeout_, &eles, &count); + ASSERT_EQ(rc.code, K_OK); + elements.reserve(count); + for (uint64_t i = 0; i < count; ++i) { + elements.emplace_back(reinterpret_cast(eles[i].ptr), eles[i].size); + } + rc = StreamConsumerAck(consumerPtr, eles[count - 1].id); + ASSERT_EQ(rc.code, K_OK); + delete eles; + } + + void ReceiveElementsExpected(Consumer_p consumerPtr, uint32_t numExpect, std::vector &elements) + { + StreamElement *eles = nullptr; + uint64_t count = 0; + elements.clear(); + auto rc = StreamConsumerReceiveExpect(consumerPtr, numExpect, timeout_, &eles, &count); + ASSERT_EQ(rc.code, K_OK); + elements.reserve(count); + for (uint64_t i = 0; i < count; ++i) { + elements.emplace_back(reinterpret_cast(eles[i].ptr), eles[i].size); + } + rc = StreamConsumerAck(consumerPtr, eles[count - 1].id); + ASSERT_EQ(rc.code, K_OK); + delete eles; + } + +protected: + std::string ak_ = "QTWAOYTTINDUT2QVKYUC"; + std::string sk_ = "MFyfvK41ba2giqM7**********KGpownRZlmVmHc"; + StreamClient_p client0_{ nullptr }; + StreamClient_p client1_{ nullptr }; + int64_t delayFlushTime_{ 5 }; + int64_t pageSize_{ 1024 * 1024ul }; + uint64_t maxStreamSize_{ 1024 * 1024 * 1024ul }; + uint32_t timeout_ = 100; + uint64_t retainForNumConsumers = 0; + bool autoCleanup = false; + bool encryptStream = false; +}; + +TEST_F(StreamCacheTest, CreateProducerConsumer) +{ + Producer_p producer = nullptr; + std::string streamName = "CreateProducerConsumer"; + CreateProducer(client0_, streamName, delayFlushTime_, pageSize_, maxStreamSize_, autoCleanup, &producer); + std::string subName = "CreateProducerConsumerSub"; + Consumer_p consumer = nullptr; + Subscribe(client0_, streamName, subName, &consumer); +} + +TEST_F(StreamCacheTest, SendReceiveElements1) +{ + Producer_p producer = nullptr; + std::string streamName = "SendReceiveElements1"; + CreateProducer(client0_, streamName, delayFlushTime_, pageSize_, maxStreamSize_, autoCleanup, &producer); + std::string subName = "SendReceiveElements1Sub"; + Consumer_p consumer = nullptr; + Subscribe(client0_, streamName, subName, &consumer); + size_t numEle = 100; + size_t eleSize = 1024; + std::vector elements; + std::vector writeElements; + CreateElements(numEle, eleSize, elements, writeElements); + + SendElements(producer, elements); + auto rc = StreamProducerFlush(producer); + ASSERT_EQ(rc.code, K_OK); + std::vector outElements; + ReceiveElements(consumer, outElements); + ASSERT_EQ(writeElements.size(), outElements.size()); + for (size_t i = 0; i < writeElements.size(); ++i) { + ASSERT_EQ(writeElements[i], writeElements[i]); + } +} + +TEST_F(StreamCacheTest, SendReceiveElements2) +{ + Producer_p producer = nullptr; + std::string streamName = "SendReceiveElements2"; + CreateProducer(client0_, streamName, delayFlushTime_, pageSize_, maxStreamSize_, autoCleanup, &producer); + std::string subName = "SendReceiveElements2Sub"; + Consumer_p consumer = nullptr; + Subscribe(client0_, streamName, subName, &consumer); + size_t numEle = 100; + size_t eleSize = 1024; + std::vector writeElements; + for (size_t i = 0; i < numEle; ++i) { + std::vector elements; + std::vector writeElement; + CreateElements(1, eleSize, elements, writeElement); + SendElements(producer, elements); + auto rc = StreamProducerFlush(producer); + ASSERT_EQ(rc.code, K_OK); + writeElements.emplace_back(writeElement[0]); + } + std::vector outElements; + ReceiveElements(consumer, outElements); + ASSERT_EQ(writeElements.size(), outElements.size()); + for (size_t i = 0; i < writeElements.size(); ++i) { + ASSERT_EQ(writeElements[i], writeElements[i]); + } +} + +TEST_F(StreamCacheTest, SendReceiveElementsExpectFromRemote) +{ + Producer_p producer = nullptr; + std::string streamName = "SendReceiveElementsFromRemote"; + CreateProducer(client0_, streamName, delayFlushTime_, pageSize_, maxStreamSize_, autoCleanup, &producer); + std::string subName = "SendReceiveElementsFromRemoteSub"; + Consumer_p consumer = nullptr; + // connect to remote worker + Subscribe(client1_, streamName, subName, &consumer); + size_t numEle = 50; + size_t eleSize = 1024; + std::vector writeElements; + for (size_t i = 0; i < numEle; ++i) { + std::vector elements; + std::vector writeElement; + CreateElements(1, eleSize, elements, writeElement); + SendElements(producer, elements); + auto rc = StreamProducerFlush(producer); + ASSERT_EQ(rc.code, K_OK); + writeElements.emplace_back(writeElement[0]); + } + std::vector outElements; + int sleepTime = 1; + std::this_thread::sleep_for(std::chrono::seconds(sleepTime)); + for (size_t i = 0; i < numEle; ++i) { + std::vector outElement; + ReceiveElementsExpected(consumer, 1, outElement); + outElements.emplace_back(outElement[0]); + } + for (size_t i = 0; i < numEle; ++i) { + ASSERT_EQ(writeElements[i], writeElements[i]); + } + ReceiveElementsExpected(consumer, 1, outElements); + ASSERT_EQ(outElements.size(), 0u); +} +} // namespace st +} // namespace datasystem \ No newline at end of file