From 764912e54ebbb8e06e4b7dc7f8c81a364550a435 Mon Sep 17 00:00:00 2001 From: KeKe Date: Sat, 21 Dec 2024 19:00:36 +0800 Subject: [PATCH] =?UTF-8?q?dcc=20=E6=94=AF=E6=8C=81build?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit (cherry picked from commit c269a669d89a00a7511d647118d6672a5fb2ee93) --- src/executor/executor.c | 956 +++++++++++++++++- src/executor/executor.h | 94 +- src/executor/executor_defs.h | 4 +- src/executor/executor_utils.c | 100 ++ src/executor/executor_utils.h | 14 + src/interface/dcc_interface.h | 6 + src/server/srv_api.c | 57 +- src/server/srv_logger.c | 8 +- src/storage/gstor/gstor_executor.c | 86 +- src/storage/gstor/gstor_executor.h | 6 +- src/storage/gstor/gstor_instance.h | 4 + .../gstor/zekernel/kernel/backup/bak_common.c | 8 + src/storage/storage.c | 27 +- src/storage/storage.h | 11 +- 14 files changed, 1345 insertions(+), 36 deletions(-) diff --git a/src/executor/executor.c b/src/executor/executor.c index b8556bd..763b515 100644 --- a/src/executor/executor.c +++ b/src/executor/executor.c @@ -46,6 +46,11 @@ static exc_cb_consensus_proc_t g_cb_consensus_proc_notify = NULL; static dcc_cb_status_notify_t g_cb_status_notify = NULL; static mem_pool_t *g_exc_mem_pool = NULL; +// build info +static exc_build_info_t g_build_info = {0}; + +static volatile bool32 g_truncate_stopped = CM_FALSE; + #define DCC_SEQUENCE_START "0000000000" static void exc_dealing_put(msg_entry_t* entry); @@ -107,13 +112,13 @@ static status_t exc_set_dcf_param(param_value_t* dcf_config) } int len = sprintf_s(dcf_data_path, EXC_PATH_MAX_SIZE, "%s/dcf_data", (char *)data_path.str_val); if (len < 0 || len > EXC_PATH_MAX_SIZE) { - LOG_RUN_ERR("[EXC] Setting data path crosses the max size."); + LOG_RUN_ERR("[EXC] Setting dcf data path fail, len=%d.", len); return CM_ERROR; } int ret = dcf_set_param("DATA_PATH", dcf_data_path); if (ret != CM_SUCCESS) { - LOG_RUN_ERR("[EXC] Setting data path is failed."); + LOG_RUN_ERR("[EXC] Setting dcf data path is failed."); return CM_ERROR; } @@ -143,6 +148,12 @@ static status_t exc_set_dcf_applied_index(void) } if (!eof) { + if (stg_value.len >= CM_MAX_NUM_PART_BUFF) { + LOG_RUN_ERR("[EXC] exc get dcf applied index value string len is too large, len=%u.", stg_value.len); + return CM_ERROR; + } + char tmp_str[CM_MAX_NUM_PART_BUFF] = {0}; + MEMS_RETURN_IFERR(memcpy_s(tmp_str, CM_MAX_NUM_PART_BUFF, stg_value.str, stg_value.len)); CM_RETURN_IFERR(cm_str2uint64(stg_value.str, &applied_index)); if (dcf_set_applied_index(EXC_STREAM_ID_DEFAULT, applied_index) != CM_SUCCESS) { CM_THROW_ERROR(ERR_EXC_INIT_FAILED, "it sets local applied index"); @@ -384,7 +395,7 @@ static bool32 exc_need_truncate(uint64 min_applied_idx, uint64 *first_index_kept if (total_disk_size != 0 && ((double)avail_disk_size) / total_disk_size <= EXC_DISK_AVAIL_RATE) { *first_index_kept = g_set_stg_applied_idx; g_min_applied_idx_frozen_cnt = 0; - LOG_DEBUG_WAR("[EXC] exc need truncate, set first_index_kept as stg_applied_idx:%llu", + LOG_RUN_WAR("[EXC] exc need truncate, set first_index_kept as stg_applied_idx:%llu", g_set_stg_applied_idx); return CM_TRUE; } @@ -401,6 +412,10 @@ static status_t exc_dcf_truncate(void) if (dcf_get_cluster_min_applied_idx(EXC_STREAM_ID_DEFAULT, (unsigned long long*)&min_applied_idx) != CM_SUCCESS) { return CM_ERROR; } + if (g_truncate_stopped) { + LOG_DEBUG_INF("[EXC] truncate is stopped now, maybe in building."); + return CM_SUCCESS; + } if (exc_need_truncate(min_applied_idx, &first_index_kept) && first_index_kept <= g_set_stg_applied_idx) { int ret = dcf_truncate(EXC_STREAM_ID_DEFAULT, first_index_kept); if (ret != CM_SUCCESS) { @@ -487,7 +502,8 @@ int exc_cb_consensus_follow_notify(unsigned int stream_id, unsigned long long in uint32 total_size = size + CM_SEQUENCE_OFFSET; msg_entry_t *entry = exc_add_entry(buf, total_size, index, key); if (entry == NULL) { - LOG_RUN_ERR("[EXC] Add entry failed when it executes consensus-notify function."); + LOG_RUN_ERR("[EXC] Add entry failed when it executes consensus-notify function, total_size = %u, index=%llu", + total_size, index); return CM_ERROR; } @@ -534,6 +550,854 @@ int exc_cb_status_notify(unsigned int stream_id, dcf_role_t new_role) return CM_SUCCESS; } +status_t exc_join_datadir_and_subdir(const char *subdir, char *joined_dir) +{ + param_value_t data_path; + char real_data_path[CM_FILE_NAME_BUFFER_SIZE] = {0}; + CM_RETURN_IFERR(srv_get_param(DCC_PARAM_DATA_PATH, &data_path)); + if (CM_IS_EMPTY_STR(data_path.str_val)) { + LOG_RUN_ERR("[EXC]data_path is empty."); + return CM_ERROR; + } + CM_RETURN_IFERR(realpath_file(data_path.str_val, real_data_path, CM_FILE_NAME_BUFFER_SIZE)); + + PRTS_RETURN_IFERR(snprintf_s(joined_dir, CM_FILE_NAME_BUFFER_SIZE, CM_FILE_NAME_BUFFER_SIZE - 1, "%s/%s", + real_data_path, subdir)); + + return CM_SUCCESS; +} + +static status_t exc_follower_set_build_status_to_file(uint32 status) +{ + char file_name[CM_FILE_NAME_BUFFER_SIZE] = {0}; + if (exc_join_datadir_and_subdir(DCC_BUILD_STATUS_FILE, file_name) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] set_build_status: join datapath and file=%s failed.", DCC_BUILD_STATUS_FILE); + return CM_ERROR; + } + + char buf[CM_BUFLEN_32] = {0}; + PRTS_RETURN_IFERR(snprintf_s(buf, CM_BUFLEN_32, CM_BUFLEN_32 - 1, "%u", status)); + + int fd; + status_t ret = cm_create_file(file_name, O_RDWR | O_BINARY | O_APPEND | O_SYNC, &fd); + if (ret != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] create build status file failed."); + return CM_ERROR; + } + + ret = cm_write_file(fd, (const void *)buf, (int32)sizeof(status)); + if (ret != CM_SUCCESS) { + cm_close_file(fd); + LOG_RUN_ERR("[EXC] write build status file failed."); + return CM_ERROR; + } + + cm_close_file(fd); + LOG_RUN_INF("[EXC] write build status=%u to file=%s success.", status, file_name); + return CM_SUCCESS; +} + +status_t exc_follower_remove_build_status_file(void) +{ + char file_name[CM_FILE_NAME_BUFFER_SIZE] = {0}; + if (exc_join_datadir_and_subdir(DCC_BUILD_STATUS_FILE, file_name) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] remove_build_status_file: join datapath and file=%s failed.", DCC_BUILD_STATUS_FILE); + return CM_ERROR; + } + + status_t ret = CM_SUCCESS; + if (cm_file_exist(file_name)) { + ret = cm_remove_file(file_name); + } + LOG_RUN_INF("[EXC] remove build status file=%s end, ret=%d.", file_name, ret); + return ret; +} + +void exc_remove_subdir_of_datadir(const char *subdir) +{ + LOG_RUN_INF("[EXC]remove subdir=%s start...", subdir); + char rm_path[CM_FILE_NAME_BUFFER_SIZE] = {0}; + if (exc_join_datadir_and_subdir(subdir, rm_path) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC]remove: join datapath and subdir=%s failed.", subdir); + return; + } + + if (cm_dir_exist(rm_path)) { + (void)exc_remove_dir(rm_path); + } + LOG_RUN_INF("[EXC]remove subdir=%s end.", subdir); +} + +status_t exc_send_build_cmd(exc_build_cmd_t cmd, uint32 dest_node, uint32 serial_number) +{ + if (dest_node == EXC_INVALID_NODE_ID) { + LOG_RUN_ERR("[EXC] send_build_cmd, dest_node is invalid, cmd:%d", cmd); + return CM_ERROR; + } + LOG_RUN_INF("[EXC] send_build_cmd, cmd=%d, dest_node=%u", cmd, dest_node); + bool32 diff_endian = (bool32)dcf_is_diff_endian(DCC_STREAM_ID, dest_node); + exc_build_msg_head_t head; + head.version = (uint32)(diff_endian ? cs_reverse_uint32(EXC_BUILD_CUR_VERSION) : EXC_BUILD_CUR_VERSION); + head.cmd = diff_endian ? (exc_build_cmd_t)cs_reverse_uint32((uint32)cmd) : cmd; + head.serial_number = diff_endian ? cs_reverse_uint32(serial_number) : serial_number; + return (status_t)dcf_send_msg(DCC_STREAM_ID, dest_node, (const char *)&head, sizeof(exc_build_msg_head_t)); +} + +status_t exc_send_big_build_cmd_by_body(exc_build_cmd_t cmd, uint32 dest_node, uint32 serial_number, const char *str) +{ + if (dest_node == EXC_INVALID_NODE_ID) { + LOG_RUN_ERR("[EXC] exc_send_big_build_cmd_by_body, dest_node is invalid, cmd:%d", cmd); + return CM_ERROR; + } + LOG_RUN_INF("[EXC] exc_send_big_build_cmd_by_body, cmd=%d, dest_node=%u", cmd, dest_node); + bool32 diff_endian = (bool32)dcf_is_diff_endian(DCC_STREAM_ID, dest_node); + exc_build_msg_t msg; + msg.head.version = (uint32)(diff_endian ? cs_reverse_uint32(EXC_BUILD_CUR_VERSION) : EXC_BUILD_CUR_VERSION); + msg.head.cmd = diff_endian ? (exc_build_cmd_t)cs_reverse_uint32((uint32)cmd) : cmd; + msg.head.serial_number = diff_endian ? cs_reverse_uint32(serial_number) : serial_number; + uint32 size = strlen(str); + msg.head.cur_size = diff_endian ? cs_reverse_uint32(size) : size; + if (snprintf_s(msg.body, BUILD_PKT_MAX_BODY_SIZE, + BUILD_PKT_MAX_BODY_SIZE - 1, "%s", str) == -1) { + LOG_RUN_ERR("[EXC] save big_build_cmd str=%s failed.", str); + return CM_ERROR; + } + return (status_t)dcf_send_msg(DCC_STREAM_ID, dest_node, (const char *)&msg, sizeof(exc_build_msg_t)); +} + +void exc_init_build_msg(exc_build_msg_t *msg, int32 file_size, bool32 diff_endian) +{ + msg->head.version = (uint32)(diff_endian ? cs_reverse_uint32(EXC_BUILD_CUR_VERSION) : EXC_BUILD_CUR_VERSION); + msg->head.cmd = (exc_build_cmd_t)(diff_endian ? cs_reverse_uint32((uint32)BUILD_PKT_SEND) : BUILD_PKT_SEND); + msg->head.filesize = (uint32)(diff_endian ? cs_reverse_int32(file_size) : file_size); +} + +status_t exc_send_one_build_file(const char *path, const char *file_name) +{ + if (g_build_info.build_status == BUILD_CANCEL) { + LOG_RUN_ERR("[EXC] build cancel, no need to send build file"); + return CM_ERROR; + } + if (CM_STR_EQUAL(file_name, ".") || CM_STR_EQUAL(file_name, "..")) { + LOG_RUN_INF("[EXC] path=%s file=%s, no need send.", path, file_name); + return CM_SUCCESS; + } + + char full_file_name[CM_FILE_NAME_BUFFER_SIZE] = {0}; + PRTS_RETURN_IFERR(snprintf_s(full_file_name, CM_FILE_NAME_BUFFER_SIZE, CM_FILE_NAME_BUFFER_SIZE - 1, "%s/%s", + path, file_name)); + int32 fd = -1; + CM_RETURN_IFERR(cm_open_file(full_file_name, O_RDONLY | O_BINARY, &fd)); + int32 file_size = (int32)cm_file_size(fd); + if (file_size < 0) { + cm_close_file(fd); + LOG_RUN_ERR("[EXC] backup file=%s size=%d error.", file_name, file_size); + return CM_ERROR; + } + + uint32 dest_node = g_build_info.follower_id; + bool32 diff_endian = (bool32)dcf_is_diff_endian(DCC_STREAM_ID, dest_node); + exc_build_msg_t msg; + exc_init_build_msg(&msg, file_size, diff_endian); + PRTS_RETURN_IFERR(snprintf_s(msg.head.filename, CM_MAX_NAME_LEN, CM_MAX_NAME_LEN - 1, "%s", file_name)); + uint32 offset = 0; + int32 read_size; + uint32 remain_size = (uint32)file_size; + while (remain_size > 0) { + uint32 cur_size = (remain_size >= BUILD_PKT_MAX_BODY_SIZE) ? BUILD_PKT_MAX_BODY_SIZE : remain_size; + if (cm_pread_file(fd, msg.body, cur_size, offset, &read_size) != CM_SUCCESS || + (uint32)read_size != cur_size) { + cm_close_file(fd); + LOG_RUN_ERR("[EXC] read file=%s size=%d failed, offset=%u.", file_name, file_size, offset); + return CM_ERROR; + } + msg.head.cur_size = diff_endian ? cs_reverse_uint32(cur_size) : cur_size; + msg.head.cur_offset = diff_endian ? cs_reverse_uint32(offset) : offset; + if (g_build_info.send_serial_number > g_build_info.recv_serial_number + BUILD_PKT_CREDIT_NUM) { + (void)cm_event_timedwait(&g_build_info.send_event, CM_SLEEP_50_FIXED); + } + uint32 serial_number = g_build_info.send_serial_number++; + msg.head.serial_number = diff_endian ? cs_reverse_uint32(serial_number) : serial_number; + if (dcf_send_msg(DCC_STREAM_ID, dest_node, (const char *)&msg, sizeof(exc_build_msg_t)) != CM_SUCCESS) { + cm_close_file(fd); + LOG_RUN_ERR("[EXC] send file=%s size=%dfailed, offset=%u.", file_name, file_size, offset); + return CM_ERROR; + } + + offset += cur_size; + remain_size -= cur_size; + } + + cm_close_file(fd); + LOG_RUN_INF("[EXC] send build path=%s file=%s size=%d success.", path, file_name, file_size); + return CM_SUCCESS; +} + +status_t exc_make_subdir_of_datadir(const char *subdir) +{ + char make_path[CM_FILE_NAME_BUFFER_SIZE] = {0}; + if (exc_join_datadir_and_subdir(subdir, make_path) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] make: join datapath and subdir=%s failed.", DCC_BACKUP_DIR); + return CM_ERROR; + } + if (cm_create_dir(make_path) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] create_dir=%s failed.", make_path); + return CM_ERROR; + } + LOG_RUN_INF("[EXC] create_dir=%s success.", make_path); + return CM_SUCCESS; +} + +status_t exc_follower_build_start_proc(void) +{ + exc_remove_subdir_of_datadir(DCC_BACKUP_DIR); + if (exc_make_subdir_of_datadir(DCC_BACKUP_DIR) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] exc_make_subdir_of_datadir failed."); + return CM_ERROR; + } + if (exc_send_build_cmd(BUILD_START_REQ, g_build_info.leader_id, 0) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] send_build_cmd BUILD_START_REQ failed."); + return CM_ERROR; + } + g_build_info.build_status = FOLLOWER_BUILD_PKT_RECV; + g_build_info.last_update_time = cm_clock_now_ms(); + + LOG_RUN_INF("[EXC] follower_build_start_proc ok."); + return CM_SUCCESS; +} + +status_t exc_follower_build_pkt_recv_end_proc(void) +{ + LOG_RUN_INF("[EXC] shutdown db start..."); + db_shutdown(); + LOG_RUN_INF("[EXC] shutdown db success."); + exc_remove_subdir_of_datadir(DCC_GSTOR_DIR); + exc_remove_subdir_of_datadir(DCC_DCFDATA_DIR); + char bak_path[CM_FILE_NAME_BUFFER_SIZE] = {0}; + if (exc_join_datadir_and_subdir(DCC_BACKUP_DIR, bak_path) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] restore: join datapath and subdir=%s failed.", DCC_BACKUP_DIR); + return CM_ERROR; + } + + char new_path[CM_FILE_NAME_BUFFER_SIZE] = {0}; + if (exc_join_datadir_and_subdir(DCC_DATA_DIR, new_path) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] new_path: join datapath and subdir=%s failed.", DCC_DATA_DIR); + return CM_ERROR; + } + LOG_RUN_INF("[EXC] restore start..."); + const char *old_path = (const char *)g_build_info.old_restore_path; + if (exc_restore(bak_path, old_path, (const char *)new_path) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] restore failed, bak_path=%s, old_path=%s, new_path=%s.", bak_path, old_path, new_path); + return CM_ERROR; + } + LOG_RUN_INF("[EXC] restore success, bak_path=%s, old_path=%s, new_path=%s.", bak_path, old_path, new_path); + if (exc_send_build_cmd(BUILD_OK_REQ, g_build_info.leader_id, 0) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] send_build_cmd BUILD_OK_REQ failed."); + return CM_ERROR; + } + if (g_build_info.build_status == BUILD_CANCEL) { + LOG_RUN_ERR("[EXC] build_cancel, pkt_recv_proc failed."); + return CM_ERROR; + } + LOG_RUN_INF("[EXC] exc_follower_build_pkt_recv_end_proc ok."); + return CM_SUCCESS; +} + +void exc_follower_build_start(thread_t *thread) +{ + (void)exc_follower_set_build_status_to_file(FOLLOWER_BUILD_START); + if (exc_follower_build_start_proc() != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] follower_build_start_proc failed."); + thread->closed = 1; + } else { + (void)exc_follower_set_build_status_to_file(FOLLOWER_BUILD_PKT_RECV); + } +} + +void exc_follower_build_pkt_recv(thread_t *thread) +{ + LOG_DEBUG_INF("[EXC] follower build status= %d", FOLLOWER_BUILD_PKT_RECV); + if ((cm_clock_now_ms() - g_build_info.last_update_time) / MILLISECS_PER_SECOND > + FOLLOWER_BUILD_PKT_RECV_TIMEOUT) { + LOG_RUN_ERR("[EXC] wait build pkt from leader timeout."); + thread->closed = 1; + } +} + +void exc_follower_build_pkt_recv_end(thread_t *thread) +{ + (void)exc_follower_set_build_status_to_file(FOLLOWER_BUILD_PKT_RECV_END); + if (exc_follower_build_pkt_recv_end_proc() != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] follower_build_pkt_recv_end_proc failed, send cancel leader build msg."); + (void)exc_send_build_cmd(BUILD_CANCEL_REQ, g_build_info.leader_id, 0); + thread->closed = 1; + } else { + g_build_info.build_status = FOLLOWER_BUILD_OK_REQ_SEND; + g_build_info.last_update_time = cm_clock_now_ms(); + (void)exc_follower_set_build_status_to_file(FOLLOWER_BUILD_OK_REQ_SEND); + } +} + +void exc_follower_build_ok_req_send(thread_t *thread) +{ + if ((cm_clock_now_ms() - g_build_info.last_update_time) / MILLISECS_PER_SECOND > + FOLLOWER_BUILD_OK_REQ_SEND_TIMEOUT) { + LOG_RUN_WAR("[EXC] wait build ok ack timeout, directly exit and try restart"); + (void)dcf_stop(); + (void)exc_follower_remove_build_status_file(); + exit(0); + } +} + +void exc_follower_build_ok_ack_recv(thread_t *thread) +{ + (void)exc_follower_set_build_status_to_file(FOLLOWER_BUILD_OK_ACK_RECV); + LOG_RUN_INF("[EXC] build ok ack received, exit and restart."); + (void)dcf_stop(); + (void)exc_follower_remove_build_status_file(); + exit(0); +} + +void exc_follower_build_proc(thread_t *thread) +{ + cm_set_thread_name("exc_follower_build"); + LOG_RUN_INF("[EXC] follower_build thread started, tid:%lu, close:%u", thread->id, thread->closed); + + while (!thread->closed) { + if (g_build_info.build_status == FOLLOWER_BUILD_START) { + exc_follower_build_start(thread); + } + + if (g_build_info.build_status == FOLLOWER_BUILD_PKT_RECV) { + exc_follower_build_pkt_recv(thread); + } + + if (g_build_info.build_status == FOLLOWER_BUILD_PKT_RECV_END) { + exc_follower_build_pkt_recv_end(thread); + } + + if (g_build_info.build_status == FOLLOWER_BUILD_OK_REQ_SEND) { + exc_follower_build_ok_req_send(thread); + } + + if (g_build_info.build_status == FOLLOWER_BUILD_OK_ACK_RECV) { + exc_follower_build_ok_ack_recv(thread); + } + + uint32 now_leader = exc_get_leader_id(); + if (g_build_info.leader_id != now_leader) { + LOG_RUN_INF("[EXC] leader=%u changed to %u now, give up building.", g_build_info.leader_id, now_leader); + break; + } + + if (g_build_info.build_status == BUILD_CANCEL) { + LOG_RUN_INF("[EXC] follower build status changed to cancel, give up building."); + cm_sleep(EXC_3X_FIXED * MILLISECS_PER_SECOND); + break; + } + + cm_sleep(CM_SLEEP_1_FIXED); + } + + g_build_info.build_status = BUILD_NONE; + (void)exc_follower_remove_build_status_file(); + dcf_set_exception(DCC_STREAM_ID, DCF_EXCEPTION_MISSING_LOG); // rebuild if build fail. + LOG_RUN_INF("[EXC] follower_build thread closed, tid:%lu, close:%u", thread->id, thread->closed); +} + +void exc_clear_build_file_info(void) +{ + for (uint32 i = 0; i < BUILD_FILE_MAX_NUM; i++) { + g_build_info.build_file[i].filename[0] = '\0'; + g_build_info.build_file[i].fd = -1; + g_build_info.build_file[i].is_write_end = CM_FALSE; + } +} + +int exc_cb_exception_notify(unsigned int strean_id, dcf_exception_t exception) +{ + LOG_RUN_INF("[EXC] dcf exception %d report, build_status=%d.", exception, g_build_info.build_status); + if (exception == DCF_EXCEPTION_MISSING_LOG && g_build_info.build_status == BUILD_NONE) { + cm_close_thread(&g_build_info.thread); + exc_clear_build_file_info(); + g_build_info.build_status = FOLLOWER_BUILD_START; + g_build_info.leader_id = exc_get_leader_id(); + CM_MFENCE; + LOG_RUN_INF("[EXC] dcf log loss and full build is required!"); + CM_RETURN_IFERR(cm_create_thread(exc_follower_build_proc, 0, NULL, &g_build_info.thread)); + } else { + dcf_set_exception(DCC_STREAM_ID, DCF_RUNNING_NORMAL); + LOG_RUN_INF("[EXC] dcf exception has been used, clear it."); + } + return CM_SUCCESS; +} + +status_t exc_send_backup_file(const char *path) +{ +#ifdef WIN32 + intptr_t handle; + struct _finddata_t file_data; + char file_name[CM_MAX_PATH_LEN] = {0}; + char *prefix = (char *)"*"; + + PRTS_RETURN_IFERR(snprintf_s(file_name, CM_MAX_PATH_LEN, CM_MAX_PATH_LEN - 1, "%s/%s", path, prefix)); + + handle = (intptr_t)_findfirst(file_name, &file_data); + if (-1L == handle) { + return CM_ERROR; + } + if (exc_send_one_build_file(path, (char *)file_data.name) != CM_SUCCESS) { + _findclose(handle); + return CM_ERROR; + } + while (_findnext(handle, &file_data) == 0) { + if (exc_send_one_build_file(path, (char *)file_data.name) != CM_SUCCESS) { + _findclose(handle); + return CM_ERROR; + } + } + _findclose(handle); +#else + DIR *dir_ptr = NULL; + struct dirent *dirent_ptr = NULL; + + dir_ptr = opendir(path); + if (dir_ptr == NULL) { + return CM_ERROR; + } + + dirent_ptr = readdir(dir_ptr); + while (dirent_ptr != NULL) { + if (exc_send_one_build_file(path, (char *)dirent_ptr->d_name) != CM_SUCCESS) { + (void)closedir(dir_ptr); + return CM_ERROR; + } + dirent_ptr = readdir(dir_ptr); + } + (void)closedir(dir_ptr); +#endif + return CM_SUCCESS; +} + +status_t exc_leader_build_pkt_send_proc(void) +{ + g_truncate_stopped = CM_TRUE; + if (dcf_pause_rep(DCC_STREAM_ID, g_build_info.follower_id, DCF_MAX_PAUSE_TIME) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] pause rep to follower_id=%u failed.", g_build_info.follower_id); + return CM_ERROR; + } + exc_remove_subdir_of_datadir(DCC_BACKUP_DIR); + CM_MFENCE; + + char bak_path[CM_FILE_NAME_BUFFER_SIZE] = {0}; + if (exc_join_datadir_and_subdir(DCC_BACKUP_DIR, bak_path) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] send: join datapath and subdir=%s failed.", DCC_BACKUP_DIR); + return CM_ERROR; + } + LOG_RUN_INF("[EXC] backup start..."); + if (exc_backup(bak_path)) { + LOG_RUN_ERR("[EXC] backup failed, bak_path=%s.", bak_path); + return CM_ERROR; + } + LOG_RUN_INF("[EXC] backup success, bak_path=%s.", bak_path); + + CM_RETURN_IFERR(exc_send_backup_file(bak_path)); + + char old_path[CM_FILE_NAME_BUFFER_SIZE] = {0}; + if (exc_join_datadir_and_subdir(DCC_DATA_DIR, old_path) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] old_path: join datapath and subdir=%s failed.", DCC_DATA_DIR); + return CM_ERROR; + } + CM_RETURN_IFERR(exc_send_big_build_cmd_by_body(BUILD_PKT_SEND_END, g_build_info.follower_id, 0, old_path)); + if (g_build_info.build_status == BUILD_CANCEL) { + LOG_RUN_ERR("[EXC] build_cancel, send_proc failed"); + return CM_ERROR; + } + LOG_RUN_INF("[EXC] leader_build_pkt_send_proc ok."); + return CM_SUCCESS; +} + +void exc_leader_build_proc(thread_t *thread) +{ + cm_set_thread_name("exc_leader_build"); + LOG_RUN_INF("[EXC]leader_build thread started, tid:%lu, close:%u", thread->id, thread->closed); + if (cm_event_init(&g_build_info.send_event) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] leader_build send_event init failed."); + } + + while (!thread->closed) { + if (g_build_info.build_status == LEADER_BUILD_PKT_SEND) { + if (exc_leader_build_pkt_send_proc() != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] leader_build_pkt_send_proc failed, retry."); + break; + } + g_build_info.build_status = LEADER_BUILD_PKT_SEND_END; + g_build_info.last_update_time = cm_clock_now_ms(); + } + + if (g_build_info.build_status == LEADER_BUILD_PKT_SEND_END) { + if ((cm_clock_now_ms() - g_build_info.last_update_time) / MILLISECS_PER_SECOND > + LEADER_WAIT_FOLLOWER_RESTORE_TIMEOUT) { + LOG_RUN_ERR("[EXC] wait follower restore timeout."); + break; + } + } + + if (g_build_info.build_status == LEADER_BUILD_OK_REQ_RECV) { + if (exc_send_build_cmd(BUILD_OK_ACK, g_build_info.follower_id, 0) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] send_build_cmd BUILD_OK_ACK failed"); + } + LOG_RUN_INF("[EXC] send BUILD_OK_ACK cmd to follower=%u success and build end.", g_build_info.follower_id); + break; + } + + if (!exc_is_leader()) { + LOG_RUN_INF("[EXC] I am not leader now, give up building."); + break; + } + + if (g_build_info.build_status == BUILD_CANCEL) { + LOG_RUN_INF("[EXC] leader build status changed to cancel, give up building."); + break; + } + + cm_sleep(CM_SLEEP_1_FIXED); + } + + cm_event_destory(&g_build_info.send_event); + + if (dcf_pause_rep(DCC_STREAM_ID, g_build_info.follower_id, DCF_MIN_PAUSE_TIME) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] cancel pause rep to node=%u failed.", g_build_info.follower_id); + } + g_build_info.build_status = BUILD_NONE; + g_truncate_stopped = CM_FALSE; + LOG_RUN_INF("[EXC] leader_build follower=%u end, thread closed, tid:%lu, close:%u", + g_build_info.follower_id, thread->id, thread->closed); +} + +bool32 exc_is_build_file_exist(const char *file_name) +{ + for (uint32 i = 0; i < BUILD_FILE_MAX_NUM; i++) { + if (strcmp((const char *)g_build_info.build_file[i].filename, file_name) == 0) { + return CM_TRUE; + } + } + return CM_FALSE; +} + +status_t exc_create_build_file(exc_build_msg_t *buf, int32 *fd, uint32 *pos) +{ + uint32 i; + for (i = 0; i < BUILD_FILE_MAX_NUM; i++) { + if (g_build_info.build_file[i].filename[0] == '\0') { + break; + } + } + if (i >= BUILD_FILE_MAX_NUM) { + LOG_RUN_ERR("[EXC] create_build_file=%s failed, no pos now.", buf->head.filename); + return CM_ERROR; + } + + char bak_path[CM_FILE_NAME_BUFFER_SIZE] = {0}; + if (exc_join_datadir_and_subdir(DCC_BACKUP_DIR, bak_path) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] create: join datapath and subdir= %s failed.", DCC_BACKUP_DIR); + return CM_ERROR; + } + + char full_file_name[CM_FILE_NAME_BUFFER_SIZE] = {0}; + PRTS_RETURN_IFERR(snprintf_s(full_file_name, CM_FILE_NAME_BUFFER_SIZE, CM_FILE_NAME_BUFFER_SIZE - 1, "%s/%s", + bak_path, buf->head.filename)); + + PRTS_RETURN_IFERR(snprintf_s((char *)g_build_info.build_file[i].filename, CM_MAX_NAME_LEN, CM_MAX_NAME_LEN - 1, + "%s", buf->head.filename)); + g_build_info.build_file[i].is_write_end = CM_FALSE; + if (cm_open_file(full_file_name, O_CREAT | O_TRUNC | O_RDWR | O_BINARY, fd) != CM_SUCCESS || *fd < 0) { + LOG_RUN_ERR("[EXC] create build file=%s failed, fd=%d.", full_file_name, *fd); + g_build_info.build_file[i].filename[0] = '\0'; + return CM_ERROR; + } + g_build_info.build_file[i].fd = *fd; + *pos = i; + LOG_RUN_INF("[EXC] create_build_file=%s success, fd=%d, i=%u.", buf->head.filename, *fd, i); + return CM_SUCCESS; +} + +status_t exc_find_build_file_fd(exc_build_msg_t *buf, int32 *fd, uint32 *pos) +{ + uint32 i; + for (i = 0; i < BUILD_FILE_MAX_NUM; i++) { + if (strcmp((const char *)g_build_info.build_file[i].filename, buf->head.filename) == 0) { + break; + } + } + if (i >= BUILD_FILE_MAX_NUM) { + LOG_RUN_ERR("[EXC] find_build_file=%s failed, offset=%u.", buf->head.filename, buf->head.cur_offset); + return CM_ERROR; + } + if (g_build_info.build_file[i].fd < 0) { + LOG_RUN_ERR("[EXC] [EXC] find_build_file=%s fd=%d error.", buf->head.filename, g_build_info.build_file[i].fd); + return CM_ERROR; + } + + *fd = g_build_info.build_file[i].fd; + *pos = i; + return CM_SUCCESS; +} + +status_t exc_write_build_file(exc_build_msg_t *buf) +{ + int32 fd = -1; + uint32 pos = 0; + if (buf->head.cur_offset == 0) { + if (exc_is_build_file_exist(buf->head.filename)) { + LOG_RUN_WAR("[EXC] build file=%s is already exist, ignore this pkt.", buf->head.filename); + return CM_ERROR; + } else { + CM_RETURN_IFERR(exc_create_build_file(buf, &fd, &pos)); + } + } else { + CM_RETURN_IFERR(exc_find_build_file_fd(buf, &fd, &pos)); + } + + uint32 file_size = (uint32)cm_file_size(fd); + if (buf->head.cur_offset != file_size) { + LOG_RUN_ERR("[EXC] build_file=%s offset=%u or size=%u error.", + buf->head.filename, buf->head.cur_offset, file_size); + return CM_ERROR; + } + if (cm_pwrite_file(fd, buf->body, buf->head.cur_size, buf->head.cur_offset) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] build_file=%s offset=%u write error.", + buf->head.filename, buf->head.cur_offset); + return CM_ERROR; + } + + if (buf->head.cur_offset + buf->head.cur_size == buf->head.filesize) { + g_build_info.build_file[pos].is_write_end = CM_TRUE; + g_build_info.last_update_time = cm_clock_now_ms(); + LOG_RUN_INF("[EXC] build file=%s size=%u write end success.", buf->head.filename, buf->head.filesize); + if (exc_send_build_cmd(BUILD_PKT_ACK, g_build_info.leader_id, buf->head.serial_number) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] end: send_build_cmd BUILD_PKT_ACK failed, serial_number=%u.", buf->head.serial_number); + } + return CM_SUCCESS; + } + + if (buf->head.serial_number % BUILD_PKTS_PER_ACK == 0) { + if (exc_send_build_cmd(BUILD_PKT_ACK, g_build_info.leader_id, buf->head.serial_number) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] send_build_cmd BUILD_PKT_ACK failed, serial_number=%u.", buf->head.serial_number); + } + } + + return CM_SUCCESS; +} + +bool32 exc_check_all_build_file_write_ok(void) +{ + LOG_RUN_INF("[EXC] check_all_build_file_write start..."); + uint32 file_num = 0; + for (uint32 i = 0; i < BUILD_FILE_MAX_NUM; i++) { + if (g_build_info.build_file[i].filename[0] == '\0') { + continue; + } + if (!g_build_info.build_file[i].is_write_end) { + LOG_RUN_ERR("[EXC] check_all_build_file_write failed, file_name=%s.", g_build_info.build_file[i].filename); + return CM_FALSE; + } + int32 fd = g_build_info.build_file[i].fd; + status_t status = cm_fdatasync_file(fd); + cm_close_file(fd); + if (status != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] build_file=%s fd=%d fdatasync failed.", g_build_info.build_file[i].filename, fd); + return CM_FALSE; + } + file_num++; + g_build_info.last_update_time = cm_clock_now_ms(); + LOG_RUN_INF("[EXC] sync and close build file = %s fd=%u success.", g_build_info.build_file[i].filename, fd); + } + LOG_RUN_INF("[EXC] check_all_build_file_write success, file_num=%u.", file_num); + return CM_TRUE; +} + +bool32 exc_is_build_msg_valid(exc_build_cmd_t cmd) +{ + switch (cmd) { + case BUILD_START_REQ: + if (g_build_info.build_status != BUILD_NONE) { + LOG_RUN_ERR("[EXC] maybe follower=%u building now, wait...", g_build_info.follower_id); + return CM_FALSE; + } + break; + case BUILD_PKT_SEND: + case BUILD_PKT_SEND_END: + if (g_build_info.build_status != FOLLOWER_BUILD_PKT_RECV && + g_build_info.build_status != FOLLOWER_BUILD_START) { + return CM_FALSE; + } + break; + case BUILD_PKT_ACK: + if (g_build_info.build_status != LEADER_BUILD_PKT_SEND && + g_build_info.build_status != LEADER_BUILD_PKT_SEND_END) { + return CM_FALSE; + } + break; + case BUILD_OK_REQ: + if (g_build_info.build_status != LEADER_BUILD_PKT_SEND_END) { + return CM_FALSE; + } + break; + case BUILD_OK_ACK: + if (g_build_info.build_status != FOLLOWER_BUILD_OK_REQ_SEND) { + return CM_FALSE; + } + break; + case BUILD_CANCEL_REQ: + break; + default: + LOG_RUN_ERR("[EXC] recv msg_cmd=%u is not support now.", cmd); + return CM_FALSE; + } + + return CM_TRUE; +} + +static bool32 exc_is_need_build_cancel(unsigned int src_node) +{ + if (g_build_info.build_status != BUILD_NONE) { + if (exc_is_leader()) { + if (g_build_info.follower_id == src_node) { + LOG_RUN_WAR("[EXC] i am leader, msg invalid, src_node = %u, build_id = %u, build_status = %u", src_node, + g_build_info.follower_id, g_build_info.build_status); + return CM_TRUE; + } + } else { + LOG_RUN_WAR("[EXC] i am not leader, msg invalid and build cancel, src_node = %u", src_node); + return CM_TRUE; + } + } + return CM_FALSE; +} + +status_t exc_build_start_req(uint32 src_node) +{ + cm_close_thread(&g_build_info.thread); + g_build_info.build_status = LEADER_BUILD_PKT_SEND; + g_build_info.follower_id = src_node; + CM_MFENCE; + return cm_create_thread(exc_leader_build_proc, 0, NULL, &g_build_info.thread); +} + +int exc_cb_process_msg(unsigned int stream_id, unsigned int src_node, const char *msg, unsigned int msg_size) +{ + exc_build_msg_t *buf = (exc_build_msg_t *)msg; + LOG_RUN_INF("[EXC] recv process msg, src=%u, msg_cmd=%u, msg_size=%u, serial_number=%u, build_status=%d.", + src_node, buf->head.cmd, msg_size, buf->head.serial_number, g_build_info.build_status); + + if (exc_is_build_msg_valid(buf->head.cmd) != CM_TRUE) { + LOG_RUN_ERR("[EXC] cmd=%u does not match with build status=%d", buf->head.cmd, g_build_info.build_status); + (void)exc_send_build_cmd(BUILD_CANCEL_REQ, src_node, 0); + if (exc_is_need_build_cancel(src_node) == CM_TRUE) { + g_build_info.build_status = BUILD_CANCEL; + } + return CM_ERROR; + } + + switch (buf->head.cmd) { + case BUILD_START_REQ: + CM_RETURN_IFERR(exc_build_start_req(src_node)); + break; + case BUILD_PKT_SEND: + LOG_RUN_INF("[EXC] recv BUILD_PKT_SEND, filename=%s, offset=%u, size=%u, filesize=%u.", + buf->head.filename, buf->head.cur_offset, buf->head.cur_size, buf->head.filesize); + CM_RETURN_IFERR(exc_write_build_file(buf)); + break; + case BUILD_PKT_ACK: + g_build_info.recv_serial_number = buf->head.serial_number; + if (g_build_info.send_serial_number <= g_build_info.recv_serial_number + BUILD_PKT_CREDIT_NUM) { + cm_event_notify(&g_build_info.send_event); + } + break; + case BUILD_PKT_SEND_END: + CM_RETURN_IF_FALSE(exc_check_all_build_file_write_ok()); + if (snprintf_s((char *)g_build_info.old_restore_path, CM_FILE_NAME_BUFFER_SIZE, + CM_FILE_NAME_BUFFER_SIZE - 1, "%s", buf->body) == -1) { + LOG_RUN_ERR("[EXC] save old_restore_path failed, body=%s.", buf->body); + return CM_ERROR; + } + CM_MFENCE; + g_build_info.build_status = FOLLOWER_BUILD_PKT_RECV_END; + break; + case BUILD_OK_REQ: + g_build_info.build_status = LEADER_BUILD_OK_REQ_RECV; + break; + case BUILD_OK_ACK: + g_build_info.build_status = FOLLOWER_BUILD_OK_ACK_RECV; + break; + case BUILD_CANCEL_REQ: + if (g_build_info.build_status != BUILD_NONE) { + g_build_info.build_status = BUILD_CANCEL; + } + break; + default: + return CM_ERROR; + } + + return CM_SUCCESS; +} + +void exc_rename_subdir_of_datadir(const char *src_dir, const char *dst_dir) +{ + LOG_RUN_INF("[EXC] rename src_dir=%s to dst_dir=%s start...", src_dir, dst_dir); + char old_dir[CM_MAX_PATH_LEN] = {0}; + char new_dir[CM_MAX_PATH_LEN] = {0}; + if (exc_join_datadir_and_subdir(src_dir, old_dir) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] rename: join datapath and src_dir=%s failed.", src_dir); + return; + } + if (exc_join_datadir_and_subdir(dst_dir, new_dir) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] rename:join datapath and dst_dir=%s failed.", dst_dir); + return; + } + + if (cm_dir_exist(new_dir)) { + (void)exc_remove_dir(new_dir); + } + + if (cm_rename_file(old_dir, new_dir) != CM_SUCCESS) { + LOG_RUN_INF("[EXC] rename src_dir=%s to dst_dir=%s failed.", src_dir, dst_dir); + return; + } + LOG_RUN_INF("[EXC] rename src_dir=%s to dst_dir=%s end.", src_dir, dst_dir); +} + +void exc_try_self_recovery(void) +{ + char build_file[CM_FILE_NAME_BUFFER_SIZE] = {0}; + if (exc_join_datadir_and_subdir(DCC_BUILD_STATUS_FILE, build_file) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] try_self_recovery: join datapath and file=%s failed.", DCC_BUILD_STATUS_FILE); + return; + } + + if (cm_file_exist(build_file)) { + exc_rename_subdir_of_datadir(DCC_GSTOR_DIR, DCC_GSTOR_DIR_BK); + exc_rename_subdir_of_datadir(DCC_DCFDATA_DIR, DCC_DCFDATA_DIR_BK); + (void)cm_remove_file(build_file); + LOG_RUN_INF("[EXC] build status file exist, try_self_recovery."); + return; + } + + char first_init[CM_MAX_PATH_LEN] = {0}; + if (exc_join_datadir_and_subdir(DCC_FIRST_INIT_DIR, first_init) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] try_self_recovery: join datapath and first_init failed."); + return; + } + + if (cm_dir_exist(first_init)) { + exc_rename_subdir_of_datadir(DCC_GSTOR_DIR, DCC_GSTOR_DIR_BK); + exc_rename_subdir_of_datadir(DCC_DCFDATA_DIR, DCC_DCFDATA_DIR_BK); + LOG_RUN_INF("[EXC] first_init dir exist, try_self_recovery."); + return; + } +} + static int exc_register_logger_cb_func(void) { int ret; @@ -566,6 +1430,16 @@ static status_t exc_dcf_start(void) return CM_ERROR; } + if (dcf_register_exception_report(exc_cb_exception_notify) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] Register exception_notify callback function failed."); + return CM_ERROR; + } + + if (dcf_register_msg_proc(exc_cb_process_msg) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] Register msg_proc callback function failed."); + return CM_ERROR; + } + // register log callback func if (exc_register_logger_cb_func() != CM_SUCCESS) { LOG_RUN_ERR("[EXC] Register logger callback function failed."); @@ -1181,6 +2055,80 @@ bool32 exc_is_idle(void) return CM_FALSE; } +status_t exc_check_first_init(void) +{ + char dcc_db[CM_MAX_PATH_LEN] = {0}; + if (exc_join_datadir_and_subdir(DCC_GSTOR_DIR, dcc_db) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] exc_check_first_init: join datapath and gstor failed."); + return CM_ERROR; + } + + if (cm_dir_exist(dcc_db)) { + LOG_RUN_INF("[EXC] dcc_db dir=%s is exist, not first init.", dcc_db); + return CM_SUCCESS; + } + + char first_init[CM_MAX_PATH_LEN] = {0}; + if (exc_join_datadir_and_subdir(DCC_FIRST_INIT_DIR, first_init) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] exc_check_first_init: join datapath and first_init failed."); + return CM_ERROR; + } + + if (!cm_dir_exist(first_init)) { + if (cm_create_dir(first_init) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] exc_check_first_init: create_dir=%s failed.", first_init); + return CM_ERROR; + } + } + + LOG_RUN_INF("[EXC] this is first init, and create dir=%s success.", DCC_FIRST_INIT_DIR); + return CM_SUCCESS; +} + +status_t exc_init_done_tryclean(void) +{ + char gstor_bk[CM_MAX_PATH_LEN] = {0}; + if (exc_join_datadir_and_subdir(DCC_GSTOR_DIR_BK, gstor_bk) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] exc_init_done: join datapath and gstor_bk failed."); + return CM_ERROR; + } + + if (cm_dir_exist(gstor_bk)) { + if (exc_remove_dir(gstor_bk) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] exc_init_done: remove gstor_bk failed."); + return CM_ERROR; + } + } + + char dcf_data_bk[CM_MAX_PATH_LEN] = {0}; + if (exc_join_datadir_and_subdir(DCC_DCFDATA_DIR_BK, dcf_data_bk) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] exc_init_done: join datapath and dcf_data_bk failed."); + return CM_ERROR; + } + + if (cm_dir_exist(dcf_data_bk)) { + if (exc_remove_dir(dcf_data_bk) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] exc_init_done: remove dcf_data_bk failed."); + return CM_ERROR; + } + } + + char first_init[CM_MAX_PATH_LEN] = {0}; + if (exc_join_datadir_and_subdir(DCC_FIRST_INIT_DIR, first_init) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] exc_init_done: join datapath and first_init failed."); + return CM_ERROR; + } + + if (cm_dir_exist(first_init)) { + if (exc_remove_dir(first_init) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] exc_init_done: remove first_init failed."); + return CM_ERROR; + } + } + + return CM_SUCCESS; +} + #ifdef __cplusplus } #endif diff --git a/src/executor/executor.h b/src/executor/executor.h index 11deddb..dc71901 100644 --- a/src/executor/executor.h +++ b/src/executor/executor.h @@ -84,7 +84,96 @@ typedef struct st_exc_lease_info_t { #define ENTRY_K(entry) (&(entry)->kvp.key) #define ENTRY_V(entry) (&(entry)->kvp.value) - +#define DCC_BACKUP_DIR "dcc_backup" +#define DCC_GSTOR_DIR "gstor" +#define DCC_DCFDATA_DIR "dcf_data" +#define DCC_DATA_DIR "gstor/data" +#define DCC_BUILD_STATUS_FILE "build.status" +#define DCC_FIRST_INIT_DIR "dcc_first_init" +#define DCC_GSTOR_DIR_BK "gstor_backup" +#define DCC_DCFDATA_DIR_BK "dcf_data_backup" + +typedef enum e_exc_build_cmd { + BUILD_START_REQ = 1, // follower->leader:build start request + BUILD_PKT_SEND = 2, // leader->follower:build pkt send + BUILD_PKT_ACK = 3, // follower->leader:build pkt ack + BUILD_PKT_SEND_END = 4, // leader->follower:build pkt send end + BUILD_OK_REQ = 5, // follower->leader:build ok request + BUILD_OK_ACK = 6, // leader->follower:build ok ack + BUILD_CANCEL_REQ = 7, // leader<->follower:cancel build request + } exc_build_cmd_t; + +typedef enum e_exc_build_status { + BUILD_NONE = 0, + + // follower build status + FOLLOWER_BUILD_START = 1, + FOLLOWER_BUILD_PKT_RECV = 2, + FOLLOWER_BUILD_PKT_RECV_END = 3, + FOLLOWER_BUILD_OK_REQ_SEND = 4, + FOLLOWER_BUILD_OK_ACK_RECV = 5, + + // leader build status + LEADER_BUILD_PKT_SEND = 6, + LEADER_BUILD_PKT_SEND_END = 7, + LEADER_BUILD_OK_REQ_RECV = 8, + + //commom build status + BUILD_CANCEL = 9, +} exc_build_status_t; + +typedef enum en_exc_build_version { + EXC_BUILD_VERSION_1 = 1, + // add new versions here in the future if needed +} exc_build_version_t; + +#define EXC_BUILD_CUR_VERSION EXC_BUILD_VERSION_1 + +#define FOLLOWER_BUILD_PKT_RECV_TIMEOUT 300 +#define FOLLOWER_BUILD_OK_REQ_SEND_TIMEOUT 10 +#define LEADER_WAIT_FOLLOWER_RESTORE_TIMEOUT 300 + +#define BUILD_PKT_MAX_BODY_SIZE SIZE_K(60) +#define BUILD_FILE_MAX_NUM 64 + +#define BUILD_PKT_CREDIT_NUM 100 +#define BUILD_PKTS_PER_ACK 10 + +typedef struct st_exc_build_file_info_t { + int32 fd; + bool32 is_write_end; + char filename[CM_MAX_NAME_LEN]; +} exc_build_file_info_t; + +typedef struct st_exc_build_info_t { + volatile uint32 send_serial_number; + volatile uint32 recv_serial_number; + volatile uint32 leader_id; + volatile uint32 follower_id; + volatile exc_build_status_t build_status; + volatile timespec_t last_update_time; + thread_t thread; + cm_event_t send_event; + volatile char old_restore_path[CM_FILE_NAME_BUFFER_SIZE]; + volatile exc_build_file_info_t build_file[BUILD_FILE_MAX_NUM]; +} exc_build_info_t; + +typedef struct st_exc_build_msg_head_t { + uint32 version; + exc_build_cmd_t cmd; + uint32 cur_size; + uint32 cur_offset; + uint32 filesize; + uint32 serial_number; + char reserved[8]; // reserved for future use + char filename[CM_MAX_NAME_LEN]; +} exc_build_msg_head_t; + +typedef struct st_exc_build_msg_t { + exc_build_msg_head_t head; + char body[BUILD_PKT_MAX_BODY_SIZE]; +} exc_build_msg_t; + static inline void exc_entry_inc_ref(msg_entry_t *entry) { (void)cm_atomic32_inc(&entry->ref_count); @@ -149,6 +238,9 @@ status_t exc_lease_renew(void *handle, const text_t *buf, unsigned long long wri status_t exc_lease_query(void *handle, const text_t *leasename, exc_lease_info_t *lease_info); void exc_dealing_del(msg_entry_t* entry); +void exc_try_self_recovery(void); +status_t exc_check_first_init(void); +status_t exc_init_done_tryclean(void); #ifdef __cplusplus } diff --git a/src/executor/executor_defs.h b/src/executor/executor_defs.h index 04cd761..c83a1a1 100644 --- a/src/executor/executor_defs.h +++ b/src/executor/executor_defs.h @@ -42,9 +42,9 @@ extern "C" { #define EXC_STREAM_ID_DEFAULT (1) #define EXC_PATH_MAX_SIZE (MAX_PARAM_VALUE_LEN + 1) #define EXC_DCF_CFG_SIZE SIZE_K(4) -#define EXC_DCF_TRUNCATE_SIZE (1000) +#define EXC_DCF_TRUNCATE_SIZE (100000) #define EXC_DCF_APPLY_IDX_FROZEN_CNT_THOLD (10) -#define EXC_DISK_AVAIL_RATE (0.02) +#define EXC_DISK_AVAIL_RATE (0.2) #define EXC_DCF_APPLY_INDEX_SIZE (3000) #define EXC_DCF_ROLE_NOT_LEADER_ERRORNO (603) #define EXC_THREAD_SLEEP_TIME (10) diff --git a/src/executor/executor_utils.c b/src/executor/executor_utils.c index 90b8073..c6b8a08 100644 --- a/src/executor/executor_utils.c +++ b/src/executor/executor_utils.c @@ -26,6 +26,7 @@ #include "storage.h" #include "util_error.h" #include "util_defs.h" +#include "dcf_interface.h" #ifdef __cplusplus extern "C" { @@ -145,6 +146,105 @@ void exc_wr_handle_write_commit(uint32 table_id, text_t *key, text_t *val) exc_wr_handle_commit(); } +int exc_backup(const char *bak_format) +{ + return (int)db_bakup(EXC_WR_HANDLE, bak_format); +} + +int exc_restore(const char *restore_path, const char *old_path, const char *new_path) +{ + if (db_startup(STARTUP_MODE_NOMOUNT) != CM_SUCCESS) { + LOG_DEBUG_ERR("[EXC] db_startup with nomount mode failed"); + return CM_ERROR; + } + + void *handle = NULL; + int ret = db_alloc(&handle); + if (ret != CM_SUCCESS) { + LOG_DEBUG_ERR("[EXC] db_alloc handle for restore failed"); + return CM_ERROR; + } + + ret = db_restore(handle, restore_path, old_path, new_path); + if (ret != CM_SUCCESS) { + db_free(handle); + db_shutdown(); + LOG_DEBUG_ERR("[EXC] db_restore failed"); + return CM_ERROR; + } + + db_shutdown(); + return CM_SUCCESS; +} + +status_t exc_path_join(char *buf, uint32 buf_size, const char *path, const char *filename) +{ + MEMS_RETURN_IFERR(strcpy_sp(buf, buf_size, path)); + MEMS_RETURN_IFERR(strcat_sp(buf, CM_FILE_NAME_BUFFER_SIZE, "/")); + MEMS_RETURN_IFERR(strcat_sp(buf, CM_FILE_NAME_BUFFER_SIZE, filename)); + + return CM_SUCCESS; +} + +status_t exc_remove_dir(const char *path) +{ + LOG_RUN_INF("[EXC] remove directory %s...", path); +#ifndef WIN32 + struct dirent *dirp = NULL; + char filepath[CM_FILE_NAME_BUFFER_SIZE] = {0}; + + DIR *dir = opendir(path); + if (dir == NULL) { + return CM_ERROR; + } + + while ((dirp = readdir(dir)) != NULL) { + if ((strcmp(dirp->d_name, ".") == 0) || (strcmp(dirp->d_name, "..") == 0)) { + continue; + } + + if (exc_path_join(filepath, CM_FILE_NAME_BUFFER_SIZE, path, dirp->d_name) != CM_SUCCESS) { + LOG_RUN_ERR("[EXC]splic dir/file %s to path %s failed", dirp->d_name, path); + (void)closedir(dir); + return CM_ERROR; + } + + if (cm_dir_exist(filepath)) { + if (exc_remove_dir(filepath) == CM_SUCCESS) { + continue; + } + (void)closedir(dir); + return CM_ERROR; + } + + if (cm_remove_file(filepath) != CM_SUCCESS) { + (void)closedir(dir); + return CM_ERROR; + } + } + (void)closedir(dir); + return cm_remove_file(path); +#else + LOG_RUN_ERR("[EXC]win32 not support rm dir now."); + return CM_ERROR; +#endif +} + +uint32 exc_get_leader_id(void) +{ + uint32 node_id = EXC_INVALID_NODE_ID; + char ip[CM_MAX_IP_LEN]; + uint32 port; + int ret = dcf_query_leader_info(DCC_STREAM_ID, ip, CM_MAX_IP_LEN, &port, &node_id); + if (ret != CM_SUCCESS) { + LOG_RUN_ERR("[EXC] get_leader_id: error_no:%d, error_msg:%s", + dcf_get_errorno(), + dcf_get_error(dcf_get_errorno())); + return EXC_INVALID_NODE_ID; + } + return node_id; +} + #ifdef __cplusplus } #endif \ No newline at end of file diff --git a/src/executor/executor_utils.h b/src/executor/executor_utils.h index 83e600b..396c4a1 100644 --- a/src/executor/executor_utils.h +++ b/src/executor/executor_utils.h @@ -42,6 +42,10 @@ extern "C" { #define EXC_DCC_LEASE_KV_TABLE ((char *)"SYS_LEASE_KV") #define EXC_DCC_RESERVED_KV_TABLE ((char *)"SYS_DCC_KV") +#define DCC_STREAM_ID 1 +#define EXC_INVALID_NODE_ID 0 +#define EXC_3X_FIXED 3 + typedef struct st_exc_util_handle { void *handle; uint32 opened_tabled_id; @@ -63,6 +67,16 @@ void exc_wr_handle_commit(void); void exc_wr_handle_write_commit(uint32 table_id, text_t *key, text_t *val); +int exc_backup(const char *bak_format); + +int exc_restore(const char *restore_path, const char *old_path, const char *new_path); + +status_t exc_path_join(char *buf, uint32 buf_size, const char *path, const char *filename); + +status_t exc_remove_dir(const char *path); + +uint32 exc_get_leader_id(void); + #ifdef __cplusplus } #endif diff --git a/src/interface/dcc_interface.h b/src/interface/dcc_interface.h index b6c92e5..193013e 100644 --- a/src/interface/dcc_interface.h +++ b/src/interface/dcc_interface.h @@ -337,6 +337,12 @@ EXPORT_API int srv_dcc_set_election_priority(unsigned long long priority); */ EXPORT_API int srv_dcc_promote_leader(unsigned int node_id, unsigned int wait_timeout_ms); +EXPORT_API int srv_dcc_backup(const char *bak_format); + +EXPORT_API int srv_dcc_restore(const char *restore_path); + +EXPORT_API int srv_dcc_set_dcf_param(const char *param_name, const char *param_value); + #ifdef __cplusplus } #endif diff --git a/src/server/srv_api.c b/src/server/srv_api.c index 4c73cd5..f972781 100644 --- a/src/server/srv_api.c +++ b/src/server/srv_api.c @@ -49,7 +49,6 @@ static latch_t g_dcc_latch = {0}; #define SRV_WAIT_COMMIT_TIMEOUT_DEFAULT (5000) // ms #define SRV_WAIT_COMMIT_EVENT_TIMEOUT (50) // ms -#define DCC_STREAM_ID (1) #define DCC_SPLIT_STRING " " #define DCC_ENCLOSE_CHAR 0 #define DCC_CMD_PARAMETER_CNT 16 @@ -238,10 +237,13 @@ static status_t srv_instance_init() LOG_RUN_ERR("[API] init profile stat failed"); return CM_ERROR; } - + exc_try_self_recovery(); + LOG_RUN_INF("[API] dcc check if need try_self_recovery end."); + CM_RETURN_IFERR(exc_check_first_init()); // stg start - if (db_startup() != CM_SUCCESS) { + if (db_startup(STARTUP_MODE_OPEN) != CM_SUCCESS) { LOG_RUN_ERR("[API] db_startup failed"); + exc_try_self_recovery(); return CM_ERROR; } LOG_RUN_INF("[API] dcc db_startup succeed."); @@ -250,8 +252,10 @@ static status_t srv_instance_init() if (exc_init() != CM_SUCCESS) { db_shutdown(); LOG_RUN_ERR("[API] executor module init failed"); + exc_try_self_recovery(); return CM_ERROR; } + CM_RETURN_IFERR(exc_init_done_tryclean()); LOG_RUN_INF("[API] dcc init executor succeed."); if (srv_new_instance() != CM_SUCCESS) { @@ -756,15 +760,8 @@ int srv_dcc_query_leader_info(unsigned int *node_id) cm_reset_error(); CHECK_SRV_STATUS(DCC_SRV_RUNNING); CM_CHECK_NULL_PTR(node_id); - - char ip[CM_MAX_IP_LEN]; - uint32 port; - int ret = dcf_query_leader_info(DCC_STREAM_ID, ip, CM_MAX_IP_LEN, &port, node_id); - if (ret != CM_SUCCESS) { - CM_THROW_ERROR(ERR_DCF_INTERNAL, ""); - LOG_RUN_ERR("[API] dcf_query_leader_info: error_no:%d, error_msg:%s", - dcf_get_errorno(), - dcf_get_error(dcf_get_errorno())); + *node_id = exc_get_leader_id(); + if (*node_id == EXC_INVALID_NODE_ID) { return CM_ERROR; } return CM_SUCCESS; @@ -851,6 +848,42 @@ int srv_dcc_promote_leader(unsigned int node_id, unsigned int wait_timeout_ms) return CM_SUCCESS; } +int srv_dcc_backup(const char *bak_format) +{ + cm_reset_error(); + CHECK_SRV_STATUS(DCC_SRV_RUNNING); + LOG_OPER("[API] dcc backup"); + int ret = exc_backup(bak_format); + if (ret != CM_SUCCESS) { + LOG_DEBUG_ERR("[API] dcc backup failed: error_no:%d, error_msg:%s", + dcf_get_errorno(), dcf_get_error(dcf_get_errorno())); + return CM_ERROR; + } + return CM_SUCCESS; +} + +int srv_dcc_restore(const char *restore_path) +{ + cm_reset_error(); + LOG_OPER("[API] dcc restore"); + int ret = exc_restore(restore_path, NULL, NULL); + if (ret != CM_SUCCESS) { + LOG_DEBUG_ERR("[API] dcc restore failed: error_no:%d, error_msg:%s", + dcf_get_errorno(), dcf_get_error(dcf_get_errorno())); + return CM_ERROR; + } + return CM_SUCCESS; +} + +int srv_dcc_set_dcf_param(const char *param_name, const char *param_value) +{ + CM_CHECK_NULL_PTR(param_name); + cm_reset_error(); + init_dcc_errno_desc(); + + return dcf_set_param(param_name, param_value); +} + #ifdef __cplusplus } #endif diff --git a/src/server/srv_logger.c b/src/server/srv_logger.c index 6e58698..7555f4f 100644 --- a/src/server/srv_logger.c +++ b/src/server/srv_logger.c @@ -40,7 +40,7 @@ static status_t init_logger_param(log_param_t *log_param) CM_RETURN_IFERR(srv_get_param(DCC_PARAM_DATA_PATH, ¶m_value)); CM_RETURN_IFERR(realpath_file(param_value.str_val, real_path, CM_FILE_NAME_BUFFER_SIZE)); PRTS_RETURN_IFERR(snprintf_s(log_param->log_home, CM_MAX_LOG_HOME_LEN, CM_MAX_LOG_HOME_LEN - 1, "%s/%s", - real_path, "log")); + real_path, "dcc_log")); } else { CM_RETURN_IFERR(realpath_file(param_value.str_val, real_path, CM_FILE_NAME_BUFFER_SIZE)); PRTS_RETURN_IFERR(snprintf_s(log_param->log_home, CM_MAX_LOG_HOME_LEN, CM_MAX_LOG_HOME_LEN - 1, @@ -89,11 +89,11 @@ status_t init_logger(void) log_param->log_instance_startup = CM_FALSE; CM_RETURN_IFERR(init_logger_param(log_param)); PRTS_RETURN_IFERR(snprintf_s(file_name, CM_FULL_PATH_BUFFER_SIZE, CM_FULL_PATH_BUFFER_SIZE - 1, "%s/run/%s", - log_param->log_home, "dcc.rlog")); + log_param->log_home, "run.log")); CM_RETURN_IFERR(cm_log_init(LOG_RUN, file_name)); PRTS_RETURN_IFERR(snprintf_s(file_name, CM_FULL_PATH_BUFFER_SIZE, CM_FULL_PATH_BUFFER_SIZE - 1, "%s/debug/%s", - log_param->log_home, "dcc.dlog")); + log_param->log_home, "debug.log")); CM_RETURN_IFERR(cm_log_init(LOG_DEBUG, file_name)); PRTS_RETURN_IFERR(snprintf_s(file_name, CM_FULL_PATH_BUFFER_SIZE, CM_FULL_PATH_BUFFER_SIZE - 1, "%s/oper/%s", @@ -113,7 +113,7 @@ status_t init_logger(void) CM_RETURN_IFERR(cm_log_init(LOG_TRACE, file_name)); PRTS_RETURN_IFERR(snprintf_s(file_name, CM_FULL_PATH_BUFFER_SIZE, CM_FULL_PATH_BUFFER_SIZE - 1, "%s/profile/%s", - log_param->log_home, "dcc.plog")); + log_param->log_home, "profile.log")); CM_RETURN_IFERR(cm_log_init(LOG_PROFILE, file_name)); log_param->log_instance_startup = CM_TRUE; diff --git a/src/storage/gstor/gstor_executor.c b/src/storage/gstor/gstor_executor.c index 3e32239..864a1bc 100644 --- a/src/storage/gstor/gstor_executor.c +++ b/src/storage/gstor/gstor_executor.c @@ -572,12 +572,12 @@ static status_t gstor_init_loggers(void) // RUN PRTS_RETURN_IFERR(snprintf_s(file_name, GS_FILE_NAME_BUFFER_SIZE, GS_MAX_FILE_NAME_LEN, "%s/run/%s", - log_param->log_home, "gstor.rlog")); + log_param->log_home, "gstor_run.log")); cm_log_init(LOG_RUN, file_name); // DEBUG PRTS_RETURN_IFERR(snprintf_s(file_name, GS_FILE_NAME_BUFFER_SIZE, GS_MAX_FILE_NAME_LEN, "%s/debug/%s", - log_param->log_home, "gstor.dlog")); + log_param->log_home, "gstor_debug.log")); cm_log_init(LOG_DEBUG, file_name); // ALARM @@ -697,7 +697,7 @@ void gstor_shutdown(void) gstor_deinit_config(); } -int gstor_startup(char *data_path) +int gstor_startup(char *data_path, unsigned int startup_mode) { do { GS_BREAK_IF_ERROR(cm_start_timer(g_timer())); @@ -705,13 +705,15 @@ int gstor_startup(char *data_path) GS_BREAK_IF_ERROR(gstor_lock_db()); GS_BREAK_IF_ERROR(alck_init_ctx(&g_instance->kernel)); GS_BREAK_IF_ERROR(knl_startup(&g_instance->kernel)); - GS_BREAK_IF_ERROR(gstor_start_db(&g_instance->kernel)); - GS_LOG_RUN_INF("gstore started successfully!"); + if (startup_mode == STARTUP_MODE_OPEN) { + GS_BREAK_IF_ERROR(gstor_start_db(&g_instance->kernel)); + } + GS_LOG_RUN_INF("gstore started successfully with startup_mode:%d!", startup_mode); return GS_SUCCESS; } while (GS_FALSE); gstor_shutdown(); - GS_LOG_RUN_INF("gstore started failed!"); + GS_LOG_RUN_INF("gstore started failed with startup_mode:%d!", startup_mode); return GS_ERROR; } @@ -1116,6 +1118,78 @@ int gstor_attach_pending_rm(void *handle) return GS_SUCCESS; } +int gstor_backup(void *handle, const char *bak_format) +{ + knl_backup_t backup = { 0 }; + knl_backup_t *param_backup = &backup; + param_backup->type = BACKUP_MODE_FULL; + param_backup->device = DEVICE_DISK; + param_backup->format.str = (char *)bak_format; + param_backup->format.len = strlen(bak_format); + param_backup->finish_scn = DB_CURR_SCN(EC_SESSION(handle)); + param_backup->target_info.target = TARGET_ALL; + param_backup->target_info.backup_arch_mode = ARCHIVELOG_ALL; + param_backup->crypt_info.encrypt_alg = ENCRYPT_NONE; + int ret = knl_backup(EC_SESSION(handle), param_backup); + return ret; +} + +int gstor_restore(void *handle, const char *restore_path, const char *old_path, const char *new_path) +{ + knl_session_t *session = (knl_session_t *)EC_SESSION(handle); + knl_attr_t *attr = &session->kernel->attr; + int ret = GS_SUCCESS; + + if (old_path == NULL || new_path == NULL) { + GS_LOG_RUN_INF("old_path or new_path null, no need convert_restore_path!"); + } else { + char convert_value[GS_FILE_NAME_BUFFER_SIZE * 2] = {0}; + const int buffer_size = GS_FILE_NAME_BUFFER_SIZE * 2; + PRTS_RETURN_IFERR(snprintf_s(convert_value, buffer_size, buffer_size - 1, "%s,%s", old_path, new_path)); + + ret = knl_get_convert_params("DB_FILE_NAME_CONVERT", convert_value, &attr->data_file_convert, "home"); + if (ret != GS_SUCCESS) { + GS_LOG_RUN_ERR("gstor DB_FILE_NAME_CONVERT failed with retcode(%d)!", ret); + return GS_ERROR; + } + + ret = knl_get_convert_params("LOG_FILE_NAME_CONVERT", convert_value, &attr->log_file_convert, "home"); + if (ret != GS_SUCCESS) { + GS_LOG_RUN_ERR("gstor LOG_FILE_NAME_CONVERT failed with retcode(%d)!", ret); + return GS_ERROR; + } + GS_LOG_RUN_INF("gstor convert_restore_path success, convert_value=%s", convert_value); + } + + knl_restore_t restore = { 0 }; + knl_restore_t *param_restore = &restore; + param_restore->type = RESTORE_FROM_PATH; + param_restore->device = DEVICE_DISK; + param_restore->path.str = (char *)restore_path; + param_restore->path.len = strlen(restore_path); + param_restore->file_type = RESTORE_ALL; + param_restore->crypt_info.encrypt_alg = ENCRYPT_NONE; + if (EC_SESSION(handle)->kernel->db.status != DB_STATUS_NOMOUNT) { + GS_LOG_RUN_ERR("gstore restore failed since db status(%d) not DB_STATUS_NOMOUNT!", + EC_SESSION(handle)->kernel->db.status); + return GS_ERROR; + } + + ret = knl_restore(EC_SESSION(handle), param_restore); + if (ret != GS_SUCCESS) { + GS_LOG_RUN_ERR("gstore restore failed with retcode(%d)!", ret); + return GS_ERROR; + } + + knl_recover_t recover = { 0 }; + recover.action = RECOVER_NORMAL; + ret = knl_recover(EC_SESSION(handle), &recover); + if (ret != GS_SUCCESS) { + GS_LOG_RUN_ERR("gstore recover failed with retcode(%d)!", ret); + } + return ret; +} + #ifdef __cplusplus } #endif \ No newline at end of file diff --git a/src/storage/gstor/gstor_executor.h b/src/storage/gstor/gstor_executor.h index 5c5c3fd..289c00c 100644 --- a/src/storage/gstor/gstor_executor.h +++ b/src/storage/gstor/gstor_executor.h @@ -36,7 +36,7 @@ extern "C" { EXPORT_API void gstor_shutdown(void); -EXPORT_API int gstor_startup(char *data_path); +EXPORT_API int gstor_startup(char *data_path, unsigned int startup_mode); EXPORT_API int gstor_alloc(void **handle); @@ -71,6 +71,10 @@ EXPORT_API int gstor_commit(void *handle); EXPORT_API int gstor_rollback(void *handle); +EXPORT_API int gstor_backup(void *handle, const char *bak_format); + +EXPORT_API int gstor_restore(void *handle, const char *restore_path, const char *old_path, const char *new_path); + EXPORT_API int gstor_vm_alloc(void *handle, unsigned int *vmid); EXPORT_API int gstor_vm_open(void *handle, unsigned int vmid, void **page); EXPORT_API void gstor_vm_close(void *handle, unsigned int vmid); diff --git a/src/storage/gstor/gstor_instance.h b/src/storage/gstor/gstor_instance.h index f1dfe2a..531d0e9 100644 --- a/src/storage/gstor/gstor_instance.h +++ b/src/storage/gstor/gstor_instance.h @@ -34,6 +34,10 @@ extern "C" { #endif +typedef enum en_startup_mode { + STARTUP_MODE_OPEN = 0, + STARTUP_MODE_NOMOUNT = 1, +} startup_mode_t; typedef enum en_shutdown_mode { SHUTDOWN_MODE_NORMAL = 0, diff --git a/src/storage/gstor/zekernel/kernel/backup/bak_common.c b/src/storage/gstor/zekernel/kernel/backup/bak_common.c index ed67ce1..f8a51e5 100644 --- a/src/storage/gstor/zekernel/kernel/backup/bak_common.c +++ b/src/storage/gstor/zekernel/kernel/backup/bak_common.c @@ -1969,6 +1969,10 @@ status_t bak_set_exclude_space(knl_session_t *session, bak_t *bak, galist_t *exc uint32 spc_id; errno_t ret; + if (exclude_spcs == NULL) { + return GS_SUCCESS; + } + ret = memset_sp(bak->exclude_spcs, sizeof(bool32) * GS_MAX_SPACES, 0, sizeof(bool32) * GS_MAX_SPACES); knl_securec_check(ret); @@ -1998,6 +2002,10 @@ status_t bak_set_include_space(knl_session_t *session, bak_t *bak, galist_t *inc ret = memset_sp(bak->include_spcs, sizeof(bool32) * GS_MAX_SPACES, 0, sizeof(bool32) * GS_MAX_SPACES); knl_securec_check(ret); + if (include_spcs == NULL) { + return GS_SUCCESS; + } + for (uint32 i = 0; i < include_spcs->count; i++) { spc_name = (text_t *)cm_galist_get(include_spcs, i); if (spc_get_space_id(session, spc_name, &spc_id) != GS_SUCCESS) { diff --git a/src/storage/storage.c b/src/storage/storage.c index 1a3d29e..e846270 100644 --- a/src/storage/storage.c +++ b/src/storage/storage.c @@ -23,13 +23,14 @@ */ #include "cm_text.h" -#include "storage.h" #include "srv_param.h" #include "db_handle.h" #include "gstor_adpt.h" +#include "gstor_executor.h" +#include "storage.h" typedef void(*db_shutdown_t)(void); -typedef int(*db_startup_t)(char *path); +typedef int(*db_startup_t)(char *path, db_startup_mode_t startup_mode); typedef void(*db_free_t)(void *handle); typedef void(*db_clean_t)(void *handle); typedef int(*db_alloc_t)(void **handle); @@ -43,6 +44,8 @@ typedef int(*db_get_t)(void *handle, char *key, uint32 key_len, char **val, uint typedef int(*db_cursor_next_t)(void *handle, bool32 *eof); typedef int(*db_open_cursor_t)(void *handle, char *key, uint32 key_len, uint32 flags, bool32 *eof); typedef int(*db_cursor_fetch_t)(void *handle, char **key, uint32 *key_len, char **val, uint32 *val_len); +typedef int(*db_backup_t)(void *handle, const char *bak_format); +typedef int(*db_restore_t)(void *handle, const char *restore_path, const char *old_path, const char *new_path); typedef struct st_db { db_put_t put; @@ -60,6 +63,8 @@ typedef struct st_db { db_open_cursor_t open_cursor; db_cursor_next_t cursor_next; db_cursor_fetch_t cursor_fetch; + db_backup_t backup; + db_restore_t restore; }db_t; typedef enum en_dbtype { @@ -70,7 +75,7 @@ typedef enum en_dbtype { static const db_t g_dbs[] = { { gstor_put, gstor_del, gstor_get, gstor_free, gstor_alloc, gstor_open_table, gstor_clean, gstor_begin, gstor_commit, gstor_startup, gstor_shutdown, gstor_rollback, gstor_open_cursor, gstor_cursor_next, - gstor_cursor_fetch }, + gstor_cursor_fetch, gstor_backup, gstor_restore}, }; static const db_t *g_curr_db = NULL; @@ -111,7 +116,7 @@ static inline void deinit_g_handle_pool(void) } } -status_t db_startup(void) +status_t db_startup(db_startup_mode_t startup_mode) { param_value_t data_path, dbtype; char real_data_path[CM_FILE_NAME_BUFFER_SIZE] = {0}; @@ -132,7 +137,7 @@ status_t db_startup(void) return CM_ERROR; } - if (g_dbs[dbtype.uint32_val].startup(real_data_path) != CM_SUCCESS) { + if (g_dbs[dbtype.uint32_val].startup(real_data_path, startup_mode) != CM_SUCCESS) { LOG_RUN_ERR("[STG] db %u startup failed", dbtype.uint32_val); return CM_ERROR; } @@ -291,4 +296,16 @@ status_t db_rollback(void *handle) { STG_CHECK_DB_STARTUP; return STG_HANDLE->rollback(((db_handle_t*)handle)->handle); +} + +status_t db_bakup(void *handle, const char *bak_format) +{ + STG_CHECK_DB_STARTUP; + return STG_HANDLE->backup(((db_handle_t*)handle)->handle, bak_format); +} + +status_t db_restore(void *handle, const char *restore_path, const char *old_path, const char *new_path) +{ + STG_CHECK_DB_STARTUP; + return STG_HANDLE->restore(((db_handle_t*)handle)->handle, restore_path, old_path, new_path); } \ No newline at end of file diff --git a/src/storage/storage.h b/src/storage/storage.h index d8db5d0..ab5f6fd 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -34,9 +34,14 @@ extern "C" { #endif +typedef enum en_db_startup_mode { + STARTUP_MODE_OPEN = 0, + STARTUP_MODE_NOMOUNT = 1, +} db_startup_mode_t; + void db_shutdown(void); -status_t db_startup(void); +status_t db_startup(db_startup_mode_t startup_mode); void db_free(void *handle); @@ -62,6 +67,10 @@ status_t db_commit(void *handle); status_t db_rollback(void *handle); +status_t db_bakup(void *handle, const char *bak_format); + +status_t db_restore(void *handle, const char *restore_path, const char *old_path, const char *new_path); + #ifdef __cplusplus } #endif -- Gitee