diff --git a/0031-add-startup-parameter-of-enable-vq-balance.patch b/0031-add-startup-parameter-of-enable-vq-balance.patch new file mode 100644 index 0000000000000000000000000000000000000000..3b77b62b611cc7a5d9c2404706671d7475dc5f2d --- /dev/null +++ b/0031-add-startup-parameter-of-enable-vq-balance.patch @@ -0,0 +1,177 @@ +From b7a1e0708f97f10e273dddc9602951dc55208922 Mon Sep 17 00:00:00 2001 +From: ShenYage +Date: Fri, 12 Sep 2025 18:37:19 +0800 +Subject: [PATCH 1/3] add startup parameter of enable-vq-balance + +Signed-off-by: ShenYage +--- + include/spdk/event.h | 2 ++ + include/spdk/vhost.h | 3 +++ + lib/event/app.c | 20 +++++++++++++++++++- + lib/vhost/spdk_vhost.map | 1 + + lib/vhost/vhost.c | 14 ++++++++++++++ + mk/spdk.lib_deps.mk | 2 +- + 6 files changed, 40 insertions(+), 2 deletions(-) + +diff --git a/include/spdk/event.h b/include/spdk/event.h +index 312fa05..1e394de 100644 +--- a/include/spdk/event.h ++++ b/include/spdk/event.h +@@ -157,6 +157,8 @@ struct spdk_app_opts { + * After that, new added fields should be put after opts_size. + */ + size_t opts_size; ++ ++ bool enable_vq_balance; + }; + + /** +diff --git a/include/spdk/vhost.h b/include/spdk/vhost.h +index 588acd3..5ebe8c7 100644 +--- a/include/spdk/vhost.h ++++ b/include/spdk/vhost.h +@@ -338,6 +338,9 @@ int spdk_vhost_dev_remove(struct spdk_vhost_dev *vdev); + */ + void spdk_vhost_set_backend_interrupt_coalescing(bool enable); + ++void spdk_set_enable_vq_balance(bool is_enable); ++bool spdk_is_enable_vq_balance(void); ++ + #ifdef __cplusplus + } + #endif +diff --git a/lib/event/app.c b/lib/event/app.c +index 03f9692..10f1839 100644 +--- a/lib/event/app.c ++++ b/lib/event/app.c +@@ -44,6 +44,7 @@ + #include "spdk/string.h" + #include "spdk/rpc.h" + #include "spdk/util.h" ++#include "spdk/vhost.h" + + #define SPDK_APP_DEFAULT_LOG_LEVEL SPDK_LOG_NOTICE + #define SPDK_APP_DEFAULT_LOG_PRINT_LEVEL SPDK_LOG_INFO +@@ -60,6 +61,7 @@ struct spdk_app { + const char *json_config_file; + bool json_config_ignore_errors; + bool stopped; ++ bool enable_vq_balance; + const char *rpc_addr; + int shm_id; + spdk_app_shutdown_cb shutdown_cb; +@@ -139,6 +141,8 @@ static const struct option g_cmdline_options[] = { + {"iova-mode", required_argument, NULL, IOVA_MODE_OPT_IDX}, + #define BASE_VIRTADDR_OPT_IDX 265 + {"base-virtaddr", required_argument, NULL, BASE_VIRTADDR_OPT_IDX}, ++#define VQ_BALANCE_OPT_IDX 501 ++ {"enable-vq-balance", no_argument, NULL, VQ_BALANCE_OPT_IDX}, + }; + + static void +@@ -445,10 +449,11 @@ app_copy_opts(struct spdk_app_opts *opts, struct spdk_app_opts *opts_user, size_ + SET_FIELD(env_context); + SET_FIELD(log); + SET_FIELD(base_virtaddr); ++ SET_FIELD(enable_vq_balance); + + /* You should not remove this statement, but need to update the assert statement + * if you add a new field, and also add a corresponding SET_FIELD statement */ +- SPDK_STATIC_ASSERT(sizeof(struct spdk_app_opts) == 200, "Incorrect size"); ++ SPDK_STATIC_ASSERT(sizeof(struct spdk_app_opts) == 208, "Incorrect size"); + + #undef SET_FIELD + } +@@ -524,6 +529,14 @@ spdk_app_start(struct spdk_app_opts *opts_user, spdk_msg_fn start_fn, + g_spdk_app.shutdown_cb = opts->shutdown_cb; + g_spdk_app.rc = 0; + g_spdk_app.stopped = false; ++ g_spdk_app.enable_vq_balance = opts->enable_vq_balance; ++ if (g_spdk_app.enable_vq_balance) { ++ SPDK_NOTICELOG("vq balance is enabled\n"); ++ spdk_set_enable_vq_balance(true); ++ } else { ++ SPDK_NOTICELOG("vq balance is disabled\n"); ++ spdk_set_enable_vq_balance(false); ++ } + + spdk_log_set_level(SPDK_APP_DEFAULT_LOG_LEVEL); + +@@ -663,6 +676,7 @@ usage(void (*app_usage)(void)) + printf(" -p, --main-core main (primary) core for DPDK\n"); + printf(" -r, --rpc-socket RPC listen address (default %s)\n", SPDK_DEFAULT_RPC_ADDR); + printf(" -s, --mem-size memory size in MB for DPDK (default: "); ++ printf(" --enable-vq-balance Enable vq balance feature, default is disabled\n"); + #ifndef __linux__ + if (g_default_opts.mem_size <= 0) { + printf("all hugepage memory)\n"); +@@ -795,6 +809,10 @@ spdk_app_parse_args(int argc, char **argv, struct spdk_app_opts *opts, + goto out; + } + break; ++ case VQ_BALANCE_OPT_IDX: ++#ifdef __aarch64__ ++ opts->enable_vq_balance = true; ++#endif + case MAIN_CORE_OPT_IDX: + opts->main_core = spdk_strtol(optarg, 0); + if (opts->main_core < 0) { +diff --git a/lib/vhost/spdk_vhost.map b/lib/vhost/spdk_vhost.map +index 2dce13b..3924a6c 100644 +--- a/lib/vhost/spdk_vhost.map ++++ b/lib/vhost/spdk_vhost.map +@@ -23,6 +23,7 @@ + spdk_vhost_scsi_dev_remove_tgt; + spdk_vhost_blk_construct; + spdk_vhost_dev_remove; ++ spdk_set_enable_vq_balance; + + local: *; + }; +diff --git a/lib/vhost/vhost.c b/lib/vhost/vhost.c +index 2e5cf36..7b918f9 100644 +--- a/lib/vhost/vhost.c ++++ b/lib/vhost/vhost.c +@@ -54,6 +54,8 @@ static struct spdk_thread *g_vhost_init_thread; + + static spdk_vhost_fini_cb g_fini_cpl_cb; + ++static bool g_enable_vq_balance = false; ++ + /** + * DPDK calls our callbacks synchronously but the work those callbacks + * perform needs to be async. Luckily, all DPDK callbacks are called on +@@ -1747,5 +1749,17 @@ spdk_vhost_config_json(struct spdk_json_write_ctx *w) + spdk_json_write_array_end(w); + } + ++void ++spdk_set_enable_vq_balance(bool is_enable) ++{ ++ g_enable_vq_balance = is_enable; ++} ++ ++bool ++spdk_is_enable_vq_balance(void) ++{ ++ return g_enable_vq_balance; ++} ++ + SPDK_LOG_REGISTER_COMPONENT(vhost) + SPDK_LOG_REGISTER_COMPONENT(vhost_ring) +diff --git a/mk/spdk.lib_deps.mk b/mk/spdk.lib_deps.mk +index a6c7825..8756288 100644 +--- a/mk/spdk.lib_deps.mk ++++ b/mk/spdk.lib_deps.mk +@@ -81,7 +81,7 @@ DEPDIRS-trace := log util $(JSON_LIBS) + + DEPDIRS-bdev := log util thread $(JSON_LIBS) notify trace + DEPDIRS-blobfs := log thread blob trace +-DEPDIRS-event := log util thread $(JSON_LIBS) trace ++DEPDIRS-event := log util thread $(JSON_LIBS) trace vhost + + DEPDIRS-ftl := log util thread trace bdev + DEPDIRS-nbd := log util thread $(JSON_LIBS) bdev +-- +2.33.0 + diff --git a/0032-nvme-support-to-set-the-number-of-IO-queues-to-reque.patch b/0032-nvme-support-to-set-the-number-of-IO-queues-to-reque.patch new file mode 100644 index 0000000000000000000000000000000000000000..c2d6c0ec2ff34c32d290b814f71db6d19abdf8ac --- /dev/null +++ b/0032-nvme-support-to-set-the-number-of-IO-queues-to-reque.patch @@ -0,0 +1,152 @@ +From e8802c215f6a2332a9ee143ab68e627ccc6733a9 Mon Sep 17 00:00:00 2001 +From: ShenYage +Date: Fri, 12 Sep 2025 19:34:50 +0800 +Subject: [PATCH 2/3] nvme: support to set the number of IO queues to request + +Signed-off-by: ShenYage +--- + module/bdev/nvme/bdev_nvme.c | 4 ++++ + module/bdev/nvme/bdev_nvme.h | 1 + + module/bdev/nvme/bdev_nvme_rpc.c | 13 +++++++++++-- + scripts/rpc.py | 4 +++- + scripts/rpc/bdev.py | 6 +++++- + 5 files changed, 24 insertions(+), 4 deletions(-) + +diff --git a/module/bdev/nvme/bdev_nvme.c b/module/bdev/nvme/bdev_nvme.c +index 425436f..f140f29 100644 +--- a/module/bdev/nvme/bdev_nvme.c ++++ b/module/bdev/nvme/bdev_nvme.c +@@ -2018,6 +2018,7 @@ bdev_nvme_create(struct spdk_nvme_transport_id *trid, + const char **names, + uint32_t count, + const char *hostnqn, ++ uint32_t num_io_queues, + uint32_t prchk_flags, + spdk_bdev_create_nvme_fn cb_fn, + void *cb_ctx) +@@ -2056,6 +2057,9 @@ bdev_nvme_create(struct spdk_nvme_transport_id *trid, + } + + spdk_nvme_ctrlr_get_default_ctrlr_opts(&ctx->opts, sizeof(ctx->opts)); ++ if (num_io_queues) { ++ ctx->opts.num_io_queues = num_io_queues; ++ } + ctx->opts.transport_retry_count = g_opts.retry_count; + ctx->opts.keep_alive_timeout_ms = g_opts.keep_alive_timeout_ms; + +diff --git a/module/bdev/nvme/bdev_nvme.h b/module/bdev/nvme/bdev_nvme.h +index e789371..29dc0d8 100644 +--- a/module/bdev/nvme/bdev_nvme.h ++++ b/module/bdev/nvme/bdev_nvme.h +@@ -75,6 +75,7 @@ int bdev_nvme_create(struct spdk_nvme_transport_id *trid, + const char **names, + uint32_t count, + const char *hostnqn, ++ uint32_t num_io_queues, + uint32_t prchk_flags, + spdk_bdev_create_nvme_fn cb_fn, + void *cb_ctx); +diff --git a/module/bdev/nvme/bdev_nvme_rpc.c b/module/bdev/nvme/bdev_nvme_rpc.c +index 16b9f41..6ce5cbb 100644 +--- a/module/bdev/nvme/bdev_nvme_rpc.c ++++ b/module/bdev/nvme/bdev_nvme_rpc.c +@@ -176,6 +176,7 @@ struct rpc_bdev_nvme_attach_controller { + char *hostsvcid; + bool prchk_reftag; + bool prchk_guard; ++ uint32_t num_io_queues; + }; + + static void +@@ -207,7 +208,8 @@ static const struct spdk_json_object_decoder rpc_bdev_nvme_attach_controller_dec + {"hostsvcid", offsetof(struct rpc_bdev_nvme_attach_controller, hostsvcid), spdk_json_decode_string, true}, + + {"prchk_reftag", offsetof(struct rpc_bdev_nvme_attach_controller, prchk_reftag), spdk_json_decode_bool, true}, +- {"prchk_guard", offsetof(struct rpc_bdev_nvme_attach_controller, prchk_guard), spdk_json_decode_bool, true} ++ {"prchk_guard", offsetof(struct rpc_bdev_nvme_attach_controller, prchk_guard), spdk_json_decode_bool, true}, ++ {"num_io_queues", offsetof(struct rpc_bdev_nvme_attach_controller, num_io_queues), spdk_json_decode_uint32, true} + }; + + #define NVME_MAX_BDEVS_PER_RPC 128 +@@ -263,6 +265,7 @@ rpc_bdev_nvme_attach_controller(struct spdk_jsonrpc_request *request, + return; + } + ++ ctx->req.num_io_queues = SPDK_NVME_MAX_IO_QUEUES; + if (spdk_json_decode_object(params, rpc_bdev_nvme_attach_controller_decoders, + SPDK_COUNTOF(rpc_bdev_nvme_attach_controller_decoders), + &ctx->req)) { +@@ -372,10 +375,16 @@ rpc_bdev_nvme_attach_controller(struct spdk_jsonrpc_request *request, + prchk_flags |= SPDK_NVME_IO_FLAGS_PRCHK_GUARD; + } + ++ if (ctx->req.num_io_queues == 0 || ctx->req.num_io_queues > UINT16_MAX + 1) { ++ spdk_jsonrpc_send_error_response_fmt(request, -EINVAL, "num_io_queues out of bounds, min: %u max: %u", ++ 1, UINT16_MAX + 1); ++ goto cleanup; ++ } ++ + ctx->request = request; + ctx->count = NVME_MAX_BDEVS_PER_RPC; + rc = bdev_nvme_create(&trid, &hostid, ctx->req.name, ctx->names, ctx->count, ctx->req.hostnqn, +- prchk_flags, rpc_bdev_nvme_attach_controller_done, ctx); ++ ctx->req.num_io_queues, prchk_flags, rpc_bdev_nvme_attach_controller_done, ctx); + if (rc) { + spdk_jsonrpc_send_error_response(request, rc, spdk_strerror(-rc)); + goto cleanup; +diff --git a/scripts/rpc.py b/scripts/rpc.py +index 99c8036..9440db1 100755 +--- a/scripts/rpc.py ++++ b/scripts/rpc.py +@@ -510,7 +510,8 @@ if __name__ == "__main__": + hostaddr=args.hostaddr, + hostsvcid=args.hostsvcid, + prchk_reftag=args.prchk_reftag, +- prchk_guard=args.prchk_guard)) ++ prchk_guard=args.prchk_guard, ++ num_io_queues=args.num_io_queues)) + + p = subparsers.add_parser('bdev_nvme_attach_controller', aliases=['construct_nvme_bdev'], + help='Add bdevs with nvme backend') +@@ -535,6 +536,7 @@ if __name__ == "__main__": + help='Enable checking of PI reference tag for I/O processing.', action='store_true') + p.add_argument('-g', '--prchk-guard', + help='Enable checking of PI guard for I/O processing.', action='store_true') ++ p.add_argument('--num-io-queues', type=int, help='Set the number of IO queues to request.') + p.set_defaults(func=bdev_nvme_attach_controller) + + def bdev_nvme_get_controllers(args): +diff --git a/scripts/rpc/bdev.py b/scripts/rpc/bdev.py +index fcd0129..d7c5d7b 100644 +--- a/scripts/rpc/bdev.py ++++ b/scripts/rpc/bdev.py +@@ -488,7 +488,7 @@ def bdev_nvme_set_hotplug(client, enable, period_us=None): + @deprecated_alias('construct_nvme_bdev') + def bdev_nvme_attach_controller(client, name, trtype, traddr, adrfam=None, trsvcid=None, + priority=None, subnqn=None, hostnqn=None, hostaddr=None, +- hostsvcid=None, prchk_reftag=None, prchk_guard=None): ++ hostsvcid=None, prchk_reftag=None, prchk_guard=None, num_io_queues=None): + """Construct block device for each NVMe namespace in the attached controller. + + Args: +@@ -504,6 +504,7 @@ def bdev_nvme_attach_controller(client, name, trtype, traddr, adrfam=None, trsvc + hostsvcid: host transport service ID (port number for IP-based transports, NULL for PCIe or FC; optional) + prchk_reftag: Enable checking of PI reference tag for I/O processing (optional) + prchk_guard: Enable checking of PI guard for I/O processing (optional) ++ num_io_queues: The number of IO queues to request. (optional) + + Returns: + Names of created block devices. +@@ -539,6 +540,9 @@ def bdev_nvme_attach_controller(client, name, trtype, traddr, adrfam=None, trsvc + if prchk_guard: + params['prchk_guard'] = prchk_guard + ++ if num_io_queues: ++ params['num_io_queues'] = num_io_queues ++ + return client.call('bdev_nvme_attach_controller', params) + + +-- +2.33.0 + diff --git a/0033-thread-allocate-spdk_thread-for-nvme-io-queues-when-.patch b/0033-thread-allocate-spdk_thread-for-nvme-io-queues-when-.patch new file mode 100644 index 0000000000000000000000000000000000000000..3ce15920ba02820a09ae4f24235eb0d726e17589 --- /dev/null +++ b/0033-thread-allocate-spdk_thread-for-nvme-io-queues-when-.patch @@ -0,0 +1,379 @@ +From fff6688bb9a7c85275efa0db37983f3e1e4f2641 Mon Sep 17 00:00:00 2001 +From: ShenYage +Date: Fri, 12 Sep 2025 23:59:07 +0800 +Subject: [PATCH 3/3] thread: allocate spdk_thread for nvme io queues when + vq_balance is enabled + +Signed-off-by: ShenYage +--- + include/spdk/thread.h | 11 ++ + include/spdk_internal/thread.h | 2 + + lib/event/app.c | 4 + + lib/event/reactor.c | 11 +- + lib/nvme/nvme_ctrlr.c | 6 ++ + lib/thread/spdk_thread.map | 5 + + lib/thread/thread.c | 188 +++++++++++++++++++++++++++++++++ + lib/vhost/spdk_vhost.map | 1 + + mk/spdk.lib_deps.mk | 2 +- + 9 files changed, 225 insertions(+), 5 deletions(-) + +diff --git a/include/spdk/thread.h b/include/spdk/thread.h +index 4b7e650..81b364c 100644 +--- a/include/spdk/thread.h ++++ b/include/spdk/thread.h +@@ -816,6 +816,17 @@ int spdk_interrupt_mode_enable(void); + */ + bool spdk_interrupt_mode_is_enabled(void); + ++#define MAX_THREAD_LIST_STR_LEN 512 ++#define MAX_PRE_THREAD_NUM 256 ++#define MAX_PRE_THREAD_NAME_LEN 32 ++ ++void spdk_init_thread_list(void); ++void spdk_free_thread_list(void); ++void spdk_thread_list_add_new(uint32_t lcore, struct spdk_thread *thread); ++int spdk_pre_allocate_thread(uint32_t thread_num); ++ ++void spdk_set_need_pre_allocate_thread(bool need); ++ + #ifdef __cplusplus + } + #endif +diff --git a/include/spdk_internal/thread.h b/include/spdk_internal/thread.h +index 5bab452..9713b27 100644 +--- a/include/spdk_internal/thread.h ++++ b/include/spdk_internal/thread.h +@@ -123,6 +123,8 @@ struct spdk_thread { + + TAILQ_HEAD(, spdk_io_channel) io_channels; + TAILQ_ENTRY(spdk_thread) tailq; ++ STAILQ_ENTRY(spdk_thread) stailq; ++ uint32_t total_ref; + + char name[SPDK_MAX_THREAD_NAME_LEN + 1]; + struct spdk_cpuset cpumask; +diff --git a/lib/event/app.c b/lib/event/app.c +index 10f1839..2cbb36f 100644 +--- a/lib/event/app.c ++++ b/lib/event/app.c +@@ -533,6 +533,7 @@ spdk_app_start(struct spdk_app_opts *opts_user, spdk_msg_fn start_fn, + if (g_spdk_app.enable_vq_balance) { + SPDK_NOTICELOG("vq balance is enabled\n"); + spdk_set_enable_vq_balance(true); ++ spdk_set_need_pre_allocate_thread(true); + } else { + SPDK_NOTICELOG("vq balance is disabled\n"); + spdk_set_enable_vq_balance(false); +@@ -565,6 +566,8 @@ spdk_app_start(struct spdk_app_opts *opts_user, spdk_msg_fn start_fn, + return 1; + } + ++ spdk_init_thread_list(); ++ spdk_thread_list_add_new(spdk_env_get_current_core(), g_app_thread); + /* + * Disable and ignore trace setup if setting num_entries + * to be 0. +@@ -603,6 +606,7 @@ spdk_app_fini(void) + spdk_trace_cleanup(); + spdk_reactors_fini(); + spdk_env_fini(); ++ spdk_free_thread_list(); + spdk_log_close(); + } + +diff --git a/lib/event/reactor.c b/lib/event/reactor.c +index 724371c..ad9a990 100644 +--- a/lib/event/reactor.c ++++ b/lib/event/reactor.c +@@ -42,6 +42,7 @@ + #include "spdk/util.h" + #include "spdk/string.h" + #include "spdk/fd_group.h" ++#include "spdk/vhost.h" + + #ifdef __linux__ + #include +@@ -1034,12 +1035,14 @@ spdk_reactors_start(void) + } + + /* For now, for each reactor spawn one thread. */ +- snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); ++ if (!spdk_is_enable_vq_balance()) { ++ snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); + +- spdk_cpuset_zero(&tmp_cpumask); +- spdk_cpuset_set_cpu(&tmp_cpumask, i, true); ++ spdk_cpuset_zero(&tmp_cpumask); ++ spdk_cpuset_set_cpu(&tmp_cpumask, i, true); + +- spdk_thread_create(thread_name, &tmp_cpumask); ++ spdk_thread_create(thread_name, &tmp_cpumask); ++ } + } + spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); + } +diff --git a/lib/nvme/nvme_ctrlr.c b/lib/nvme/nvme_ctrlr.c +index ff27771..d47f5b5 100644 +--- a/lib/nvme/nvme_ctrlr.c ++++ b/lib/nvme/nvme_ctrlr.c +@@ -38,6 +38,7 @@ + + #include "spdk/env.h" + #include "spdk/string.h" ++#include "spdk/thread.h" + + struct nvme_active_ns_ctx; + +@@ -2250,6 +2251,11 @@ nvme_ctrlr_set_num_queues_done(void *arg, const struct spdk_nvme_cpl *cpl) + ctrlr->opts.num_io_queues = spdk_min(min_allocated, ctrlr->opts.num_io_queues); + } + ++ if (spdk_pre_allocate_thread(ctrlr->opts.num_io_queues)) { ++ nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_ERROR, NVME_TIMEOUT_INFINITE); ++ return; ++ } ++ + ctrlr->free_io_qids = spdk_bit_array_create(ctrlr->opts.num_io_queues + 1); + if (ctrlr->free_io_qids == NULL) { + nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_ERROR, NVME_TIMEOUT_INFINITE); +diff --git a/lib/thread/spdk_thread.map b/lib/thread/spdk_thread.map +index ea00244..6925861 100644 +--- a/lib/thread/spdk_thread.map ++++ b/lib/thread/spdk_thread.map +@@ -52,6 +52,11 @@ + spdk_thread_get_interrupt_fd; + spdk_interrupt_mode_enable; + spdk_interrupt_mode_is_enabled; ++ spdk_init_thread_list; ++ spdk_free_thread_list; ++ spdk_thread_list_add_new; ++ spdk_pre_allocate_thread; ++ spdk_set_need_pre_allocate_thread; + + # internal functions in spdk_internal/thread.h + spdk_poller_state_str; +diff --git a/lib/thread/thread.c b/lib/thread/thread.c +index 08a1284..d6f6c7c 100644 +--- a/lib/thread/thread.c ++++ b/lib/thread/thread.c +@@ -2037,4 +2037,192 @@ spdk_interrupt_mode_is_enabled(void) + return g_interrupt_mode; + } + ++static bool g_need_pre_allocate_thread = false; ++ ++typedef STAILQ_HEAD(, spdk_thread) spdk_reactor_thread_t; ++static spdk_reactor_thread_t *g_spdk_reactor_thread_list; ++ ++void spdk_init_thread_list(void) ++{ ++ uint32_t i, last_core; ++ ++ last_core = spdk_env_get_last_core(); ++ /* core start with 0, so here need add 1 */ ++ g_spdk_reactor_thread_list = calloc(last_core + 1, sizeof(spdk_reactor_thread_t)); ++ SPDK_ENV_FOREACH_CORE(i) { ++ STAILQ_INIT(&g_spdk_reactor_thread_list[i]); ++ } ++} ++ ++void spdk_free_thread_list(void) ++{ ++ if (g_spdk_reactor_thread_list) { ++ free(g_spdk_reactor_thread_list); ++ g_spdk_reactor_thread_list = NULL; ++ SPDK_NOTICELOG("vhost exit, free reactor_thread_list success.\n"); ++ } ++} ++ ++void spdk_thread_list_add_new(uint32_t lcore, struct spdk_thread *thread) ++{ ++ struct spdk_thread *each_thread = NULL; ++ ++ STAILQ_FOREACH(each_thread, &g_spdk_reactor_thread_list[lcore], stailq) { ++ if (each_thread == thread) { ++ return; ++ } ++ } ++ ++ STAILQ_INSERT_TAIL(&g_spdk_reactor_thread_list[lcore], thread, stailq); ++} ++ ++static uint32_t get_strncat_length(uint32_t src_len, uint32_t dst_len) ++{ ++ if (src_len > MAX_THREAD_LIST_STR_LEN - dst_len - 1) { ++ return MAX_THREAD_LIST_STR_LEN - dst_len - 1; ++ } ++ return src_len; ++} ++ ++static void calc_every_core_thread_num(uint32_t *thread_per_core, uint32_t total_core, uint32_t total_thread_num) ++{ ++ uint32_t i, average, remain; ++ ++ average = total_thread_num / total_core; ++ remain = total_thread_num % total_core; ++ ++ SPDK_ENV_FOREACH_CORE(i) { ++ thread_per_core[i] = average; ++ if (remain > 0) { ++ thread_per_core[i]++; ++ remain--; ++ } ++ } ++} ++ ++static uint32_t get_allocate_thread_num(uint32_t lcore) ++{ ++ struct spdk_thread *thread; ++ uint32_t count = 0; ++ ++ STAILQ_FOREACH(thread, &g_spdk_reactor_thread_list[lcore], stailq) { ++ if (thread) { ++ count++; ++ } ++ } ++ ++ return count; ++} ++ ++static int pre_allocate_thread_for_each_reactor(uint32_t *thread_per_core) ++{ ++ uint32_t i, allocated_num; ++ char thread_name[MAX_PRE_THREAD_NAME_LEN]; ++ struct spdk_cpuset tmp_cpumask = {}; ++ struct spdk_thread *alloc_thread = NULL; ++ ++ SPDK_ENV_FOREACH_CORE(i) { ++ allocated_num = get_allocate_thread_num(i); ++ SPDK_NOTICELOG("core %u current allocated_num = %u\n", i, allocated_num); ++ for (uint32_t n = 0; n < thread_per_core[i] - allocated_num; n++) { ++ (void)snprintf(thread_name, sizeof(thread_name), "reactor_%u_%u", i, n + allocated_num); ++ spdk_cpuset_zero(&tmp_cpumask); ++ spdk_cpuset_set_cpu(&tmp_cpumask, i, true); ++ alloc_thread = spdk_thread_create(thread_name, &tmp_cpumask); ++ if (alloc_thread == NULL) { ++ SPDK_ERRLOG("failed to create spdk thread\n"); ++ return -1; ++ } ++ ++ if (alloc_thread->id > MAX_PRE_THREAD_NUM) { ++ SPDK_ERRLOG("expect thread id < %d, but current preallocate thread id is %" PRIu64 "\n", ++ MAX_PRE_THREAD_NUM, alloc_thread->id); ++ } ++ ++ alloc_thread->total_ref = 0; ++ spdk_thread_list_add_new(i, alloc_thread); ++ } ++ } ++ ++ return 0; ++} ++ ++static void log_preallocate_thread_list(void) ++{ ++ uint32_t i; ++ char output_str[MAX_THREAD_LIST_STR_LEN] = {0}; ++ char tmp_str[MAX_THREAD_LIST_STR_LEN] = {0}; ++ struct spdk_thread *each_thread = NULL; ++ uint32_t len; ++ ++ SPDK_ENV_FOREACH_CORE(i) { ++ (void)snprintf(output_str, sizeof(output_str), "ractor_%u, thread_list [ ", i); ++ STAILQ_FOREACH(each_thread, &g_spdk_reactor_thread_list[i], stailq) { ++ (void)snprintf(tmp_str, sizeof(tmp_str), "%ld ", each_thread->id); ++ len = get_strncat_length(strlen(tmp_str), strlen(output_str)); ++ strncat(output_str, tmp_str, len); ++ } ++ len = get_strncat_length(strlen(tmp_str), strlen("]")); ++ strncat(output_str, "]", len); ++ SPDK_NOTICELOG("%s\n", output_str); ++ } ++} ++ ++int spdk_pre_allocate_thread(uint32_t thread_num) ++{ ++ uint32_t total_core, last_core; ++ uint32_t *thread_per_core; ++ static bool is_allocate = false; ++ int rc = -1; ++ ++ if (!g_need_pre_allocate_thread) { ++ return 0; ++ } ++ ++ if (is_allocate) { ++ return 0; ++ } ++ ++ if (thread_num == 0 || thread_num >= MAX_PRE_THREAD_NAME_LEN) { ++ SPDK_ERRLOG("expect thread_num > 0 or thread_num <= %u, but now is %u\n", ++ MAX_PRE_THREAD_NUM, thread_num); ++ return rc; ++ } ++ ++ total_core = spdk_env_get_core_count(); ++ SPDK_NOTICELOG("current total core num is %u\n", total_core); ++ if (total_core == 0 || total_core > thread_num) { ++ SPDK_ERRLOG("expect total_core > 0 or total_core <= thread_num, " ++ "but current core is %u, thread_num is %u\n", ++ total_core, thread_num); ++ return rc; ++ } ++ ++ last_core = spdk_env_get_last_core(); ++ SPDK_NOTICELOG("current last core num is %u\n", last_core); ++ thread_per_core = (uint32_t *)calloc(last_core + 1, sizeof(uint32_t)); ++ if (thread_per_core == NULL) { ++ SPDK_ERRLOG("malloc for thread_per_core failed.\n"); ++ return rc; ++ } ++ ++ SPDK_NOTICELOG("current thread num is %u\n", thread_num); ++ ++ calc_every_core_thread_num(thread_per_core, total_core, thread_num); ++ ++ rc = pre_allocate_thread_for_each_reactor(thread_per_core); ++ log_preallocate_thread_list(); ++ free(thread_per_core); ++ if (rc == 0) { ++ is_allocate = true; ++ } ++ ++ return rc; ++} ++ ++void spdk_set_need_pre_allocate_thread(bool need) ++{ ++ g_need_pre_allocate_thread = need; ++} ++ + SPDK_LOG_REGISTER_COMPONENT(thread) +diff --git a/lib/vhost/spdk_vhost.map b/lib/vhost/spdk_vhost.map +index 3924a6c..0f596fa 100644 +--- a/lib/vhost/spdk_vhost.map ++++ b/lib/vhost/spdk_vhost.map +@@ -24,6 +24,7 @@ + spdk_vhost_blk_construct; + spdk_vhost_dev_remove; + spdk_set_enable_vq_balance; ++ spdk_is_enable_vq_balance; + + local: *; + }; +diff --git a/mk/spdk.lib_deps.mk b/mk/spdk.lib_deps.mk +index 8756288..2c28160 100644 +--- a/mk/spdk.lib_deps.mk ++++ b/mk/spdk.lib_deps.mk +@@ -59,7 +59,7 @@ DEPDIRS-rdma := log util + DEPDIRS-reduce := log util + DEPDIRS-thread := log util + +-DEPDIRS-nvme := log sock util ++DEPDIRS-nvme := log sock util thread + ifeq ($(CONFIG_RDMA),y) + DEPDIRS-nvme += rdma + endif +-- +2.33.0 + diff --git a/0034-vhost_blk-allocate-vq-poller-for-each-virtqueue-for-.patch b/0034-vhost_blk-allocate-vq-poller-for-each-virtqueue-for-.patch new file mode 100644 index 0000000000000000000000000000000000000000..7926521f00f920175e44cf3784f11aa4b1bdd7c0 --- /dev/null +++ b/0034-vhost_blk-allocate-vq-poller-for-each-virtqueue-for-.patch @@ -0,0 +1,927 @@ +From 02e1e9cd0effba232fb3d30f7780c3418eab3a93 Mon Sep 17 00:00:00 2001 +From: ShenYage +Date: Mon, 15 Sep 2025 09:04:42 +0800 +Subject: [PATCH] vhost_blk: allocate vq poller for each virtqueue for + vhost_blk disks + +Signed-off-by: ShenYage +--- + include/spdk/thread.h | 12 ++ + include/spdk/vhost.h | 1 + + include/spdk_internal/thread.h | 1 + + lib/event/app.c | 1 + + lib/thread/spdk_thread.map | 7 + + lib/thread/thread.c | 66 +++++++ + lib/vhost/spdk_vhost.map | 1 + + lib/vhost/vhost.c | 151 +++++++++++++++- + lib/vhost/vhost_blk.c | 321 ++++++++++++++++++++++++++++++++- + lib/vhost/vhost_internal.h | 7 + + lib/vhost/vhost_rpc.c | 12 ++ + 11 files changed, 569 insertions(+), 11 deletions(-) + +diff --git a/include/spdk/thread.h b/include/spdk/thread.h +index 81b364c..668398a 100644 +--- a/include/spdk/thread.h ++++ b/include/spdk/thread.h +@@ -820,6 +820,8 @@ bool spdk_interrupt_mode_is_enabled(void); + #define MAX_PRE_THREAD_NUM 256 + #define MAX_PRE_THREAD_NAME_LEN 32 + ++typedef STAILQ_HEAD(, spdk_poller) vq_poller_list_head_t; ++ + void spdk_init_thread_list(void); + void spdk_free_thread_list(void); + void spdk_thread_list_add_new(uint32_t lcore, struct spdk_thread *thread); +@@ -827,6 +829,16 @@ int spdk_pre_allocate_thread(uint32_t thread_num); + + void spdk_set_need_pre_allocate_thread(bool need); + ++struct spdk_thread *spdk_get_shared_thread_by_lcore(uint32_t lcore); ++void spdk_put_shared_thread(struct spdk_thread *thread); ++ ++void spdk_init_bvsession_vq_poller_list(vq_poller_list_head_t *head); ++void spdk_insert_bvsession_vq_poller_list(vq_poller_list_head_t *head, ++ struct spdk_poller *poller); ++bool spdk_bvsession_vq_poller_list_is_empty(vq_poller_list_head_t *head); ++struct spdk_poller *spdk_pop_bvsession_vq_poller_list(vq_poller_list_head_t *head); ++void spdk_vhost_vq_put_io_channel(struct spdk_io_channel *io_channel); ++ + #ifdef __cplusplus + } + #endif +diff --git a/include/spdk/vhost.h b/include/spdk/vhost.h +index 5ebe8c7..0f84d6f 100644 +--- a/include/spdk/vhost.h ++++ b/include/spdk/vhost.h +@@ -340,6 +340,7 @@ void spdk_vhost_set_backend_interrupt_coalescing(bool enable); + + void spdk_set_enable_vq_balance(bool is_enable); + bool spdk_is_enable_vq_balance(void); ++void spdk_vq_allocate_lcore_fini(void); + + #ifdef __cplusplus + } +diff --git a/include/spdk_internal/thread.h b/include/spdk_internal/thread.h +index 9713b27..a9e2fec 100644 +--- a/include/spdk_internal/thread.h ++++ b/include/spdk_internal/thread.h +@@ -77,6 +77,7 @@ struct spdk_poller { + int timerfd; + + char name[SPDK_MAX_POLLER_NAME_LEN + 1]; ++ STAILQ_ENTRY(spdk_poller) stailq; + }; + + enum spdk_thread_state { +diff --git a/lib/event/app.c b/lib/event/app.c +index 2cbb36f..dd16ed2 100644 +--- a/lib/event/app.c ++++ b/lib/event/app.c +@@ -607,6 +607,7 @@ spdk_app_fini(void) + spdk_reactors_fini(); + spdk_env_fini(); + spdk_free_thread_list(); ++ spdk_vq_allocate_lcore_fini(); + spdk_log_close(); + } + +diff --git a/lib/thread/spdk_thread.map b/lib/thread/spdk_thread.map +index 6925861..8526df6 100644 +--- a/lib/thread/spdk_thread.map ++++ b/lib/thread/spdk_thread.map +@@ -57,6 +57,13 @@ + spdk_thread_list_add_new; + spdk_pre_allocate_thread; + spdk_set_need_pre_allocate_thread; ++ spdk_get_shared_thread_by_lcore; ++ spdk_put_shared_thread; ++ spdk_init_bvsession_vq_poller_list; ++ spdk_insert_bvsession_vq_poller_list; ++ spdk_bvsession_vq_poller_list_is_empty; ++ spdk_pop_bvsession_vq_poller_list; ++ spdk_vhost_vq_put_io_channel; + + # internal functions in spdk_internal/thread.h + spdk_poller_state_str; +diff --git a/lib/thread/thread.c b/lib/thread/thread.c +index d6f6c7c..9b4a167 100644 +--- a/lib/thread/thread.c ++++ b/lib/thread/thread.c +@@ -2225,4 +2225,70 @@ void spdk_set_need_pre_allocate_thread(bool need) + g_need_pre_allocate_thread = need; + } + ++static pthread_mutex_t g_spdk_thread_allocate_mutex = PTHREAD_MUTEX_INITIALIZER; ++ ++struct spdk_thread * ++spdk_get_shared_thread_by_lcore(uint32_t lcore) ++{ ++ struct spdk_thread *allocated_thread = NULL; ++ struct spdk_thread *each_thread = NULL; ++ uint32_t min_threads = UINT32_MAX; ++ ++ if (STAILQ_EMPTY(&g_spdk_reactor_thread_list[lcore])) { ++ SPDK_ERRLOG("no pre-allocated spdk thread for core %u.\n", lcore); ++ return NULL; ++ } ++ ++ pthread_mutex_lock(&g_spdk_thread_allocate_mutex); ++ STAILQ_FOREACH(each_thread, &g_spdk_reactor_thread_list[lcore], stailq) { ++ if (each_thread->total_ref < min_threads) { ++ min_threads = each_thread->total_ref; ++ allocated_thread = each_thread; ++ } ++ } ++ allocated_thread->total_ref++; ++ pthread_mutex_unlock(&g_spdk_thread_allocate_mutex); ++ ++ return allocated_thread; ++} ++ ++void spdk_put_shared_thread(struct spdk_thread *thread) ++{ ++ pthread_mutex_lock(&g_spdk_thread_allocate_mutex); ++ thread->total_ref--; ++ pthread_mutex_unlock(&g_spdk_thread_allocate_mutex); ++} ++ ++void spdk_init_bvsession_vq_poller_list(vq_poller_list_head_t *head) ++{ ++ STAILQ_INIT(head); ++} ++ ++void spdk_insert_bvsession_vq_poller_list(vq_poller_list_head_t *head, ++ struct spdk_poller *poller) ++{ ++ STAILQ_INSERT_TAIL(head, poller, stailq); ++} ++ ++bool spdk_bvsession_vq_poller_list_is_empty(vq_poller_list_head_t *head) ++{ ++ return STAILQ_EMPTY(head); ++} ++ ++struct spdk_poller *spdk_pop_bvsession_vq_poller_list(vq_poller_list_head_t *head) ++{ ++ struct spdk_poller *poller = STAILQ_FIRST(head); ++ STAILQ_REMOVE_HEAD(head, stailq); ++ return poller; ++} ++ ++void spdk_vhost_vq_put_io_channel(struct spdk_io_channel *io_channel) ++{ ++ io_channel->ref--; ++ if (io_channel->ref == 0) { ++ io_channel->destroy_ref++; ++ spdk_thread_send_msg(io_channel->thread, put_io_channel, io_channel); ++ } ++} ++ + SPDK_LOG_REGISTER_COMPONENT(thread) +diff --git a/lib/vhost/spdk_vhost.map b/lib/vhost/spdk_vhost.map +index 0f596fa..a68f48e 100644 +--- a/lib/vhost/spdk_vhost.map ++++ b/lib/vhost/spdk_vhost.map +@@ -25,6 +25,7 @@ + spdk_vhost_dev_remove; + spdk_set_enable_vq_balance; + spdk_is_enable_vq_balance; ++ spdk_vq_allocate_lcore_fini; + + local: *; + }; +diff --git a/lib/vhost/vhost.c b/lib/vhost/vhost.c +index 7b918f9..549235d 100644 +--- a/lib/vhost/vhost.c ++++ b/lib/vhost/vhost.c +@@ -40,7 +40,9 @@ + #include "spdk/memory.h" + #include "spdk/barrier.h" + #include "spdk/vhost.h" ++#include "spdk/event.h" + #include "vhost_internal.h" ++#include "spdk_internal/event.h" + + bool g_packed_ring_recovery = false; + +@@ -55,6 +57,8 @@ static struct spdk_thread *g_vhost_init_thread; + static spdk_vhost_fini_cb g_fini_cpl_cb; + + static bool g_enable_vq_balance = false; ++static uint32_t *g_num_vqs_each_core = NULL; ++static pthread_mutex_t g_vq_allocate_lcore_mutex = PTHREAD_MUTEX_INITIALIZER; + + /** + * DPDK calls our callbacks synchronously but the work those callbacks +@@ -1338,6 +1342,8 @@ vhost_stop_device_cb(int vid) + return rc; + } + ++static uint32_t vq_allocate_lcore(void); ++ + int + vhost_start_device_cb(int vid) + { +@@ -1346,6 +1352,7 @@ vhost_start_device_cb(int vid) + int rc = -1; + uint16_t i; + bool packed_ring; ++ uint32_t lcore; + + pthread_mutex_lock(&g_vhost_mutex); + +@@ -1377,7 +1384,14 @@ vhost_start_device_cb(int vid) + memset(vsession->virtqueue, 0, sizeof(vsession->virtqueue)); + for (i = 0; i < SPDK_VHOST_MAX_VQUEUES; i++) { + struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i]; +- ++ struct spdk_thread *tmp_thread = q->thread; ++ struct spdk_io_channel *tmp_channel = q->io_channel; ++ ++ memset(&vsession->virtqueue[i], 0, sizeof(vsession->virtqueue[i])); ++ q->thread = tmp_thread; ++ q->io_channel = tmp_channel; ++ q->alloc_io_channel_is_failed = false; ++ q->task_cnt = 0; + q->vsession = vsession; + q->vring_idx = -1; + if (rte_vhost_get_vhost_vring(vid, i, &q->vring)) { +@@ -1458,6 +1472,15 @@ vhost_start_device_cb(int vid) + if (q->vring.desc != NULL && q->vring.size > 0) { + rte_vhost_vring_call(vsession->vid, q->vring_idx); + } ++ ++ /* allocate lcore and thread for vq */ ++ if (spdk_is_enable_vq_balance() && vsession->virtqueue[i].thread == NULL) { ++ lcore = vq_allocate_lcore(); ++ vsession->virtqueue[i].thread = spdk_get_shared_thread_by_lcore(lcore); ++ SPDK_NOTICELOG("blk(%s)-virtqueue[%u]: lcore(%u), thread(%ld)\n", ++ vsession->vdev->name, i, lcore, ++ spdk_thread_get_id(vsession->virtqueue[i].thread)); ++ } + } + + vhost_session_set_coalescing(vdev, vsession, NULL); +@@ -1571,6 +1594,9 @@ vhost_new_connection_cb(int vid, const char *ifname) + return 0; + } + ++static void vhost_vq_put_io_channels(struct spdk_vhost_session *vsession); ++static void free_vhost_vq_thread(struct spdk_vhost_session *vsession); ++ + int + vhost_destroy_connection_cb(int vid) + { +@@ -1589,6 +1615,11 @@ vhost_destroy_connection_cb(int vid) + rc = _stop_session(vsession); + } + ++ if (spdk_is_enable_vq_balance()) { ++ vhost_vq_put_io_channels(vsession); ++ free_vhost_vq_thread(vsession); ++ } ++ + TAILQ_REMOVE(&vsession->vdev->vsessions, vsession, tailq); + free(vsession->name); + free(vsession); +@@ -1615,6 +1646,20 @@ spdk_vhost_unlock(void) + pthread_mutex_unlock(&g_vhost_mutex); + } + ++static int ++vq_allocate_lcore_init(void) ++{ ++ uint32_t last_core; ++ ++ last_core = spdk_env_get_last_core(); ++ g_num_vqs_each_core = calloc(last_core + 1, sizeof(uint32_t)); ++ if (!g_num_vqs_each_core) { ++ SPDK_ERRLOG("cound not allocate array size=%u for g_num_vqs_each_core.\n", last_core + 1); ++ return -1; ++ } ++ return 0; ++} ++ + void + spdk_vhost_init(spdk_vhost_init_cb init_cb) + { +@@ -1651,6 +1696,11 @@ spdk_vhost_init(spdk_vhost_init_cb init_cb) + * created. + */ + spdk_for_each_thread(vhost_setup_core_mask, init_cb, vhost_setup_core_mask_done); ++ ++ if (vq_allocate_lcore_init() != 0) { ++ ret = -1; ++ goto out; ++ } + return; + out: + init_cb(ret); +@@ -1761,5 +1811,104 @@ spdk_is_enable_vq_balance(void) + return g_enable_vq_balance; + } + ++void ++spdk_vq_allocate_lcore_fini(void) ++{ ++ if (g_num_vqs_each_core) { ++ free(g_num_vqs_each_core); ++ g_num_vqs_each_core = NULL; ++ SPDK_NOTICELOG("vhost exit, free g_num_vqs_each_core success.\n"); ++ } ++} ++ ++int ++get_vsession_task_cnt(struct spdk_vhost_session *vsession) ++{ ++ uint32_t i; ++ int task_cnt = 0; ++ ++ for (i = 0; i < vsession->max_queues; i++) { ++ task_cnt += vsession->virtqueue[i].task_cnt; ++ } ++ ++ return task_cnt; ++} ++ ++static uint32_t ++get_least_allocated_lcore(uint32_t *allocate_each_core) ++{ ++ uint32_t i; ++ uint32_t selected_core = spdk_env_get_first_core(); ++ uint32_t min = UINT32_MAX; ++ ++ SPDK_ENV_FOREACH_CORE(i) { ++ if (!spdk_cpuset_get_cpu(spdk_app_get_core_mask(), i)) { ++ continue; ++ } ++ ++ if (allocate_each_core[i] < min) { ++ selected_core = i; ++ min = allocate_each_core[i]; ++ } ++ } ++ ++ allocate_each_core[selected_core]++; ++ return selected_core; ++} ++ ++static uint32_t ++vq_allocate_lcore(void) ++{ ++ uint32_t selected_core; ++ ++ pthread_mutex_lock(&g_vq_allocate_lcore_mutex); ++ selected_core = get_least_allocated_lcore(g_num_vqs_each_core); ++ pthread_mutex_unlock(&g_vq_allocate_lcore_mutex); ++ ++ return selected_core; ++} ++ ++static void ++vhost_vq_put_io_channels(struct spdk_vhost_session *vsession) ++{ ++ uint32_t i; ++ struct spdk_vhost_virtqueue *vq; ++ ++ for (i = 0; i < vsession->max_queues; i++) { ++ vq = &vsession->virtqueue[i]; ++ if ((vq->io_channel = NULL)) { ++ continue; ++ } ++ spdk_vhost_vq_put_io_channel(vq->io_channel); ++ vq->io_channel = NULL; ++ } ++} ++ ++static void ++vq_free_lcore(uint32_t lcore) ++{ ++ pthread_mutex_lock(&g_vq_allocate_lcore_mutex); ++ g_num_vqs_each_core[lcore]--; ++ pthread_mutex_unlock(&g_vq_allocate_lcore_mutex); ++} ++ ++static void ++free_vhost_vq_thread(struct spdk_vhost_session *vsession) ++{ ++ uint32_t i; ++ struct spdk_lw_thread *lw_thread; ++ ++ for (i = 0; i < vsession->max_queues; i++) { ++ if (vsession->virtqueue[i].thread != NULL) { ++ lw_thread = spdk_thread_get_ctx(vsession->virtqueue[i].thread); ++ SPDK_NOTICELOG("blk(%s)-stopping: virtqueue[%u] lcore(%u), thread(%ld)\n", ++ vsession->vdev->name, i, lw_thread->lcore, ++ spdk_thread_get_id(vsession->virtqueue[i].thread)); ++ vsession->virtqueue[i].thread = NULL; ++ vq_free_lcore(lw_thread->lcore); ++ } ++ } ++} ++ + SPDK_LOG_REGISTER_COMPONENT(vhost) + SPDK_LOG_REGISTER_COMPONENT(vhost_ring) +diff --git a/lib/vhost/vhost_blk.c b/lib/vhost/vhost_blk.c +index b72de52..1ca005c 100644 +--- a/lib/vhost/vhost_blk.c ++++ b/lib/vhost/vhost_blk.c +@@ -105,6 +105,8 @@ struct spdk_vhost_blk_session { + struct spdk_poller *requestq_poller; + struct spdk_io_channel *io_channel; + struct spdk_poller *stop_poller; ++ vq_poller_list_head_t vq_poller_list[MAX_PRE_THREAD_NUM]; ++ rte_atomic32_t vq_poller_nums; + }; + + /* forward declaration */ +@@ -124,8 +126,8 @@ to_blk_session(struct spdk_vhost_session *vsession) + static void + blk_task_finish(struct spdk_vhost_blk_task *task) + { +- assert(task->bvsession->vsession.task_cnt > 0); +- task->bvsession->vsession.task_cnt--; ++ assert(task->vq->task_cnt > 0); ++ task->vq->task_cnt--; + task->used = false; + } + +@@ -459,6 +461,7 @@ process_blk_request(struct spdk_vhost_blk_task *task, + uint64_t flush_bytes; + uint32_t payload_len; + int rc; ++ struct spdk_io_channel *io_channel; + + iov = &task->iovs[0]; + if (spdk_unlikely(iov->iov_len != sizeof(*req))) { +@@ -491,6 +494,12 @@ process_blk_request(struct spdk_vhost_blk_task *task, + type &= ~VIRTIO_BLK_T_BARRIER; + #endif + ++ if (spdk_is_enable_vq_balance()) { ++ io_channel = task->vq->io_channel; ++ } else { ++ io_channel = bvsession->io_channel; ++ } ++ + switch (type) { + case VIRTIO_BLK_T_IN: + case VIRTIO_BLK_T_OUT: +@@ -503,12 +512,12 @@ process_blk_request(struct spdk_vhost_blk_task *task, + + if (type == VIRTIO_BLK_T_IN) { + task->used_len = payload_len + sizeof(*task->status); +- rc = spdk_bdev_readv(bvdev->bdev_desc, bvsession->io_channel, ++ rc = spdk_bdev_readv(bvdev->bdev_desc, io_channel, + &task->iovs[1], task->iovcnt, req->sector * 512, + payload_len, blk_request_complete_cb, task); + } else if (!bvdev->readonly) { + task->used_len = sizeof(*task->status); +- rc = spdk_bdev_writev(bvdev->bdev_desc, bvsession->io_channel, ++ rc = spdk_bdev_writev(bvdev->bdev_desc, io_channel, + &task->iovs[1], task->iovcnt, req->sector * 512, + payload_len, blk_request_complete_cb, task); + } else { +@@ -540,7 +549,7 @@ process_blk_request(struct spdk_vhost_blk_task *task, + return -1; + } + +- rc = spdk_bdev_unmap(bvdev->bdev_desc, bvsession->io_channel, ++ rc = spdk_bdev_unmap(bvdev->bdev_desc, io_channel, + desc->sector * 512, desc->num_sectors * 512, + blk_request_complete_cb, task); + if (rc) { +@@ -570,7 +579,7 @@ process_blk_request(struct spdk_vhost_blk_task *task, + (uint64_t)desc->sector * 512, (uint64_t)desc->num_sectors * 512); + } + +- rc = spdk_bdev_write_zeroes(bvdev->bdev_desc, bvsession->io_channel, ++ rc = spdk_bdev_write_zeroes(bvdev->bdev_desc, io_channel, + desc->sector * 512, desc->num_sectors * 512, + blk_request_complete_cb, task); + if (rc) { +@@ -590,7 +599,7 @@ process_blk_request(struct spdk_vhost_blk_task *task, + invalid_blk_request(task, VIRTIO_BLK_S_IOERR); + return -1; + } +- rc = spdk_bdev_flush(bvdev->bdev_desc, bvsession->io_channel, ++ rc = spdk_bdev_flush(bvdev->bdev_desc, io_channel, + 0, flush_bytes, + blk_request_complete_cb, task); + if (rc) { +@@ -639,7 +648,7 @@ process_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx) + return; + } + +- task->bvsession->vsession.task_cnt++; ++ task->vq->task_cnt++; + + blk_task_init(task); + +@@ -825,6 +834,20 @@ process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue + uint16_t reqs[SPDK_VHOST_VQ_MAX_SUBMISSIONS]; + uint16_t reqs_cnt, i; + ++ if (spdk_is_enable_vq_balance() && vq->alloc_io_channel_is_failed) { ++ return; ++ } ++ ++ if (spdk_is_enable_vq_balance() && !vq->io_channel) { ++ vq->io_channel = spdk_bdev_get_io_channel(bvsession->bvdev->bdev_desc); ++ SPDK_DEBUGLOG("%s alloc io_channel(%p)\n", bvsession->bvdev->vdev.name, vq->io_channel); ++ if (!vq->io_channel) { ++ SPDK_ERRLOG("%s: get-iochannel failed\n", bvsession->bvdev->vdev.name); ++ vq->alloc_io_channel_is_failed = true; ++ return; ++ } ++ } ++ + submit_inflight_desc(bvsession, vq); + + reqs_cnt = vhost_vq_avail_ring_get(vq, reqs, SPDK_COUNTOF(reqs)); +@@ -975,6 +998,7 @@ _no_bdev_vdev_vq_worker(struct spdk_vhost_virtqueue *vq) + struct spdk_vhost_session *vsession = vq->vsession; + struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession); + bool packed_ring; ++ int task_cnt; + + packed_ring = vq->packed.packed_ring; + if (packed_ring) { +@@ -985,7 +1009,8 @@ _no_bdev_vdev_vq_worker(struct spdk_vhost_virtqueue *vq) + + vhost_session_vq_used_signal(vq); + +- if (vsession->task_cnt == 0 && bvsession->io_channel) { ++ task_cnt = get_vsession_task_cnt(vsession); ++ if (task_cnt == 0 && bvsession->io_channel) { + spdk_put_io_channel(bvsession->io_channel); + bvsession->io_channel = NULL; + } +@@ -1307,9 +1332,17 @@ out: + return rc; + } + ++static int ++vhost_blk_start_cb_async(struct spdk_vhost_dev *vdev, ++ struct spdk_vhost_session *vsession, void *unused); ++ + static int + vhost_blk_start(struct spdk_vhost_session *vsession) + { ++ if (spdk_is_enable_vq_balance()) { ++ return vhost_session_send_event(vsession, vhost_blk_start_cb_async, ++ 3, "start session"); ++ } + return vhost_session_send_event(vsession, vhost_blk_start_cb, + 3, "start session"); + } +@@ -1320,8 +1353,9 @@ destroy_session_poller_cb(void *arg) + struct spdk_vhost_blk_session *bvsession = arg; + struct spdk_vhost_session *vsession = &bvsession->vsession; + int i; ++ int task_cnt = get_vsession_task_cnt(vsession); + +- if (vsession->task_cnt > 0) { ++ if (task_cnt > 0) { + return SPDK_POLLER_BUSY; + } + +@@ -1367,9 +1401,17 @@ vhost_blk_stop_cb(struct spdk_vhost_dev *vdev, + return 0; + } + ++static int ++vhost_blk_stop_cb_async(struct spdk_vhost_dev *vdev, ++ struct spdk_vhost_session *vsession, void *unused); ++ + static int + vhost_blk_stop(struct spdk_vhost_session *vsession) + { ++ if (spdk_is_enable_vq_balance()) { ++ return vhost_session_send_event(vsession, vhost_blk_stop_cb_async, ++ 3, "stop session"); ++ } + return vhost_session_send_event(vsession, vhost_blk_stop_cb, + 3, "stop session"); + } +@@ -1603,5 +1645,264 @@ vhost_blk_destroy(struct spdk_vhost_dev *vdev) + return 0; + } + ++#define MAX_VDEV_VQ_WORK_NAME_LEN 128 ++#define CHECK_SESSION_POLLER_PERIOD_US 1000 ++#define VSESSION_STOP_RETRY_COUNT_MS 4000 ++ ++typedef struct spdk_vhost_dev_async_ctx { ++ struct spdk_vhost_blk_session *bvsession; ++ struct spdk_poller *poller; ++ struct spdk_vhost_session *vsession; ++} spdk_vhost_dev_async_ctx_t; ++ ++typedef struct alloc_vq_poller_in_thread_ctx { ++ struct spdk_vhost_blk_session *bvsession; ++ struct spdk_vhost_virtqueue *vq; ++ spdk_poller_fn fn; ++} alloc_vq_poller_in_thread_ctx_t; ++ ++ ++static bool ++is_start_vq_pollers_done(struct spdk_vhost_blk_session *bvsession) ++{ ++ struct spdk_vhost_session *vsession; ++ ++ vsession = &bvsession->vsession; ++ if (rte_atomic32_read(&bvsession->vq_poller_nums) < vsession->max_queues) { ++ return false; ++ } ++ ++ return true; ++} ++ ++static int ++start_session_poller_cb(void* arg) ++{ ++ spdk_vhost_dev_async_ctx_t *start_ctx; ++ struct spdk_vhost_session *vsession; ++ ++ start_ctx = (spdk_vhost_dev_async_ctx_t *)arg; ++ if (!is_start_vq_pollers_done(start_ctx->bvsession)) { ++ return SPDK_POLLER_BUSY; ++ } ++ ++ vsession = start_ctx->vsession; ++ SPDK_NOTICELOG("blk(%s)-starting: all thread finish own work\n", vsession->vdev->name); ++ spdk_poller_unregister(&start_ctx->poller); ++ ++ vhost_session_start_done(start_ctx->vsession, 0); ++ free(start_ctx); ++ ++ return SPDK_POLLER_BUSY; ++} ++ ++static void ++alloc_vq_poller_in_thread(void *arg) ++{ ++ alloc_vq_poller_in_thread_ctx_t *ctx; ++ struct spdk_vhost_blk_session *bvsession; ++ struct spdk_vhost_session *vsession; ++ uint64_t thread_id; ++ char vdev_vq_work_name[MAX_VDEV_VQ_WORK_NAME_LEN] = {0}; ++ struct spdk_poller *poller; ++ ++ ctx = (alloc_vq_poller_in_thread_ctx_t *)arg; ++ bvsession = ctx->bvsession; ++ vsession = &bvsession->vsession; ++ ++ thread_id = spdk_thread_get_id(spdk_get_thread()); ++ (void)snprintf(vdev_vq_work_name, MAX_VDEV_VQ_WORK_NAME_LEN, "vdev_vq_work_%s_%"PRIu64"", ++ vsession->vdev->name, thread_id); ++ poller = spdk_poller_register_named((spdk_poller_fn)ctx->fn, ++ ctx->vq, 0, vdev_vq_work_name); ++ spdk_insert_bvsession_vq_poller_list(&bvsession->vq_poller_list[thread_id], poller); ++ rte_atomic32_inc(&bvsession->vq_poller_nums); ++ free(ctx); ++} ++ ++static int ++alloc_poller_for_vq(struct spdk_vhost_session *vsession, struct spdk_vhost_blk_dev *bvdev) ++{ ++ int i; ++ struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession); ++ alloc_vq_poller_in_thread_ctx_t *alloc_ctx; ++ ++ for (i = 0; i < vsession->max_queues; i++) { ++ alloc_ctx = (alloc_vq_poller_in_thread_ctx_t *)malloc(sizeof(*alloc_ctx)); ++ if (alloc_ctx == NULL) { ++ SPDK_DEBUGLOG("fail to malloc for alloc_ctx(%d)", i); ++ return -1; ++ } ++ alloc_ctx->bvsession = bvsession; ++ alloc_ctx->vq = &vsession->virtqueue[i]; ++ alloc_ctx->fn = bvdev->bdev ? vdev_vq_worker : no_bdev_vdev_worker; ++ spdk_thread_send_msg(vsession->virtqueue[i].thread, alloc_vq_poller_in_thread, alloc_ctx); ++ } ++ ++ return 0; ++} ++ ++static int ++vhost_blk_start_cb_async(struct spdk_vhost_dev *vdev, ++ struct spdk_vhost_session *vsession, void *unused) ++{ ++ struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession); ++ struct spdk_vhost_blk_dev *bvdev; ++ int i, rc = 0; ++ spdk_vhost_dev_async_ctx_t *start_ctx; ++ ++ bvdev = to_blk_dev(vdev); ++ if (bvdev == NULL) { ++ SPDK_ERRLOG("Unexpect vhost blk dev is NULL, vhost dev(%s) cannot work well\n", vdev->name); ++ rc = -1; ++ goto out; ++ } ++ bvsession->bvdev = bvdev; ++ ++ /* validate all I/O queues are in a contiguous index range */ ++ for (i = 0; i < vsession->max_queues; i++) { ++ if (vsession->virtqueue[i].vring.desc == NULL) { ++ SPDK_ERRLOG("%s: queue %d is empty\n", vsession->name, i); ++ rc = -1; ++ goto out; ++ } ++ } ++ ++ SPDK_NOTICELOG("start alloc task pool for %s\n", vsession->name); ++ rc = alloc_task_pool(bvsession); ++ if (rc != 0) { ++ SPDK_ERRLOG("%s: failed to alloc task pool.\n", vsession->name); ++ goto out; ++ } ++ SPDK_NOTICELOG("start alloc task pool for %s\n", vsession->name); ++ ++ start_ctx = (spdk_vhost_dev_async_ctx_t *)malloc(sizeof(*start_ctx)); ++ if (start_ctx == NULL) { ++ SPDK_ERRLOG("fail to malloc for start_ctx\n"); ++ rc = -1; ++ free_task_pool(bvsession); ++ goto out; ++ } ++ ++ for (i = 0; i < MAX_PRE_THREAD_NUM; i++) { ++ spdk_init_bvsession_vq_poller_list(&bvsession->vq_poller_list[i]); ++ } ++ rte_atomic32_init(&bvsession->vq_poller_nums); ++ ++ rc = alloc_poller_for_vq(vsession, bvdev); ++ if (rc != 0) { ++ SPDK_ERRLOG("fail to alloc poller for vq\n"); ++ free_task_pool(bvsession); ++ goto out; ++ } ++ ++ /* register major poller to check all reactors finish own work */ ++ start_ctx->vsession = vsession; ++ start_ctx->bvsession = bvsession; ++ start_ctx->poller = SPDK_POLLER_REGISTER(start_session_poller_cb, start_ctx, ++ CHECK_SESSION_POLLER_PERIOD_US); ++ ++ return 0; ++out: ++ vhost_session_start_done(vsession, rc); ++ return rc; ++} ++ ++static void ++unregister_vq_poller_in_thread(void *arg) ++{ ++ struct spdk_vhost_blk_session *bvsession; ++ uint64_t thread_id; ++ struct spdk_poller *poller; ++ vq_poller_list_head_t *head; ++ ++ bvsession = (struct spdk_vhost_blk_session *)arg; ++ thread_id = spdk_thread_get_id(spdk_get_thread()); ++ head = &bvsession->vq_poller_list[thread_id]; ++ while (!spdk_bvsession_vq_poller_list_is_empty(head)) { ++ poller = spdk_pop_bvsession_vq_poller_list(head); ++ spdk_poller_unregister(&poller); ++ rte_atomic32_dec(&bvsession->vq_poller_nums); ++ } ++} ++ ++static bool ++is_stop_session_done(struct spdk_vhost_blk_session *bvsession) ++{ ++ uint32_t i; ++ uint64_t thread_id; ++ struct spdk_vhost_session *vsession; ++ vq_poller_list_head_t *head; ++ ++ vsession = &bvsession->vsession; ++ for (i = 0; i < vsession->max_queues; i++) { ++ thread_id = spdk_thread_get_id(vsession->virtqueue[i].thread); ++ head = &bvsession->vq_poller_list[thread_id]; ++ if (!spdk_bvsession_vq_poller_list_is_empty(head)) { ++ return false; ++ } ++ } ++ ++ return true; ++} ++ ++static int ++stop_session_poller_cb(void *arg) ++{ ++ spdk_vhost_dev_async_ctx_t *stop_ctx = arg; ++ struct spdk_vhost_blk_session *bvsession = stop_ctx->bvsession; ++ struct spdk_vhost_session *vsession = &bvsession->vsession; ++ ++ if (!is_stop_session_done(bvsession)) { ++ return SPDK_POLLER_BUSY; ++ } ++ ++ SPDK_NOTICELOG("blk(%s)-stoping: all thread finish own work\n", vsession->vdev->name); ++ spdk_poller_unregister(&stop_ctx->poller); ++ free(stop_ctx); ++ ++ /* vhost_session_send_event timeout is 3 seconds, here set retry within 4 seconds */ ++ bvsession->stop_poller = SPDK_POLLER_REGISTER(destroy_session_poller_cb, bvsession, ++ CHECK_SESSION_POLLER_PERIOD_US); ++ ++ return SPDK_POLLER_BUSY; ++} ++ ++static int ++vhost_blk_stop_cb_async(struct spdk_vhost_dev *vdev, ++ struct spdk_vhost_session *vsession, void *unused) ++{ ++ int i; ++ struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession); ++ spdk_vhost_dev_async_ctx_t *stop_ctx; ++ ++ stop_ctx = (spdk_vhost_dev_async_ctx_t *)malloc(sizeof(*stop_ctx)); ++ if (stop_ctx == NULL) { ++ SPDK_ERRLOG("fail to malloc for stop ctx\n"); ++ vhost_session_stop_done(vsession, -1); ++ return -1; ++ } ++ ++ if (vsession->virtqueue[0].intr) { ++ vhost_blk_session_unregister_interrupts(bvsession); ++ } ++ ++ /* send msg to vq thread, let every thread register sub-poller */ ++ for (i = 0; i < vsession->max_queues; i++) { ++ if (vsession->virtqueue[i].thread) { ++ spdk_thread_send_msg(vsession->virtqueue[i].thread, unregister_vq_poller_in_thread, bvsession); ++ } else { ++ SPDK_NOTICELOG("(%s) virtqueue %d thread has been freed\n", vsession->vdev->name, i); ++ } ++ } ++ ++ /* register major poller to check all sub-poller finish own work */ ++ stop_ctx->bvsession = bvsession; ++ stop_ctx->poller = SPDK_POLLER_REGISTER(stop_session_poller_cb, stop_ctx, ++ CHECK_SESSION_POLLER_PERIOD_US); ++ ++ return 0; ++} ++ + SPDK_LOG_REGISTER_COMPONENT(vhost_blk) + SPDK_LOG_REGISTER_COMPONENT(vhost_blk_data) +diff --git a/lib/vhost/vhost_internal.h b/lib/vhost/vhost_internal.h +index 3cf027b..f91c8b5 100644 +--- a/lib/vhost/vhost_internal.h ++++ b/lib/vhost/vhost_internal.h +@@ -131,6 +131,11 @@ struct spdk_vhost_virtqueue { + struct spdk_vhost_session *vsession; + + struct spdk_interrupt *intr; ++ ++ struct spdk_thread *thread; ++ struct spdk_io_channel *io_channel; ++ bool alloc_io_channel_is_failed; ++ int task_cnt; + } __attribute((aligned(SPDK_CACHE_LINE_SIZE))); + + struct spdk_vhost_session { +@@ -502,4 +507,6 @@ int remove_vhost_controller(struct spdk_vhost_dev *vdev); + + bool spdk_vhost_get_backend_interrupt_coalescing(void); + ++int get_vsession_task_cnt(struct spdk_vhost_session *vsession); ++ + #endif /* SPDK_VHOST_INTERNAL_H */ +diff --git a/lib/vhost/vhost_rpc.c b/lib/vhost/vhost_rpc.c +index b1555f2..5507ecf 100644 +--- a/lib/vhost/vhost_rpc.c ++++ b/lib/vhost/vhost_rpc.c +@@ -360,6 +360,7 @@ static void + _rpc_get_vhost_controller(struct spdk_json_write_ctx *w, struct spdk_vhost_dev *vdev) + { + uint32_t delay_base_us, iops_threshold; ++ struct spdk_vhost_session *vsession; + + spdk_vhost_get_coalescing(vdev, &delay_base_us, &iops_threshold); + +@@ -376,6 +377,17 @@ _rpc_get_vhost_controller(struct spdk_json_write_ctx *w, struct spdk_vhost_dev * + vhost_dump_info_json(vdev, w); + spdk_json_write_object_end(w); + ++ spdk_json_write_named_array_begin(w, "vsession"); ++ TAILQ_FOREACH(vsession, &vdev->vsessions, tailq) { ++ if (vsession->initialized) { ++ spdk_json_write_object_begin(w); ++ spdk_json_write_named_string(w, "name", vsession->name); ++ spdk_json_write_named_uint32(w, "task_cnt", get_vsession_task_cnt(vsession)); ++ spdk_json_write_object_end(w); ++ } ++ } ++ spdk_json_write_array_end(w); ++ + spdk_json_write_object_end(w); + } + +-- +2.33.0 + diff --git a/spdk.spec b/spdk.spec index 81941eb9b47f121911f0237b09b0cb92a36fdd6b..41ad7a72d8ba255c08ac92ee69ce6ac4ef78c2e2 100644 --- a/spdk.spec +++ b/spdk.spec @@ -3,7 +3,7 @@ Name: spdk Version: 21.01.1 -Release: 17 +Release: 18 Summary: Set of libraries and utilities for high performance user-mode storage License: BSD and MIT URL: http://spdk.io @@ -38,6 +38,10 @@ Patch27: 0027--nvme-cuse-Add-ctrlr_lock-for-cuse-register-and-unreg.patch Patch28: 0028-fixed-use-after-free-detected-by-Coverity.patch Patch29: 0029-scripts-Do-msr-existence-check-only-on-x86_64-machin.patch Patch30: 0030-vhost-add-vhost-interrupt-coalescing-based-on-virtio.patch +Patch31: 0031-add-startup-parameter-of-enable-vq-balance.patch +Patch32: 0032-nvme-support-to-set-the-number-of-IO-queues-to-reque.patch +Patch33: 0033-thread-allocate-spdk_thread-for-nvme-io-queues-when-.patch +Patch34: 0034-vhost_blk-allocate-vq-poller-for-each-virtqueue-for-.patch %define package_version %{version}-%{release} @@ -208,6 +212,9 @@ mv doc/output/html/ %{install_docdir} %changelog +* Fri Sep 19 2025 shenyage - 21.01.1-18 +- vhost_blk: support multiple reactors to handle vqs of vhost_blk disks + * Sat May 10 2025 jiaqingtong - 21.01.1-17 - vhost: add vhost interrupt coalescing model