diff --git a/0001-make-thread-detach-to-avoid-resource-leak.patch b/0001-make-thread-detach-to-avoid-resource-leak.patch new file mode 100644 index 0000000000000000000000000000000000000000..b66699e7182393c28020b8b2ce699654ea836939 --- /dev/null +++ b/0001-make-thread-detach-to-avoid-resource-leak.patch @@ -0,0 +1,32 @@ +From 1ef7a43907ac6fc521cedd2b4744be4d102efd32 Mon Sep 17 00:00:00 2001 +From: WangFengTu +Date: Thu, 31 Dec 2020 14:05:25 +0800 +Subject: [PATCH 1/9] make thread detach to avoid resource leak + +Signed-off-by: WangFengTu +--- + src/daemon/modules/image/oci/registry/registry.c | 8 ++++++++ + 1 file changed, 8 insertions(+) + +diff --git a/src/daemon/modules/image/oci/registry/registry.c b/src/daemon/modules/image/oci/registry/registry.c +index 391af4fb..3fba2039 100644 +--- a/src/daemon/modules/image/oci/registry/registry.c ++++ b/src/daemon/modules/image/oci/registry/registry.c +@@ -1389,6 +1389,14 @@ static void *register_layers_in_thread(void *arg) + size_t i = 0; + struct timespec ts = {0}; + ++ ret = pthread_detach(pthread_self()); ++ if (ret != 0) { ++ ERROR("Set thread detach fail"); ++ goto out; ++ } ++ ++ prctl(PR_SET_NAME, "register_layer"); ++ + for (i = 0; i < desc->layers_len; i++) { + mutex_lock(&desc->mutex); + while (wait_fetch_complete(&infos[i])) { +-- +2.25.1 + diff --git a/0002-devmapper-fix-udev-wait-thread-resource-leak.patch b/0002-devmapper-fix-udev-wait-thread-resource-leak.patch new file mode 100644 index 0000000000000000000000000000000000000000..58eae8eb94b17530db72607e48f9886b50372926 --- /dev/null +++ b/0002-devmapper-fix-udev-wait-thread-resource-leak.patch @@ -0,0 +1,134 @@ +From 025416aae9f7eaaa8fe5ad52ecbbf6692505186b Mon Sep 17 00:00:00 2001 +From: gaohuatao +Date: Thu, 31 Dec 2020 14:31:12 +0800 +Subject: [PATCH 2/9] devmapper: fix udev wait thread resource leak + +Signed-off-by: gaohuatao +--- + .../graphdriver/devmapper/driver_devmapper.c | 2 +- + .../graphdriver/devmapper/wrapper_devmapper.c | 35 +++++++++++-------- + 2 files changed, 22 insertions(+), 15 deletions(-) + +diff --git a/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/driver_devmapper.c b/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/driver_devmapper.c +index ab60845d..f2586f0d 100644 +--- a/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/driver_devmapper.c ++++ b/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/driver_devmapper.c +@@ -216,7 +216,7 @@ char *devmapper_mount_layer(const char *id, const struct graphdriver *driver, + } + + if (mount_device(id, mnt_point_dir, mount_opts, driver->devset) != 0) { +- ERROR("Mount device:%s to path:%s failed", id, mnt_parent_dir); ++ ERROR("Mount device:%s to path:%s failed", id, mnt_point_dir); + ret = -1; + goto out; + } +diff --git a/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/wrapper_devmapper.c b/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/wrapper_devmapper.c +index 112e2b73..1dcdf595 100644 +--- a/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/wrapper_devmapper.c ++++ b/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/wrapper_devmapper.c +@@ -358,28 +358,32 @@ out: + + static void *udev_wait_process(void *data) + { ++ int ret = 0; + udev_wait_pth_t *uwait = (udev_wait_pth_t *)data; + +- if (dm_udev_wait(uwait->cookie) != 1) { +- pthread_mutex_lock(&uwait->udev_mutex); +- uwait->state = ERR_UDEV_WAIT; +- pthread_mutex_unlock(&uwait->udev_mutex); +- DAEMON_CLEAR_ERRMSG(); +- pthread_exit((void *)ERR_UDEV_WAIT); ++ if (pthread_detach(pthread_self()) != 0) { ++ CRIT("Start: set thread detach fail"); ++ goto out; + } + ++ ret = dm_udev_wait(uwait->cookie); + pthread_mutex_lock(&uwait->udev_mutex); +- uwait->state = DEV_OK; ++ if (ret != 1) { ++ uwait->state = ERR_UDEV_WAIT; ++ } else { ++ uwait->state = DEV_OK; ++ } + pthread_mutex_unlock(&uwait->udev_mutex); ++ ++out: + DAEMON_CLEAR_ERRMSG(); +- pthread_exit((void *)0); ++ return NULL; + } + + // UdevWait waits for any processes that are waiting for udev to complete the specified cookie. + void dev_udev_wait(uint32_t cookie) + { + pthread_t tid; +- int thread_result = 0; + udev_wait_pth_t *uwait = NULL; + float timeout = 0; + struct timeval start, end; +@@ -403,7 +407,7 @@ void dev_udev_wait(uint32_t cookie) + } + + if (pthread_create(&tid, NULL, udev_wait_process, uwait) != 0) { +- ERROR("devmapper: create udev wait process thread failed"); ++ ERROR("devmapper: create udev wait process thread error:%s", strerror(errno)); + goto free_out; + } + +@@ -419,15 +423,14 @@ void dev_udev_wait(uint32_t cookie) + ERROR("devmapper: get time failed"); + goto free_out; + } ++ + timeout = (end.tv_sec - start.tv_sec) + (end.tv_usec - start.tv_usec) / 1000000; // seconds + if (timeout >= (float)dm_udev_wait_timeout) { + if (dm_udev_complete(cookie) != 1) { + ERROR("Failed to complete udev cookie %u on udev wait timeout", cookie); + goto free_out; + } +- INFO("devmapper: udev wait join thread start..."); +- pthread_join(tid, (void *)&thread_result); +- INFO("devmapper: udev wait join thread end exit %d", thread_result); ++ ERROR("Wait on udev cookie time out"); + break; + } + } +@@ -482,6 +485,7 @@ int dev_delete_device_force(const char *name) + } + + udev: ++ DEBUG("Start udev wait on delete device force"); + dev_udev_wait(cookie); + + out: +@@ -536,6 +540,7 @@ int dev_remove_device_deferred(const char *name) + } + + udev: ++ DEBUG("Start udev wait on remove device deferred"); + dev_udev_wait(cookie); + out: + if (dmt != NULL) { +@@ -825,6 +830,7 @@ int dev_resume_device(const char *dm_name) + ret = -1; + } + ++ DEBUG("Start udev wait on resume device"); + dev_udev_wait(cookie); + + out: +@@ -886,7 +892,8 @@ int dev_active_device(const char *pool_name, const char *name, int device_id, ui + ERROR("devicemapper: error running deviceCreate (ActivateDevice) %d", ret); + ret = -1; + } +- ++ ++ DEBUG("Start udev wait on create device"); + dev_udev_wait(cookie); + out: + if (dmt != NULL) { +-- +2.25.1 + diff --git a/0003-clean-code-fix-clean-code.patch b/0003-clean-code-fix-clean-code.patch new file mode 100644 index 0000000000000000000000000000000000000000..8f58e8bcaf20ae41769e3bdfbd1807941017a7d4 --- /dev/null +++ b/0003-clean-code-fix-clean-code.patch @@ -0,0 +1,61 @@ +From 200f49ff353ee8266505316659493ffc4082c803 Mon Sep 17 00:00:00 2001 +From: lifeng68 +Date: Tue, 5 Jan 2021 18:48:20 +0800 +Subject: [PATCH 3/9] clean code: fix clean code + +Signed-off-by: lifeng68 +--- + .../layer_store/graphdriver/devmapper/wrapper_devmapper.c | 2 +- + src/utils/console/console.c | 2 +- + src/utils/cutils/utils.c | 2 -- + 3 files changed, 2 insertions(+), 4 deletions(-) + +diff --git a/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/wrapper_devmapper.c b/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/wrapper_devmapper.c +index 1dcdf595..5748ec54 100644 +--- a/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/wrapper_devmapper.c ++++ b/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/wrapper_devmapper.c +@@ -892,7 +892,7 @@ int dev_active_device(const char *pool_name, const char *name, int device_id, ui + ERROR("devicemapper: error running deviceCreate (ActivateDevice) %d", ret); + ret = -1; + } +- ++ + DEBUG("Start udev wait on create device"); + dev_udev_wait(cookie); + out: +diff --git a/src/utils/console/console.c b/src/utils/console/console.c +index cb748196..7fda519c 100644 +--- a/src/utils/console/console.c ++++ b/src/utils/console/console.c +@@ -68,7 +68,7 @@ static int console_cb_tty_stdin_with_escape(int fd, uint32_t events, void *cbdat + } + + if (c == 'q' && ts->saw_tty_exit) { +- ret = 1; ++ ret = EPOLL_LOOP_HANDLE_CLOSE; + goto out; + } + +diff --git a/src/utils/cutils/utils.c b/src/utils/cutils/utils.c +index 1e777dc3..9107f540 100644 +--- a/src/utils/cutils/utils.c ++++ b/src/utils/cutils/utils.c +@@ -493,7 +493,6 @@ out: + static void set_stderr_buf(char **stderr_buf, const char *format, ...) + { + char errbuf[BUFSIZ + 1] = { 0 }; +- char *jerr = NULL; + + UTIL_FREE_AND_SET_NULL(*stderr_buf); + +@@ -511,7 +510,6 @@ static void set_stderr_buf(char **stderr_buf, const char *format, ...) + if (*stderr_buf == NULL) { + *stderr_buf = util_strdup_s(errbuf); + } +- free(jerr); + } + + static int open_devnull(void) +-- +2.25.1 + diff --git a/0004-judge-isula-load-file-exists.patch b/0004-judge-isula-load-file-exists.patch new file mode 100644 index 0000000000000000000000000000000000000000..ea1b089579c988b152e48c81ce5bd08d124e1ddb --- /dev/null +++ b/0004-judge-isula-load-file-exists.patch @@ -0,0 +1,29 @@ +From c0b6c4187a3c66bef8b75a63e699df1be57d05b4 Mon Sep 17 00:00:00 2001 +From: gaohuatao +Date: Mon, 11 Jan 2021 18:29:26 +0800 +Subject: [PATCH 4/9] judge isula load file exists + +Signed-off-by: gaohuatao +--- + src/cmd/isula/images/load.c | 5 +++++ + 1 file changed, 5 insertions(+) + +diff --git a/src/cmd/isula/images/load.c b/src/cmd/isula/images/load.c +index 343d8d6d..0fb8014e 100644 +--- a/src/cmd/isula/images/load.c ++++ b/src/cmd/isula/images/load.c +@@ -162,6 +162,11 @@ int cmd_load_main(int argc, const char **argv) + g_cmd_load_args.file = file; + } + ++ if (!util_file_exists(g_cmd_load_args.file)) { ++ COMMAND_ERROR("File %s is not exist", g_cmd_load_args.file); ++ exit(exit_code); ++ } ++ + ret = client_load_image(&g_cmd_load_args); + if (ret) { + exit(exit_code); +-- +2.25.1 + diff --git a/0005-modify-image_load.sh-CI-to-test-file-not-exist.patch b/0005-modify-image_load.sh-CI-to-test-file-not-exist.patch new file mode 100644 index 0000000000000000000000000000000000000000..a6b05c797999a08d96ac1ef254cb57109571ec30 --- /dev/null +++ b/0005-modify-image_load.sh-CI-to-test-file-not-exist.patch @@ -0,0 +1,29 @@ +From e151821a1e092995836631b273bddc339cadffbe Mon Sep 17 00:00:00 2001 +From: gaohuatao +Date: Mon, 11 Jan 2021 18:33:39 +0800 +Subject: [PATCH 5/9] modify image_load.sh CI to test file not exist + +Signed-off-by: gaohuatao +--- + CI/test_cases/image_cases/image_load.sh | 5 +++++ + 1 file changed, 5 insertions(+) + +diff --git a/CI/test_cases/image_cases/image_load.sh b/CI/test_cases/image_cases/image_load.sh +index 8415e036..bf41f2af 100755 +--- a/CI/test_cases/image_cases/image_load.sh ++++ b/CI/test_cases/image_cases/image_load.sh +@@ -30,6 +30,11 @@ function test_image_load() + local test="isula load image test => (${FUNCNAME[@]})" + + msg_info "${test} starting..." ++ ++ # file is not exist, expect fail ++ isula load -i xxx.tar ++ [[ $? -eq 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - image tar file not exist test failed" && ((ret++)) ++ + + # single image without --tag + isula load -i $single_image +-- +2.25.1 + diff --git a/0006-do-not-pause-container-when-copy.patch b/0006-do-not-pause-container-when-copy.patch new file mode 100644 index 0000000000000000000000000000000000000000..cfcac6c8d6c58244bfa8ec147488b012f5cbd644 --- /dev/null +++ b/0006-do-not-pause-container-when-copy.patch @@ -0,0 +1,1918 @@ +From b69da83db290057dde5dbe34e153fb0895e456e2 Mon Sep 17 00:00:00 2001 +From: WangFengTu +Date: Tue, 29 Dec 2020 10:16:13 +0800 +Subject: [PATCH 6/9] do not pause container when copy + +and use libarchive to do unpack/tar instead of +execute tar command. Once not pause container, +we need to chroot to container's rootfs first +to avoid symlink attrack when copy. + +Signed-off-by: WangFengTu +--- + src/cmd/isula/main.c | 20 + + src/cmd/isula/stream/cp.c | 3 +- + .../executor/container_cb/execution_stream.c | 133 ++-- + src/daemon/modules/image/oci/oci_load.c | 8 +- + .../graphdriver/devmapper/driver_devmapper.c | 6 +- + .../graphdriver/overlay2/driver_overlay2.c | 6 +- + src/utils/tar/isulad_tar.c | 405 +----------- + src/utils/tar/isulad_tar.h | 10 +- + src/utils/tar/util_archive.c | 611 ++++++++++++++++-- + src/utils/tar/util_archive.h | 15 +- + 10 files changed, 679 insertions(+), 538 deletions(-) + +diff --git a/src/cmd/isula/main.c b/src/cmd/isula/main.c +index d4a66695..a69df5d5 100644 +--- a/src/cmd/isula/main.c ++++ b/src/cmd/isula/main.c +@@ -14,6 +14,7 @@ + ******************************************************************************/ + + #include ++#include + + #include "isula_commands.h" + #include "create.h" +@@ -202,8 +203,27 @@ struct command g_commands[] = { + { NULL, false, NULL, NULL, NULL, NULL } // End of the list + }; + ++static int set_locale() ++{ ++ int ret = 0; ++ ++ /* Change from the standard (C) to en_US.UTF-8 locale, so libarchive can handle filename conversions.*/ ++ if (setlocale(LC_CTYPE, "en_US.UTF-8") == NULL) { ++ fprintf(stderr, "Could not set locale to en_US.UTF-8:%s", strerror(errno)); ++ ret = -1; ++ goto out; ++ } ++ ++out: ++ return ret; ++} ++ + int main(int argc, char **argv) + { ++ if (set_locale() != 0) { ++ exit(ECOMMON); ++ } ++ + if (connect_client_ops_init()) { + return ECOMMON; + } +diff --git a/src/cmd/isula/stream/cp.c b/src/cmd/isula/stream/cp.c +index 4ebca2b3..e954ed3d 100644 +--- a/src/cmd/isula/stream/cp.c ++++ b/src/cmd/isula/stream/cp.c +@@ -27,6 +27,7 @@ + #include "path.h" + #include "isula_connect.h" + #include "isulad_tar.h" ++#include "util_archive.h" + #include "command_parser.h" + #include "connect.h" + #include "io_wrapper.h" +@@ -124,7 +125,7 @@ static int client_copy_from_container(const struct client_arguments *args, const + srcinfo->path = util_strdup_s(srcpath); + srcinfo->isdir = S_ISDIR(response->stat->mode); + +- nret = archive_copy_to(&response->reader, false, srcinfo, resolved, &archive_err); ++ nret = archive_copy_to(&response->reader, srcinfo, resolved, &archive_err); + if (nret != 0) { + ret = nret; + } +diff --git a/src/daemon/executor/container_cb/execution_stream.c b/src/daemon/executor/container_cb/execution_stream.c +index fde5d41d..7d165fb9 100644 +--- a/src/daemon/executor/container_cb/execution_stream.c ++++ b/src/daemon/executor/container_cb/execution_stream.c +@@ -46,6 +46,7 @@ + #include "image_api.h" + #include "path.h" + #include "isulad_tar.h" ++#include "util_archive.h" + #include "container_api.h" + #include "error.h" + #include "isula_libutils/logger_json_file.h" +@@ -384,9 +385,18 @@ out: + return ret; + } + ++static char *get_tar_path(const char *srcdir, const char *srcbase, const char *container_fs) ++{ ++ if (!util_has_prefix(srcdir, container_fs)) { ++ ERROR("srcdir %s does not contain %s", srcdir, container_fs); ++ return NULL; ++ } ++ return util_path_join(srcdir + strlen(container_fs), srcbase); ++} ++ + static int archive_and_send_copy_data(const stream_func_wrapper *stream, + struct isulad_copy_from_container_response *response, const char *resolvedpath, +- const char *abspath) ++ const char *abspath, const char *container_fs) + { + int ret = -1; + int nret; +@@ -399,6 +409,7 @@ static int archive_and_send_copy_data(const stream_func_wrapper *stream, + char *buf = NULL; + char cleaned[PATH_MAX + 2] = { 0 }; + struct io_read_wrapper reader = { 0 }; ++ char *tar_path = NULL; + + buf = util_common_calloc_s(buf_len); + if (buf == NULL) { +@@ -422,7 +433,15 @@ static int archive_and_send_copy_data(const stream_func_wrapper *stream, + ERROR("split %s failed", abspath); + goto cleanup; + } +- nret = archive_path(srcdir, srcbase, absbase, false, &reader); ++ ++ tar_path = get_tar_path(srcdir, srcbase, container_fs); ++ if (tar_path == NULL) { ++ goto cleanup; ++ } ++ ++ DEBUG("archive chroot tar stream container_fs(%s) srcdir(%s) relative(%s) srcbase(%s) absbase(%s)", ++ container_fs, srcdir, tar_path, srcbase, absbase); ++ nret = archive_chroot_tar_stream(container_fs, tar_path, srcbase, absbase, &reader); + if (nret != 0) { + ERROR("Archive %s failed", resolvedpath); + goto cleanup; +@@ -445,6 +464,7 @@ static int archive_and_send_copy_data(const stream_func_wrapper *stream, + + ret = 0; + cleanup: ++ free(tar_path); + free(buf); + free(srcdir); + free(srcbase); +@@ -583,58 +603,6 @@ static container_path_stat *resolve_and_stat_path(const char *rootpath, const ch + return stat; + } + +-static int pause_container(const container_t *cont) +-{ +- int ret = 0; +- rt_pause_params_t params = { 0 }; +- const char *id = cont->common_config->id; +- +- params.rootpath = cont->root_path; +- params.state = cont->state_path; +- if (runtime_pause(id, cont->runtime, ¶ms)) { +- ERROR("Failed to pause container:%s", id); +- ret = -1; +- goto out; +- } +- +- container_state_set_paused(cont->state); +- +- if (container_state_to_disk(cont)) { +- ERROR("Failed to save container \"%s\" to disk", id); +- ret = -1; +- goto out; +- } +- +-out: +- return ret; +-} +- +-static int resume_container(const container_t *cont) +-{ +- int ret = 0; +- rt_resume_params_t params = { 0 }; +- const char *id = cont->common_config->id; +- +- params.rootpath = cont->root_path; +- params.state = cont->state_path; +- if (runtime_resume(id, cont->runtime, ¶ms)) { +- ERROR("Failed to resume container:%s", id); +- ret = -1; +- goto out; +- } +- +- container_state_reset_paused(cont->state); +- +- if (container_state_to_disk(cont)) { +- ERROR("Failed to save container \"%s\" to disk", id); +- ret = -1; +- goto out; +- } +- +-out: +- return ret; +-} +- + static int copy_from_container_cb(const struct isulad_copy_from_container_request *request, + const stream_func_wrapper *stream, char **err) + { +@@ -645,7 +613,6 @@ static int copy_from_container_cb(const struct isulad_copy_from_container_reques + container_path_stat *stat = NULL; + container_t *cont = NULL; + struct isulad_copy_from_container_response *response = NULL; +- bool need_pause = false; + + DAEMON_CLEAR_ERRMSG(); + if (request == NULL || stream == NULL || err == NULL) { +@@ -665,19 +632,10 @@ static int copy_from_container_cb(const struct isulad_copy_from_container_reques + goto unlock_container; + } + +- need_pause = container_is_running(cont->state) && !container_is_paused(cont->state); +- if (need_pause) { +- if (pause_container(cont) != 0) { +- ERROR("can't copy to a container which is cannot be paused"); +- isulad_set_error_message("can't copy to a container which is cannot be paused"); +- goto unlock_container; +- } +- } +- + nret = im_mount_container_rootfs(cont->common_config->image_type, cont->common_config->image, + cont->common_config->id); + if (nret != 0) { +- goto unpause_container; ++ goto unlock_container; + } + + stat = resolve_and_stat_path(cont->common_config->base_fs, request->srcpath, &resolvedpath, &abspath); +@@ -692,7 +650,7 @@ static int copy_from_container_cb(const struct isulad_copy_from_container_reques + goto cleanup_rootfs; + } + +- nret = archive_and_send_copy_data(stream, response, resolvedpath, abspath); ++ nret = archive_and_send_copy_data(stream, response, resolvedpath, abspath, cont->common_config->base_fs); + if (nret < 0) { + ERROR("Failed to send archive data"); + goto cleanup_rootfs; +@@ -705,10 +663,6 @@ cleanup_rootfs: + cont->common_config->id) != 0) { + WARN("Can not umount rootfs of container: %s", cont->common_config->id); + } +-unpause_container: +- if (need_pause && resume_container(cont) != 0) { +- ERROR("can't resume container which has been paused before copy"); +- } + unlock_container: + container_unlock(cont); + container_unref(cont); +@@ -777,15 +731,16 @@ static ssize_t extract_stream_to_io_read(void *content, void *buf, size_t buf_le + return (ssize_t)(copy.data_len); + } + +-int read_and_extract_archive(stream_func_wrapper *stream, const char *resolved_path, const char *transform) ++static int read_and_extract_archive(stream_func_wrapper *stream, const char *container_fs, ++ const char *dstdir_in_container, const char *src_rebase, ++ const char *dst_rebase) + { + int ret = -1; + char *err = NULL; + struct io_read_wrapper content = { 0 }; +- + content.context = stream; + content.read = extract_stream_to_io_read; +- ret = archive_untar(&content, false, resolved_path, transform, &err); ++ ret = archive_chroot_untar_stream(&content, container_fs, dstdir_in_container, src_rebase, dst_rebase, &err); + if (ret != 0) { + ERROR("Can not untar to container: %s", (err != NULL) ? err : "unknown"); + isulad_set_error_message("Can not untar to container: %s", (err != NULL) ? err : "unknown"); +@@ -795,7 +750,7 @@ int read_and_extract_archive(stream_func_wrapper *stream, const char *resolved_p + } + + static char *copy_to_container_get_dstdir(const container_t *cont, const container_copy_to_request *request, +- char **transform) ++ char **src_base, char **dst_base) + { + char *dstdir = NULL; + char *error = NULL; +@@ -836,7 +791,7 @@ static char *copy_to_container_get_dstdir(const container_t *cont, const contain + srcinfo.path = request->src_path; + srcinfo.rebase_name = request->src_rebase_name; + +- dstdir = prepare_archive_copy(&srcinfo, dstinfo, transform, &error); ++ dstdir = prepare_archive_copy(&srcinfo, dstinfo, src_base, dst_base, &error); + if (dstdir == NULL) { + if (error == NULL) { + ERROR("Can not prepare archive copy"); +@@ -930,9 +885,9 @@ static int copy_to_container_cb(const container_copy_to_request *request, stream + char *resolvedpath = NULL; + char *abspath = NULL; + char *dstdir = NULL; +- char *transform = NULL; ++ char *src_base = NULL; ++ char *dst_base = NULL; + container_t *cont = NULL; +- bool need_pause = false; + + DAEMON_CLEAR_ERRMSG(); + if (request == NULL || stream == NULL || err == NULL) { +@@ -952,22 +907,13 @@ static int copy_to_container_cb(const container_copy_to_request *request, stream + goto unlock_container; + } + +- need_pause = container_is_running(cont->state) && !container_is_paused(cont->state); +- if (need_pause) { +- if (pause_container(cont) != 0) { +- ERROR("can't copy to a container which is cannot be paused"); +- isulad_set_error_message("can't copy to a container which is cannot be paused"); +- goto unlock_container; +- } +- } +- + nret = im_mount_container_rootfs(cont->common_config->image_type, cont->common_config->image, + cont->common_config->id); + if (nret != 0) { +- goto unpause_container; ++ goto unlock_container; + } + +- dstdir = copy_to_container_get_dstdir(cont, request, &transform); ++ dstdir = copy_to_container_get_dstdir(cont, request, &src_base, &dst_base); + if (dstdir == NULL) { + goto cleanup_rootfs; + } +@@ -982,7 +928,8 @@ static int copy_to_container_cb(const container_copy_to_request *request, stream + goto cleanup_rootfs; + } + +- nret = read_and_extract_archive(stream, resolvedpath, transform); ++ nret = read_and_extract_archive(stream, cont->common_config->base_fs, ++ dstdir, src_base, dst_base); + if (nret < 0) { + ERROR("Failed to send archive data"); + goto cleanup_rootfs; +@@ -997,11 +944,6 @@ cleanup_rootfs: + WARN("Can not umount rootfs of container: %s", cont->common_config->id); + } + +-unpause_container: +- if (need_pause && resume_container(cont) != 0) { +- ERROR("can't resume container which has been paused before copy"); +- } +- + unlock_container: + container_unlock(cont); + container_unref(cont); +@@ -1013,7 +955,8 @@ pack_response: + free(resolvedpath); + free(abspath); + free(dstdir); +- free(transform); ++ free(src_base); ++ free(dst_base); + return ret; + } + +diff --git a/src/daemon/modules/image/oci/oci_load.c b/src/daemon/modules/image/oci/oci_load.c +index 80647253..a8eecfe9 100644 +--- a/src/daemon/modules/image/oci/oci_load.c ++++ b/src/daemon/modules/image/oci/oci_load.c +@@ -1061,6 +1061,7 @@ int oci_do_load(const im_load_request *request) + load_image_t *im = NULL; + char *digest = NULL; + char *dstdir = NULL; ++ char *err = NULL; + + if (request == NULL || request->file == NULL) { + ERROR("Invalid input arguments, cannot load image"); +@@ -1082,9 +1083,9 @@ int oci_do_load(const im_load_request *request) + } + + options.whiteout_format = NONE_WHITEOUT_FORMATE; +- if (archive_unpack(&reader, dstdir, &options) != 0) { +- ERROR("Failed to unpack to :%s", dstdir); +- isulad_try_set_error_message("Failed to unpack to :%s", dstdir); ++ if (archive_unpack(&reader, dstdir, &options, &err) != 0) { ++ ERROR("Failed to unpack to %s: %s", dstdir, err); ++ isulad_try_set_error_message("Failed to unpack to %s: %s", dstdir, err); + ret = -1; + goto out; + } +@@ -1167,5 +1168,6 @@ out: + WARN("failed to remove directory %s", dstdir); + } + free(dstdir); ++ free(err); + return ret; + } +diff --git a/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/driver_devmapper.c b/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/driver_devmapper.c +index f2586f0d..e91ffe05 100644 +--- a/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/driver_devmapper.c ++++ b/src/daemon/modules/image/oci/storage/layer_store/graphdriver/devmapper/driver_devmapper.c +@@ -319,6 +319,7 @@ int devmapper_apply_diff(const char *id, const struct graphdriver *driver, const + char *layer_fs = NULL; + int ret = 0; + struct archive_options options = { 0 }; ++ char *err = NULL; + + if (!util_valid_str(id) || driver == NULL || content == NULL) { + ERROR("invalid argument to apply diff with id(%s)", id); +@@ -340,8 +341,8 @@ int devmapper_apply_diff(const char *id, const struct graphdriver *driver, const + } + + options.whiteout_format = REMOVE_WHITEOUT_FORMATE; +- if (archive_unpack(content, layer_fs, &options) != 0) { +- ERROR("devmapper: failed to unpack to :%s", layer_fs); ++ if (archive_unpack(content, layer_fs, &options, &err) != 0) { ++ ERROR("devmapper: failed to unpack to %s: %s", layer_fs, err); + ret = -1; + goto out; + } +@@ -355,6 +356,7 @@ int devmapper_apply_diff(const char *id, const struct graphdriver *driver, const + out: + free_driver_mount_opts(mount_opts); + free(layer_fs); ++ free(err); + return ret; + } + +diff --git a/src/daemon/modules/image/oci/storage/layer_store/graphdriver/overlay2/driver_overlay2.c b/src/daemon/modules/image/oci/storage/layer_store/graphdriver/overlay2/driver_overlay2.c +index 6cdabe54..659d9d52 100644 +--- a/src/daemon/modules/image/oci/storage/layer_store/graphdriver/overlay2/driver_overlay2.c ++++ b/src/daemon/modules/image/oci/storage/layer_store/graphdriver/overlay2/driver_overlay2.c +@@ -1657,6 +1657,7 @@ int overlay2_apply_diff(const char *id, const struct graphdriver *driver, const + char *layer_dir = NULL; + char *layer_diff = NULL; + struct archive_options options = { 0 }; ++ char *err = NULL; + + if (id == NULL || driver == NULL || content == NULL) { + ERROR("invalid argument"); +@@ -1680,14 +1681,15 @@ int overlay2_apply_diff(const char *id, const struct graphdriver *driver, const + + options.whiteout_format = OVERLAY_WHITEOUT_FORMATE; + +- ret = archive_unpack(content, layer_diff, &options); ++ ret = archive_unpack(content, layer_diff, &options, &err); + if (ret != 0) { +- ERROR("Failed to unpack to :%s", layer_diff); ++ ERROR("Failed to unpack to %s: %s", layer_diff, err); + ret = -1; + goto out; + } + + out: ++ free(err); + free(layer_dir); + free(layer_diff); + return ret; +diff --git a/src/utils/tar/isulad_tar.c b/src/utils/tar/isulad_tar.c +index 5edf2ac3..03277373 100644 +--- a/src/utils/tar/isulad_tar.c ++++ b/src/utils/tar/isulad_tar.c +@@ -31,17 +31,7 @@ + #include "isula_libutils/log.h" + #include "error.h" + #include "isula_libutils/json_common.h" +-#include "io_wrapper.h" +-#include "utils_file.h" +-#include "utils_verify.h" +- +-#define TAR_MAX_OPTS 50 +-#define TAR_CMD "tar" +-#define TAR_TRANSFORM_OPT "--transform" +-#define TAR_CREATE_OPT "-c" +-#define TAR_EXACT_OPT "-x" +-#define TAR_CHDIR_OPT "-C" +-#define TAR_GZIP_OPT "-z" ++#include "util_archive.h" + + static void set_char_to_separator(char *p) + { +@@ -126,110 +116,6 @@ int gzip(const char *filename, size_t len) + return status; + } + +-struct archive_context { +- int stdin_fd; +- int stdout_fd; +- int stderr_fd; +- pid_t pid; +-}; +- +-static ssize_t archive_context_read(void *context, void *buf, size_t len) +-{ +- struct archive_context *ctx = (struct archive_context *)context; +- if (ctx == NULL) { +- return -1; +- } +- if (ctx->stdout_fd >= 0) { +- return util_read_nointr(ctx->stdout_fd, buf, len); +- } +- return 0; +-} +- +-static ssize_t archive_context_write(const void *context, const void *buf, size_t len) +-{ +- struct archive_context *ctx = (struct archive_context *)context; +- if (ctx == NULL) { +- return -1; +- } +- if (ctx->stdin_fd >= 0) { +- return util_write_nointr(ctx->stdin_fd, buf, len); +- } +- return 0; +-} +- +-static int close_wait_pid(struct archive_context *ctx, int *status) +-{ +- int ret = 0; +- +- // close stdin and stdout first, this will make sure the process of tar exit. +- if (ctx->stdin_fd >= 0) { +- close(ctx->stdin_fd); +- } +- +- if (ctx->stdout_fd >= 0) { +- close(ctx->stdout_fd); +- } +- +- if (ctx->pid > 0) { +- if (waitpid(ctx->pid, status, 0) != ctx->pid) { +- ERROR("Failed to wait pid %u", ctx->pid); +- ret = -1; +- } +- } +- +- return ret; +-} +- +-static int archive_context_close(void *context, char **err) +-{ +- int ret = 0; +- int status = 0; +- char *reason = NULL; +- ssize_t size_read = 0; +- char buffer[BUFSIZ + 1] = { 0 }; +- struct archive_context *ctx = (struct archive_context *)context; +- char *marshaled = NULL; +- +- if (ctx == NULL) { +- return 0; +- } +- +- ret = close_wait_pid(ctx, &status); +- +- if (WIFSIGNALED((unsigned int)status)) { +- status = WTERMSIG(status); +- reason = "signaled"; +- } else if (WIFEXITED(status)) { +- status = WEXITSTATUS(status); +- reason = "exited"; +- } else { +- reason = "unknown"; +- } +- +- if (ctx->stderr_fd >= 0) { +- size_read = util_read_nointr(ctx->stderr_fd, buffer, BUFSIZ); +- if (size_read > 0) { +- reason = buffer; +- marshaled = util_marshal_string(buffer); +- if (marshaled == NULL) { +- ERROR("Can not marshal json buffer: %s", buffer); +- } else { +- reason = marshaled; +- } +- } +- close(ctx->stderr_fd); +- } +- +- if (size_read > 0 || status != 0) { +- format_errorf(err, "tar exited with status %d: %s", status, reason); +- ret = -1; +- } +- +- free(marshaled); +- free(ctx); +- return ret; +-} +- + static int get_rebase_name(const char *path, const char *real_path, char **resolved_path, char **rebase_name) + { + int nret; +@@ -502,50 +388,8 @@ static bool asserts_directory(const char *path) + return util_has_trailing_path_separator(path) || util_specify_current_dir(path); + } + +-static char *format_transform_of_tar(const char *srcbase, const char *dstbase) +-{ +- char *transform = NULL; +- const char *src_escaped = srcbase; +- const char *dst_escaped = dstbase; +- int nret; +- size_t len; +- +- if (srcbase == NULL || dstbase == NULL) { +- return NULL; +- } +- +- // escape "/" by "." to avoid generating leading / in tar archive which is dangerous to host when untar. +- // this means tar or untar with leading / is forbidden and may got error, take care of this when coding. +- if (strcmp(srcbase, "/") == 0) { +- src_escaped = "."; +- } +- +- if (strcmp(dstbase, "/") == 0) { +- dst_escaped = "."; +- } +- +- len = strlen(src_escaped) + strlen(dst_escaped) + 5; +- if (len > PATH_MAX) { +- ERROR("Invalid path length"); +- return NULL; +- } +- +- transform = util_common_calloc_s(len); +- if (transform == NULL) { +- ERROR("Out of memory"); +- return NULL; +- } +- nret = snprintf(transform, len, "s/%s/%s/", src_escaped, dst_escaped); +- if (nret < 0 || (size_t)nret >= len) { +- ERROR("Failed to print string"); +- free(transform); +- return NULL; +- } +- return transform; +-} +- + char *prepare_archive_copy(const struct archive_copy_info *srcinfo, const struct archive_copy_info *dstinfo, +- char **transform, char **err) ++ char **src_base, char **dst_base, char **err) + { + char *dstdir = NULL; + char *srcbase = NULL; +@@ -573,7 +417,8 @@ char *prepare_archive_copy(const struct archive_copy_info *srcinfo, const struct + free(srcbase); + srcbase = util_strdup_s(srcinfo->rebase_name); + } +- *transform = format_transform_of_tar(srcbase, dstbase); ++ *src_base = util_strdup_s(srcbase); ++ *dst_base = util_strdup_s(dstbase); + } else if (srcinfo->isdir) { + // dst does not exist and src is a directory, untar the content to parent of dest, + // and rename basename of src name to dest's basename. +@@ -581,7 +426,8 @@ char *prepare_archive_copy(const struct archive_copy_info *srcinfo, const struct + free(srcbase); + srcbase = util_strdup_s(srcinfo->rebase_name); + } +- *transform = format_transform_of_tar(srcbase, dstbase); ++ *src_base = util_strdup_s(srcbase); ++ *dst_base = util_strdup_s(dstbase); + } else if (asserts_directory(dstinfo->path)) { + // dst does not exist and is want to be created as a directory, but src is not a directory, report error. + format_errorf(err, "no such directory, can not copy file"); +@@ -594,7 +440,8 @@ char *prepare_archive_copy(const struct archive_copy_info *srcinfo, const struct + free(srcbase); + srcbase = util_strdup_s(srcinfo->rebase_name); + } +- *transform = format_transform_of_tar(srcbase, dstbase); ++ *src_base = util_strdup_s(srcbase); ++ *dst_base = util_strdup_s(dstbase); + } + + cleanup: +@@ -603,125 +450,14 @@ cleanup: + return dstdir; + } + +-static void close_pipe_fd(int pipe_fd[]) +-{ +- if (pipe_fd[0] != -1) { +- close(pipe_fd[0]); +- pipe_fd[0] = -1; +- } +- if (pipe_fd[1] != -1) { +- close(pipe_fd[1]); +- pipe_fd[1] = -1; +- } +-} +- +-int archive_untar(const struct io_read_wrapper *content, bool compression, const char *dstdir, const char *transform, +- char **err) +-{ +- int stdin_pipe[2] = { -1, -1 }; +- int stderr_pipe[2] = { -1, -1 }; +- int ret = -1; +- int cret = 0; +- pid_t pid; +- struct archive_context *ctx = NULL; +- char *buf = NULL; +- size_t buf_len = ARCHIVE_BLOCK_SIZE; +- ssize_t read_len; +- const char *params[TAR_MAX_OPTS] = { NULL }; +- +- buf = util_common_calloc_s(buf_len); +- if (buf == NULL) { +- ERROR("Out of memory"); +- return -1; +- } +- +- if (pipe(stderr_pipe) != 0) { +- ERROR("Failed to create pipe: %s", strerror(errno)); +- goto cleanup; +- } +- if (pipe(stdin_pipe) != 0) { +- ERROR("Failed to create pipe: %s", strerror(errno)); +- goto cleanup; +- } +- +- pid = fork(); +- if (pid == (pid_t) -1) { +- ERROR("Failed to fork: %s", strerror(errno)); +- goto cleanup; +- } +- +- if (pid == (pid_t)0) { +- int i = 0; +- // child process, dup2 stderr[1] to stderr, stdout[0] to stdin. +- close(stderr_pipe[0]); +- dup2(stderr_pipe[1], 2); +- close(stdin_pipe[1]); +- dup2(stdin_pipe[0], 0); +- +- params[i++] = TAR_CMD; +- params[i++] = TAR_EXACT_OPT; +- if (compression) { +- params[i++] = TAR_GZIP_OPT; +- } +- params[i++] = TAR_CHDIR_OPT; +- params[i++] = dstdir; +- if (transform != NULL) { +- params[i++] = TAR_TRANSFORM_OPT; +- params[i++] = transform; +- } +- +- execvp(TAR_CMD, (char * const *)params); +- +- fprintf(stderr, "Failed to exec tar: %s", strerror(errno)); +- exit(EXIT_FAILURE); +- } +- +- close(stderr_pipe[1]); +- stderr_pipe[1] = -1; +- close(stdin_pipe[0]); +- stdin_pipe[0] = -1; +- +- ctx = util_common_calloc_s(sizeof(struct archive_context)); +- if (ctx == NULL) { +- goto cleanup; +- } +- +- ctx->pid = pid; +- ctx->stdin_fd = stdin_pipe[1]; +- stdin_pipe[1] = -1; +- ctx->stdout_fd = -1; +- ctx->stderr_fd = stderr_pipe[0]; +- stderr_pipe[0] = -1; +- +- read_len = content->read(content->context, buf, buf_len); +- while (read_len > 0) { +- ssize_t writed_len = archive_context_write(ctx, buf, (size_t)read_len); +- if (writed_len < 0) { +- DEBUG("Tar may exited: %s", strerror(errno)); +- break; +- } +- read_len = content->read(content->context, buf, buf_len); +- } +- +- ret = 0; +- +-cleanup: +- free(buf); +- cret = archive_context_close(ctx, err); +- ret = (cret != 0) ? cret : ret; +- close_pipe_fd(stderr_pipe); +- close_pipe_fd(stdin_pipe); +- +- return ret; +-} +- +-int archive_copy_to(const struct io_read_wrapper *content, bool compression, const struct archive_copy_info *srcinfo, ++int archive_copy_to(const struct io_read_wrapper *content, const struct archive_copy_info *srcinfo, + const char *dstpath, char **err) + { + int ret = -1; + struct archive_copy_info *dstinfo = NULL; + char *dstdir = NULL; +- char *transform = NULL; ++ char *src_base = NULL; ++ char *dst_base = NULL; + + dstinfo = copy_info_destination_path(dstpath, err); + if (dstinfo == NULL) { +@@ -729,128 +465,23 @@ int archive_copy_to(const struct io_read_wrapper *content, bool compression, con + return -1; + } + +- dstdir = prepare_archive_copy(srcinfo, dstinfo, &transform, err); ++ dstdir = prepare_archive_copy(srcinfo, dstinfo, &src_base, &dst_base, err); + if (dstdir == NULL) { + ERROR("Can not prepare archive copy"); + goto cleanup; + } + +- ret = archive_untar(content, compression, dstdir, transform, err); ++ ret = archive_chroot_untar_stream(content, dstdir, ".", src_base, dst_base, err); + + cleanup: + free_archive_copy_info(dstinfo); + free(dstdir); +- free(transform); +- return ret; +-} +- +-static void close_archive_pipes_fd(int *pipes, size_t pipe_size) +-{ +- size_t i = 0; +- +- for (i = 0; i < pipe_size; i++) { +- if (pipes[i] >= 0) { +- close(pipes[i]); +- pipes[i] = -1; +- } +- } +-} +- +-/* +- * Archive file or directory. +- * param src : file or directory to compression. +- * param compression : using gzip compression or not +- * param exclude_base : exclude source basename in the archived file or not +- * return : zero if archive success, non-zero if not. +- */ +-int archive_path(const char *srcdir, const char *srcbase, const char *rebase_name, bool compression, +- struct io_read_wrapper *archive_reader) +-{ +- int stderr_pipe[2] = { -1, -1 }; +- int stdout_pipe[2] = { -1, -1 }; +- int ret = -1; +- pid_t pid; +- struct archive_context *ctx = NULL; +- char *transform = NULL; +- const char *params[TAR_MAX_OPTS] = { NULL }; +- +- transform = format_transform_of_tar(srcbase, rebase_name); +- +- if (pipe(stderr_pipe) != 0) { +- ERROR("Failed to create pipe: %s", strerror(errno)); +- goto free_out; +- } +- if (pipe(stdout_pipe) != 0) { +- ERROR("Failed to create pipe: %s", strerror(errno)); +- goto free_out; +- } +- +- pid = fork(); +- if (pid == (pid_t) -1) { +- ERROR("Failed to fork: %s", strerror(errno)); +- goto free_out; +- } +- +- if (pid == (pid_t)0) { +- int i = 0; +- // child process, dup2 stderr[1] to stderr, stdout[1] to stdout. +- close(stderr_pipe[0]); +- close(stdout_pipe[0]); +- dup2(stderr_pipe[1], 2); +- dup2(stdout_pipe[1], 1); +- +- params[i++] = TAR_CMD; +- params[i++] = TAR_CREATE_OPT; +- if (compression) { +- params[i++] = TAR_GZIP_OPT; +- } +- params[i++] = TAR_CHDIR_OPT; +- params[i++] = srcdir; +- if (transform != NULL) { +- params[i++] = TAR_TRANSFORM_OPT; +- params[i++] = transform; +- } +- params[i++] = srcbase; +- +- execvp(TAR_CMD, (char * const *)params); +- +- fprintf(stderr, "Failed to exec tar: %s", strerror(errno)); +- exit(EXIT_FAILURE); +- } +- +- close(stderr_pipe[1]); +- stderr_pipe[1] = -1; +- close(stdout_pipe[1]); +- stdout_pipe[1] = -1; +- +- ctx = util_common_calloc_s(sizeof(struct archive_context)); +- if (ctx == NULL) { +- goto free_out; +- } +- +- ctx->stdin_fd = -1; +- ctx->stdout_fd = stdout_pipe[0]; +- stdout_pipe[0] = -1; +- ctx->stderr_fd = stderr_pipe[0]; +- stderr_pipe[0] = -1; +- ctx->pid = pid; +- +- archive_reader->close = archive_context_close; +- archive_reader->context = ctx; +- ctx = NULL; +- archive_reader->read = archive_context_read; +- +- ret = 0; +-free_out: +- free(transform); +- close_archive_pipes_fd(stderr_pipe, 2); +- close_archive_pipes_fd(stdout_pipe, 2); +- free(ctx); +- ++ free(src_base); ++ free(dst_base); + return ret; + } + +-int tar_resource_rebase(const char *path, const char *rebase, struct io_read_wrapper *archive_reader, char **err) ++static int tar_resource_rebase(const char *path, const char *rebase, struct io_read_wrapper *archive_reader, char **err) + { + int ret = -1; + int nret; +@@ -868,8 +499,8 @@ int tar_resource_rebase(const char *path, const char *rebase, struct io_read_wra + goto cleanup; + } + +- DEBUG("Copying %s from %s", srcbase, srcdir); +- nret = archive_path(srcdir, srcbase, rebase, false, archive_reader); ++ DEBUG("chroot tar stream srcdir(%s) srcbase(%s) rebase(%s)", srcdir, srcbase, rebase); ++ nret = archive_chroot_tar_stream(srcdir, srcbase, srcbase, rebase, archive_reader); + if (nret < 0) { + ERROR("Can not archive path: %s", path); + goto cleanup; +diff --git a/src/utils/tar/isulad_tar.h b/src/utils/tar/isulad_tar.h +index e2b78463..c773fe9b 100644 +--- a/src/utils/tar/isulad_tar.h ++++ b/src/utils/tar/isulad_tar.h +@@ -57,19 +57,13 @@ int gzip(const char *filename, size_t len); + struct archive_copy_info *copy_info_source_path(const char *path, bool follow_link, char **err); + + char *prepare_archive_copy(const struct archive_copy_info *srcinfo, const struct archive_copy_info *dstinfo, +- char **transform, char **err); ++ char **src_base, char **dst_base, char **err); + + int tar_resource(const struct archive_copy_info *info, struct io_read_wrapper *archive_reader, char **err); + +-int archive_untar(const struct io_read_wrapper *content, bool compression, const char *dstdir, const char *transform, +- char **err); +- +-int archive_copy_to(const struct io_read_wrapper *content, bool compression, const struct archive_copy_info *srcinfo, ++int archive_copy_to(const struct io_read_wrapper *content, const struct archive_copy_info *srcinfo, + const char *dstpath, char **err); + +-int archive_path(const char *srcdir, const char *srcbase, const char *rebase_name, bool compression, +- struct io_read_wrapper *archive_reader); +- + #ifdef __cplusplus + } + #endif +diff --git a/src/utils/tar/util_archive.c b/src/utils/tar/util_archive.c +index 234e661e..7a28286a 100644 +--- a/src/utils/tar/util_archive.c ++++ b/src/utils/tar/util_archive.c +@@ -26,6 +26,7 @@ + #include + #include + #include ++#include + + #include "stdbool.h" + #include "utils.h" +@@ -33,11 +34,14 @@ + #include "io_wrapper.h" + #include "utils_file.h" + #include "map.h" ++#include "path.h" ++#include "error.h" + + struct archive; + struct archive_entry; + + #define ARCHIVE_READ_BUFFER_SIZE (10 * 1024) ++#define ARCHIVE_WRITE_BUFFER_SIZE (10 * 1024) + #define TAR_DEFAULT_MODE 0600 + #define TAR_DEFAULT_FLAG (O_WRONLY | O_CREAT | O_TRUNC) + +@@ -45,6 +49,13 @@ struct archive_entry; + #define WHITEOUT_META_PREFIX ".wh..wh." + #define WHITEOUT_OPAQUEDIR ".wh..wh..opq" + ++struct archive_context { ++ int stdin_fd; ++ int stdout_fd; ++ int stderr_fd; ++ pid_t pid; ++}; ++ + struct archive_content_data { + const struct io_read_wrapper *content; + char buff[ARCHIVE_READ_BUFFER_SIZE]; +@@ -286,8 +297,104 @@ static whiteout_convert_call_back_t get_whiteout_convert_cb(whiteout_format_type + return NULL; + } + +-int archive_unpack_handler(const struct io_read_wrapper *content, const char *dstdir, +- const struct archive_options *options) ++static char *to_relative_path(const char *path) ++{ ++ char *dst_path = NULL; ++ ++ if (path != NULL && path[0] == '/') { ++ if (strcmp(path, "/") == 0) { ++ dst_path = util_strdup_s("."); ++ } else { ++ dst_path = util_strdup_s(path + 1); ++ } ++ } else { ++ dst_path = util_strdup_s(path); ++ } ++ ++ return dst_path; ++} ++ ++static int rebase_pathname(struct archive_entry *entry, const char *src_base, const char *dst_base) ++{ ++ int nret = 0; ++ const char *pathname = archive_entry_pathname(entry); ++ char path[PATH_MAX] = { 0 }; ++ ++ if (src_base == NULL || dst_base == NULL || !util_has_prefix(pathname, src_base)) { ++ return 0; ++ } ++ ++ nret = snprintf(path, sizeof(path), "%s%s", dst_base, pathname + strlen(src_base)); ++ if (nret < 0 || (size_t)nret >= sizeof(path)) { ++ ERROR("snprintf %s%s failed", dst_base, pathname + strlen(src_base)); ++ fprintf(stderr, "snprintf %s%s failed", dst_base, pathname + strlen(src_base)); ++ return -1; ++ } ++ ++ archive_entry_set_pathname(entry, path); ++ ++ return 0; ++} ++ ++static char *update_entry_for_pathname(struct archive_entry *entry, const char *src_base, const char *dst_base) ++{ ++ char *dst_path = NULL; ++ const char *pathname = NULL; ++ ++ if (rebase_pathname(entry, src_base, dst_base) != 0) { ++ return NULL; ++ } ++ ++ pathname = archive_entry_pathname(entry); ++ if (pathname == NULL) { ++ ERROR("Failed to get archive entry path name"); ++ fprintf(stderr, "Failed to get archive entry path name"); ++ return NULL; ++ } ++ ++ // if path in archive is absolute, we need to translate it to relative because ++ // libarchive can not support absolute path when unpack ++ dst_path = to_relative_path(pathname); ++ if (dst_path == NULL) { ++ ERROR("translate %s to relative path failed", pathname); ++ fprintf(stderr, "translate %s to relative path failed", pathname); ++ goto out; ++ } ++ ++ archive_entry_set_pathname(entry, dst_path); ++out: ++ ++ return dst_path; ++} ++ ++static int rebase_hardlink(struct archive_entry *entry, const char *src_base, const char *dst_base) ++{ ++ int nret = 0; ++ const char *linkname = NULL; ++ char path[PATH_MAX] = { 0 }; ++ ++ linkname = archive_entry_hardlink(entry); ++ if (linkname == NULL) { ++ return 0; ++ } ++ ++ if (src_base == NULL || dst_base == NULL || !util_has_prefix(linkname, src_base)) { ++ return 0; ++ } ++ ++ nret = snprintf(path, sizeof(path), "%s%s", dst_base, linkname + strlen(src_base)); ++ if (nret < 0 || (size_t)nret >= sizeof(path)) { ++ ERROR("snprintf %s%s failed", dst_base, linkname + strlen(src_base)); ++ fprintf(stderr, "snprintf %s%s failed", dst_base, linkname + strlen(src_base)); ++ return -1; ++ } ++ ++ archive_entry_set_hardlink(entry, path); ++ ++ return 0; ++} ++ ++int archive_unpack_handler(const struct io_read_wrapper *content, const struct archive_options *options) + { + int ret = 0; + struct archive *a = NULL; +@@ -302,6 +409,7 @@ int archive_unpack_handler(const struct io_read_wrapper *content, const char *ds + unpacked_path_map = map_new(MAP_STR_BOOL, MAP_DEFAULT_CMP_FUNC, MAP_DEFAULT_FREE_FUNC); + if (unpacked_path_map == NULL) { + ERROR("Out of memory"); ++ fprintf(stderr, "Out of memory"); + ret = -1; + goto out; + } +@@ -309,6 +417,7 @@ int archive_unpack_handler(const struct io_read_wrapper *content, const char *ds + mydata = util_common_calloc_s(sizeof(struct archive_content_data)); + if (mydata == NULL) { + ERROR("Memory out"); ++ fprintf(stderr, "Memory out"); + ret = -1; + goto out; + } +@@ -327,6 +436,7 @@ int archive_unpack_handler(const struct io_read_wrapper *content, const char *ds + a = archive_read_new(); + if (a == NULL) { + ERROR("archive read new failed"); ++ fprintf(stderr, "archive read new failed"); + ret = -1; + goto out; + } +@@ -336,6 +446,7 @@ int archive_unpack_handler(const struct io_read_wrapper *content, const char *ds + ext = archive_write_disk_new(); + if (ext == NULL) { + ERROR("archive write disk new failed"); ++ fprintf(stderr, "archive write disk new failed"); + ret = -1; + goto out; + } +@@ -345,6 +456,7 @@ int archive_unpack_handler(const struct io_read_wrapper *content, const char *ds + ret = archive_read_open(a, mydata, NULL, read_content, NULL); + if (ret != 0) { + SYSERROR("Failed to open archive"); ++ fprintf(stderr, "Failed to open archive: %s", strerror(errno)); + ret = -1; + goto out; + } +@@ -354,7 +466,6 @@ int archive_unpack_handler(const struct io_read_wrapper *content, const char *ds + for (;;) { + free(dst_path); + dst_path = NULL; +- + ret = archive_read_next_header(a, &entry); + + if (ret == ARCHIVE_EOF) { +@@ -363,20 +474,23 @@ int archive_unpack_handler(const struct io_read_wrapper *content, const char *ds + + if (ret != ARCHIVE_OK) { + ERROR("Warning reading tar header: %s", archive_error_string(a)); ++ fprintf(stderr, "Warning reading tar header: %s", archive_error_string(a)); + ret = -1; + goto out; + } + +- const char *pathname = archive_entry_pathname(entry); +- if (pathname == NULL) { +- ERROR("Failed to get archive entry path name"); ++ dst_path = update_entry_for_pathname(entry, options->src_base, options->dst_base); ++ if (dst_path == NULL) { ++ ERROR("Failed to update pathname"); ++ fprintf(stderr, "Failed to update pathname"); + ret = -1; + goto out; + } + +- dst_path = util_path_join(dstdir, pathname); +- if (dst_path == NULL) { +- ERROR("Failed to get archive entry dst path %s/%s", dstdir, pathname); ++ ret = rebase_hardlink(entry, options->src_base, options->dst_base); ++ if (ret != 0) { ++ ERROR("Failed to rebase hardlink"); ++ fprintf(stderr, "Failed to rebase hardlink"); + ret = -1; + goto out; + } +@@ -385,22 +499,17 @@ int archive_unpack_handler(const struct io_read_wrapper *content, const char *ds + continue; + } + +- // if path in archive is absolute, we need to translate it to relative because +- // libarchive can not support absolute path when unpack +- pathname = archive_entry_pathname(entry); +- if (pathname != NULL && pathname[0] == '/') { +- archive_entry_set_pathname(entry, pathname + 1); +- } +- + ret = archive_write_header(ext, entry); + if (ret != ARCHIVE_OK) { + ERROR("Fail to handle tar header: %s", archive_error_string(ext)); ++ fprintf(stderr, "Fail to handle tar header: %s", archive_error_string(ext)); + ret = -1; + goto out; + } else if (archive_entry_size(entry) > 0) { + ret = copy_data(a, ext); + if (ret != ARCHIVE_OK) { + ERROR("Failed to do copy tar data: %s", archive_error_string(ext)); ++ fprintf(stderr, "Failed to do copy tar data: %s", archive_error_string(ext)); + ret = -1; + goto out; + } +@@ -408,6 +517,7 @@ int archive_unpack_handler(const struct io_read_wrapper *content, const char *ds + ret = archive_write_finish_entry(ext); + if (ret != ARCHIVE_OK) { + ERROR("Failed to freeing archive entry: %s\n", archive_error_string(ext)); ++ fprintf(stderr, "Failed to freeing archive entry: %s\n", archive_error_string(ext)); + ret = -1; + goto out; + } +@@ -415,6 +525,7 @@ int archive_unpack_handler(const struct io_read_wrapper *content, const char *ds + bool b = true; + if (!map_replace(unpacked_path_map, (void *)dst_path, (void *)(&b))) { + ERROR("Failed to replace unpacked path map element"); ++ fprintf(stderr, "Failed to replace unpacked path map element"); + ret = -1; + goto out; + } +@@ -433,11 +544,32 @@ out: + return ret; + } + +-int archive_unpack(const struct io_read_wrapper *content, const char *dstdir, const struct archive_options *options) ++static void close_archive_pipes_fd(int *pipes, size_t pipe_size) ++{ ++ size_t i = 0; ++ ++ for (i = 0; i < pipe_size; i++) { ++ if (pipes[i] >= 0) { ++ close(pipes[i]); ++ pipes[i] = -1; ++ } ++ } ++} ++ ++int archive_unpack(const struct io_read_wrapper *content, const char *dstdir, const struct archive_options *options, ++ char **errmsg) + { + int ret = 0; + pid_t pid = -1; +- int keepfds[] = { -1, -1 }; ++ int keepfds[] = { -1, -1, -1 }; ++ int pipe_stderr[2] = { -1, -1 }; ++ char errbuf[BUFSIZ] = { 0 }; ++ ++ if (pipe2(pipe_stderr, O_CLOEXEC) != 0) { ++ ERROR("Failed to create pipe"); ++ ret = -1; ++ goto cleanup; ++ } + + pid = fork(); + if (pid == (pid_t) -1) { +@@ -448,26 +580,37 @@ int archive_unpack(const struct io_read_wrapper *content, const char *dstdir, co + if (pid == (pid_t)0) { + keepfds[0] = isula_libutils_get_log_fd(); + keepfds[1] = *(int *)(content->context); +- ret = util_check_inherited_exclude_fds(true, keepfds, 2); ++ keepfds[2] = pipe_stderr[1]; ++ ret = util_check_inherited_exclude_fds(true, keepfds, 3); + if (ret != 0) { + ERROR("Failed to close fds."); ++ fprintf(stderr, "Failed to close fds."); ++ ret = -1; ++ goto child_out; ++ } ++ ++ // child process, dup2 pipe_for_read[1] to stderr, ++ if (dup2(pipe_stderr[1], 2) < 0) { ++ ERROR("Dup fd error: %s", strerror(errno)); + ret = -1; + goto child_out; + } + + if (chroot(dstdir) != 0) { + SYSERROR("Failed to chroot to %s", dstdir); ++ fprintf(stderr, "Failed to chroot to %s: %s", dstdir, strerror(errno)); + ret = -1; + goto child_out; + } + + if (chdir("/") != 0) { + SYSERROR("Failed to chroot to /"); ++ fprintf(stderr, "Failed to chroot to /: %s", strerror(errno)); + ret = -1; + goto child_out; + } + +- ret = archive_unpack_handler(content, "/", options); ++ ret = archive_unpack_handler(content, options); + + child_out: + if (ret != 0) { +@@ -476,13 +619,23 @@ child_out: + exit(EXIT_SUCCESS); + } + } ++ close(pipe_stderr[1]); ++ pipe_stderr[1] = -1; + + ret = util_wait_for_pid(pid); + if (ret != 0) { + ERROR("Wait archive_untar_handler failed"); ++ fcntl(pipe_stderr[0], F_SETFL, O_NONBLOCK); ++ if (read(pipe_stderr[0], errbuf, BUFSIZ) < 0) { ++ ERROR("read error message from child failed"); ++ } + } + + cleanup: ++ close_archive_pipes_fd(pipe_stderr, 2); ++ if (errmsg != NULL && strlen(errbuf) != 0) { ++ *errmsg = util_strdup_s(errbuf); ++ } + return ret; + } + +@@ -569,19 +722,19 @@ static int copy_data_between_archives(struct archive *ar, struct archive *aw) + } + } + +-int update_entry_for_hardlink(map_t *map_link, struct archive_entry *entry) ++int update_entry_for_hardlink(map_t *map_link, struct archive_entry *entry, const char *src_base, const char *dst_base) + { + const char *path = archive_entry_pathname(entry); + char *linkname = NULL; + unsigned int nlink = archive_entry_nlink(entry); + int ino = archive_entry_ino(entry); ++ const char *hardlink = archive_entry_hardlink(entry); + +- // hardlink is regular file, not type AE_IFLNK +- if (archive_entry_filetype(entry) != AE_IFREG) { +- return 0; ++ if (hardlink != NULL && rebase_hardlink(entry, src_base, dst_base) != 0) { ++ return -1; + } + +- // no hardlink ++ // try to use hardlink to reduce tar size + if (nlink <= 1) { + return 0; + } +@@ -610,11 +763,12 @@ static void link_kvfree(void *key, void *value) + return; + } + +-int tar_handler(struct archive *r, struct archive *w) ++int tar_handler(struct archive *r, struct archive *w, const char *src_base, const char *dst_base) + { + int ret = ARCHIVE_OK; + struct archive_entry *entry = NULL; + map_t *map_link = NULL; ++ char *pathname = NULL; + + map_link = map_new(MAP_INT_STR, MAP_DEFAULT_CMP_FUNC, link_kvfree); + if (map_link == NULL) { +@@ -636,11 +790,18 @@ int tar_handler(struct archive *r, struct archive *w) + break; + } + +- if (update_entry_for_hardlink(map_link, entry) != 0) { ++ pathname = update_entry_for_pathname(entry, src_base, dst_base); ++ if (pathname == NULL) { + ret = ARCHIVE_FAILED; + break; + } ++ free(pathname); ++ pathname = NULL; + ++ if (update_entry_for_hardlink(map_link, entry, src_base, dst_base) != 0) { ++ ret = ARCHIVE_FAILED; ++ break; ++ } + ret = archive_write_header(w, entry); + if (ret != ARCHIVE_OK) { + ERROR("Fail to write tar header: %s", archive_error_string(w)); +@@ -680,7 +841,29 @@ int tar_handler(struct archive *r, struct archive *w) + return ret; + } + +-static int tar_all(int fd) ++static ssize_t stream_write_data(struct archive *a, void *client_data, const void *buffer, size_t length) ++{ ++ struct io_write_wrapper *writer = (struct io_write_wrapper *)client_data; ++ size_t written_length = 0; ++ size_t size = 0; ++ while (length > written_length) { ++ if (length - written_length > ARCHIVE_WRITE_BUFFER_SIZE) { ++ size = ARCHIVE_WRITE_BUFFER_SIZE; ++ } else { ++ size = length - written_length; ++ } ++ if (!writer->write_func(writer->context, (const char *)buffer + written_length, size)) { ++ ERROR("write stream failed"); ++ return -1; ++ } ++ written_length += size; ++ } ++ ++ return size; ++} ++ ++static int tar_all(const struct io_write_wrapper *writer, const char *tar_dir, ++ const char *src_base, const char *dst_base) + { + struct archive *r = NULL; + struct archive *w = NULL; +@@ -689,12 +872,13 @@ static int tar_all(int fd) + r = archive_read_disk_new(); + if (r == NULL) { + ERROR("archive read disk new failed"); ++ fprintf(stderr, "archive read disk new failed"); + return -1; + } + archive_read_disk_set_standard_lookup(r); + archive_read_disk_set_symlink_physical(r); + archive_read_disk_set_behavior(r, ARCHIVE_READDISK_NO_TRAVERSE_MOUNTS); +- ret = archive_read_disk_open(r, "."); ++ ret = archive_read_disk_open(r, tar_dir); + if (ret != ARCHIVE_OK) { + ERROR("open archive read failed: %s", archive_error_string(r)); + fprintf(stderr, "open archive read failed: %s\n", archive_error_string(r)); +@@ -704,19 +888,20 @@ static int tar_all(int fd) + w = archive_write_new(); + if (w == NULL) { + ERROR("archive write new failed"); ++ fprintf(stderr, "archive write new failed"); + ret = ARCHIVE_FAILED; + goto out; + } + archive_write_set_format_pax(w); + archive_write_set_options(w, "xattrheader=SCHILY"); +- ret = archive_write_open_fd(w, fd); ++ ret = archive_write_open(w, (void*)writer, NULL, stream_write_data, NULL); + if (ret != ARCHIVE_OK) { + ERROR("open archive write failed: %s", archive_error_string(w)); + fprintf(stderr, "open archive write failed: %s\n", archive_error_string(w)); + goto out; + } + +- ret = tar_handler(r, w); ++ ret = tar_handler(r, w, src_base, dst_base); + + out: + archive_free(r); +@@ -725,8 +910,14 @@ out: + return (ret == ARCHIVE_OK) ? 0 : -1; + } + ++static ssize_t fd_write(void *context, const void *data, size_t len) ++{ ++ return util_write_nointr(*(int*)context, data, len); ++} ++ + int archive_chroot_tar(char *path, char *file, char **errmsg) + { ++ struct io_write_wrapper pipe_context = { 0 }; + int ret = 0; + pid_t pid; + int pipe_for_read[2] = { -1, -1 }; +@@ -744,8 +935,6 @@ int archive_chroot_tar(char *path, char *file, char **errmsg) + if (pid == (pid_t) -1) { + ERROR("Failed to fork()"); + ret = -1; +- close(pipe_for_read[0]); +- close(pipe_for_read[1]); + goto cleanup; + } + +@@ -788,7 +977,9 @@ int archive_chroot_tar(char *path, char *file, char **errmsg) + goto child_out; + } + +- ret = tar_all(fd); ++ pipe_context.context = (void*)&fd; ++ pipe_context.write_func = fd_write; ++ ret = tar_all(&pipe_context, ".", ".", NULL); + + child_out: + +@@ -798,6 +989,8 @@ child_out: + exit(EXIT_SUCCESS); + } + } ++ close(pipe_for_read[1]); ++ pipe_for_read[1] = -1; + + ret = util_wait_for_pid(pid); + if (ret != 0) { +@@ -806,17 +999,357 @@ child_out: + if (read(pipe_for_read[0], errbuf, BUFSIZ) < 0) { + ERROR("read error message from child failed"); + } +- close(pipe_for_read[0]); +- pipe_for_read[0] = -1; + } + +- close(pipe_for_read[1]); +- pipe_for_read[1] = -1; +- + cleanup: ++ close_archive_pipes_fd(pipe_for_read, 2); + if (errmsg != NULL && strlen(errbuf) != 0) { + *errmsg = util_strdup_s(errbuf); + } + + return ret; + } ++ ++static ssize_t pipe_read(void *context, void *buf, size_t len) ++{ ++ return util_read_nointr(*(int*)context, buf, len); ++} ++ ++static ssize_t archive_context_write(const void *context, const void *buf, size_t len) ++{ ++ struct archive_context *ctx = (struct archive_context *)context; ++ if (ctx == NULL) { ++ return -1; ++ } ++ if (ctx->stdin_fd >= 0) { ++ return util_write_nointr(ctx->stdin_fd, buf, len); ++ } ++ return 0; ++} ++ ++static ssize_t pipe_write(void *context, const void *data, size_t len) ++{ ++ return util_write_nointr(*(int*)context, data, len); ++} ++ ++static ssize_t archive_context_read(void *context, void *buf, size_t len) ++{ ++ struct archive_context *ctx = (struct archive_context *)context; ++ if (ctx == NULL) { ++ return -1; ++ } ++ if (ctx->stdout_fd >= 0) { ++ return util_read_nointr(ctx->stdout_fd, buf, len); ++ } ++ return 0; ++} ++ ++static int close_wait_pid(struct archive_context *ctx, int *status) ++{ ++ int ret = 0; ++ ++ // close stdin and stdout first, this will make sure the process of tar exit. ++ if (ctx->stdin_fd >= 0) { ++ close(ctx->stdin_fd); ++ } ++ ++ if (ctx->stdout_fd >= 0) { ++ close(ctx->stdout_fd); ++ } ++ ++ if (ctx->pid > 0) { ++ if (waitpid(ctx->pid, status, 0) != ctx->pid) { ++ ERROR("Failed to wait pid %u", ctx->pid); ++ ret = -1; ++ } ++ } ++ ++ return ret; ++} ++ ++static int archive_context_close(void *context, char **err) ++{ ++ int ret = 0; ++ int status = 0; ++ char *reason = NULL; ++ ssize_t size_read = 0; ++ char buffer[BUFSIZ + 1] = { 0 }; ++ struct archive_context *ctx = (struct archive_context *)context; ++ char *marshaled = NULL; ++ ++ if (ctx == NULL) { ++ return 0; ++ } ++ ++ ret = close_wait_pid(ctx, &status); ++ ++ if (WIFSIGNALED((unsigned int)status)) { ++ status = WTERMSIG(status); ++ reason = "signaled"; ++ } else if (WIFEXITED(status)) { ++ status = WEXITSTATUS(status); ++ reason = "exited"; ++ } else { ++ reason = "unknown"; ++ } ++ if (ctx->stderr_fd >= 0) { ++ size_read = util_read_nointr(ctx->stderr_fd, buffer, BUFSIZ); ++ if (size_read > 0) { ++ reason = buffer; ++ marshaled = util_marshal_string(buffer); ++ if (marshaled == NULL) { ++ ERROR("Can not marshal json buffer: %s", buffer); ++ } else { ++ reason = marshaled; ++ } ++ } ++ close(ctx->stderr_fd); ++ } ++ ++ if (size_read > 0 || status != 0) { ++ format_errorf(err, "tar exited with status %d: %s", status, reason); ++ ret = -1; ++ } ++ ++ free(marshaled); ++ free(ctx); ++ return ret; ++} ++ ++int archive_chroot_untar_stream(const struct io_read_wrapper *context, const char *chroot_dir, ++ const char *untar_dir, const char *src_base, const char *dst_base, ++ char **errmsg) ++{ ++ struct io_read_wrapper pipe_context = { 0 }; ++ int pipe_stream[2] = { -1, -1 }; ++ int pipe_stderr[2] = { -1, -1 }; ++ int keepfds[] = { -1, -1, -1 }; ++ int ret = -1; ++ int cret = 0; ++ pid_t pid; ++ struct archive_context *ctx = NULL; ++ char *buf = NULL; ++ size_t buf_len = ARCHIVE_BLOCK_SIZE; ++ ssize_t read_len; ++ struct archive_options options = { ++ .whiteout_format = NONE_WHITEOUT_FORMATE, ++ .src_base = src_base, ++ .dst_base = dst_base ++ }; ++ ++ buf = util_common_calloc_s(buf_len); ++ if (buf == NULL) { ++ ERROR("Out of memory"); ++ return -1; ++ } ++ ++ if (pipe(pipe_stderr) != 0) { ++ ERROR("Failed to create pipe: %s", strerror(errno)); ++ goto cleanup; ++ } ++ if (pipe(pipe_stream) != 0) { ++ ERROR("Failed to create pipe: %s", strerror(errno)); ++ goto cleanup; ++ } ++ ++ pid = fork(); ++ if (pid == (pid_t) -1) { ++ ERROR("Failed to fork: %s", strerror(errno)); ++ goto cleanup; ++ } ++ ++ if (pid == (pid_t)0) { ++ keepfds[0] = isula_libutils_get_log_fd(); ++ keepfds[1] = pipe_stderr[1]; ++ keepfds[2] = pipe_stream[0]; ++ ret = util_check_inherited_exclude_fds(true, keepfds, 3); ++ if (ret != 0) { ++ ERROR("Failed to close fds."); ++ ret = -1; ++ goto child_out; ++ } ++ ++ // child process, dup2 pipe_stderr[1] to stderr, ++ if (dup2(pipe_stderr[1], 2) < 0) { ++ ERROR("Dup fd error: %s", strerror(errno)); ++ ret = -1; ++ goto child_out; ++ } ++ ++ if (chroot(chroot_dir) != 0) { ++ SYSERROR("Failed to chroot to %s", chroot_dir); ++ ret = -1; ++ goto child_out; ++ } ++ ++ if (chdir("/") != 0 || chdir(untar_dir) != 0) { ++ SYSERROR("Failed to chdir to %s", untar_dir); ++ fprintf(stderr, "Failed to chdir to %s", untar_dir); ++ ret = -1; ++ goto child_out; ++ } ++ ++ pipe_context.context = (void*)&pipe_stream[0]; ++ pipe_context.read = pipe_read; ++ ret = archive_unpack_handler(&pipe_context, &options); ++ ++child_out: ++ if (ret != 0) { ++ exit(EXIT_FAILURE); ++ } else { ++ exit(EXIT_SUCCESS); ++ } ++ } ++ ++ close(pipe_stderr[1]); ++ pipe_stderr[1] = -1; ++ close(pipe_stream[0]); ++ pipe_stream[0] = -1; ++ ++ ctx = util_common_calloc_s(sizeof(struct archive_context)); ++ if (ctx == NULL) { ++ goto cleanup; ++ } ++ ++ ctx->pid = pid; ++ ctx->stdin_fd = pipe_stream[1]; ++ pipe_stream[1] = -1; ++ ctx->stdout_fd = -1; ++ ctx->stderr_fd = pipe_stderr[0]; ++ pipe_stderr[0] = -1; ++ ++ read_len = context->read(context->context, buf, buf_len); ++ while (read_len > 0) { ++ ssize_t writed_len = archive_context_write(ctx, buf, (size_t)read_len); ++ if (writed_len < 0) { ++ DEBUG("Tar may exited: %s", strerror(errno)); ++ break; ++ } ++ read_len = context->read(context->context, buf, buf_len); ++ } ++ ++ ret = 0; ++ ++cleanup: ++ free(buf); ++ cret = archive_context_close(ctx, errmsg); ++ ret = (cret != 0) ? cret : ret; ++ close_archive_pipes_fd(pipe_stderr, 2); ++ close_archive_pipes_fd(pipe_stream, 2); ++ ++ return ret; ++} ++ ++int archive_chroot_tar_stream(const char *chroot_dir, const char *tar_path, const char *src_base, ++ const char *dst_base, struct io_read_wrapper *reader) ++{ ++ struct io_write_wrapper pipe_context = { 0 }; ++ int keepfds[] = { -1, -1, -1 }; ++ int pipe_stderr[2] = { -1, -1 }; ++ int pipe_stream[2] = { -1, -1 }; ++ int ret = -1; ++ pid_t pid; ++ struct archive_context *ctx = NULL; ++ ++ if (pipe(pipe_stderr) != 0) { ++ ERROR("Failed to create pipe: %s", strerror(errno)); ++ goto free_out; ++ } ++ if (pipe(pipe_stream) != 0) { ++ ERROR("Failed to create pipe: %s", strerror(errno)); ++ goto free_out; ++ } ++ ++ pid = fork(); ++ if (pid == (pid_t) - 1) { ++ ERROR("Failed to fork: %s", strerror(errno)); ++ goto free_out; ++ } ++ ++ if (pid == (pid_t)0) { ++ char *tar_dir_name = NULL; ++ char *tar_base_name = NULL; ++ ++ keepfds[0] = isula_libutils_get_log_fd(); ++ keepfds[1] = pipe_stderr[1]; ++ keepfds[2] = pipe_stream[1]; ++ ret = util_check_inherited_exclude_fds(true, keepfds, 3); ++ if (ret != 0) { ++ ERROR("Failed to close fds."); ++ ret = -1; ++ goto child_out; ++ } ++ ++ // child process, dup2 pipe_stderr[1] to stderr, ++ if (dup2(pipe_stderr[1], 2) < 0) { ++ ERROR("Dup fd error: %s", strerror(errno)); ++ ret = -1; ++ goto child_out; ++ } ++ ++ if (chroot(chroot_dir) != 0) { ++ ERROR("Failed to chroot to %s", chroot_dir); ++ fprintf(stderr, "Failed to chroot to %s\n", chroot_dir); ++ ret = -1; ++ goto child_out; ++ } ++ ++ if (util_split_dir_and_base_name(tar_path, &tar_dir_name, &tar_base_name) != 0) { ++ ERROR("Failed to split %s", tar_path); ++ fprintf(stderr, "Failed to split %s\n", tar_path); ++ ret = -1; ++ goto child_out; ++ } ++ ++ if (chdir("/") != 0 || chdir(tar_dir_name) != 0) { ++ ERROR("Failed to chdir to %s", tar_dir_name); ++ fprintf(stderr, "Failed to chdir to %s\n", tar_dir_name); ++ ret = -1; ++ goto child_out; ++ } ++ ++ pipe_context.context = (void*)&pipe_stream[1]; ++ pipe_context.write_func = pipe_write; ++ ret = tar_all(&pipe_context, tar_base_name, src_base, dst_base); ++ ++child_out: ++ free(tar_dir_name); ++ free(tar_base_name); ++ ++ if (ret != 0) { ++ exit(EXIT_FAILURE); ++ } else { ++ exit(EXIT_SUCCESS); ++ } ++ } ++ ++ close(pipe_stderr[1]); ++ pipe_stderr[1] = -1; ++ close(pipe_stream[1]); ++ pipe_stream[1] = -1; ++ ++ ctx = util_common_calloc_s(sizeof(struct archive_context)); ++ if (ctx == NULL) { ++ goto free_out; ++ } ++ ++ ctx->stdin_fd = -1; ++ ctx->stdout_fd = pipe_stream[0]; ++ pipe_stream[0] = -1; ++ ctx->stderr_fd = pipe_stderr[0]; ++ pipe_stderr[0] = -1; ++ ctx->pid = pid; ++ ++ reader->close = archive_context_close; ++ reader->context = ctx; ++ ctx = NULL; ++ reader->read = archive_context_read; ++ ++ ret = 0; ++free_out: ++ close_archive_pipes_fd(pipe_stderr, 2); ++ close_archive_pipes_fd(pipe_stream, 2); ++ free(ctx); ++ ++ return ret; ++} +diff --git a/src/utils/tar/util_archive.h b/src/utils/tar/util_archive.h +index 0e05a363..55fd7683 100644 +--- a/src/utils/tar/util_archive.h ++++ b/src/utils/tar/util_archive.h +@@ -24,6 +24,8 @@ + + #include "io_wrapper.h" + ++#define ARCHIVE_BLOCK_SIZE (32 * 1024) ++ + struct io_read_wrapper; + + #ifdef __cplusplus +@@ -38,14 +40,25 @@ typedef enum { + + struct archive_options { + whiteout_format_type whiteout_format; ++ ++ // rename archive entry's name from src_base to dst_base ++ const char *src_base; ++ const char *dst_base; + }; + +-int archive_unpack(const struct io_read_wrapper *content, const char *dstdir, const struct archive_options *options); ++int archive_unpack(const struct io_read_wrapper *content, const char *dstdir, const struct archive_options *options, ++ char **errmsg); + + bool valid_archive_format(const char *file); + + int archive_chroot_tar(char *path, char *file, char **errmsg); + ++int archive_chroot_tar_stream(const char *chroot_dir, const char *tar_path, const char *src_base, ++ const char *dst_base, struct io_read_wrapper *content); ++int archive_chroot_untar_stream(const struct io_read_wrapper *content, const char *chroot_dir, ++ const char *untar_dir, const char *src_base, const char *dst_base, ++ char **errmsg); ++ + #ifdef __cplusplus + } + #endif +-- +2.25.1 + diff --git a/0007-add-testcases-for-isula-cp.patch b/0007-add-testcases-for-isula-cp.patch new file mode 100644 index 0000000000000000000000000000000000000000..e70c5db33bd30f719066fd9c4a7a8c59004f7f37 --- /dev/null +++ b/0007-add-testcases-for-isula-cp.patch @@ -0,0 +1,157 @@ +From 085b93daf8f080f21b304058da3af404be9ac61d Mon Sep 17 00:00:00 2001 +From: WangFengTu +Date: Fri, 8 Jan 2021 14:02:00 +0800 +Subject: [PATCH 7/9] add testcases for isula cp + +Signed-off-by: WangFengTu +--- + CI/test_cases/container_cases/cp.sh | 93 ++++++++++++++++++++++++++++- + 1 file changed, 90 insertions(+), 3 deletions(-) + +diff --git a/CI/test_cases/container_cases/cp.sh b/CI/test_cases/container_cases/cp.sh +index dfbd222f..67a36909 100644 +--- a/CI/test_cases/container_cases/cp.sh ++++ b/CI/test_cases/container_cases/cp.sh +@@ -163,6 +163,7 @@ test_cp_file_to_container() + return ${ret} + } + ++ + test_cp_dir_to_container() + { + local ret=0 +@@ -194,6 +195,66 @@ test_cp_dir_to_container() + isula exec $containername /bin/sh -c "ls $dstfile/passwd" + [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - failed to do copy" && ((ret++)) + ++ # test copy dir with hardlink ++ rm -rf $cpfiles/a ++ mkdir -p $cpfiles/a/a $cpfiles/a/b ++ echo "test_hardlink_a" > $cpfiles/a/a/a ++ ln $cpfiles/a/a/a $cpfiles/a/b/b ++ isula cp $cpfiles/a $containername:/c ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - failed to do copy" && ((ret++)) ++ ++ isula exec -ti $containername cat /c/a/a | grep "test_hardlink_a" ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - copy hardlink a not right" && ((ret++)) ++ ++ isula exec -ti $containername cat /c/b/b | grep "test_hardlink_a" ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - copy hardlink b not right" && ((ret++)) ++ rm -rf $cpfiles/a ++ ++ # test copy dir to file ++ mkdir -p $cpfiles/dst ++ isula exec -ti $containername sh -c 'touch /dst' ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - failed to touch file in container" && ((ret++)) ++ ++ isula cp $cpfiles/dst $containername:/ ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - copy dir to container failed" && ((ret++)) ++ ++ isula exec -ti $containername stat / | grep directory ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - file should be replaced to be dir" && ((ret++)) ++ rm -rf $cpfiles/dir ++ ++ # test copy current dir file ++ touch $cpfiles/current ++ cd $cpfiles ++ isula cp . $containername:/current1 ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - failed to cp current1 file" && ((ret++)) ++ ++ isula exec -ti $containername stat /current1 ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - file current1 not exist" && ((ret++)) ++ ++ isula cp ./ $containername:/current2 ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - failed to cp current2 file" && ((ret++)) ++ ++ isula exec -ti $containername stat /current2 ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - file current2 not exist" && ((ret++)) ++ cd - ++ rm -f $cpfiles/current ++ ++ # test copy perm ++ mkdir -p $cpfiles/perm && chmod 700 $cpfiles/perm ++ isula cp $cpfiles/perm $containername:/ ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - failed to cp dir to container" && ((ret++)) ++ ++ isula exec -ti $containername stat /perm | grep "Access: (0700/drwx" ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - copy perm not right" && ((ret++)) ++ rm -f $cpfiles/perm ++ ++ # test copy hardlink ++ rm -rf $cpfiles/cp_dir ++ mkdir $cpfiles/cp_dir && cd $cpfiles/cp_dir && echo hello > norm_file && ln norm_file norm_file_link && cd - ++ isula cp $cpfiles/cp_dir $containername:/home/ ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - copy hardlink failed" && ((ret++)) ++ rm -rf $cpfiles/cp_dir ++ + return ${ret} + } + +@@ -227,6 +288,17 @@ test_cp_symlink_to_container() + isula exec $containername /bin/sh -c "cat $cpfiles/target | grep root" + [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - failed to do copy" && ((ret++)) + ++ # test cp symlink with dir which have the same name prefix ++ rm -rf $cpfiles/abc $cpfiles/a ++ ln -s $cpfiles/abc $cpfiles/a ++ ++ isula cp $cpfiles/a $containername:/b ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - failed to copy symlink" && ((ret++)) ++ ++ isula exec -ti $containername readlink /b | grep "$cpfiles/abc" ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - invalid symlink" && ((ret++)) ++ rm -f $cpfiles/abc $cpfiles/a ++ + return ${ret} + } + +@@ -256,14 +328,21 @@ function cp_test_t() + + msg_info "${test} starting..." + +- isula pull ${image} +- [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - failed to pull image: ${image}" && return ${FAILURE} ++ local isulad_pid=$(cat /var/run/isulad.pid) ++ local fd_num1=$(ls -l /proc/$isulad_pid/fd | wc -l) ++ [[ $fd_num1 -eq 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - can not get fd number" && ((ret++)) ++ ++ isula inspect ${image} ++ if [ x"$?" != x"0" ];then ++ isula pull ${image} ++ [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - failed to pull image: ${image}" && return ${FAILURE} ++ fi + + isula images | grep busybox + [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - missing list image: ${image}" && ((ret++)) + + containername=test_cmd_cp +- isula run -n $containername -itd $image ++ isula run -n $containername -itd $image + [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - failed to run container: ${image}" && ((ret++)) + + rm -rf $cpfiles +@@ -274,6 +353,7 @@ function cp_test_t() + test_cp_file_from_container $containername || ((ret++)) + test_cp_dir_from_container $containername || ((ret++)) + test_cp_file_to_container $containername || ((ret++)) ++ test_cp_dir_to_container $containername || ((ret++)) + test_cp_symlink_to_container $containername || ((ret++)) + test_cp_symlink_from_container $containername || ((ret++)) + +@@ -281,6 +361,13 @@ function cp_test_t() + [[ $? -ne 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - failed to rm container: ${containername}" && ((ret++)) + + rm -rf $cpfiles ++ ++ local fd_num2=$(ls -l /proc/$isulad_pid/fd | wc -l) ++ [[ $fd_num2 -eq 0 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - can not get fd number" && ((ret++)) ++ ++ # make sure fd not increase after test ++ [[ $fd_num1 -ne $fd_num2 ]] && msg_err "${FUNCNAME[0]}:${LINENO} - fd number not right" && ((ret++)) ++ + echo "test end" + return ${ret} + } +-- +2.25.1 + diff --git a/0008-image_cb-rename-the-function-isula_-docker_-to-do_.patch b/0008-image_cb-rename-the-function-isula_-docker_-to-do_.patch new file mode 100644 index 0000000000000000000000000000000000000000..a3cbbe7c3e53663e6998d2fe08e8856e4da5cee2 --- /dev/null +++ b/0008-image_cb-rename-the-function-isula_-docker_-to-do_.patch @@ -0,0 +1,107 @@ +From c8d14980e145a7d400aa6c5b449a59952a422801 Mon Sep 17 00:00:00 2001 +From: Li Feng +Date: Fri, 15 Jan 2021 10:34:43 +0800 +Subject: [PATCH 8/9] image_cb: rename the function {isula_/docker_} to do_ + +Signed-off-by: Li Feng +--- + src/daemon/executor/image_cb/image_cb.c | 20 ++++++++++---------- + 1 file changed, 10 insertions(+), 10 deletions(-) + +diff --git a/src/daemon/executor/image_cb/image_cb.c b/src/daemon/executor/image_cb/image_cb.c +index 6ab8067f..156cf88c 100644 +--- a/src/daemon/executor/image_cb/image_cb.c ++++ b/src/daemon/executor/image_cb/image_cb.c +@@ -54,7 +54,7 @@ + #include "utils_timestamp.h" + #include "utils_verify.h" + +-static int isula_import_image(const char *file, const char *tag, char **id) ++static int do_import_image(const char *file, const char *tag, char **id) + { + int ret = 0; + im_import_request *request = NULL; +@@ -114,7 +114,7 @@ static int import_cb(const image_import_request *request, image_import_response + + EVENT("Image Event: {Object: %s, Type: Importing}", request->file); + +- ret = isula_import_image(request->file, request->tag, &id); ++ ret = do_import_image(request->file, request->tag, &id); + if (ret != 0) { + ERROR("Failed to import docker image %s with tag %s", request->file, request->tag); + cc = EINVALIDARGS; +@@ -140,7 +140,7 @@ out: + return (ret < 0) ? ECOMMON : ret; + } + +-static int docker_load_image(const char *file, const char *tag, const char *type) ++static int do_load_image(const char *file, const char *tag, const char *type) + { + int ret = 0; + im_load_request *request = NULL; +@@ -210,7 +210,7 @@ static int image_load_cb(const image_load_image_request *request, image_load_ima + + EVENT("Image Event: {Object: %s, Type: Loading}", request->file); + +- ret = docker_load_image(request->file, request->tag, request->type); ++ ret = do_load_image(request->file, request->tag, request->type); + if (ret != 0) { + ERROR("Failed to load docker image %s with tag %s and type %s", request->file, request->tag, request->type); + cc = EINVALIDARGS; +@@ -233,7 +233,7 @@ out: + return (ret < 0) ? ECOMMON : ret; + } + +-static int docker_login(const char *username, const char *password, const char *server, const char *type) ++static int do_login(const char *username, const char *password, const char *server, const char *type) + { + int ret = 0; + im_login_request *request = NULL; +@@ -290,7 +290,7 @@ static int login_cb(const image_login_request *request, image_login_response **r + + EVENT("Image Event: {Object: %s, Type: Logining}", request->server); + +- ret = docker_login(request->username, request->password, request->server, request->type); ++ ret = do_login(request->username, request->password, request->server, request->type); + if (ret != 0) { + ERROR("Failed to login %s", request->server); + cc = EINVALIDARGS; +@@ -312,7 +312,7 @@ out: + return (ret < 0) ? ECOMMON : ret; + } + +-static int docker_logout(const char *server, const char *type) ++static int do_logout(const char *server, const char *type) + { + int ret = 0; + im_logout_request *request = NULL; +@@ -367,7 +367,7 @@ static int logout_cb(const image_logout_request *request, image_logout_response + + EVENT("Image Event: {Object: %s, Type: Logouting}", request->server); + +- ret = docker_logout(request->server, request->type); ++ ret = do_logout(request->server, request->type); + if (ret != 0) { + ERROR("Failed to logout %s", request->server); + cc = EINVALIDARGS; +@@ -442,7 +442,7 @@ out: + } + + /* tag image */ +-static int tag_image(const char *src_name, const char *dest_name) ++static int do_tag_image(const char *src_name, const char *dest_name) + { + int ret = 0; + im_tag_request *im_request = NULL; +@@ -524,7 +524,7 @@ static int image_tag_cb(const image_tag_image_request *request, image_tag_image_ + + EVENT("Image Event: {Object: %s, Type: Tagging}", src_name); + +- ret = tag_image(src_name, dest_name); ++ ret = do_tag_image(src_name, dest_name); + if (ret != 0) { + cc = ISULAD_ERR_EXEC; + goto out; +-- +2.25.1 + diff --git a/0009-fix-small-probability-of-coredump-in-CRI-streaming-s.patch b/0009-fix-small-probability-of-coredump-in-CRI-streaming-s.patch new file mode 100644 index 0000000000000000000000000000000000000000..a96a1e9792b3b8e04c59820f793a579acf231972 --- /dev/null +++ b/0009-fix-small-probability-of-coredump-in-CRI-streaming-s.patch @@ -0,0 +1,1078 @@ +From 0295f347d6394294cb2c81741ece78548d4cafc6 Mon Sep 17 00:00:00 2001 +From: wujing +Date: Thu, 14 Jan 2021 10:53:07 +0800 +Subject: [PATCH 9/9] fix small probability of coredump in CRI streaming + services in high concurrency scenarios + +Signed-off-by: wujing +--- + .../cri/cri_container_manager_service_impl.cc | 16 +- + src/daemon/entry/cri/request_cache.cc | 74 ++++++--- + src/daemon/entry/cri/request_cache.h | 29 +++- + .../cri/websocket/service/attach_serve.cc | 60 ++++--- + .../cri/websocket/service/attach_serve.h | 3 +- + .../entry/cri/websocket/service/exec_serve.cc | 71 +++++---- + .../entry/cri/websocket/service/exec_serve.h | 3 +- + .../entry/cri/websocket/service/ws_server.cc | 148 ++++++++++-------- + .../entry/cri/websocket/service/ws_server.h | 20 ++- + src/utils/cpputils/read_write_lock.cc | 59 +++++++ + src/utils/cpputils/read_write_lock.h | 90 +++++++++++ + src/utils/cpputils/stoppable_thread.cc | 4 - + 12 files changed, 392 insertions(+), 185 deletions(-) + create mode 100644 src/utils/cpputils/read_write_lock.cc + create mode 100644 src/utils/cpputils/read_write_lock.h + +diff --git a/src/daemon/entry/cri/cri_container_manager_service_impl.cc b/src/daemon/entry/cri/cri_container_manager_service_impl.cc +index 45ecf9f2..812469ee 100644 +--- a/src/daemon/entry/cri/cri_container_manager_service_impl.cc ++++ b/src/daemon/entry/cri/cri_container_manager_service_impl.cc +@@ -1251,15 +1251,9 @@ void ContainerManagerServiceImpl::Exec(const runtime::v1alpha2::ExecRequest &req + return; + } + RequestCache *cache = RequestCache::GetInstance(); +- runtime::v1alpha2::ExecRequest *execReq = new (std::nothrow) runtime::v1alpha2::ExecRequest(req); +- if (execReq == nullptr) { +- error.SetError("Out of memory"); +- return; +- } +- std::string token = cache->Insert(const_cast(execReq)); ++ std::string token = cache->InsertExecRequest(req); + if (token.empty()) { + error.SetError("failed to get a unique token!"); +- delete execReq; + return; + } + std::string url = BuildURL("exec", token); +@@ -1303,15 +1297,9 @@ void ContainerManagerServiceImpl::Attach(const runtime::v1alpha2::AttachRequest + return; + } + RequestCache *cache = RequestCache::GetInstance(); +- runtime::v1alpha2::AttachRequest *attachReq = new (std::nothrow) runtime::v1alpha2::AttachRequest(req); +- if (attachReq == nullptr) { +- error.SetError("Out of memory"); +- return; +- } +- std::string token = cache->Insert(const_cast(attachReq)); ++ std::string token = cache->InsertAttachRequest(req); + if (token.empty()) { + error.SetError("failed to get a unique token!"); +- delete attachReq; + return; + } + std::string url = BuildURL("attach", token); +diff --git a/src/daemon/entry/cri/request_cache.cc b/src/daemon/entry/cri/request_cache.cc +index a3cb3771..b502715a 100644 +--- a/src/daemon/entry/cri/request_cache.cc ++++ b/src/daemon/entry/cri/request_cache.cc +@@ -41,12 +41,26 @@ RequestCache *RequestCache::GetInstance() noexcept + return cache; + } + +-std::string RequestCache::Insert(::google::protobuf::Message *req) ++std::string RequestCache::InsertExecRequest(const runtime::v1alpha2::ExecRequest &req) + { +- if (req == nullptr) { +- ERROR("invalid request"); ++ std::lock_guard lock(m_mutex); ++ // Remove expired entries. ++ GarbageCollection(); ++ // If the cache is full, reject the request. ++ if (m_ll.size() == MaxInFlight) { ++ ERROR("too many cache in flight!"); + return ""; + } ++ auto token = UniqueToken(); ++ CacheEntry tmp; ++ tmp.SetValue(token, &req, nullptr, std::chrono::system_clock::now() + std::chrono::minutes(1)); ++ m_ll.push_front(tmp); ++ m_tokens.insert(std::make_pair(token, tmp)); ++ return token; ++} ++ ++std::string RequestCache::InsertAttachRequest(const runtime::v1alpha2::AttachRequest &req) ++{ + std::lock_guard lock(m_mutex); + // Remove expired entries. + GarbageCollection(); +@@ -56,7 +70,8 @@ std::string RequestCache::Insert(::google::protobuf::Message *req) + return ""; + } + auto token = UniqueToken(); +- CacheEntry tmp { token, req, std::chrono::system_clock::now() + std::chrono::minutes(1) }; ++ CacheEntry tmp; ++ tmp.SetValue(token, nullptr, &req, std::chrono::system_clock::now() + std::chrono::minutes(1)); + m_ll.push_front(tmp); + m_tokens.insert(std::make_pair(token, tmp)); + return token; +@@ -64,16 +79,12 @@ std::string RequestCache::Insert(::google::protobuf::Message *req) + + void RequestCache::GarbageCollection() + { +- std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); ++ auto now = std::chrono::system_clock::now(); + while (!m_ll.empty()) { + CacheEntry oldest = m_ll.back(); + if (now < oldest.expireTime) { + return; + } +- if (oldest.req != nullptr) { +- delete oldest.req; +- oldest.req = nullptr; +- } + m_ll.pop_back(); + m_tokens.erase(oldest.token); + } +@@ -124,34 +135,59 @@ std::string RequestCache::UniqueToken() + ERROR("create unique token failed!"); + return ""; + } ++ + bool RequestCache::IsValidToken(const std::string &token) + { ++ std::lock_guard lock(m_mutex); ++ + return static_cast(m_tokens.count(token)); + } + + // Consume the token (remove it from the cache) and return the cached request, if found. +-::google::protobuf::Message *RequestCache::Consume(const std::string &token, bool &found) ++runtime::v1alpha2::ExecRequest RequestCache::ConsumeExecRequest(const std::string &token) + { + std::lock_guard lock(m_mutex); + +- found = false; +- if (!IsValidToken(token)) { ++ if (m_tokens.count(token) == 0 || m_tokens[token].execRequest.size() == 0) { + ERROR("Invalid token"); +- return nullptr; ++ return runtime::v1alpha2::ExecRequest(); + } + + CacheEntry ele = m_tokens[token]; + for (auto it = m_ll.begin(); it != m_ll.end(); it++) { +- if (it->token == ele.token) { ++ if (it->token == token) { + m_ll.erase(it); + break; + } + } + m_tokens.erase(token); +- std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); +- if (now > ele.expireTime) { +- return nullptr; ++ if (std::chrono::system_clock::now() > ele.expireTime) { ++ return runtime::v1alpha2::ExecRequest(); + } +- found = true; +- return ele.req; ++ ++ return ele.execRequest.at(0); + } ++ ++runtime::v1alpha2::AttachRequest RequestCache::ConsumeAttachRequest(const std::string &token) ++{ ++ std::lock_guard lock(m_mutex); ++ ++ if (m_tokens.count(token) == 0 || m_tokens[token].attachRequest.size() == 0) { ++ ERROR("Invalid token"); ++ return runtime::v1alpha2::AttachRequest(); ++ } ++ ++ CacheEntry ele = m_tokens[token]; ++ for (auto it = m_ll.begin(); it != m_ll.end(); it++) { ++ if (it->token == token) { ++ m_ll.erase(it); ++ break; ++ } ++ } ++ m_tokens.erase(token); ++ if (std::chrono::system_clock::now() > ele.expireTime) { ++ return runtime::v1alpha2::AttachRequest(); ++ } ++ ++ return ele.attachRequest.at(0); ++} +\ No newline at end of file +diff --git a/src/daemon/entry/cri/request_cache.h b/src/daemon/entry/cri/request_cache.h +index 024f3ba7..0f86a85e 100644 +--- a/src/daemon/entry/cri/request_cache.h ++++ b/src/daemon/entry/cri/request_cache.h +@@ -21,19 +21,38 @@ + #include + #include + #include ++#include + #include ++#include "api.pb.h" + +-typedef struct sCacheEntry { ++struct CacheEntry { + std::string token; +- ::google::protobuf::Message *req; ++ std::vector execRequest; ++ std::vector attachRequest; + std::chrono::system_clock::time_point expireTime; +-} CacheEntry, *pCacheEntry; ++ ++ void SetValue(const std::string &t, ++ const runtime::v1alpha2::ExecRequest *execReq, ++ const runtime::v1alpha2::AttachRequest *attachReq, ++ std::chrono::system_clock::time_point et) ++ { ++ token = t; ++ if (execReq != nullptr) { ++ execRequest.push_back(*execReq); ++ } else if (attachReq != nullptr) { ++ attachRequest.push_back(*attachReq); ++ } ++ expireTime = et; ++ } ++}; + + class RequestCache { + public: + static RequestCache *GetInstance() noexcept; +- std::string Insert(::google::protobuf::Message *req); +- ::google::protobuf::Message *Consume(const std::string &token, bool &found); ++ std::string InsertExecRequest(const runtime::v1alpha2::ExecRequest &req); ++ std::string InsertAttachRequest(const runtime::v1alpha2::AttachRequest &req); ++ runtime::v1alpha2::ExecRequest ConsumeExecRequest(const std::string &token); ++ runtime::v1alpha2::AttachRequest ConsumeAttachRequest(const std::string &token); + bool IsValidToken(const std::string &token); + + private: +diff --git a/src/daemon/entry/cri/websocket/service/attach_serve.cc b/src/daemon/entry/cri/websocket/service/attach_serve.cc +index caf02c74..01c6b9cf 100644 +--- a/src/daemon/entry/cri/websocket/service/attach_serve.cc ++++ b/src/daemon/entry/cri/websocket/service/attach_serve.cc +@@ -18,54 +18,50 @@ + + int AttachServe::Execute(struct lws *wsi, const std::string &token, int read_pipe_fd) + { +- RequestCache *cache = RequestCache::GetInstance(); +- bool found = false; +- auto cachedRequest = cache->Consume(token, found); +- if (!found) { +- ERROR("invalid token :%s", token.c_str()); +- return -1; +- } +- runtime::v1alpha2::AttachRequest *request = dynamic_cast(cachedRequest); +- if (request == nullptr) { +- ERROR("failed to get exec request!"); +- return -1; +- } +- +- container_attach_request *container_req = nullptr; +- container_attach_response *container_res = nullptr; +- + service_executor_t *cb = get_service_executor(); + if (cb == nullptr || cb->container.attach == nullptr) { + return -1; + } +- int tret = 0; +- tret = RequestFromCri(request, &container_req); +- if (tret != 0) { +- ERROR("Failed to transform grpc request!"); ++ ++ container_attach_request *container_req = nullptr; ++ if (GetContainerRequest(token, &container_req) != 0) { ++ ERROR("Failed to get contaner request"); + return -1; + } ++ + struct io_write_wrapper stringWriter = { 0 }; + stringWriter.context = (void *)wsi; + stringWriter.write_func = WsWriteStdoutToClient; + stringWriter.close_func = closeWsConnect; + container_req->attach_stderr = false; ++ ++ container_attach_response *container_res = nullptr; + int ret = cb->container.attach(container_req, &container_res, container_req->attach_stdin ? read_pipe_fd : -1, + &stringWriter, nullptr); ++ if (ret != 0) { ++ ERROR("Failed to attach container: %s", container_req->container_id); ++ } ++ + free_container_attach_request(container_req); + free_container_attach_response(container_res); + +- if (request != nullptr) { +- delete request; +- request = nullptr; +- } +- if (tret != 0) { +- ERROR("Failed to translate response to grpc, operation is %s", ret ? "failed" : "success"); ++ return ret; ++} ++ ++int AttachServe::GetContainerRequest(const std::string &token, container_attach_request **container_req) ++{ ++ RequestCache *cache = RequestCache::GetInstance(); ++ auto request = cache->ConsumeAttachRequest(token); ++ ++ int ret = RequestFromCri(request, container_req); ++ if (ret != 0) { ++ ERROR("Failed to transform grpc request!"); + } + + return ret; + } + +-int AttachServe::RequestFromCri(const runtime::v1alpha2::AttachRequest *grequest, container_attach_request **request) ++int AttachServe::RequestFromCri(const runtime::v1alpha2::AttachRequest &grequest, container_attach_request **request) + { + container_attach_request *tmpreq = nullptr; + +@@ -75,12 +71,12 @@ int AttachServe::RequestFromCri(const runtime::v1alpha2::AttachRequest *grequest + return -1; + } + +- if (!grequest->container_id().empty()) { +- tmpreq->container_id = util_strdup_s(grequest->container_id().c_str()); ++ if (!grequest.container_id().empty()) { ++ tmpreq->container_id = util_strdup_s(grequest.container_id().c_str()); + } +- tmpreq->attach_stdin = grequest->stdin(); +- tmpreq->attach_stdout = grequest->stdout(); +- tmpreq->attach_stderr = grequest->stderr(); ++ tmpreq->attach_stdin = grequest.stdin(); ++ tmpreq->attach_stdout = grequest.stdout(); ++ tmpreq->attach_stderr = grequest.stderr(); + + *request = tmpreq; + +diff --git a/src/daemon/entry/cri/websocket/service/attach_serve.h b/src/daemon/entry/cri/websocket/service/attach_serve.h +index 7d57b9a3..00e2b34e 100644 +--- a/src/daemon/entry/cri/websocket/service/attach_serve.h ++++ b/src/daemon/entry/cri/websocket/service/attach_serve.h +@@ -35,8 +35,9 @@ public: + virtual ~AttachServe() = default; + int Execute(struct lws *wsi, const std::string &token, int read_pipe_fd) override; + private: +- int RequestFromCri(const runtime::v1alpha2::AttachRequest *grequest, ++ int RequestFromCri(const runtime::v1alpha2::AttachRequest &grequest, + container_attach_request **request); ++ int GetContainerRequest(const std::string &token, container_attach_request **container_req); + }; + #endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_ATTACH_SERVE_H + +diff --git a/src/daemon/entry/cri/websocket/service/exec_serve.cc b/src/daemon/entry/cri/websocket/service/exec_serve.cc +index b1a3759d..855d28b8 100644 +--- a/src/daemon/entry/cri/websocket/service/exec_serve.cc ++++ b/src/daemon/entry/cri/websocket/service/exec_serve.cc +@@ -19,37 +19,25 @@ + + int ExecServe::Execute(struct lws *wsi, const std::string &token, int read_pipe_fd) + { +- RequestCache *cache = RequestCache::GetInstance(); +- bool found = false; +- auto cachedRequest = cache->Consume(token, found); +- if (!found) { +- ERROR("invalid token :%s", token.c_str()); +- return -1; +- } +- runtime::v1alpha2::ExecRequest *request = dynamic_cast(cachedRequest); +- if (request == nullptr) { +- ERROR("failed to get exec request!"); +- return -1; +- } +- +- container_exec_request *container_req = nullptr; +- container_exec_response *container_res = nullptr; +- + service_executor_t *cb = get_service_executor(); + if (cb == nullptr || cb->container.exec == nullptr) { + return -1; + } +- int tret = RequestFromCri(request, &container_req); +- if (tret != 0) { +- ERROR("Failed to transform grpc request!"); ++ ++ container_exec_request *container_req = nullptr; ++ if (GetContainerRequest(token, &container_req) != 0) { ++ ERROR("Failed to get contaner request"); + return -1; + } ++ + struct io_write_wrapper StdoutstringWriter = { 0 }; + StdoutstringWriter.context = (void *)wsi; + StdoutstringWriter.write_func = WsWriteStdoutToClient; + struct io_write_wrapper StderrstringWriter = { 0 }; + StderrstringWriter.context = (void *)wsi; + StderrstringWriter.write_func = WsWriteStderrToClient; ++ ++ container_exec_response *container_res = nullptr; + int ret = cb->container.exec(container_req, &container_res, container_req->attach_stdin ? read_pipe_fd : -1, + container_req->attach_stdout ? &StdoutstringWriter : nullptr, + container_req->attach_stderr ? &StderrstringWriter : nullptr); +@@ -66,19 +54,29 @@ int ExecServe::Execute(struct lws *wsi, const std::string &token, int read_pipe_ + std::string exit_info = "Exit code :" + std::to_string((int)container_res->exit_code) + "\n"; + WsWriteStdoutToClient(wsi, exit_info.c_str(), exit_info.length()); + } ++ + free_container_exec_request(container_req); + free_container_exec_response(container_res); +- if (request != nullptr) { +- delete request; +- request = nullptr; +- } + + (void)closeWsConnect((void*)wsi, nullptr); + + return ret; + } + +-int ExecServe::RequestFromCri(const runtime::v1alpha2::ExecRequest *grequest, container_exec_request **request) ++int ExecServe::GetContainerRequest(const std::string &token, container_exec_request **container_req) ++{ ++ RequestCache *cache = RequestCache::GetInstance(); ++ auto request = cache->ConsumeExecRequest(token); ++ ++ int ret = RequestFromCri(request, container_req); ++ if (ret != 0) { ++ ERROR("Failed to transform grpc request!"); ++ } ++ ++ return ret; ++} ++ ++int ExecServe::RequestFromCri(const runtime::v1alpha2::ExecRequest &grequest, container_exec_request **request) + { + container_exec_request *tmpreq = nullptr; + +@@ -88,32 +86,33 @@ int ExecServe::RequestFromCri(const runtime::v1alpha2::ExecRequest *grequest, co + return -1; + } + +- tmpreq->tty = grequest->tty(); +- tmpreq->attach_stdin = grequest->stdin(); +- tmpreq->attach_stdout = grequest->stdout(); +- tmpreq->attach_stderr = grequest->stderr(); ++ tmpreq->tty = grequest.tty(); ++ tmpreq->attach_stdin = grequest.stdin(); ++ tmpreq->attach_stdout = grequest.stdout(); ++ tmpreq->attach_stderr = grequest.stderr(); + +- if (!grequest->container_id().empty()) { +- tmpreq->container_id = util_strdup_s(grequest->container_id().c_str()); ++ if (!grequest.container_id().empty()) { ++ tmpreq->container_id = util_strdup_s(grequest.container_id().c_str()); + } + +- if (grequest->cmd_size() > 0) { +- if ((size_t)grequest->cmd_size() > SIZE_MAX / sizeof(char *)) { ++ if (grequest.cmd_size() > 0) { ++ if ((size_t)grequest.cmd_size() > SIZE_MAX / sizeof(char *)) { + ERROR("Too many arguments!"); + free_container_exec_request(tmpreq); + return -1; + } +- tmpreq->argv = (char **)util_common_calloc_s(sizeof(char *) * grequest->cmd_size()); ++ tmpreq->argv = (char **)util_common_calloc_s(sizeof(char *) * grequest.cmd_size()); + if (tmpreq->argv == nullptr) { + ERROR("Out of memory!"); + free_container_exec_request(tmpreq); + return -1; + } +- for (int i = 0; i < grequest->cmd_size(); i++) { +- tmpreq->argv[i] = util_strdup_s(grequest->cmd(i).c_str()); ++ for (int i = 0; i < grequest.cmd_size(); i++) { ++ tmpreq->argv[i] = util_strdup_s(grequest.cmd(i).c_str()); + } +- tmpreq->argv_len = (size_t)grequest->cmd_size(); ++ tmpreq->argv_len = (size_t)grequest.cmd_size(); + } ++ + *request = tmpreq; + return 0; + } +diff --git a/src/daemon/entry/cri/websocket/service/exec_serve.h b/src/daemon/entry/cri/websocket/service/exec_serve.h +index ef474018..b29c3e1e 100644 +--- a/src/daemon/entry/cri/websocket/service/exec_serve.h ++++ b/src/daemon/entry/cri/websocket/service/exec_serve.h +@@ -40,6 +40,7 @@ public: + int Execute(struct lws *wsi, const std::string &token, int read_pipe_fd) override; + + private: +- int RequestFromCri(const runtime::v1alpha2::ExecRequest *grequest, container_exec_request **request); ++ int RequestFromCri(const runtime::v1alpha2::ExecRequest &grequest, container_exec_request **request); ++ int GetContainerRequest(const std::string &token, container_exec_request **request); + }; + #endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_EXEC_SERVE_H +diff --git a/src/daemon/entry/cri/websocket/service/ws_server.cc b/src/daemon/entry/cri/websocket/service/ws_server.cc +index c7e1b538..795d2c1e 100644 +--- a/src/daemon/entry/cri/websocket/service/ws_server.cc ++++ b/src/daemon/entry/cri/websocket/service/ws_server.cc +@@ -28,22 +28,19 @@ + + struct lws_context *WebsocketServer::m_context = nullptr; + std::atomic WebsocketServer::m_instance; +-std::mutex WebsocketServer::m_mutex; +-std::unordered_map WebsocketServer::m_wsis; ++RWMutex WebsocketServer::m_mutex; ++std::unordered_map WebsocketServer::m_wsis; ++std::unordered_set WebsocketServer::m_activeSession; ++ + WebsocketServer *WebsocketServer::GetInstance() noexcept + { +- WebsocketServer *server = m_instance.load(std::memory_order_relaxed); +- std::atomic_thread_fence(std::memory_order_acquire); +- if (server == nullptr) { +- std::lock_guard lock(m_mutex); +- server = m_instance.load(std::memory_order_relaxed); +- if (server == nullptr) { +- server = new WebsocketServer; +- std::atomic_thread_fence(std::memory_order_release); +- m_instance.store(server, std::memory_order_relaxed); +- } +- } +- return server; ++ static std::once_flag flag; ++ ++ std::call_once(flag, [] { ++ m_instance = new WebsocketServer; ++ }); ++ ++ return m_instance; + } + + WebsocketServer::WebsocketServer() +@@ -62,14 +59,14 @@ url::URLDatum WebsocketServer::GetWebsocketUrl() + return m_url; + } + +-std::unordered_map &WebsocketServer::GetWsisData() ++std::unordered_map &WebsocketServer::GetWsisData() + { + return m_wsis; + } + +-void WebsocketServer::LockAllWsSession() ++void WebsocketServer::ReadLockAllWsSession() + { +- m_mutex.lock(); ++ m_mutex.rdlock(); + } + + void WebsocketServer::UnlockAllWsSession() +@@ -160,7 +157,7 @@ void WebsocketServer::RegisterCallback(const std::string &path, + + void WebsocketServer::CloseAllWsSession() + { +- std::lock_guard lock(m_mutex); ++ WriteGuard lock(m_mutex); + for (auto it = m_wsis.begin(); it != m_wsis.end(); ++it) { + free(it->second.buf); + close(it->second.pipes.at(0)); +@@ -172,15 +169,10 @@ void WebsocketServer::CloseAllWsSession() + m_wsis.clear(); + } + +-void WebsocketServer::CloseWsSession(struct lws *wsi) ++void WebsocketServer::CloseWsSession(int socketID) + { +- const int WAIT_PERIOD_MS = 50; +- +- auto it = m_wsis.find(wsi); ++ auto it = m_wsis.find(socketID); + if (it != m_wsis.end()) { +- while (it->second.GetProcessingStatus()) { +- std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_PERIOD_MS)); +- } + free(it->second.buf); + close(it->second.pipes.at(0)); + close(it->second.pipes.at(1)); +@@ -191,6 +183,21 @@ void WebsocketServer::CloseWsSession(struct lws *wsi) + } + } + ++void WebsocketServer::RecordSession(struct lws *wsi) ++{ ++ m_activeSession.insert(wsi); ++} ++ ++void WebsocketServer::RemoveSession(struct lws *wsi) ++{ ++ m_activeSession.erase(wsi); ++} ++ ++bool WebsocketServer::IsValidSession(struct lws *wsi) ++{ ++ return m_activeSession.count(wsi) != 0; ++} ++ + int WebsocketServer::DumpHandshakeInfo(struct lws *wsi) noexcept + { + int read_pipe_fd[PIPE_FD_NUM]; +@@ -200,15 +207,17 @@ int WebsocketServer::DumpHandshakeInfo(struct lws *wsi) noexcept + + session_data session; + session.pipes = std::array { read_pipe_fd[0], read_pipe_fd[1] }; +- m_wsis.insert(std::make_pair(wsi, session)); +- m_wsis[wsi].buf = (unsigned char *)util_common_calloc_s(LWS_PRE + MAX_MSG_BUFFER_SIZE + 1); +- if (m_wsis[wsi].buf == nullptr) { ++ ++ int socketID = lws_get_socket_fd(wsi); ++ m_wsis.insert(std::make_pair(socketID, std::move(session))); ++ m_wsis[socketID].buf = (unsigned char *)util_common_calloc_s(LWS_PRE + MAX_MSG_BUFFER_SIZE + 1); ++ if (m_wsis[socketID].buf == nullptr) { + ERROR("Out of memory"); + return -1; + } +- m_wsis[wsi].buf_mutex = new std::mutex; +- m_wsis[wsi].sended_mutex = new std::mutex; +- m_wsis[wsi].SetProcessingStatus(false); ++ m_wsis[socketID].buf_mutex = new std::mutex; ++ m_wsis[socketID].sended_mutex = new std::mutex; ++ m_wsis[socketID].SetProcessingStatus(false); + + int len; + char buf[MAX_BUF_LEN] { 0 }; +@@ -216,7 +225,7 @@ int WebsocketServer::DumpHandshakeInfo(struct lws *wsi) noexcept + lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI); + if (strlen(buf) == 0) { + ERROR("invalid url"); +- CloseWsSession(wsi); ++ CloseWsSession(socketID); + return -1; + } + +@@ -228,14 +237,15 @@ int WebsocketServer::DumpHandshakeInfo(struct lws *wsi) noexcept + !m_handler.IsValidMethod(vec.at(1)) || + !cache->IsValidToken(vec.at(2))) { + ERROR("invalid url(%s): incorrect format!", buf); +- CloseWsSession(wsi); ++ CloseWsSession(socketID); + return -1; + } + + std::thread streamTh([ = ]() { +- StreamTask(&m_handler, wsi, vec.at(1), vec.at(2), m_wsis[wsi].pipes.at(0)).Run(); ++ StreamTask(&m_handler, wsi, vec.at(1), vec.at(2), m_wsis[socketID].pipes.at(0)).Run(); + }); + streamTh.detach(); ++ RecordSession(wsi); + int n = 0; + const unsigned char *c = nullptr; + do { +@@ -260,7 +270,7 @@ int WebsocketServer::DumpHandshakeInfo(struct lws *wsi) noexcept + + int WebsocketServer::Wswrite(struct lws *wsi, void *in, size_t len) + { +- auto it = m_wsis.find(wsi); ++ auto it = m_wsis.find(lws_get_socket_fd(wsi)); + if (it != m_wsis.end()) { + if (it->second.close) { + DEBUG("websocket session disconnected"); +@@ -286,9 +296,9 @@ int WebsocketServer::Wswrite(struct lws *wsi, void *in, size_t len) + return 0; + } + +-void WebsocketServer::Receive(struct lws *wsi, void *in, size_t len) ++void WebsocketServer::Receive(int socketID, void *in, size_t len) + { +- if (m_wsis.find(wsi) == m_wsis.end()) { ++ if (m_wsis.find(socketID) == m_wsis.end()) { + ERROR("invailed websocket session!"); + return; + } +@@ -298,20 +308,20 @@ void WebsocketServer::Receive(struct lws *wsi, void *in, size_t len) + return; + } + +- if (write(m_wsis[wsi].pipes.at(1), (void *)((char *)in + 1), len - 1) < 0) { ++ if (write(m_wsis[socketID].pipes.at(1), (void *)((char *)in + 1), len - 1) < 0) { + ERROR("sub write over!"); + return; + } + } + +-void WebsocketServer::SetLwsSendedFlag(struct lws *wsi, bool sended) ++void WebsocketServer::SetLwsSendedFlag(int socketID, bool sended) + { +- auto it = m_wsis.find(wsi); +- if (it != m_wsis.end()) { +- it->second.sended_mutex->lock(); +- it->second.sended = sended; +- it->second.sended_mutex->unlock(); ++ if (m_wsis.count(socketID) == 0) { ++ return; + } ++ m_wsis[socketID].sended_mutex->lock(); ++ m_wsis[socketID].sended = sended; ++ m_wsis[socketID].sended_mutex->unlock(); + } + + int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, +@@ -323,7 +333,7 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, + // asking to upgrade the connection to a websocket one. + return -1; + case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: { +- std::lock_guard lock(m_mutex); ++ WriteGuard lock(m_mutex); + if (WebsocketServer::GetInstance()->DumpHandshakeInfo(wsi)) { + // return non-zero here and kill the connection + return -1; +@@ -335,22 +345,27 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, + } + break; + case LWS_CALLBACK_SERVER_WRITEABLE: { +- std::lock_guard lock(m_mutex); ++ ReadGuard lock(m_mutex); ++ int socketID = lws_get_socket_fd(wsi); + if (WebsocketServer::GetInstance()->Wswrite(wsi, in, len)) { +- WebsocketServer::GetInstance()->SetLwsSendedFlag(wsi, true); ++ WebsocketServer::GetInstance()->SetLwsSendedFlag(socketID, true); ++ // return nonzero from the user callback to close the connection ++ // and callback with the reason of LWS_CALLBACK_CLOSED + return -1; + } +- WebsocketServer::GetInstance()->SetLwsSendedFlag(wsi, true); ++ WebsocketServer::GetInstance()->SetLwsSendedFlag(socketID, true); + } + break; + case LWS_CALLBACK_RECEIVE: { +- std::lock_guard lock(m_mutex); +- WebsocketServer::GetInstance()->Receive(wsi, (char *)in, len); ++ ReadGuard lock(m_mutex); ++ WebsocketServer::GetInstance()->Receive(lws_get_socket_fd(wsi), (char *)in, len); + } + break; + case LWS_CALLBACK_CLOSED: { +- std::lock_guard lock(m_mutex); +- WebsocketServer::GetInstance()->CloseWsSession(wsi); ++ WriteGuard lock(m_mutex); ++ DEBUG("connection has been closed"); ++ WebsocketServer::GetInstance()->RemoveSession(wsi); ++ WebsocketServer::GetInstance()->CloseWsSession(lws_get_socket_fd(wsi)); + } + break; + default: +@@ -363,8 +378,7 @@ void WebsocketServer::ServiceWorkThread(int threadid) + { + int n = 0; + while (n >= 0 && !m_force_exit) { +- n = lws_service(m_context, 50); +- std::this_thread::sleep_for(std::chrono::milliseconds(1)); ++ n = lws_service(m_context, 0); + } + } + +@@ -396,20 +410,19 @@ void WebsocketServer::Wait() + } + + namespace { +-auto PrepareWsiSession(struct lws *wsi) -> session_data * ++auto PrepareWsiSession(int socketID) -> session_data * + { + WebsocketServer *server = WebsocketServer::GetInstance(); +- server->LockAllWsSession(); ++ server->ReadLockAllWsSession(); + +- auto itor = server->GetWsisData().find(wsi); ++ auto itor = server->GetWsisData().find(socketID); + if (itor == server->GetWsisData().end()) { + ERROR("invalid session!"); + server->UnlockAllWsSession(); + return nullptr; + } +- itor->second.SetProcessingStatus(true); ++ server->SetLwsSendedFlag(socketID, false); + server->UnlockAllWsSession(); +- server->SetLwsSendedFlag(wsi, false); + + return &itor->second; + } +@@ -450,15 +463,13 @@ void EnsureWrited(struct lws *wsi, session_data *session) + } + std::this_thread::sleep_for(std::chrono::milliseconds(TRIGGER_PERIOD_MS)); + } +- +- session->SetProcessingStatus(false); + } + + ssize_t WsWriteToClient(void *context, const void *data, size_t len, WebsocketChannel channel) + { + struct lws *wsi = static_cast(context); + +- session_data *session = PrepareWsiSession(wsi); ++ session_data *session = PrepareWsiSession(lws_get_socket_fd(wsi)); + if (session == nullptr) { + return 0; + } +@@ -487,15 +498,20 @@ int closeWsConnect(void *context, char **err) + struct lws *wsi = static_cast(context); + + WebsocketServer *server = WebsocketServer::GetInstance(); +- auto it = server->GetWsisData().find(wsi); ++ server->ReadLockAllWsSession(); ++ auto it = server->GetWsisData().find(lws_get_socket_fd(wsi)); + if (it == server->GetWsisData().end()) { ++ server->UnlockAllWsSession(); + ERROR("websocket session not exist"); + return -1; + } ++ + it->second.close = true; + // close websocket session +- lws_callback_on_writable(wsi); ++ if (server->IsValidSession(wsi)) { ++ lws_callback_on_writable(wsi); ++ } ++ server->UnlockAllWsSession(); ++ + return 0; + } +- +- +diff --git a/src/daemon/entry/cri/websocket/service/ws_server.h b/src/daemon/entry/cri/websocket/service/ws_server.h +index 1370c552..cb431f7f 100644 +--- a/src/daemon/entry/cri/websocket/service/ws_server.h ++++ b/src/daemon/entry/cri/websocket/service/ws_server.h +@@ -17,6 +17,7 @@ + #define DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_WS_SERVER_H + #include + #include ++#include + #include + #include + #include +@@ -26,6 +27,7 @@ + #include "route_callback_register.h" + #include "url.h" + #include "errors.h" ++#include "read_write_lock.h" + + #define MAX_ECHO_PAYLOAD 4096 + #define MAX_ARRAY_LEN 2 +@@ -71,10 +73,11 @@ public: + void Shutdown(); + void RegisterCallback(const std::string &path, std::shared_ptr callback); + url::URLDatum GetWebsocketUrl(); +- std::unordered_map &GetWsisData(); +- void SetLwsSendedFlag(struct lws *wsi, bool sended); +- void LockAllWsSession(); ++ std::unordered_map &GetWsisData(); ++ void SetLwsSendedFlag(int socketID, bool sended); ++ void ReadLockAllWsSession(); + void UnlockAllWsSession(); ++ bool IsValidSession(struct lws *wsi); + + private: + WebsocketServer(); +@@ -85,17 +88,19 @@ private: + std::vector split(std::string str, char r); + static void EmitLog(int level, const char *line); + int CreateContext(); +- inline void Receive(struct lws *client, void *in, size_t len); ++ inline void Receive(int socketID, void *in, size_t len); + int Wswrite(struct lws *wsi, void *in, size_t len); + inline int DumpHandshakeInfo(struct lws *wsi) noexcept; + static int Callback(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len); + void ServiceWorkThread(int threadid); +- void CloseWsSession(struct lws *wsi); ++ void CloseWsSession(int socketID); + void CloseAllWsSession(); ++ void RecordSession(struct lws *wsi); ++ void RemoveSession(struct lws *wsi); + + private: +- static std::mutex m_mutex; ++ static RWMutex m_mutex; + static struct lws_context *m_context; + volatile int m_force_exit = 0; + std::thread m_pthread_service; +@@ -104,7 +109,8 @@ private: + { NULL, NULL, 0, 0 } + }; + RouteCallbackRegister m_handler; +- static std::unordered_map m_wsis; ++ static std::unordered_map m_wsis; ++ static std::unordered_set m_activeSession; + url::URLDatum m_url; + int m_listenPort; + }; +diff --git a/src/utils/cpputils/read_write_lock.cc b/src/utils/cpputils/read_write_lock.cc +new file mode 100644 +index 00000000..c9f94dc8 +--- /dev/null ++++ b/src/utils/cpputils/read_write_lock.cc +@@ -0,0 +1,59 @@ ++/****************************************************************************** ++ * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved. ++ * iSulad licensed under the Mulan PSL v2. ++ * You can use this software according to the terms and conditions of the Mulan PSL v2. ++ * You may obtain a copy of Mulan PSL v2 at: ++ * http://license.coscl.org.cn/MulanPSL2 ++ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++ * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++ * PURPOSE. ++ * See the Mulan PSL v2 for more details. ++ * Author: wujing ++ * Create: 2021-01-18 ++ * Description: provide read write lock implementation ++ *********************************************************************************/ ++ ++#include "read_write_lock.h" ++ ++void RWMutex::rdlock() ++{ ++ std::unique_lock autoLock(m_mutex); ++ ++m_waiting_readers; ++ m_read_cond.wait(autoLock, [&]() { ++ return m_waiting_writers == 0 && m_status >= 0; ++ }); ++ --m_waiting_readers; ++ ++m_status; ++} ++ ++void RWMutex::wrlock() ++{ ++ std::unique_lock autoLock(m_mutex); ++ ++m_waiting_writers; ++ m_write_cond.wait(autoLock, [&]() { ++ return m_status == 0; ++ }); ++ --m_waiting_writers; ++ --m_status; ++} ++ ++void RWMutex::unlock() ++{ ++ std::unique_lock autoLock(m_mutex); ++ ++ if (m_status == -1) { // one writer ++ m_status = 0; ++ } else if (m_status > 0) { // one or multiple readers ++ --m_status; ++ } else { // neither readers nor writers ++ return; ++ } ++ ++ if (m_waiting_writers > 0) { ++ if (m_status == 0) { ++ m_write_cond.notify_one(); ++ } ++ } else { ++ m_read_cond.notify_all(); ++ } ++} +diff --git a/src/utils/cpputils/read_write_lock.h b/src/utils/cpputils/read_write_lock.h +new file mode 100644 +index 00000000..0149e3a5 +--- /dev/null ++++ b/src/utils/cpputils/read_write_lock.h +@@ -0,0 +1,90 @@ ++/****************************************************************************** ++ * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved. ++ * iSulad licensed under the Mulan PSL v2. ++ * You can use this software according to the terms and conditions of the Mulan PSL v2. ++ * You may obtain a copy of Mulan PSL v2 at: ++ * http://license.coscl.org.cn/MulanPSL2 ++ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++ * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++ * PURPOSE. ++ * See the Mulan PSL v2 for more details. ++ * Author: wujing ++ * Create: 2021-01-18 ++ * Description: provide read write lock definition ++ *********************************************************************************/ ++#ifndef UTILS_CPPUTILS_READ_WRITE_LOCK_H ++#define UTILS_CPPUTILS_READ_WRITE_LOCK_H ++ ++#include ++#include ++#include ++#include ++ ++class RWMutex { ++public: ++ RWMutex() = default; ++ ~RWMutex() = default; ++ RWMutex(const RWMutex &) = delete; ++ RWMutex(RWMutex &&) = delete; ++ RWMutex &operator = (const RWMutex &) = delete; ++ RWMutex &operator = (RWMutex &&) = delete; ++ ++ void rdlock(); ++ void wrlock(); ++ void unlock(); ++ ++private: ++ volatile long m_status {0}; ++ volatile long m_waiting_readers {0}; ++ volatile long m_waiting_writers {0}; ++ std::mutex m_mutex; ++ std::condition_variable m_read_cond; ++ std::condition_variable m_write_cond; ++}; ++ ++template ++class ReadGuard { ++public: ++ explicit ReadGuard(RWMutexType &lock) : m_lock(lock) ++ { ++ m_lock.rdlock(); ++ } ++ virtual ~ReadGuard() ++ { ++ m_lock.unlock(); ++ } ++ ++ ReadGuard() = delete; ++ ReadGuard(const ReadGuard &) = delete; ++ ReadGuard &operator=(const ReadGuard &) = delete; ++ ReadGuard(const ReadGuard &&) = delete; ++ ReadGuard &operator = (const ReadGuard &&) = delete; ++ ++private: ++ RWMutexType &m_lock; ++}; ++ ++ ++template ++class WriteGuard { ++public: ++ explicit WriteGuard(RWMutexType &lock) : m_lock(lock) ++ { ++ m_lock.wrlock(); ++ } ++ virtual ~WriteGuard() ++ { ++ m_lock.unlock(); ++ } ++ ++ WriteGuard() = delete; ++ WriteGuard(const WriteGuard &) = delete; ++ WriteGuard &operator=(const WriteGuard &) = delete; ++ WriteGuard(const WriteGuard &&) = delete; ++ WriteGuard &operator = (const WriteGuard &&) = delete; ++ ++private: ++ RWMutexType &m_lock; ++}; ++ ++#endif // UTILS_CPPUTILS_READ_WRITE_LOCK_H +diff --git a/src/utils/cpputils/stoppable_thread.cc b/src/utils/cpputils/stoppable_thread.cc +index 0d15aa01..68f6d9b2 100644 +--- a/src/utils/cpputils/stoppable_thread.cc ++++ b/src/utils/cpputils/stoppable_thread.cc +@@ -22,7 +22,6 @@ StoppableThread &StoppableThread::operator=(StoppableThread &&obj) + return *this; + } + +- + bool StoppableThread::stopRequested() + { + if (m_future_obj.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) { +@@ -35,6 +34,3 @@ void StoppableThread::stop() + { + m_exit_signal.set_value(); + } +- +- +- +-- +2.25.1 + diff --git a/iSulad.spec b/iSulad.spec index be05e2fc80b10e375c4621f0b39ffba3439836fd..50374776afca5c0c5963d8bced1026ef500ea6f2 100644 --- a/iSulad.spec +++ b/iSulad.spec @@ -1,5 +1,5 @@ %global _version 2.0.8 -%global _release 20201230.155843.git6557a6eb +%global _release 20210118.195254.git077e10f2 %global is_systemd 1 Name: iSulad @@ -12,6 +12,16 @@ Source: https://gitee.com/openeuler/iSulad/repository/archive/v%{version}.tar BuildRoot: {_tmppath}/iSulad-%{version} ExclusiveArch: x86_64 aarch64 +Patch1: 0001-make-thread-detach-to-avoid-resource-leak.patch +Patch2: 0002-devmapper-fix-udev-wait-thread-resource-leak.patch +Patch3: 0003-clean-code-fix-clean-code.patch +Patch4: 0004-judge-isula-load-file-exists.patch +Patch5: 0005-modify-image_load.sh-CI-to-test-file-not-exist.patch +Patch6: 0006-do-not-pause-container-when-copy.patch +Patch7: 0007-add-testcases-for-isula-cp.patch +Patch8: 0008-image_cb-rename-the-function-isula_-docker_-to-do_.patch +Patch9: 0009-fix-small-probability-of-coredump-in-CRI-streaming-s.patch + %ifarch x86_64 aarch64 Provides: libhttpclient.so()(64bit) Provides: libisula.so()(64bit) @@ -213,6 +223,12 @@ fi %endif %changelog +* Mon Jan 18 2020 lifeng - 2.0.8-20210118.195254.git077e10f2 +- Type: sync from upstream +- ID: NA +- SUG: NA +- DESC: update from master + * Wed Dec 30 2020 lifeng - 2.0.8-20201230.155843.git6557a6eb - Type: update to v2.0.8 - ID: NA