From 0b760d0a2108fd8546c9b522719a14004ccb21cd Mon Sep 17 00:00:00 2001 From: x30027624 Date: Wed, 2 Apr 2025 10:19:09 +0800 Subject: [PATCH] optimizing shuffle split to partition-wise --- .../cpp/src/shuffle/splitter.cpp | 301 ++++++++++-------- .../cpp/src/shuffle/splitter.h | 5 + 2 files changed, 166 insertions(+), 140 deletions(-) diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp index 4265d4c03..37fb83acd 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp @@ -24,6 +24,28 @@ using namespace omniruntime::vec; SplitOptions SplitOptions::Defaults() { return SplitOptions(); } +void Splitter::BuildPartition2Row(int32_t num_rows) +{ + row_offset_row_id_.resize(num_rows); + partition_row_offset_base_.resize(num_partitions_ + 1); + for (auto pid = 1; pid <= num_partitions_; ++pid) { + partition_row_offset_base_[pid] = partition_row_offset_base_[pid - 1] + partition_id_cnt_cur_[pid - 1]; + } + for (auto row = 0; row < num_rows; ++row) { + auto pid = partition_id_[row]; + row_offset_row_id_[partition_row_offset_base_[pid]++] = row; + } + for (auto pid = 0; pid < num_partitions_; ++pid) { + partition_row_offset_base_[pid] -= partition_id_cnt_cur_[pid]; + } + partition_used_.clear(); + for (auto pid = 0; pid != num_partitions_; ++pid) { + if (partition_id_cnt_cur_[pid] > 0) { + partition_used_.push_back(pid); + } + } +} + // 计算分区id,每个batch初始化 int Splitter::ComputeAndCountPartitionId(VectorBatch& vb) { auto num_rows = vb.GetRowCount(); @@ -130,46 +152,38 @@ int Splitter::SplitFixedWidthValueBuffer(VectorBatch& vb) { if (vb.Get(col_idx_vb)->GetEncoding() == OMNI_DICTIONARY) { LogsDebug("Dictionary Columnar process!"); - auto ids_addr = VectorHelper::UnsafeGetValues(vb.Get(col_idx_vb)); + auto ids_addr = static_cast(VectorHelper::UnsafeGetValues(vb.Get(col_idx_vb))); auto src_addr = reinterpret_cast(VectorHelper::UnsafeGetDictionary(vb.Get(col_idx_vb))); + auto process = [&](const ShuffleTypeId shuffleTypeId) { + const auto shuffle_size = (1 << shuffleTypeId); + for (auto &pid: partition_used_) { + auto dstPidBase = reinterpret_cast(dst_addrs[pid]) + partition_buffer_idx_base_[pid]; + auto pos = partition_row_offset_base_[pid]; + auto end = partition_row_offset_base_[pid + 1]; + auto count = end - pos; + for (; pos < end; ++pos) { + auto rowId = row_offset_row_id_[pos]; + *dstPidBase++ = reinterpret_cast(src_addr)[ids_addr[rowId]]; + } + partition_fixed_width_buffers_[col][pid][1]->size_ += shuffle_size * count; + partition_buffer_idx_offset_[pid] += count; + } + }; switch (column_type_id_[col_idx_schema]) { -#define PROCESS(SHUFFLE_TYPE, CTYPE) \ - case SHUFFLE_TYPE: \ - { \ - auto shuffle_size = (1 << SHUFFLE_TYPE); \ - for (auto row = 0; row < num_rows; ++row) { \ - auto pid = partition_id_[row]; \ - auto dst_offset = \ - partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; \ - reinterpret_cast(dst_addrs[pid])[dst_offset] = \ - reinterpret_cast(src_addr)[reinterpret_cast(ids_addr)[row]]; \ - partition_fixed_width_buffers_[col][pid][1]->size_ += shuffle_size; \ - partition_buffer_idx_offset_[pid]++; \ - } \ - } \ - break; - PROCESS(SHUFFLE_1BYTE, uint8_t) - PROCESS(SHUFFLE_2BYTE, uint16_t) - PROCESS(SHUFFLE_4BYTE, uint32_t) - PROCESS(SHUFFLE_8BYTE, uint64_t) -#undef PROCESS + case SHUFFLE_1BYTE: + process.operator()(SHUFFLE_1BYTE); + break; + case SHUFFLE_2BYTE: + process.operator()(SHUFFLE_2BYTE); + break; + case SHUFFLE_4BYTE: + process.operator()(SHUFFLE_4BYTE); + break; + case SHUFFLE_8BYTE: + process.operator()(SHUFFLE_8BYTE); + break; case SHUFFLE_DECIMAL128: - { - auto shuffle_size = (1 << SHUFFLE_DECIMAL128); - for (auto row = 0; row < num_rows; ++row) { - auto pid = partition_id_[row]; - auto dst_offset = - partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; - // 前64位取值、赋值 - reinterpret_cast(dst_addrs[pid])[dst_offset << 1] = - reinterpret_cast(src_addr)[reinterpret_cast(ids_addr)[row] << 1]; - // 后64位取值、赋值 - reinterpret_cast(dst_addrs[pid])[(dst_offset << 1) | 1] = - reinterpret_cast(src_addr)[(reinterpret_cast(ids_addr)[row] << 1) | 1]; - partition_fixed_width_buffers_[col][pid][1]->size_ += shuffle_size; //decimal128 16Bytes - partition_buffer_idx_offset_[pid]++; - } - } + process.operator()(SHUFFLE_DECIMAL128); break; default: { LogsError("SplitFixedWidthValueBuffer not match this type: %d", column_type_id_[col_idx_schema]); @@ -178,42 +192,37 @@ int Splitter::SplitFixedWidthValueBuffer(VectorBatch& vb) { } } else { auto src_addr = reinterpret_cast(VectorHelper::UnsafeGetValues(vb.Get(col_idx_vb))); + auto process = [&](const ShuffleTypeId shuffleTypeId) { + const auto shuffle_size = (1 << shuffleTypeId); + for (auto &pid: partition_used_) { + auto dst_offset = partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; + auto dstPidBase = reinterpret_cast(dst_addrs[pid]) + dst_offset; + auto pos = partition_row_offset_base_[pid]; + auto end = partition_row_offset_base_[pid + 1]; + auto count = end - pos; + for (; pos < end; ++pos) { + auto rowId = row_offset_row_id_[pos]; + *dstPidBase++ = reinterpret_cast(src_addr)[rowId]; + } + partition_fixed_width_buffers_[col][pid][1]->size_ += shuffle_size * count; + partition_buffer_idx_offset_[pid] += count; + } + }; switch (column_type_id_[col_idx_schema]) { -#define PROCESS(SHUFFLE_TYPE, CTYPE) \ - case SHUFFLE_TYPE: \ - { \ - auto shuffle_size = (1 << SHUFFLE_TYPE); \ - for (auto row = 0; row < num_rows; ++row) { \ - auto pid = partition_id_[row]; \ - auto dst_offset = \ - partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; \ - reinterpret_cast(dst_addrs[pid])[dst_offset] = \ - reinterpret_cast(src_addr)[row]; \ - partition_fixed_width_buffers_[col][pid][1]->size_ += shuffle_size; \ - partition_buffer_idx_offset_[pid]++; \ - } \ - } \ - break; - PROCESS(SHUFFLE_1BYTE, uint8_t) - PROCESS(SHUFFLE_2BYTE, uint16_t) - PROCESS(SHUFFLE_4BYTE, uint32_t) - PROCESS(SHUFFLE_8BYTE, uint64_t) -#undef PROCESS + case SHUFFLE_1BYTE: + process.operator()(SHUFFLE_1BYTE); + break; + case SHUFFLE_2BYTE: + process.operator()(SHUFFLE_2BYTE); + break; + case SHUFFLE_4BYTE: + process.operator()(SHUFFLE_4BYTE); + break; + case SHUFFLE_8BYTE: + process.operator()(SHUFFLE_8BYTE); + break; case SHUFFLE_DECIMAL128: - { - auto shuffle_size = (1 << SHUFFLE_DECIMAL128); - for (auto row = 0; row < num_rows; ++row) { - auto pid = partition_id_[row]; - auto dst_offset = - partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; - reinterpret_cast(dst_addrs[pid])[dst_offset << 1] = - reinterpret_cast(src_addr)[row << 1]; // 前64位取值、赋值 - reinterpret_cast(dst_addrs[pid])[(dst_offset << 1) | 1] = - reinterpret_cast(src_addr)[(row << 1) | 1]; // 后64位取值、赋值 - partition_fixed_width_buffers_[col][pid][1]->size_ += shuffle_size; //decimal128 16Bytes - partition_buffer_idx_offset_[pid]++; - } - } + process.operator()(SHUFFLE_DECIMAL128); break; default: { LogsError("ERROR: SplitFixedWidthValueBuffer not match this type: %d", column_type_id_[col_idx_schema]); @@ -239,84 +248,92 @@ void Splitter::SplitBinaryVector(BaseVector *varcharVector, int col_schema) { auto vc = reinterpret_cast> *>( varcharVector); cached_vectorbatch_size_ += num_rows * (sizeof(bool) + sizeof(int32_t)); - for (auto row = 0; row < num_rows; ++row) { - auto pid = partition_id_[row]; + for (auto &pid: partition_used_) { uint8_t *dst = nullptr; uint32_t str_len = 0; - if constexpr (hasNull) { - if (!vc->IsNull(row)) { - std::string_view value = vc->GetValue(row); + auto index = 0; + auto pos = partition_row_offset_base_[pid]; + auto end = partition_row_offset_base_[pid + 1]; + for (; pos < end; ++pos, ++index) { + auto rowId = row_offset_row_id_[pos]; + if constexpr (hasNull) { + if (!vc->IsNull(rowId)) { + std::string_view value = vc->GetValue(rowId); + dst = reinterpret_cast(reinterpret_cast(value.data())); + str_len = static_cast(value.length()); + } + } else { + std::string_view value = vc->GetValue(rowId); dst = reinterpret_cast(reinterpret_cast(value.data())); str_len = static_cast(value.length()); } - } else { - std::string_view value = vc->GetValue(row); - dst = reinterpret_cast(reinterpret_cast(value.data())); - str_len = static_cast(value.length()); - } - if constexpr (hasNull) { - is_null = vc->IsNull(row); - } - cached_vectorbatch_size_ += str_len; // 累计变长部分cache数据 - VCLocation cl((uint64_t) dst, str_len, is_null); - if ((vc_partition_array_buffers_[pid][col_schema].size() != 0) && - (vc_partition_array_buffers_[pid][col_schema].back().getVcList().size() < - options_.spill_batch_row_num)) { - if constexpr(hasNull) { - HandleNull(vc_partition_array_buffers_[pid][col_schema].back(), is_null); - } - vc_partition_array_buffers_[pid][col_schema].back().getVcList().push_back(cl); - vc_partition_array_buffers_[pid][col_schema].back().vcb_total_len += str_len; - } else { - VCBatchInfo svc(options_.spill_batch_row_num); - svc.getVcList().push_back(cl); - svc.vcb_total_len += str_len; if constexpr (hasNull) { - HandleNull(svc, is_null); + is_null = vc->IsNull(rowId); + } + cached_vectorbatch_size_ += str_len; // 累计变长部分cache数据 + if ((vc_partition_array_buffers_[pid][col_schema].size() != 0) && + (vc_partition_array_buffers_[pid][col_schema].back().getVcList().size() < + options_.spill_batch_row_num)) { + if constexpr (hasNull) { + HandleNull(vc_partition_array_buffers_[pid][col_schema].back(), is_null); + } + vc_partition_array_buffers_[pid][col_schema].back().getVcList().emplace_back((uint64_t)dst, str_len, is_null); + vc_partition_array_buffers_[pid][col_schema].back().vcb_total_len += str_len; + } else { + VCBatchInfo svc(options_.spill_batch_row_num); + svc.getVcList().emplace_back((uint64_t)dst, str_len, is_null); + svc.vcb_total_len += str_len; + if constexpr (hasNull) { + HandleNull(svc, is_null); + } + vc_partition_array_buffers_[pid][col_schema].emplace_back(svc); } - vc_partition_array_buffers_[pid][col_schema].push_back(svc); } } } else { auto vc = reinterpret_cast> *>(varcharVector); cached_vectorbatch_size_ += num_rows * (sizeof(bool) + sizeof(int32_t)) + sizeof(int32_t); - for (auto row = 0; row < num_rows; ++row) { - auto pid = partition_id_[row]; + for (auto &pid: partition_used_) { + auto &vc_partition_array = vc_partition_array_buffers_[pid]; uint8_t *dst = nullptr; uint32_t str_len = 0; - if constexpr (hasNull) { - if (!vc->IsNull(row)) { - std::string_view value = vc->GetValue(row); + auto index = 0; + auto pos = partition_row_offset_base_[pid]; + auto end = partition_row_offset_base_[pid + 1]; + for (; pos < end; ++pos, ++index) { + auto rowId = row_offset_row_id_[pos]; + if constexpr (hasNull) { + if (!vc->IsNull(rowId)) { + std::string_view value = vc->GetValue(rowId); + dst = reinterpret_cast(reinterpret_cast(value.data())); + str_len = static_cast(value.length()); + } + } else { + std::string_view value = vc->GetValue(rowId); dst = reinterpret_cast(reinterpret_cast(value.data())); str_len = static_cast(value.length()); } - } else { - std::string_view value = vc->GetValue(row); - dst = reinterpret_cast(reinterpret_cast(value.data())); - str_len = static_cast(value.length()); - } - - if constexpr (hasNull) { - is_null = vc->IsNull(row); - } - cached_vectorbatch_size_ += str_len; // 累计变长部分cache数据 - VCLocation cl((uint64_t) dst, str_len, is_null); - if ((vc_partition_array_buffers_[pid][col_schema].size() != 0) && - (vc_partition_array_buffers_[pid][col_schema].back().getVcList().size() < - options_.spill_batch_row_num)) { - if constexpr(hasNull) { - HandleNull(vc_partition_array_buffers_[pid][col_schema].back(), is_null); + if constexpr (hasNull) { + is_null = vc->IsNull(rowId); } - vc_partition_array_buffers_[pid][col_schema].back().getVcList().push_back(cl); - vc_partition_array_buffers_[pid][col_schema].back().vcb_total_len += str_len; - } else { - VCBatchInfo svc(options_.spill_batch_row_num); - svc.getVcList().push_back(cl); - if constexpr(hasNull) { - HandleNull(svc, is_null); + cached_vectorbatch_size_ += str_len; // 累计变长部分cache数据 + if ((vc_partition_array[col_schema].size() != 0) && + (vc_partition_array[col_schema].back().getVcList().size() < + options_.spill_batch_row_num)) { + if constexpr (hasNull) { + HandleNull(vc_partition_array[col_schema].back(), is_null); + } + vc_partition_array[col_schema].back().getVcList().emplace_back((uint64_t)dst, str_len, is_null); + vc_partition_array[col_schema].back().vcb_total_len += str_len; + } else { + VCBatchInfo svc(options_.spill_batch_row_num); + svc.getVcList().emplace_back((uint64_t)dst, str_len, is_null); + if constexpr (hasNull) { + HandleNull(svc, is_null); + } + svc.vcb_total_len += str_len; + vc_partition_array[col_schema].emplace_back(svc); } - svc.vcb_total_len += str_len; - vc_partition_array_buffers_[pid][col_schema].push_back(svc); } } } @@ -358,7 +375,9 @@ int Splitter::SplitFixedWidthValidityBuffer(VectorBatch& vb){ for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_id_cnt_cur_[pid] > 0 && dst_addrs[pid] == nullptr) { // init bitmap if it's null - auto new_size = partition_id_cnt_cur_[pid] > options_.buffer_size ? partition_id_cnt_cur_[pid] : options_.buffer_size; + auto new_size = partition_id_cnt_cur_[pid] > options_.buffer_size + ? partition_id_cnt_cur_[pid] + : options_.buffer_size; auto ptr_tmp = static_cast(options_.allocator->Alloc(new_size)); if (nullptr == ptr_tmp) { throw std::runtime_error("Allocator for ValidityBuffer Failed! "); @@ -373,15 +392,15 @@ int Splitter::SplitFixedWidthValidityBuffer(VectorBatch& vb){ } // 计算并填充数据 - auto src_addr = const_cast((uint8_t *)( - reinterpret_cast(omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vb.Get(col_idx))))); - memset_s(partition_buffer_idx_offset_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); - const auto num_rows = vb.GetRowCount(); - for (auto row = 0; row < num_rows; ++row) { - auto pid = partition_id_[row]; - auto dst_offset = partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; - dst_addrs[pid][dst_offset] = omniruntime::BitUtil::IsBitSet(src_addr, row); - partition_buffer_idx_offset_[pid]++; + auto src_addr = unsafe::UnsafeBaseVector::GetNulls(vb.Get(col_idx)); + for (auto &pid: partition_used_) { + auto dstPidBase = dst_addrs[pid] + partition_buffer_idx_base_[pid]; + auto pos = partition_row_offset_base_[pid]; + auto end = partition_row_offset_base_[pid + 1]; + for (; pos < end; ++pos) { + auto rowId = row_offset_row_id_[pos]; + *dstPidBase++ = omniruntime::BitUtil::IsBitSet(src_addr, rowId); + } } } } @@ -449,6 +468,8 @@ int Splitter::DoSplit(VectorBatch& vb) { } } } + BuildPartition2Row(vb.GetRowCount()); + SplitFixedWidthValueBuffer(vb); SplitFixedWidthValidityBuffer(vb); diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h index 9f0e8fa58..d30eccf4d 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h @@ -126,6 +126,9 @@ class Splitter { std::vector partition_id_; // 记录当前vb每一行的pid int32_t *partition_id_cnt_cur_; // 统计不同partition记录的行数(当前处理中的vb) uint64_t *partition_id_cnt_cache_; // 统计不同partition记录的行数,cache住的 + std::vector row_offset_row_id_; + std::vector partition_used_; + std::vector partition_row_offset_base_; // column number uint32_t num_row_splited_; // cached row number uint64_t cached_vectorbatch_size_; // cache total vectorbatch size in bytes @@ -160,6 +163,8 @@ class Splitter { spark::ProtoRowBatch *protoRowBatch = new ProtoRowBatch(); private: + void BuildPartition2Row(int32_t row_count); + void ReleaseVarcharVector() { std::set::iterator it; -- Gitee