From e16bf85717e041943e568aa824e03db585d551a6 Mon Sep 17 00:00:00 2001 From: Anllcik <654610542@qq.com> Date: Wed, 25 Oct 2023 10:15:37 +0800 Subject: [PATCH 1/2] kae for spark322 --- .../cpp/src/CMakeLists.txt | 4 +- .../cpp/src/common/common.cpp | 8 +- .../cpp/src/io/Common.hh | 7 +- .../cpp/src/io/Compression.cc | 221 ++-------------- .../cpp/src/io/WriterOptions.cc | 2 +- .../cpp/test/CMakeLists.txt | 7 + .../cpp/test/shuffle/shuffle_test.cpp | 46 ++-- .../spark/compress/CompressionUtil.java | 8 +- .../{ZlibCodec.java => KaeCodec.java} | 6 +- .../boostkit/spark/ColumnarPluginConfig.scala | 4 +- .../spark/ColumnShuffleCompressionTest.java | 12 +- .../spark/ColumnShuffleDiffPartitionTest.java | 2 +- .../spark/ColumnShuffleDiffRowVBTest.java | 14 +- .../spark/ColumnShuffleGBSizeTest.java | 12 +- .../boostkit/spark/ColumnShuffleNullTest.java | 8 +- ... => ColumnShuffleSerializerKaeSuite.scala} | 8 +- .../ColumnShuffleSerializerSnappySuite.scala | 247 ------------------ 17 files changed, 93 insertions(+), 523 deletions(-) rename omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/{ZlibCodec.java => KaeCodec.java} (93%) rename omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/{ColumnShuffleSerializerZlibSuite.scala => ColumnShuffleSerializerKaeSuite.scala} (98%) delete mode 100644 omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSnappySuite.scala diff --git a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt index 7256a02cb..915b71f68 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt +++ b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt @@ -1,4 +1,5 @@ include_directories(SYSTEM "/user/local/include") +include_directories(KAEZIP "/usr/local/kaezip/include") set (PROJ_TARGET spark_columnar_plugin) @@ -41,7 +42,7 @@ target_include_directories(${PROJ_TARGET} PUBLIC $ENV{JAVA_HOME}/include) target_include_directories(${PROJ_TARGET} PUBLIC $ENV{JAVA_HOME}/include/linux) target_include_directories(${PROJ_TARGET} PUBLIC ${CMAKE_CURRENT_BINARY_DIR}) -target_link_libraries (${PROJ_TARGET} PUBLIC +target_link_libraries (${PROJ_TARGET} PRIVATE Arrow::arrow_shared ArrowDataset::arrow_dataset_shared Parquet::parquet_shared @@ -50,6 +51,7 @@ target_link_libraries (${PROJ_TARGET} PUBLIC sasl2 protobuf z + kaezip snappy lz4 zstd diff --git a/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp b/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp index f33d5c4c9..9166716f8 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp @@ -58,14 +58,10 @@ uint32_t reversebytes_uint32t(uint32_t const value) spark::CompressionKind GetCompressionType(const std::string& name) { if (name == "uncompressed") { return spark::CompressionKind::CompressionKind_NONE; - } else if (name == "zlib") { - return spark::CompressionKind::CompressionKind_ZLIB; - } else if (name == "snappy") { - return spark::CompressionKind::CompressionKind_SNAPPY; + } else if (name == "kae") { + return spark::CompressionKind::CompressionKind_KAEZIP; } else if (name == "lz4") { return spark::CompressionKind::CompressionKind_LZ4; - } else if (name == "zstd") { - return spark::CompressionKind::CompressionKind_ZSTD; } else { throw std::logic_error("compression codec not supported"); } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/Common.hh b/omnioperator/omniop-spark-extension/cpp/src/io/Common.hh index e24036356..412c38a15 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/Common.hh +++ b/omnioperator/omniop-spark-extension/cpp/src/io/Common.hh @@ -23,11 +23,8 @@ namespace spark { enum CompressionKind { CompressionKind_NONE = 0, - CompressionKind_ZLIB = 1, - CompressionKind_SNAPPY = 2, - CompressionKind_LZO = 3, - CompressionKind_LZ4 = 4, - CompressionKind_ZSTD = 5 + CompressionKind_KAEZIP = 1, + CompressionKind_LZ4 = 2 }; } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc b/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc index 720f7ff19..a1b731cc9 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc +++ b/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc @@ -26,7 +26,7 @@ #include #include "zlib.h" -#include "zstd.h" +#include "kaezip.h" #include "wrap/snappy_wrapper.h" @@ -210,115 +210,6 @@ namespace spark { return true; } - class ZlibCompressionStream: public CompressionStream { - public: - ZlibCompressionStream(OutputStream * outStream, - int compressionLevel, - uint64_t capacity, - uint64_t blockSize, - MemoryPool& pool); - - virtual ~ZlibCompressionStream() override { - end(); - } - - virtual std::string getName() const override; - - protected: - virtual uint64_t doStreamingCompression() override; - - private: - void init(); - void end(); - z_stream strm; - }; - - ZlibCompressionStream::ZlibCompressionStream( - OutputStream * outStream, - int compressionLevel, - uint64_t capacity, - uint64_t blockSize, - MemoryPool& pool) - : CompressionStream(outStream, - compressionLevel, - capacity, - blockSize, - pool) { - init(); - } - - uint64_t ZlibCompressionStream::doStreamingCompression() { - if (deflateReset(&strm) != Z_OK) { - throw std::runtime_error("Failed to reset inflate."); - } - - strm.avail_in = static_cast(bufferSize); - strm.next_in = rawInputBuffer.data(); - - do { - if (outputPosition >= outputSize) { - if (!BufferedOutputStream::Next( - reinterpret_cast(&outputBuffer), - &outputSize)) { - throw std::runtime_error( - "Failed to get next output buffer from output stream."); - } - outputPosition = 0; - } - strm.next_out = reinterpret_cast - (outputBuffer + outputPosition); - strm.avail_out = static_cast - (outputSize - outputPosition); - - int ret = deflate(&strm, Z_FINISH); - outputPosition = outputSize - static_cast(strm.avail_out); - - if (ret == Z_STREAM_END) { - break; - } else if (ret == Z_OK) { - // needs more buffer so will continue the loop - } else { - throw std::runtime_error("Failed to deflate input data."); - } - } while (strm.avail_out == 0); - - return strm.total_out; - } - - std::string ZlibCompressionStream::getName() const { - return "ZlibCompressionStream"; - } - -// DIAGNOSTIC_PUSH - -#if defined(__GNUC__) || defined(__clang__) - DIAGNOSTIC_IGNORE("-Wold-style-cast") -#endif - - void ZlibCompressionStream::init() { - strm.zalloc = nullptr; - strm.zfree = nullptr; - strm.opaque = nullptr; - strm.next_in = nullptr; - - if (deflateInit2(&strm, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) - != Z_OK) { - throw std::runtime_error("Error while calling deflateInit2() for zlib."); - } - } - - void ZlibCompressionStream::end() { - (void)deflateEnd(&strm); - } - -// DIAGNOSTIC_PUSH - - enum DecompressState { DECOMPRESS_HEADER, - DECOMPRESS_START, - DECOMPRESS_CONTINUE, - DECOMPRESS_ORIGINAL, - DECOMPRESS_EOF}; - // DIAGNOSTIC_PUSH #if defined(__GNUC__) || defined(__clang__) @@ -478,54 +369,11 @@ namespace spark { } /** - * Snappy block compression - */ - class SnappyCompressionStream: public BlockCompressionStream { - public: - SnappyCompressionStream(OutputStream * outStream, - int compressionLevel, - uint64_t capacity, - uint64_t blockSize, - MemoryPool& pool) - : BlockCompressionStream(outStream, - compressionLevel, - capacity, - blockSize, - pool) { - } - - virtual std::string getName() const override { - return "SnappyCompressionStream"; - } - - virtual ~SnappyCompressionStream() override { - // PASS - } - - protected: - virtual uint64_t doBlockCompression() override; - - virtual uint64_t estimateMaxCompressionSize() override { - return static_cast - (snappy::MaxCompressedLength(static_cast(bufferSize))); - } - }; - - uint64_t SnappyCompressionStream::doBlockCompression() { - size_t compressedLength; - snappy::RawCompress(reinterpret_cast(rawInputBuffer.data()), - static_cast(bufferSize), - reinterpret_cast(compressorBuffer.data()), - &compressedLength); - return static_cast(compressedLength); - } - - /** - * ZSTD block compression + * KAE block compression */ - class ZSTDCompressionStream: public BlockCompressionStream{ + class KAECompressionStream: public BlockCompressionStream{ public: - ZSTDCompressionStream(OutputStream * outStream, + KAECompressionStream(OutputStream * outStream, int compressionLevel, uint64_t capacity, uint64_t blockSize, @@ -539,10 +387,10 @@ namespace spark { } virtual std::string getName() const override { - return "ZstdCompressionStream"; + return "KaeCompressionStream"; } - virtual ~ZSTDCompressionStream() override { + virtual ~KAECompressionStream() override { this->end(); } @@ -550,22 +398,26 @@ namespace spark { virtual uint64_t doBlockCompression() override; virtual uint64_t estimateMaxCompressionSize() override { - return ZSTD_compressBound(static_cast(bufferSize)); + return compressBound(static_cast(bufferSize)); } private: void init(); void end(); - ZSTD_CCtx *cctx; + z_stream strm; }; - uint64_t ZSTDCompressionStream::doBlockCompression() { - return ZSTD_compressCCtx(cctx, - compressorBuffer.data(), - compressorBuffer.size(), - rawInputBuffer.data(), - static_cast(bufferSize), - level); + uint64_t KAECompressionStream::doBlockCompression() { + auto compressedSize = compressorBuffer.size(); + auto ret = compress2(compressorBuffer.data(), + &compressedSize, + rawInputBuffer.data(), + static_cast(bufferSize), + level); + if (ret != Z_OK) { + throw std::runtime_error("Failed to compress input data"); + } + return compressedSize; } // DIAGNOSTIC_PUSH @@ -574,24 +426,23 @@ namespace spark { DIAGNOSTIC_IGNORE("-Wold-style-cast") #endif - void ZSTDCompressionStream::init() { - - cctx = ZSTD_createCCtx(); - if (!cctx) { - throw std::runtime_error("Error while calling ZSTD_createCCtx() for zstd."); + void KAECompressionStream::init() { + strm.zalloc = nullptr; + strm.zfree = nullptr; + strm.opaque = nullptr; + if (!kz_get_devices() || deflateInit(&strm, level) != Z_OK) { + throw std::runtime_error("Error while calling deflateInit for kaezip."); } } - void ZSTDCompressionStream::end() { - (void)ZSTD_freeCCtx(cctx); - cctx = nullptr; + void KAECompressionStream::end() { + (void)deflateEnd(&strm); } #if defined(__GNUC__) || defined(__clang__) DIAGNOSTIC_IGNORE("-Wold-style-cast") #endif - // DIAGNOSTIC_PUSH std::unique_ptr @@ -608,20 +459,13 @@ namespace spark { (new BufferedOutputStream( pool, outStream, bufferCapacity, compressionBlockSize)); } - case CompressionKind_ZLIB: { + case CompressionKind_KAEZIP: { int level = (strategy == CompressionStrategy_SPEED) ? Z_BEST_SPEED + 1 : Z_DEFAULT_COMPRESSION; return std::unique_ptr - (new ZlibCompressionStream( + (new KAECompressionStream( outStream, level, bufferCapacity, compressionBlockSize, pool)); } - case CompressionKind_ZSTD: { - int level = (strategy == CompressionStrategy_SPEED) ? - 1 : ZSTD_CLEVEL_DEFAULT; - return std::unique_ptr - (new ZSTDCompressionStream( - outStream, level, bufferCapacity, compressionBlockSize, pool)); - } case CompressionKind_LZ4: { int level = (strategy == CompressionStrategy_SPEED) ? LZ4_ACCELERATION_MAX : LZ4_ACCELERATION_DEFAULT; @@ -629,13 +473,6 @@ namespace spark { (new Lz4CompressionSteam( outStream, level, bufferCapacity, compressionBlockSize, pool)); } - case CompressionKind_SNAPPY: { - int level = 0; - return std::unique_ptr - (new SnappyCompressionStream( - outStream, level, bufferCapacity, compressionBlockSize, pool)); - } - case CompressionKind_LZO: default: throw std::logic_error("compression codec not supported"); } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/WriterOptions.cc b/omnioperator/omniop-spark-extension/cpp/src/io/WriterOptions.cc index 43bc0b38b..dde88ef31 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/WriterOptions.cc +++ b/omnioperator/omniop-spark-extension/cpp/src/io/WriterOptions.cc @@ -33,7 +33,7 @@ namespace spark { WriterOptionsPrivate() { // default to Hive_0_12 compressionBlockSize = 64 * 1024; // 64K - compression = CompressionKind_ZLIB; + compression = CompressionKind_KAEZIP; compressionStrategy = CompressionStrategy_SPEED; memoryPool = getDefaultPool(); } diff --git a/omnioperator/omniop-spark-extension/cpp/test/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/test/CMakeLists.txt index ba1ad3a77..709ed7005 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/CMakeLists.txt +++ b/omnioperator/omniop-spark-extension/cpp/test/CMakeLists.txt @@ -14,6 +14,9 @@ set(MY_LINK # find gtest package find_package(GTest REQUIRED) +find_package(Arrow REQUIRED) +find_package(ArrowDataset REQUIRED) +find_package(Parquet REQUIRED) # compile a executable file add_executable(${TP_TEST_TARGET} ${ROOT_SRCS} ${TEST_ROOT_SRCS}) @@ -31,6 +34,10 @@ target_link_libraries(${TP_TEST_TARGET} dl boostkit-omniop-vector-1.3.0-aarch64 securec + Arrow::arrow_shared + ArrowDataset::arrow_dataset_shared + Parquet::parquet_shared + orc spark_columnar_plugin) target_compile_options(${TP_TEST_TARGET} PUBLIC -g -O2 -fPIC) diff --git a/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp b/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp index 3031943ee..24fee8f5f 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp @@ -68,7 +68,7 @@ TEST_F (ShuffleTest, Split_SingleVarChar) { inputDataTypes, colNumber, 1024, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -106,7 +106,7 @@ TEST_F (ShuffleTest, Split_Fixed_Cols) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -134,7 +134,7 @@ TEST_F (ShuffleTest, Split_Fixed_SinglePartition_SomeNullRow) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -162,7 +162,7 @@ TEST_F (ShuffleTest, Split_Fixed_SinglePartition_SomeNullCol) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -180,20 +180,8 @@ TEST_F (ShuffleTest, Split_Compression_None) { Test_Shuffle_Compression("uncompressed", 4, 999, 999); } -TEST_F (ShuffleTest, Split_Compression_zstd) { - Test_Shuffle_Compression("zstd", 4, 999, 999); -} - -TEST_F (ShuffleTest, Split_Compression_Lz4) { - Test_Shuffle_Compression("lz4", 4, 999, 999); -} - -TEST_F (ShuffleTest, Split_Compression_Snappy) { - Test_Shuffle_Compression("snappy", 4, 999, 999); -} - -TEST_F (ShuffleTest, Split_Compression_Zlib) { - Test_Shuffle_Compression("zlib", 4, 999, 999); +TEST_F (ShuffleTest, Split_Compression_kae) { + Test_Shuffle_Compression("kae", 4, 999, 999); } TEST_F (ShuffleTest, Split_Mix_LargeSize) { @@ -210,7 +198,7 @@ TEST_F (ShuffleTest, Split_Mix_LargeSize) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -238,7 +226,7 @@ TEST_F (ShuffleTest, Split_Short_10WRows) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -266,7 +254,7 @@ TEST_F (ShuffleTest, Split_Boolean_10WRows) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -294,7 +282,7 @@ TEST_F (ShuffleTest, Split_Long_100WRows) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -322,7 +310,7 @@ TEST_F (ShuffleTest, Split_VarChar_LargeSize) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 64, tmpTestingDir); @@ -350,7 +338,7 @@ TEST_F (ShuffleTest, Split_VarChar_First) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -394,7 +382,7 @@ TEST_F (ShuffleTest, Split_Dictionary) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -422,7 +410,7 @@ TEST_F (ShuffleTest, Split_Char) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 64, tmpTestingDir); @@ -450,7 +438,7 @@ TEST_F (ShuffleTest, Split_Decimal128) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -478,7 +466,7 @@ TEST_F (ShuffleTest, Split_Decimal64) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -506,7 +494,7 @@ TEST_F (ShuffleTest, Split_Decimal64_128) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/CompressionUtil.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/CompressionUtil.java index c3b35a4f6..c7a601779 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/CompressionUtil.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/CompressionUtil.java @@ -23,12 +23,8 @@ import io.airlift.compress.lzo.LzoDecompressor; public class CompressionUtil { public static CompressionCodec createCodec(String compressionCodec) { switch (compressionCodec) { - case "zlib": - return new ZlibCodec(); - case "snappy": - return new SnappyCodec(); - case "lzo": - return new AircompressorCodec(new LzoDecompressor()); + case "kae": + return new KaeCodec(); case "lz4": return new AircompressorCodec(new Lz4Decompressor()); default: diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/ZlibCodec.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/KaeCodec.java similarity index 93% rename from omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/ZlibCodec.java rename to omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/KaeCodec.java index 89f326566..9ae9fb8e0 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/ZlibCodec.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/KaeCodec.java @@ -21,13 +21,13 @@ import java.io.IOException; import java.util.zip.DataFormatException; import java.util.zip.Inflater; -public class ZlibCodec implements CompressionCodec { +public class KaeCodec implements CompressionCodec { - public ZlibCodec() {} + public KaeCodec() {} @Override public int decompress(byte[] input, int inputLength, byte[] output) throws IOException { - Inflater inflater = new Inflater(true); + Inflater inflater = new Inflater(false); int offset = 0; int length = output.length; try { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index d59978a88..6a7684cef 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -150,9 +150,9 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { val enableShuffleCompress = conf.getConfString("spark.shuffle.compress", "true").toBoolean - // shuffle compress type, default lz4 + // shuffle compress type, default kae val columnarShuffleCompressionCodec = - conf.getConfString("spark.io.compression.codec", "lz4").toString + conf.getConfString("spark.omni.io.compression.codec", "kae").toString // columnar shuffle native buffer size val columnarShuffleNativeBufferSize = diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleCompressionTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleCompressionTest.java index d95be1883..74a8fbf87 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleCompressionTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleCompressionTest.java @@ -80,12 +80,6 @@ public class ColumnShuffleCompressionTest extends ColumnShuffleTest { columnShuffleTestCompress("uncompressed", shuffleDataFile); } - @Test - public void columnShuffleSnappyCompressTest() throws IOException { - shuffleDataFile = shuffleTestDir + "/shuffle_dataFile_snappy_test"; - columnShuffleTestCompress("snappy", shuffleDataFile); - } - @Test public void columnShuffleLz4CompressTest() throws IOException { shuffleDataFile = shuffleTestDir + "/shuffle_dataFile_lz4_test"; @@ -93,9 +87,9 @@ public class ColumnShuffleCompressionTest extends ColumnShuffleTest { } @Test - public void columnShuffleZlibCompressTest() throws IOException { - shuffleDataFile = shuffleTestDir + "/shuffle_dataFile_zlib_test"; - columnShuffleTestCompress("zlib", shuffleDataFile); + public void columnShuffleKaeCompressTest() throws IOException { + shuffleDataFile = shuffleTestDir + "/shuffle_dataFile_kae_test"; + columnShuffleTestCompress("kae", shuffleDataFile); } public void columnShuffleTestCompress(String compressType, String dataFile) throws IOException { diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffPartitionTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffPartitionTest.java index c8fd47413..a809c8147 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffPartitionTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffPartitionTest.java @@ -109,7 +109,7 @@ public class ColumnShuffleDiffPartitionTest extends ColumnShuffleTest { tmpStr, types.length, 3, - "lz4", + "kae", dataFile, 0, shuffleTestDir, diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffRowVBTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffRowVBTest.java index dc53fda8a..adbcc0e62 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffRowVBTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffRowVBTest.java @@ -89,7 +89,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -119,7 +119,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -149,7 +149,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -179,7 +179,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -208,7 +208,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -237,7 +237,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -276,7 +276,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleGBSizeTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleGBSizeTest.java index 2ef81ac49..488aaa496 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleGBSizeTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleGBSizeTest.java @@ -89,7 +89,7 @@ public class ColumnShuffleGBSizeTest extends ColumnShuffleTest { tmpStr, types.length, 4096, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -118,7 +118,7 @@ public class ColumnShuffleGBSizeTest extends ColumnShuffleTest { tmpStr, types.length, 4096, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -147,7 +147,7 @@ public class ColumnShuffleGBSizeTest extends ColumnShuffleTest { tmpStr, types.length, 1024, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -177,7 +177,7 @@ public class ColumnShuffleGBSizeTest extends ColumnShuffleTest { tmpStr, types.length, 1024, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -207,7 +207,7 @@ public class ColumnShuffleGBSizeTest extends ColumnShuffleTest { tmpStr, types.length, 4096, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -238,7 +238,7 @@ public class ColumnShuffleGBSizeTest extends ColumnShuffleTest { tmpStr, types.length, 4096, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleNullTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleNullTest.java index 98fc18dd8..66b5503f9 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleNullTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleNullTest.java @@ -88,7 +88,7 @@ public class ColumnShuffleNullTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -118,7 +118,7 @@ public class ColumnShuffleNullTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -149,7 +149,7 @@ public class ColumnShuffleNullTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -180,7 +180,7 @@ public class ColumnShuffleNullTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerZlibSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerKaeSuite.scala similarity index 98% rename from omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerZlibSuite.scala rename to omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerKaeSuite.scala index 1088c37e0..1f8b5ef23 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerZlibSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerKaeSuite.scala @@ -39,7 +39,7 @@ import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Mockito.{doAnswer, when} import org.mockito.invocation.InvocationOnMock -class ColumnShuffleSerializerZlibSuite extends SharedSparkSession { +class ColumnShuffleSerializerKaeSuite extends SharedSparkSession { @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _ @Mock(answer = RETURNS_SMART_NULLS) private var dependency @@ -47,10 +47,10 @@ class ColumnShuffleSerializerZlibSuite extends SharedSparkSession { override def sparkConf: SparkConf = super.sparkConf - .setAppName("test shuffle serializer for zlib") + .setAppName("test shuffle serializer for kae") .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.OmniColumnarShuffleManager") .set("spark.shuffle.compress", "true") - .set("spark.io.compression.codec", "zlib") + .set("spark.io.compression.codec", "kae") private var taskMetrics: TaskMetrics = _ private var tempDir: File = _ @@ -130,7 +130,7 @@ class ColumnShuffleSerializerZlibSuite extends SharedSparkSession { super.afterAll() } - test("write shuffle compress for zlib with null value middle") { + test("write shuffle compress for kae with null value middle") { val pidArray: Array[java.lang.Integer] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) val intArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, null, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) val shortArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, null, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSnappySuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSnappySuite.scala deleted file mode 100644 index 6fcb9a896..000000000 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSnappySuite.scala +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved. - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle - -import java.io.{File, FileInputStream} - -import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer -import com.huawei.boostkit.spark.vectorized.PartitionInfo -import nova.hetu.omniruntime.`type`.{DataType, _} -import nova.hetu.omniruntime.vector._ -import org.apache.spark.{HashPartitioner, SparkConf, TaskContext} -import org.apache.spark.executor.TaskMetrics -import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.shuffle.sort.ColumnarShuffleHandle -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.execution.vectorized.OmniColumnVector -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.Utils -import org.mockito.Answers.RETURNS_SMART_NULLS -import org.mockito.ArgumentMatchers.{any, anyInt, anyLong} -import org.mockito.{Mock, MockitoAnnotations} -import org.mockito.Mockito.{doAnswer, when} -import org.mockito.invocation.InvocationOnMock - -class ColumnShuffleSerializerSnappySuite extends SharedSparkSession { - @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ - @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _ - @Mock(answer = RETURNS_SMART_NULLS) private var dependency - : ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = _ - - override def sparkConf: SparkConf = - super.sparkConf - .setAppName("test shuffle serializer for snappy") - .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.OmniColumnarShuffleManager") - .set("spark.shuffle.compress", "true") - .set("spark.io.compression.codec", "snappy") - - private var taskMetrics: TaskMetrics = _ - private var tempDir: File = _ - private var outputFile: File = _ - - private var shuffleHandle: ColumnarShuffleHandle[Int, ColumnarBatch] = _ - private val numPartitions = 1 - - protected var avgBatchNumRows: SQLMetric = _ - protected var outputNumRows: SQLMetric = _ - - override def beforeEach(): Unit = { - super.beforeEach() - - avgBatchNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, - "test serializer avg read batch num rows") - outputNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, - "test serializer number of output rows") - - tempDir = Utils.createTempDir() - outputFile = File.createTempFile("shuffle", null, tempDir) - taskMetrics = new TaskMetrics - - MockitoAnnotations.initMocks(this) - - shuffleHandle = - new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) - - val types : Array[DataType] = Array[DataType]( - IntDataType.INTEGER, - ShortDataType.SHORT, - LongDataType.LONG, - DoubleDataType.DOUBLE, - new Decimal64DataType(18, 3), - new Decimal128DataType(28, 11), - VarcharDataType.VARCHAR, - BooleanDataType.BOOLEAN) - val inputTypes = DataTypeSerializer.serialize(types) - - when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) - when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) - when(dependency.partitionInfo).thenReturn( - new PartitionInfo("hash", numPartitions, types.length, inputTypes)) - when(dependency.dataSize) - .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "data size")) - when(dependency.bytesSpilled) - .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "shuffle bytes spilled")) - when(dependency.numInputRows) - .thenReturn(SQLMetrics.createMetric(spark.sparkContext, "number of input rows")) - when(dependency.splitTime) - .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_split")) - when(dependency.spillTime) - .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_spill")) - when(taskContext.taskMetrics()).thenReturn(taskMetrics) - when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) - - doAnswer { (invocationOnMock: InvocationOnMock) => - val tmp = invocationOnMock.getArguments()(4).asInstanceOf[File] - if (tmp != null) { - outputFile.delete - tmp.renameTo(outputFile) - } - null - }.when(blockResolver) - .writeMetadataFileAndCommit(anyInt, anyLong, any(classOf[Array[Long]]), any(classOf[Array[Long]]), any(classOf[File])) - } - - override def afterEach(): Unit = { - try { - Utils.deleteRecursively(tempDir) - } finally { - super.afterEach() - } - } - - override def afterAll(): Unit = { - super.afterAll() - } - - test("write shuffle compress for snappy") { - val pidArray: Array[java.lang.Integer] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - val intArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) - val shortArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) - val longArray: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, - 17L, 18L, 19L, 20L) - val doubleArray: Array[java.lang.Double] = Array(0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.10, 11.11, 12.12, - 13.13, 14.14, 15.15, 16.16, 17.17, 18.18, 19.19, 20.20) - val decimal64Array: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, - 17L, 18L, 19L, 20L) - val decimal128Array: Array[Array[Long]] = Array( - Array(0L, 0L), Array(1L, 1L), Array(2L, 2L), Array(3L, 3L), Array(4L, 4L), Array(5L, 5L), Array(6L, 6L), - Array(7L, 7L), Array(8L, 8L), Array(9L, 9L), Array(10L, 10L), Array(11L, 11L), Array(12L, 12L), Array(13L, 13L), - Array(14L, 14L), Array(15L, 15L), Array(16L, 16L), Array(17L, 17L), Array(18L, 18L), Array(19L, 19L), Array(20L, 20L)) - val stringArray: Array[java.lang.String] = Array("", "a", "bb", "ccc", "dddd", "eeeee", "ffffff", "ggggggg", - "hhhhhhhh", "iiiiiiiii", "jjjjjjjjjj", "kkkkkkkkkkk", "llllllllllll", "mmmmmmmmmmmmm", "nnnnnnnnnnnnnn", - "ooooooooooooooo", "pppppppppppppppp", "qqqqqqqqqqqqqqqqq", "rrrrrrrrrrrrrrrrrr", "sssssssssssssssssss", - "tttttttttttttttttttt") - val booleanArray: Array[java.lang.Boolean] = Array(true, true, true, true, true, true, true, true, true, true, - false, false, false, false, false, false, false, false, false, false, false) - - val pidVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) - val intVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) - val shortVector0 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) - val longVector0 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) - val doubleVector0 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) - val decimal64Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) - val decimal128Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) - val varcharVector0 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) - val booleanVector0 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) - - val cb0 = ColumnarShuffleWriterSuite.makeColumnarBatch( - pidVector0.getVec.getSize, - List(pidVector0, intVector0, shortVector0, longVector0, doubleVector0, - decimal64Vector0, decimal128Vector0, varcharVector0, booleanVector0) - ) - - val pidVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) - val intVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) - val shortVector1 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) - val longVector1 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) - val doubleVector1 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) - val decimal64Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) - val decimal128Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) - val varcharVector1 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) - val booleanVector1 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) - - val cb1 = ColumnarShuffleWriterSuite.makeColumnarBatch( - pidVector1.getVec.getSize, - List(pidVector1, intVector1, shortVector1, longVector1, doubleVector1, - decimal64Vector1, decimal128Vector1, varcharVector1, booleanVector1) - ) - - def records: Iterator[(Int, ColumnarBatch)] = Iterator((0, cb0), (0, cb1)) - - val writer = new ColumnarShuffleWriter[Int, ColumnarBatch]( - blockResolver, - shuffleHandle, - 0L, // MapId - taskContext.taskMetrics().shuffleWriteMetrics) - - writer.write(records) - writer.stop(success = true) - - assert(writer.getPartitionLengths.sum === outputFile.length()) - assert(writer.getPartitionLengths.count(_ == 0L) === 0) - // should be (numPartitions - 2) zero length files - - val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics - assert(shuffleWriteMetrics.bytesWritten === outputFile.length()) - assert(shuffleWriteMetrics.recordsWritten === pidArray.length * 2) - - assert(taskMetrics.diskBytesSpilled === 0) - assert(taskMetrics.memoryBytesSpilled === 0) - - val serializer = new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() - val deserializedStream = serializer.deserializeStream(new FileInputStream(outputFile)) - - try { - val kv = deserializedStream.asKeyValueIterator - var length = 0 - kv.foreach { - case (_, batch: ColumnarBatch) => - length += 1 - assert(batch.numRows == 42) - assert(batch.numCols == 8) - assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(0) == 0) - assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(19) == 19) - assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(0) == 0) - assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(19) == 19) - assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0) - assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19) - assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(0) == 0.0) - assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(19) == 19.19) - assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0L) - assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19L) - assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(0) sameElements Array(0L, 0L)) - assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(19) sameElements Array(19L, 19L)) - assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(0) sameElements "") - assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(19) sameElements "sssssssssssssssssss") - assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(0) == true) - assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(19) == false) - (0 until batch.numCols).foreach { i => - val valueVector = batch.column(i).asInstanceOf[OmniColumnVector].getVec - assert(valueVector.getSize == batch.numRows) - } - batch.close() - } - assert(length == 1) - } finally { - deserializedStream.close() - } - - } -} -- Gitee From 2f3edc0d33704fcaee1a2689840a0ab215dd31b3 Mon Sep 17 00:00:00 2001 From: Anllcik <654610542@qq.com> Date: Tue, 31 Oct 2023 11:38:13 +0800 Subject: [PATCH 2/2] delete other compression methods for kae --- .../cpp/src/CMakeLists.txt | 3 - .../cpp/src/common/common.cpp | 2 - .../cpp/src/io/Common.hh | 3 +- .../cpp/src/io/Compression.cc | 96 +------ .../cpp/src/io/wrap/snappy_wrapper.h | 30 --- .../spark/compress/CompressionUtil.java | 7 +- .../boostkit/spark/compress/SnappyCodec.java | 34 --- .../ColumnarShuffleExchangeExec.scala | 2 +- .../spark/ColumnShuffleCompressionTest.java | 6 - .../ColumnShuffleSerializerKaeSuite.scala | 2 +- .../ColumnShuffleSerializerLz4Suite.scala | 247 ------------------ 11 files changed, 11 insertions(+), 421 deletions(-) delete mode 100644 omnioperator/omniop-spark-extension/cpp/src/io/wrap/snappy_wrapper.h delete mode 100644 omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/SnappyCodec.java delete mode 100644 omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerLz4Suite.scala diff --git a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt index 915b71f68..d1cd1170a 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt +++ b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt @@ -52,9 +52,6 @@ target_link_libraries (${PROJ_TARGET} PRIVATE protobuf z kaezip - snappy - lz4 - zstd boostkit-omniop-vector-1.3.0-aarch64 ) diff --git a/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp b/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp index 9166716f8..5d50fbced 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp @@ -60,8 +60,6 @@ spark::CompressionKind GetCompressionType(const std::string& name) { return spark::CompressionKind::CompressionKind_NONE; } else if (name == "kae") { return spark::CompressionKind::CompressionKind_KAEZIP; - } else if (name == "lz4") { - return spark::CompressionKind::CompressionKind_LZ4; } else { throw std::logic_error("compression codec not supported"); } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/Common.hh b/omnioperator/omniop-spark-extension/cpp/src/io/Common.hh index 412c38a15..02a9755b5 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/Common.hh +++ b/omnioperator/omniop-spark-extension/cpp/src/io/Common.hh @@ -23,8 +23,7 @@ namespace spark { enum CompressionKind { CompressionKind_NONE = 0, - CompressionKind_KAEZIP = 1, - CompressionKind_LZ4 = 2 + CompressionKind_KAEZIP = 1 }; } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc b/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc index a1b731cc9..5c423d053 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc +++ b/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc @@ -18,7 +18,6 @@ #include "Adaptor.hh" #include "Compression.hh" -#include "lz4.h" #include #include @@ -28,20 +27,7 @@ #include "zlib.h" #include "kaezip.h" -#include "wrap/snappy_wrapper.h" - -#ifndef ZSTD_CLEVEL_DEFAULT -#define ZSTD_CLEVEL_DEFAULT 3 -#endif - -/* These macros are defined in lz4.c */ -#ifndef LZ4_ACCELERATION_DEFAULT -#define LZ4_ACCELERATION_DEFAULT 1 -#endif - -#ifndef LZ4_ACCELERATION_MAX -#define LZ4_ACCELERATION_MAX 65537 -#endif +##define unlikely(x) __builtin_expect(!!(x), 0) namespace spark { @@ -304,70 +290,6 @@ namespace spark { return true; } - /** - * LZ4 block compression - */ - class Lz4CompressionSteam: public BlockCompressionStream { - public: - Lz4CompressionSteam(OutputStream * outStream, - int compressionLevel, - uint64_t capacity, - uint64_t blockSize, - MemoryPool& pool) - : BlockCompressionStream(outStream, - compressionLevel, - capacity, - blockSize, - pool) { - this->init(); - } - - virtual std::string getName() const override { - return "Lz4CompressionStream"; - } - - virtual ~Lz4CompressionSteam() override { - this->end(); - } - - protected: - virtual uint64_t doBlockCompression() override; - - virtual uint64_t estimateMaxCompressionSize() override { - return static_cast(LZ4_compressBound(bufferSize)); - } - - private: - void init(); - void end(); - LZ4_stream_t *state; - }; - - uint64_t Lz4CompressionSteam::doBlockCompression() { - int result = LZ4_compress_fast_extState(static_cast(state), - reinterpret_cast(rawInputBuffer.data()), - reinterpret_cast(compressorBuffer.data()), - bufferSize, - static_cast(compressorBuffer.size()), - level); - if (result == 0) { - throw std::runtime_error("Error during block compression using lz4."); - } - return static_cast(result); - } - - void Lz4CompressionSteam::init() { - state = LZ4_createStream(); - if (!state) { - throw std::runtime_error("Error while allocating state for lz4."); - } - } - - void Lz4CompressionSteam::end() { - (void)LZ4_freeStream(state); - state = nullptr; - } - /** * KAE block compression */ @@ -414,7 +336,7 @@ namespace spark { rawInputBuffer.data(), static_cast(bufferSize), level); - if (ret != Z_OK) { + if (unlikely(ret != Z_OK)) { throw std::runtime_error("Failed to compress input data"); } return compressedSize; @@ -430,12 +352,15 @@ namespace spark { strm.zalloc = nullptr; strm.zfree = nullptr; strm.opaque = nullptr; - if (!kz_get_devices() || deflateInit(&strm, level) != Z_OK) { + if (!kz_get_devices()) { + throw std::runtime_error("Error while getting devices for kaezip."); + } else if (deflateInit(&strm, level) != Z_OK) { throw std::runtime_error("Error while calling deflateInit for kaezip."); + } else { + return; } } - void KAECompressionStream::end() { (void)deflateEnd(&strm); } @@ -466,13 +391,6 @@ namespace spark { (new KAECompressionStream( outStream, level, bufferCapacity, compressionBlockSize, pool)); } - case CompressionKind_LZ4: { - int level = (strategy == CompressionStrategy_SPEED) ? - LZ4_ACCELERATION_MAX : LZ4_ACCELERATION_DEFAULT; - return std::unique_ptr - (new Lz4CompressionSteam( - outStream, level, bufferCapacity, compressionBlockSize, pool)); - } default: throw std::logic_error("compression codec not supported"); } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/wrap/snappy_wrapper.h b/omnioperator/omniop-spark-extension/cpp/src/io/wrap/snappy_wrapper.h deleted file mode 100644 index 56ac837ee..000000000 --- a/omnioperator/omniop-spark-extension/cpp/src/io/wrap/snappy_wrapper.h +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef SNAPPY_WRAPPER_HH -#define SNAPPY_WRAPPER_HH - -#include "../Adaptor.hh" - -#ifdef __clang__ - DIAGNOSTIC_IGNORE("-Wreserved-id-macro") -#endif - -#include - -#endif diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/CompressionUtil.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/CompressionUtil.java index c7a601779..6f9377eb8 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/CompressionUtil.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/CompressionUtil.java @@ -17,18 +17,13 @@ package com.huawei.boostkit.spark.compress; -import io.airlift.compress.lz4.Lz4Decompressor; -import io.airlift.compress.lzo.LzoDecompressor; - public class CompressionUtil { public static CompressionCodec createCodec(String compressionCodec) { switch (compressionCodec) { case "kae": return new KaeCodec(); - case "lz4": - return new AircompressorCodec(new Lz4Decompressor()); default: - throw new IllegalArgumentException("Unknown compression codec: " + + throw new IllegalArgumentException("Unsupport compression codec: " + compressionCodec); } } diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/SnappyCodec.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/SnappyCodec.java deleted file mode 100644 index cdac3a5f2..000000000 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/SnappyCodec.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.huawei.boostkit.spark.compress; - -import io.airlift.compress.snappy.SnappyDecompressor; - -import java.io.IOException; - -public class SnappyCodec extends AircompressorCodec { - - SnappyCodec() { - super(new SnappyDecompressor()); - } - - @Override - public int decompress(byte[] input, int inputLength, byte[] output) throws IOException { - return super.decompress(input, inputLength, output); - } -} diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index c982ae6fc..6f98be54d 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -315,7 +315,7 @@ object ColumnarShuffleExchangeExec extends Logging { newIter }, isOrderSensitive = isOrderSensitive) case h@HashPartitioning(expressions, numPartitions) => - if (containsRollUp(expressions)) { + if (containsRollUp(expressions) || expressions.length > 6) { rdd.mapPartitionsWithIndexInternal((_, cbIter) => { val partitionKeyExtractor: InternalRow => Any = { val projection = diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleCompressionTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleCompressionTest.java index 74a8fbf87..145d12621 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleCompressionTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleCompressionTest.java @@ -80,12 +80,6 @@ public class ColumnShuffleCompressionTest extends ColumnShuffleTest { columnShuffleTestCompress("uncompressed", shuffleDataFile); } - @Test - public void columnShuffleLz4CompressTest() throws IOException { - shuffleDataFile = shuffleTestDir + "/shuffle_dataFile_lz4_test"; - columnShuffleTestCompress("lz4", shuffleDataFile); - } - @Test public void columnShuffleKaeCompressTest() throws IOException { shuffleDataFile = shuffleTestDir + "/shuffle_dataFile_kae_test"; diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerKaeSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerKaeSuite.scala index 1f8b5ef23..298c809df 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerKaeSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerKaeSuite.scala @@ -50,7 +50,7 @@ class ColumnShuffleSerializerKaeSuite extends SharedSparkSession { .setAppName("test shuffle serializer for kae") .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.OmniColumnarShuffleManager") .set("spark.shuffle.compress", "true") - .set("spark.io.compression.codec", "kae") + .set("spark.omni.io.compression.codec", "kae") private var taskMetrics: TaskMetrics = _ private var tempDir: File = _ diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerLz4Suite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerLz4Suite.scala deleted file mode 100644 index 4d79e3ca6..000000000 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerLz4Suite.scala +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved. - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle - -import java.io.{File, FileInputStream} - -import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer -import com.huawei.boostkit.spark.vectorized.PartitionInfo -import nova.hetu.omniruntime.`type`.{DataType, _} -import nova.hetu.omniruntime.vector._ -import org.apache.spark.{HashPartitioner, SparkConf, TaskContext} -import org.apache.spark.executor.TaskMetrics -import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.shuffle.sort.ColumnarShuffleHandle -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.execution.vectorized.OmniColumnVector -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.Utils -import org.mockito.Answers.RETURNS_SMART_NULLS -import org.mockito.ArgumentMatchers.{any, anyInt, anyLong} -import org.mockito.{Mock, MockitoAnnotations} -import org.mockito.Mockito.{doAnswer, when} -import org.mockito.invocation.InvocationOnMock - -class ColumnShuffleSerializerLz4Suite extends SharedSparkSession { - @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ - @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _ - @Mock(answer = RETURNS_SMART_NULLS) private var dependency - : ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = _ - - override def sparkConf: SparkConf = - super.sparkConf - .setAppName("test shuffle serializer for lz4") - .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.OmniColumnarShuffleManager") - .set("spark.shuffle.compress", "true") - .set("spark.io.compression.codec", "lz4") - - private var taskMetrics: TaskMetrics = _ - private var tempDir: File = _ - private var outputFile: File = _ - - private var shuffleHandle: ColumnarShuffleHandle[Int, ColumnarBatch] = _ - private val numPartitions = 1 - - protected var avgBatchNumRows: SQLMetric = _ - protected var outputNumRows: SQLMetric = _ - - override def beforeEach(): Unit = { - super.beforeEach() - - avgBatchNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, - "test serializer avg read batch num rows") - outputNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, - "test serializer number of output rows") - - tempDir = Utils.createTempDir() - outputFile = File.createTempFile("shuffle", null, tempDir) - taskMetrics = new TaskMetrics - - MockitoAnnotations.initMocks(this) - - shuffleHandle = - new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) - - val types : Array[DataType] = Array[DataType]( - IntDataType.INTEGER, - ShortDataType.SHORT, - LongDataType.LONG, - DoubleDataType.DOUBLE, - new Decimal64DataType(18, 3), - new Decimal128DataType(28, 11), - VarcharDataType.VARCHAR, - BooleanDataType.BOOLEAN) - val inputTypes = DataTypeSerializer.serialize(types) - - when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) - when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) - when(dependency.partitionInfo).thenReturn( - new PartitionInfo("hash", numPartitions, types.length, inputTypes)) - when(dependency.dataSize) - .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "data size")) - when(dependency.bytesSpilled) - .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "shuffle bytes spilled")) - when(dependency.numInputRows) - .thenReturn(SQLMetrics.createMetric(spark.sparkContext, "number of input rows")) - when(dependency.splitTime) - .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_split")) - when(dependency.spillTime) - .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_spill")) - when(taskContext.taskMetrics()).thenReturn(taskMetrics) - when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) - - doAnswer { (invocationOnMock: InvocationOnMock) => - val tmp = invocationOnMock.getArguments()(4).asInstanceOf[File] - if (tmp != null) { - outputFile.delete - tmp.renameTo(outputFile) - } - null - }.when(blockResolver) - .writeMetadataFileAndCommit(anyInt, anyLong, any(classOf[Array[Long]]), any(classOf[Array[Long]]), any(classOf[File])) - } - - override def afterEach(): Unit = { - try { - Utils.deleteRecursively(tempDir) - } finally { - super.afterEach() - } - } - - override def afterAll(): Unit = { - super.afterAll() - } - - test("write shuffle compress for lz4 with no null value") { - val pidArray: Array[java.lang.Integer] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - val intArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) - val shortArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) - val longArray: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, - 17L, 18L, 19L, 20L) - val doubleArray: Array[java.lang.Double] = Array(0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.10, 11.11, 12.12, - 13.13, 14.14, 15.15, 16.16, 17.17, 18.18, 19.19, 20.20) - val decimal64Array: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, - 17L, 18L, 19L, 20L) - val decimal128Array: Array[Array[Long]] = Array( - Array(0L, 0L), Array(1L, 1L), Array(2L, 2L), Array(3L, 3L), Array(4L, 4L), Array(5L, 5L), Array(6L, 6L), - Array(7L, 7L), Array(8L, 8L), Array(9L, 9L), Array(10L, 10L), Array(11L, 11L), Array(12L, 12L), Array(13L, 13L), - Array(14L, 14L), Array(15L, 15L), Array(16L, 16L), Array(17L, 17L), Array(18L, 18L), Array(19L, 19L), Array(20L, 20L)) - val stringArray: Array[java.lang.String] = Array("", "a", "bb", "ccc", "dddd", "eeeee", "ffffff", "ggggggg", - "hhhhhhhh", "iiiiiiiii", "jjjjjjjjjj", "kkkkkkkkkkk", "llllllllllll", "mmmmmmmmmmmmm", "nnnnnnnnnnnnnn", - "ooooooooooooooo", "pppppppppppppppp", "qqqqqqqqqqqqqqqqq", "rrrrrrrrrrrrrrrrrr", "sssssssssssssssssss", - "tttttttttttttttttttt") - val booleanArray: Array[java.lang.Boolean] = Array(true, true, true, true, true, true, true, true, true, true, - false, false, false, false, false, false, false, false, false, false, false) - - val pidVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) - val intVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) - val shortVector0 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) - val longVector0 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) - val doubleVector0 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) - val decimal64Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) - val decimal128Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) - val varcharVector0 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) - val booleanVector0 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) - - val cb0 = ColumnarShuffleWriterSuite.makeColumnarBatch( - pidVector0.getVec.getSize, - List(pidVector0, intVector0, shortVector0, longVector0, doubleVector0, - decimal64Vector0, decimal128Vector0, varcharVector0, booleanVector0) - ) - - val pidVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) - val intVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) - val shortVector1 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) - val longVector1 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) - val doubleVector1 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) - val decimal64Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) - val decimal128Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) - val varcharVector1 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) - val booleanVector1 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) - - val cb1 = ColumnarShuffleWriterSuite.makeColumnarBatch( - pidVector1.getVec.getSize, - List(pidVector1, intVector1, shortVector1, longVector1, doubleVector1, - decimal64Vector1, decimal128Vector1, varcharVector1, booleanVector1) - ) - - def records: Iterator[(Int, ColumnarBatch)] = Iterator((0, cb0), (0, cb1)) - - val writer = new ColumnarShuffleWriter[Int, ColumnarBatch]( - blockResolver, - shuffleHandle, - 0L, // MapId - taskContext.taskMetrics().shuffleWriteMetrics) - - writer.write(records) - writer.stop(success = true) - - assert(writer.getPartitionLengths.sum === outputFile.length()) - assert(writer.getPartitionLengths.count(_ == 0L) === 0) - // should be (numPartitions - 2) zero length files - - val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics - assert(shuffleWriteMetrics.bytesWritten === outputFile.length()) - assert(shuffleWriteMetrics.recordsWritten === pidArray.length * 2) - - assert(taskMetrics.diskBytesSpilled === 0) - assert(taskMetrics.memoryBytesSpilled === 0) - - val serializer = new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() - val deserializedStream = serializer.deserializeStream(new FileInputStream(outputFile)) - - try { - val kv = deserializedStream.asKeyValueIterator - var length = 0 - kv.foreach { - case (_, batch: ColumnarBatch) => - length += 1 - assert(batch.numRows == 42) - assert(batch.numCols == 8) - assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(0) == 0) - assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(19) == 19) - assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(0) == 0) - assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(19) == 19) - assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0) - assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19) - assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(0) == 0.0) - assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(19) == 19.19) - assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0L) - assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19L) - assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(0) sameElements Array(0L, 0L)) - assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(19) sameElements Array(19L, 19L)) - assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(0) sameElements "") - assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(19) sameElements "sssssssssssssssssss") - assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(0) == true) - assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(19) == false) - (0 until batch.numCols).foreach { i => - val valueVector = batch.column(i).asInstanceOf[OmniColumnVector].getVec - assert(valueVector.getSize == batch.numRows) - } - batch.close() - } - assert(length == 1) - } finally { - deserializedStream.close() - } - - } -} -- Gitee