diff --git a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt index 7256a02cbaa0928592d0701e5b117f833321c72c..d1cd1170a3c02baaa1cb04927bf6ef21d2de2d6c 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt +++ b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt @@ -1,4 +1,5 @@ include_directories(SYSTEM "/user/local/include") +include_directories(KAEZIP "/usr/local/kaezip/include") set (PROJ_TARGET spark_columnar_plugin) @@ -41,7 +42,7 @@ target_include_directories(${PROJ_TARGET} PUBLIC $ENV{JAVA_HOME}/include) target_include_directories(${PROJ_TARGET} PUBLIC $ENV{JAVA_HOME}/include/linux) target_include_directories(${PROJ_TARGET} PUBLIC ${CMAKE_CURRENT_BINARY_DIR}) -target_link_libraries (${PROJ_TARGET} PUBLIC +target_link_libraries (${PROJ_TARGET} PRIVATE Arrow::arrow_shared ArrowDataset::arrow_dataset_shared Parquet::parquet_shared @@ -50,9 +51,7 @@ target_link_libraries (${PROJ_TARGET} PUBLIC sasl2 protobuf z - snappy - lz4 - zstd + kaezip boostkit-omniop-vector-1.3.0-aarch64 ) diff --git a/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp b/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp index f33d5c4c9df9695c2464b622587dea9e3546c39c..5d50fbced2d68a6853d3b9e2889543958ee548bf 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp @@ -58,14 +58,8 @@ uint32_t reversebytes_uint32t(uint32_t const value) spark::CompressionKind GetCompressionType(const std::string& name) { if (name == "uncompressed") { return spark::CompressionKind::CompressionKind_NONE; - } else if (name == "zlib") { - return spark::CompressionKind::CompressionKind_ZLIB; - } else if (name == "snappy") { - return spark::CompressionKind::CompressionKind_SNAPPY; - } else if (name == "lz4") { - return spark::CompressionKind::CompressionKind_LZ4; - } else if (name == "zstd") { - return spark::CompressionKind::CompressionKind_ZSTD; + } else if (name == "kae") { + return spark::CompressionKind::CompressionKind_KAEZIP; } else { throw std::logic_error("compression codec not supported"); } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/Common.hh b/omnioperator/omniop-spark-extension/cpp/src/io/Common.hh index e240363567544263de2d8daed503b079ae649fae..02a9755b57a4f2a2f6055f98fa3efc4cba5a2cd2 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/Common.hh +++ b/omnioperator/omniop-spark-extension/cpp/src/io/Common.hh @@ -23,11 +23,7 @@ namespace spark { enum CompressionKind { CompressionKind_NONE = 0, - CompressionKind_ZLIB = 1, - CompressionKind_SNAPPY = 2, - CompressionKind_LZO = 3, - CompressionKind_LZ4 = 4, - CompressionKind_ZSTD = 5 + CompressionKind_KAEZIP = 1 }; } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc b/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc index 720f7ff1951b389fa230c1e074bdb6e9619f644b..5c423d05389c64bae8edfacad2f049e4d4adeed7 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc +++ b/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc @@ -18,7 +18,6 @@ #include "Adaptor.hh" #include "Compression.hh" -#include "lz4.h" #include #include @@ -26,22 +25,9 @@ #include #include "zlib.h" -#include "zstd.h" +#include "kaezip.h" -#include "wrap/snappy_wrapper.h" - -#ifndef ZSTD_CLEVEL_DEFAULT -#define ZSTD_CLEVEL_DEFAULT 3 -#endif - -/* These macros are defined in lz4.c */ -#ifndef LZ4_ACCELERATION_DEFAULT -#define LZ4_ACCELERATION_DEFAULT 1 -#endif - -#ifndef LZ4_ACCELERATION_MAX -#define LZ4_ACCELERATION_MAX 65537 -#endif +##define unlikely(x) __builtin_expect(!!(x), 0) namespace spark { @@ -210,115 +196,6 @@ namespace spark { return true; } - class ZlibCompressionStream: public CompressionStream { - public: - ZlibCompressionStream(OutputStream * outStream, - int compressionLevel, - uint64_t capacity, - uint64_t blockSize, - MemoryPool& pool); - - virtual ~ZlibCompressionStream() override { - end(); - } - - virtual std::string getName() const override; - - protected: - virtual uint64_t doStreamingCompression() override; - - private: - void init(); - void end(); - z_stream strm; - }; - - ZlibCompressionStream::ZlibCompressionStream( - OutputStream * outStream, - int compressionLevel, - uint64_t capacity, - uint64_t blockSize, - MemoryPool& pool) - : CompressionStream(outStream, - compressionLevel, - capacity, - blockSize, - pool) { - init(); - } - - uint64_t ZlibCompressionStream::doStreamingCompression() { - if (deflateReset(&strm) != Z_OK) { - throw std::runtime_error("Failed to reset inflate."); - } - - strm.avail_in = static_cast(bufferSize); - strm.next_in = rawInputBuffer.data(); - - do { - if (outputPosition >= outputSize) { - if (!BufferedOutputStream::Next( - reinterpret_cast(&outputBuffer), - &outputSize)) { - throw std::runtime_error( - "Failed to get next output buffer from output stream."); - } - outputPosition = 0; - } - strm.next_out = reinterpret_cast - (outputBuffer + outputPosition); - strm.avail_out = static_cast - (outputSize - outputPosition); - - int ret = deflate(&strm, Z_FINISH); - outputPosition = outputSize - static_cast(strm.avail_out); - - if (ret == Z_STREAM_END) { - break; - } else if (ret == Z_OK) { - // needs more buffer so will continue the loop - } else { - throw std::runtime_error("Failed to deflate input data."); - } - } while (strm.avail_out == 0); - - return strm.total_out; - } - - std::string ZlibCompressionStream::getName() const { - return "ZlibCompressionStream"; - } - -// DIAGNOSTIC_PUSH - -#if defined(__GNUC__) || defined(__clang__) - DIAGNOSTIC_IGNORE("-Wold-style-cast") -#endif - - void ZlibCompressionStream::init() { - strm.zalloc = nullptr; - strm.zfree = nullptr; - strm.opaque = nullptr; - strm.next_in = nullptr; - - if (deflateInit2(&strm, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) - != Z_OK) { - throw std::runtime_error("Error while calling deflateInit2() for zlib."); - } - } - - void ZlibCompressionStream::end() { - (void)deflateEnd(&strm); - } - -// DIAGNOSTIC_PUSH - - enum DecompressState { DECOMPRESS_HEADER, - DECOMPRESS_START, - DECOMPRESS_CONTINUE, - DECOMPRESS_ORIGINAL, - DECOMPRESS_EOF}; - // DIAGNOSTIC_PUSH #if defined(__GNUC__) || defined(__clang__) @@ -414,118 +291,11 @@ namespace spark { } /** - * LZ4 block compression - */ - class Lz4CompressionSteam: public BlockCompressionStream { - public: - Lz4CompressionSteam(OutputStream * outStream, - int compressionLevel, - uint64_t capacity, - uint64_t blockSize, - MemoryPool& pool) - : BlockCompressionStream(outStream, - compressionLevel, - capacity, - blockSize, - pool) { - this->init(); - } - - virtual std::string getName() const override { - return "Lz4CompressionStream"; - } - - virtual ~Lz4CompressionSteam() override { - this->end(); - } - - protected: - virtual uint64_t doBlockCompression() override; - - virtual uint64_t estimateMaxCompressionSize() override { - return static_cast(LZ4_compressBound(bufferSize)); - } - - private: - void init(); - void end(); - LZ4_stream_t *state; - }; - - uint64_t Lz4CompressionSteam::doBlockCompression() { - int result = LZ4_compress_fast_extState(static_cast(state), - reinterpret_cast(rawInputBuffer.data()), - reinterpret_cast(compressorBuffer.data()), - bufferSize, - static_cast(compressorBuffer.size()), - level); - if (result == 0) { - throw std::runtime_error("Error during block compression using lz4."); - } - return static_cast(result); - } - - void Lz4CompressionSteam::init() { - state = LZ4_createStream(); - if (!state) { - throw std::runtime_error("Error while allocating state for lz4."); - } - } - - void Lz4CompressionSteam::end() { - (void)LZ4_freeStream(state); - state = nullptr; - } - - /** - * Snappy block compression + * KAE block compression */ - class SnappyCompressionStream: public BlockCompressionStream { + class KAECompressionStream: public BlockCompressionStream{ public: - SnappyCompressionStream(OutputStream * outStream, - int compressionLevel, - uint64_t capacity, - uint64_t blockSize, - MemoryPool& pool) - : BlockCompressionStream(outStream, - compressionLevel, - capacity, - blockSize, - pool) { - } - - virtual std::string getName() const override { - return "SnappyCompressionStream"; - } - - virtual ~SnappyCompressionStream() override { - // PASS - } - - protected: - virtual uint64_t doBlockCompression() override; - - virtual uint64_t estimateMaxCompressionSize() override { - return static_cast - (snappy::MaxCompressedLength(static_cast(bufferSize))); - } - }; - - uint64_t SnappyCompressionStream::doBlockCompression() { - size_t compressedLength; - snappy::RawCompress(reinterpret_cast(rawInputBuffer.data()), - static_cast(bufferSize), - reinterpret_cast(compressorBuffer.data()), - &compressedLength); - return static_cast(compressedLength); - } - - /** - * ZSTD block compression - */ - class ZSTDCompressionStream: public BlockCompressionStream{ - public: - ZSTDCompressionStream(OutputStream * outStream, + KAECompressionStream(OutputStream * outStream, int compressionLevel, uint64_t capacity, uint64_t blockSize, @@ -539,10 +309,10 @@ namespace spark { } virtual std::string getName() const override { - return "ZstdCompressionStream"; + return "KaeCompressionStream"; } - virtual ~ZSTDCompressionStream() override { + virtual ~KAECompressionStream() override { this->end(); } @@ -550,22 +320,26 @@ namespace spark { virtual uint64_t doBlockCompression() override; virtual uint64_t estimateMaxCompressionSize() override { - return ZSTD_compressBound(static_cast(bufferSize)); + return compressBound(static_cast(bufferSize)); } private: void init(); void end(); - ZSTD_CCtx *cctx; + z_stream strm; }; - uint64_t ZSTDCompressionStream::doBlockCompression() { - return ZSTD_compressCCtx(cctx, - compressorBuffer.data(), - compressorBuffer.size(), - rawInputBuffer.data(), - static_cast(bufferSize), - level); + uint64_t KAECompressionStream::doBlockCompression() { + auto compressedSize = compressorBuffer.size(); + auto ret = compress2(compressorBuffer.data(), + &compressedSize, + rawInputBuffer.data(), + static_cast(bufferSize), + level); + if (unlikely(ret != Z_OK)) { + throw std::runtime_error("Failed to compress input data"); + } + return compressedSize; } // DIAGNOSTIC_PUSH @@ -574,24 +348,26 @@ namespace spark { DIAGNOSTIC_IGNORE("-Wold-style-cast") #endif - void ZSTDCompressionStream::init() { - - cctx = ZSTD_createCCtx(); - if (!cctx) { - throw std::runtime_error("Error while calling ZSTD_createCCtx() for zstd."); + void KAECompressionStream::init() { + strm.zalloc = nullptr; + strm.zfree = nullptr; + strm.opaque = nullptr; + if (!kz_get_devices()) { + throw std::runtime_error("Error while getting devices for kaezip."); + } else if (deflateInit(&strm, level) != Z_OK) { + throw std::runtime_error("Error while calling deflateInit for kaezip."); + } else { + return; } } - - void ZSTDCompressionStream::end() { - (void)ZSTD_freeCCtx(cctx); - cctx = nullptr; + void KAECompressionStream::end() { + (void)deflateEnd(&strm); } #if defined(__GNUC__) || defined(__clang__) DIAGNOSTIC_IGNORE("-Wold-style-cast") #endif - // DIAGNOSTIC_PUSH std::unique_ptr @@ -608,34 +384,13 @@ namespace spark { (new BufferedOutputStream( pool, outStream, bufferCapacity, compressionBlockSize)); } - case CompressionKind_ZLIB: { + case CompressionKind_KAEZIP: { int level = (strategy == CompressionStrategy_SPEED) ? Z_BEST_SPEED + 1 : Z_DEFAULT_COMPRESSION; return std::unique_ptr - (new ZlibCompressionStream( + (new KAECompressionStream( outStream, level, bufferCapacity, compressionBlockSize, pool)); } - case CompressionKind_ZSTD: { - int level = (strategy == CompressionStrategy_SPEED) ? - 1 : ZSTD_CLEVEL_DEFAULT; - return std::unique_ptr - (new ZSTDCompressionStream( - outStream, level, bufferCapacity, compressionBlockSize, pool)); - } - case CompressionKind_LZ4: { - int level = (strategy == CompressionStrategy_SPEED) ? - LZ4_ACCELERATION_MAX : LZ4_ACCELERATION_DEFAULT; - return std::unique_ptr - (new Lz4CompressionSteam( - outStream, level, bufferCapacity, compressionBlockSize, pool)); - } - case CompressionKind_SNAPPY: { - int level = 0; - return std::unique_ptr - (new SnappyCompressionStream( - outStream, level, bufferCapacity, compressionBlockSize, pool)); - } - case CompressionKind_LZO: default: throw std::logic_error("compression codec not supported"); } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/WriterOptions.cc b/omnioperator/omniop-spark-extension/cpp/src/io/WriterOptions.cc index 43bc0b38b09d74721460fac7b20065977f509b2c..dde88ef31c7685e1df2921901d61d964473f35ac 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/WriterOptions.cc +++ b/omnioperator/omniop-spark-extension/cpp/src/io/WriterOptions.cc @@ -33,7 +33,7 @@ namespace spark { WriterOptionsPrivate() { // default to Hive_0_12 compressionBlockSize = 64 * 1024; // 64K - compression = CompressionKind_ZLIB; + compression = CompressionKind_KAEZIP; compressionStrategy = CompressionStrategy_SPEED; memoryPool = getDefaultPool(); } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/wrap/snappy_wrapper.h b/omnioperator/omniop-spark-extension/cpp/src/io/wrap/snappy_wrapper.h deleted file mode 100644 index 56ac837ee0de85282bebf1f046c2de075933e30e..0000000000000000000000000000000000000000 --- a/omnioperator/omniop-spark-extension/cpp/src/io/wrap/snappy_wrapper.h +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef SNAPPY_WRAPPER_HH -#define SNAPPY_WRAPPER_HH - -#include "../Adaptor.hh" - -#ifdef __clang__ - DIAGNOSTIC_IGNORE("-Wreserved-id-macro") -#endif - -#include - -#endif diff --git a/omnioperator/omniop-spark-extension/cpp/test/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/test/CMakeLists.txt index ba1ad3a773c35a101cf728f00a19ba30b0dae607..709ed700591a3410a1f2128a43d1eea2b75ad4a7 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/CMakeLists.txt +++ b/omnioperator/omniop-spark-extension/cpp/test/CMakeLists.txt @@ -14,6 +14,9 @@ set(MY_LINK # find gtest package find_package(GTest REQUIRED) +find_package(Arrow REQUIRED) +find_package(ArrowDataset REQUIRED) +find_package(Parquet REQUIRED) # compile a executable file add_executable(${TP_TEST_TARGET} ${ROOT_SRCS} ${TEST_ROOT_SRCS}) @@ -31,6 +34,10 @@ target_link_libraries(${TP_TEST_TARGET} dl boostkit-omniop-vector-1.3.0-aarch64 securec + Arrow::arrow_shared + ArrowDataset::arrow_dataset_shared + Parquet::parquet_shared + orc spark_columnar_plugin) target_compile_options(${TP_TEST_TARGET} PUBLIC -g -O2 -fPIC) diff --git a/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp b/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp index 3031943eeae22b591de4c4b3693eb1e1744b3ac3..24fee8f5fc95acc6d997be14ab89b43e9b639de2 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp @@ -68,7 +68,7 @@ TEST_F (ShuffleTest, Split_SingleVarChar) { inputDataTypes, colNumber, 1024, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -106,7 +106,7 @@ TEST_F (ShuffleTest, Split_Fixed_Cols) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -134,7 +134,7 @@ TEST_F (ShuffleTest, Split_Fixed_SinglePartition_SomeNullRow) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -162,7 +162,7 @@ TEST_F (ShuffleTest, Split_Fixed_SinglePartition_SomeNullCol) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -180,20 +180,8 @@ TEST_F (ShuffleTest, Split_Compression_None) { Test_Shuffle_Compression("uncompressed", 4, 999, 999); } -TEST_F (ShuffleTest, Split_Compression_zstd) { - Test_Shuffle_Compression("zstd", 4, 999, 999); -} - -TEST_F (ShuffleTest, Split_Compression_Lz4) { - Test_Shuffle_Compression("lz4", 4, 999, 999); -} - -TEST_F (ShuffleTest, Split_Compression_Snappy) { - Test_Shuffle_Compression("snappy", 4, 999, 999); -} - -TEST_F (ShuffleTest, Split_Compression_Zlib) { - Test_Shuffle_Compression("zlib", 4, 999, 999); +TEST_F (ShuffleTest, Split_Compression_kae) { + Test_Shuffle_Compression("kae", 4, 999, 999); } TEST_F (ShuffleTest, Split_Mix_LargeSize) { @@ -210,7 +198,7 @@ TEST_F (ShuffleTest, Split_Mix_LargeSize) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -238,7 +226,7 @@ TEST_F (ShuffleTest, Split_Short_10WRows) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -266,7 +254,7 @@ TEST_F (ShuffleTest, Split_Boolean_10WRows) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -294,7 +282,7 @@ TEST_F (ShuffleTest, Split_Long_100WRows) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -322,7 +310,7 @@ TEST_F (ShuffleTest, Split_VarChar_LargeSize) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 64, tmpTestingDir); @@ -350,7 +338,7 @@ TEST_F (ShuffleTest, Split_VarChar_First) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -394,7 +382,7 @@ TEST_F (ShuffleTest, Split_Dictionary) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -422,7 +410,7 @@ TEST_F (ShuffleTest, Split_Char) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 64, tmpTestingDir); @@ -450,7 +438,7 @@ TEST_F (ShuffleTest, Split_Decimal128) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -478,7 +466,7 @@ TEST_F (ShuffleTest, Split_Decimal64) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); @@ -506,7 +494,7 @@ TEST_F (ShuffleTest, Split_Decimal64_128) { inputDataTypes, colNumber, 4096, - "lz4", + "kae", tmpShuffleFilePath, 0, tmpTestingDir); diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/CompressionUtil.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/CompressionUtil.java index c3b35a4f650fe84cd358e4079ae63b548d60f35e..6f9377eb8f9bd84c7c42d3f3c0872d7c90905cc3 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/CompressionUtil.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/CompressionUtil.java @@ -17,22 +17,13 @@ package com.huawei.boostkit.spark.compress; -import io.airlift.compress.lz4.Lz4Decompressor; -import io.airlift.compress.lzo.LzoDecompressor; - public class CompressionUtil { public static CompressionCodec createCodec(String compressionCodec) { switch (compressionCodec) { - case "zlib": - return new ZlibCodec(); - case "snappy": - return new SnappyCodec(); - case "lzo": - return new AircompressorCodec(new LzoDecompressor()); - case "lz4": - return new AircompressorCodec(new Lz4Decompressor()); + case "kae": + return new KaeCodec(); default: - throw new IllegalArgumentException("Unknown compression codec: " + + throw new IllegalArgumentException("Unsupport compression codec: " + compressionCodec); } } diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/ZlibCodec.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/KaeCodec.java similarity index 93% rename from omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/ZlibCodec.java rename to omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/KaeCodec.java index 89f3265667352c0fd5d5c8f2fdd994d220d06055..9ae9fb8e09c404f6cd2998f0876fe888cd48344b 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/ZlibCodec.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/KaeCodec.java @@ -21,13 +21,13 @@ import java.io.IOException; import java.util.zip.DataFormatException; import java.util.zip.Inflater; -public class ZlibCodec implements CompressionCodec { +public class KaeCodec implements CompressionCodec { - public ZlibCodec() {} + public KaeCodec() {} @Override public int decompress(byte[] input, int inputLength, byte[] output) throws IOException { - Inflater inflater = new Inflater(true); + Inflater inflater = new Inflater(false); int offset = 0; int length = output.length; try { diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/SnappyCodec.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/SnappyCodec.java deleted file mode 100644 index cdac3a5f217473ea9127950c3c21728398704b18..0000000000000000000000000000000000000000 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/compress/SnappyCodec.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.huawei.boostkit.spark.compress; - -import io.airlift.compress.snappy.SnappyDecompressor; - -import java.io.IOException; - -public class SnappyCodec extends AircompressorCodec { - - SnappyCodec() { - super(new SnappyDecompressor()); - } - - @Override - public int decompress(byte[] input, int inputLength, byte[] output) throws IOException { - return super.decompress(input, inputLength, output); - } -} diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index d59978a884f5152d5f1b2ea53aa58174a3747f3f..6a7684cefd2bf9cb17d0bc1168b47fd9e529cabe 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -150,9 +150,9 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { val enableShuffleCompress = conf.getConfString("spark.shuffle.compress", "true").toBoolean - // shuffle compress type, default lz4 + // shuffle compress type, default kae val columnarShuffleCompressionCodec = - conf.getConfString("spark.io.compression.codec", "lz4").toString + conf.getConfString("spark.omni.io.compression.codec", "kae").toString // columnar shuffle native buffer size val columnarShuffleNativeBufferSize = diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index c982ae6fc3c9914daa285c27aa331c0c6f11fb41..6f98be54dcf6747a58227decd47198d2f0de09eb 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -315,7 +315,7 @@ object ColumnarShuffleExchangeExec extends Logging { newIter }, isOrderSensitive = isOrderSensitive) case h@HashPartitioning(expressions, numPartitions) => - if (containsRollUp(expressions)) { + if (containsRollUp(expressions) || expressions.length > 6) { rdd.mapPartitionsWithIndexInternal((_, cbIter) => { val partitionKeyExtractor: InternalRow => Any = { val projection = diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleCompressionTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleCompressionTest.java index d95be18832b926500b599821b6b6fd0baa8861c5..145d126214fb3cb5a2563b0b119649d6f5610a98 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleCompressionTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleCompressionTest.java @@ -81,21 +81,9 @@ public class ColumnShuffleCompressionTest extends ColumnShuffleTest { } @Test - public void columnShuffleSnappyCompressTest() throws IOException { - shuffleDataFile = shuffleTestDir + "/shuffle_dataFile_snappy_test"; - columnShuffleTestCompress("snappy", shuffleDataFile); - } - - @Test - public void columnShuffleLz4CompressTest() throws IOException { - shuffleDataFile = shuffleTestDir + "/shuffle_dataFile_lz4_test"; - columnShuffleTestCompress("lz4", shuffleDataFile); - } - - @Test - public void columnShuffleZlibCompressTest() throws IOException { - shuffleDataFile = shuffleTestDir + "/shuffle_dataFile_zlib_test"; - columnShuffleTestCompress("zlib", shuffleDataFile); + public void columnShuffleKaeCompressTest() throws IOException { + shuffleDataFile = shuffleTestDir + "/shuffle_dataFile_kae_test"; + columnShuffleTestCompress("kae", shuffleDataFile); } public void columnShuffleTestCompress(String compressType, String dataFile) throws IOException { diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffPartitionTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffPartitionTest.java index c8fd474137a93ea8831d3dc3ab432e409018cc55..a809c8147b0fe0ce56dcf79b1976f67d487951c0 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffPartitionTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffPartitionTest.java @@ -109,7 +109,7 @@ public class ColumnShuffleDiffPartitionTest extends ColumnShuffleTest { tmpStr, types.length, 3, - "lz4", + "kae", dataFile, 0, shuffleTestDir, diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffRowVBTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffRowVBTest.java index dc53fda8a1a04a15bf7ffb9919926d4812208fc0..adbcc0e62195b5a160234f385a09784f7d4699df 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffRowVBTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleDiffRowVBTest.java @@ -89,7 +89,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -119,7 +119,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -149,7 +149,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -179,7 +179,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -208,7 +208,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -237,7 +237,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -276,7 +276,7 @@ public class ColumnShuffleDiffRowVBTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleGBSizeTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleGBSizeTest.java index 2ef81ac49e545aa617136b9d4f3e7e769ea34652..488aaa496860132bfc8bc1d5b63ae7d4178c1d08 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleGBSizeTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleGBSizeTest.java @@ -89,7 +89,7 @@ public class ColumnShuffleGBSizeTest extends ColumnShuffleTest { tmpStr, types.length, 4096, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -118,7 +118,7 @@ public class ColumnShuffleGBSizeTest extends ColumnShuffleTest { tmpStr, types.length, 4096, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -147,7 +147,7 @@ public class ColumnShuffleGBSizeTest extends ColumnShuffleTest { tmpStr, types.length, 1024, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -177,7 +177,7 @@ public class ColumnShuffleGBSizeTest extends ColumnShuffleTest { tmpStr, types.length, 1024, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -207,7 +207,7 @@ public class ColumnShuffleGBSizeTest extends ColumnShuffleTest { tmpStr, types.length, 4096, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -238,7 +238,7 @@ public class ColumnShuffleGBSizeTest extends ColumnShuffleTest { tmpStr, types.length, 4096, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleNullTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleNullTest.java index 98fc18dd8f3237928cc066887e6fcb2205686692..66b5503f9f601a023972b5f2d8dfee8e990e9976 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleNullTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleNullTest.java @@ -88,7 +88,7 @@ public class ColumnShuffleNullTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -118,7 +118,7 @@ public class ColumnShuffleNullTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -149,7 +149,7 @@ public class ColumnShuffleNullTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, @@ -180,7 +180,7 @@ public class ColumnShuffleNullTest extends ColumnShuffleTest { tmpStr, types.length, 3, //shuffle value_buffer init size - "lz4", + "kae", shuffleDataFile, 0, shuffleTestDir, diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerZlibSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerKaeSuite.scala similarity index 98% rename from omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerZlibSuite.scala rename to omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerKaeSuite.scala index 1088c37e078dab3fd0b17a1a4e5d57222d01eb2a..298c809df5bfa5e540102ab521a71f4ea4d64c1a 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerZlibSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerKaeSuite.scala @@ -39,7 +39,7 @@ import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Mockito.{doAnswer, when} import org.mockito.invocation.InvocationOnMock -class ColumnShuffleSerializerZlibSuite extends SharedSparkSession { +class ColumnShuffleSerializerKaeSuite extends SharedSparkSession { @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _ @Mock(answer = RETURNS_SMART_NULLS) private var dependency @@ -47,10 +47,10 @@ class ColumnShuffleSerializerZlibSuite extends SharedSparkSession { override def sparkConf: SparkConf = super.sparkConf - .setAppName("test shuffle serializer for zlib") + .setAppName("test shuffle serializer for kae") .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.OmniColumnarShuffleManager") .set("spark.shuffle.compress", "true") - .set("spark.io.compression.codec", "zlib") + .set("spark.omni.io.compression.codec", "kae") private var taskMetrics: TaskMetrics = _ private var tempDir: File = _ @@ -130,7 +130,7 @@ class ColumnShuffleSerializerZlibSuite extends SharedSparkSession { super.afterAll() } - test("write shuffle compress for zlib with null value middle") { + test("write shuffle compress for kae with null value middle") { val pidArray: Array[java.lang.Integer] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) val intArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, null, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) val shortArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, null, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerLz4Suite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerLz4Suite.scala deleted file mode 100644 index 4d79e3ca6e0af0c74c5be6e04126c75bae9f21db..0000000000000000000000000000000000000000 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerLz4Suite.scala +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved. - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle - -import java.io.{File, FileInputStream} - -import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer -import com.huawei.boostkit.spark.vectorized.PartitionInfo -import nova.hetu.omniruntime.`type`.{DataType, _} -import nova.hetu.omniruntime.vector._ -import org.apache.spark.{HashPartitioner, SparkConf, TaskContext} -import org.apache.spark.executor.TaskMetrics -import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.shuffle.sort.ColumnarShuffleHandle -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.execution.vectorized.OmniColumnVector -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.Utils -import org.mockito.Answers.RETURNS_SMART_NULLS -import org.mockito.ArgumentMatchers.{any, anyInt, anyLong} -import org.mockito.{Mock, MockitoAnnotations} -import org.mockito.Mockito.{doAnswer, when} -import org.mockito.invocation.InvocationOnMock - -class ColumnShuffleSerializerLz4Suite extends SharedSparkSession { - @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ - @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _ - @Mock(answer = RETURNS_SMART_NULLS) private var dependency - : ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = _ - - override def sparkConf: SparkConf = - super.sparkConf - .setAppName("test shuffle serializer for lz4") - .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.OmniColumnarShuffleManager") - .set("spark.shuffle.compress", "true") - .set("spark.io.compression.codec", "lz4") - - private var taskMetrics: TaskMetrics = _ - private var tempDir: File = _ - private var outputFile: File = _ - - private var shuffleHandle: ColumnarShuffleHandle[Int, ColumnarBatch] = _ - private val numPartitions = 1 - - protected var avgBatchNumRows: SQLMetric = _ - protected var outputNumRows: SQLMetric = _ - - override def beforeEach(): Unit = { - super.beforeEach() - - avgBatchNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, - "test serializer avg read batch num rows") - outputNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, - "test serializer number of output rows") - - tempDir = Utils.createTempDir() - outputFile = File.createTempFile("shuffle", null, tempDir) - taskMetrics = new TaskMetrics - - MockitoAnnotations.initMocks(this) - - shuffleHandle = - new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) - - val types : Array[DataType] = Array[DataType]( - IntDataType.INTEGER, - ShortDataType.SHORT, - LongDataType.LONG, - DoubleDataType.DOUBLE, - new Decimal64DataType(18, 3), - new Decimal128DataType(28, 11), - VarcharDataType.VARCHAR, - BooleanDataType.BOOLEAN) - val inputTypes = DataTypeSerializer.serialize(types) - - when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) - when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) - when(dependency.partitionInfo).thenReturn( - new PartitionInfo("hash", numPartitions, types.length, inputTypes)) - when(dependency.dataSize) - .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "data size")) - when(dependency.bytesSpilled) - .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "shuffle bytes spilled")) - when(dependency.numInputRows) - .thenReturn(SQLMetrics.createMetric(spark.sparkContext, "number of input rows")) - when(dependency.splitTime) - .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_split")) - when(dependency.spillTime) - .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_spill")) - when(taskContext.taskMetrics()).thenReturn(taskMetrics) - when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) - - doAnswer { (invocationOnMock: InvocationOnMock) => - val tmp = invocationOnMock.getArguments()(4).asInstanceOf[File] - if (tmp != null) { - outputFile.delete - tmp.renameTo(outputFile) - } - null - }.when(blockResolver) - .writeMetadataFileAndCommit(anyInt, anyLong, any(classOf[Array[Long]]), any(classOf[Array[Long]]), any(classOf[File])) - } - - override def afterEach(): Unit = { - try { - Utils.deleteRecursively(tempDir) - } finally { - super.afterEach() - } - } - - override def afterAll(): Unit = { - super.afterAll() - } - - test("write shuffle compress for lz4 with no null value") { - val pidArray: Array[java.lang.Integer] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - val intArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) - val shortArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) - val longArray: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, - 17L, 18L, 19L, 20L) - val doubleArray: Array[java.lang.Double] = Array(0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.10, 11.11, 12.12, - 13.13, 14.14, 15.15, 16.16, 17.17, 18.18, 19.19, 20.20) - val decimal64Array: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, - 17L, 18L, 19L, 20L) - val decimal128Array: Array[Array[Long]] = Array( - Array(0L, 0L), Array(1L, 1L), Array(2L, 2L), Array(3L, 3L), Array(4L, 4L), Array(5L, 5L), Array(6L, 6L), - Array(7L, 7L), Array(8L, 8L), Array(9L, 9L), Array(10L, 10L), Array(11L, 11L), Array(12L, 12L), Array(13L, 13L), - Array(14L, 14L), Array(15L, 15L), Array(16L, 16L), Array(17L, 17L), Array(18L, 18L), Array(19L, 19L), Array(20L, 20L)) - val stringArray: Array[java.lang.String] = Array("", "a", "bb", "ccc", "dddd", "eeeee", "ffffff", "ggggggg", - "hhhhhhhh", "iiiiiiiii", "jjjjjjjjjj", "kkkkkkkkkkk", "llllllllllll", "mmmmmmmmmmmmm", "nnnnnnnnnnnnnn", - "ooooooooooooooo", "pppppppppppppppp", "qqqqqqqqqqqqqqqqq", "rrrrrrrrrrrrrrrrrr", "sssssssssssssssssss", - "tttttttttttttttttttt") - val booleanArray: Array[java.lang.Boolean] = Array(true, true, true, true, true, true, true, true, true, true, - false, false, false, false, false, false, false, false, false, false, false) - - val pidVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) - val intVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) - val shortVector0 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) - val longVector0 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) - val doubleVector0 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) - val decimal64Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) - val decimal128Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) - val varcharVector0 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) - val booleanVector0 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) - - val cb0 = ColumnarShuffleWriterSuite.makeColumnarBatch( - pidVector0.getVec.getSize, - List(pidVector0, intVector0, shortVector0, longVector0, doubleVector0, - decimal64Vector0, decimal128Vector0, varcharVector0, booleanVector0) - ) - - val pidVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) - val intVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) - val shortVector1 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) - val longVector1 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) - val doubleVector1 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) - val decimal64Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) - val decimal128Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) - val varcharVector1 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) - val booleanVector1 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) - - val cb1 = ColumnarShuffleWriterSuite.makeColumnarBatch( - pidVector1.getVec.getSize, - List(pidVector1, intVector1, shortVector1, longVector1, doubleVector1, - decimal64Vector1, decimal128Vector1, varcharVector1, booleanVector1) - ) - - def records: Iterator[(Int, ColumnarBatch)] = Iterator((0, cb0), (0, cb1)) - - val writer = new ColumnarShuffleWriter[Int, ColumnarBatch]( - blockResolver, - shuffleHandle, - 0L, // MapId - taskContext.taskMetrics().shuffleWriteMetrics) - - writer.write(records) - writer.stop(success = true) - - assert(writer.getPartitionLengths.sum === outputFile.length()) - assert(writer.getPartitionLengths.count(_ == 0L) === 0) - // should be (numPartitions - 2) zero length files - - val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics - assert(shuffleWriteMetrics.bytesWritten === outputFile.length()) - assert(shuffleWriteMetrics.recordsWritten === pidArray.length * 2) - - assert(taskMetrics.diskBytesSpilled === 0) - assert(taskMetrics.memoryBytesSpilled === 0) - - val serializer = new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() - val deserializedStream = serializer.deserializeStream(new FileInputStream(outputFile)) - - try { - val kv = deserializedStream.asKeyValueIterator - var length = 0 - kv.foreach { - case (_, batch: ColumnarBatch) => - length += 1 - assert(batch.numRows == 42) - assert(batch.numCols == 8) - assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(0) == 0) - assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(19) == 19) - assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(0) == 0) - assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(19) == 19) - assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0) - assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19) - assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(0) == 0.0) - assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(19) == 19.19) - assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0L) - assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19L) - assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(0) sameElements Array(0L, 0L)) - assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(19) sameElements Array(19L, 19L)) - assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(0) sameElements "") - assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(19) sameElements "sssssssssssssssssss") - assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(0) == true) - assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(19) == false) - (0 until batch.numCols).foreach { i => - val valueVector = batch.column(i).asInstanceOf[OmniColumnVector].getVec - assert(valueVector.getSize == batch.numRows) - } - batch.close() - } - assert(length == 1) - } finally { - deserializedStream.close() - } - - } -} diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSnappySuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSnappySuite.scala deleted file mode 100644 index 6fcb9a8969bb6f0c3050301a46cbcca110b0b909..0000000000000000000000000000000000000000 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSnappySuite.scala +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved. - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle - -import java.io.{File, FileInputStream} - -import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer -import com.huawei.boostkit.spark.vectorized.PartitionInfo -import nova.hetu.omniruntime.`type`.{DataType, _} -import nova.hetu.omniruntime.vector._ -import org.apache.spark.{HashPartitioner, SparkConf, TaskContext} -import org.apache.spark.executor.TaskMetrics -import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.shuffle.sort.ColumnarShuffleHandle -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.execution.vectorized.OmniColumnVector -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.Utils -import org.mockito.Answers.RETURNS_SMART_NULLS -import org.mockito.ArgumentMatchers.{any, anyInt, anyLong} -import org.mockito.{Mock, MockitoAnnotations} -import org.mockito.Mockito.{doAnswer, when} -import org.mockito.invocation.InvocationOnMock - -class ColumnShuffleSerializerSnappySuite extends SharedSparkSession { - @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ - @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _ - @Mock(answer = RETURNS_SMART_NULLS) private var dependency - : ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = _ - - override def sparkConf: SparkConf = - super.sparkConf - .setAppName("test shuffle serializer for snappy") - .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.OmniColumnarShuffleManager") - .set("spark.shuffle.compress", "true") - .set("spark.io.compression.codec", "snappy") - - private var taskMetrics: TaskMetrics = _ - private var tempDir: File = _ - private var outputFile: File = _ - - private var shuffleHandle: ColumnarShuffleHandle[Int, ColumnarBatch] = _ - private val numPartitions = 1 - - protected var avgBatchNumRows: SQLMetric = _ - protected var outputNumRows: SQLMetric = _ - - override def beforeEach(): Unit = { - super.beforeEach() - - avgBatchNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, - "test serializer avg read batch num rows") - outputNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, - "test serializer number of output rows") - - tempDir = Utils.createTempDir() - outputFile = File.createTempFile("shuffle", null, tempDir) - taskMetrics = new TaskMetrics - - MockitoAnnotations.initMocks(this) - - shuffleHandle = - new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) - - val types : Array[DataType] = Array[DataType]( - IntDataType.INTEGER, - ShortDataType.SHORT, - LongDataType.LONG, - DoubleDataType.DOUBLE, - new Decimal64DataType(18, 3), - new Decimal128DataType(28, 11), - VarcharDataType.VARCHAR, - BooleanDataType.BOOLEAN) - val inputTypes = DataTypeSerializer.serialize(types) - - when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) - when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) - when(dependency.partitionInfo).thenReturn( - new PartitionInfo("hash", numPartitions, types.length, inputTypes)) - when(dependency.dataSize) - .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "data size")) - when(dependency.bytesSpilled) - .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "shuffle bytes spilled")) - when(dependency.numInputRows) - .thenReturn(SQLMetrics.createMetric(spark.sparkContext, "number of input rows")) - when(dependency.splitTime) - .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_split")) - when(dependency.spillTime) - .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_spill")) - when(taskContext.taskMetrics()).thenReturn(taskMetrics) - when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) - - doAnswer { (invocationOnMock: InvocationOnMock) => - val tmp = invocationOnMock.getArguments()(4).asInstanceOf[File] - if (tmp != null) { - outputFile.delete - tmp.renameTo(outputFile) - } - null - }.when(blockResolver) - .writeMetadataFileAndCommit(anyInt, anyLong, any(classOf[Array[Long]]), any(classOf[Array[Long]]), any(classOf[File])) - } - - override def afterEach(): Unit = { - try { - Utils.deleteRecursively(tempDir) - } finally { - super.afterEach() - } - } - - override def afterAll(): Unit = { - super.afterAll() - } - - test("write shuffle compress for snappy") { - val pidArray: Array[java.lang.Integer] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - val intArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) - val shortArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) - val longArray: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, - 17L, 18L, 19L, 20L) - val doubleArray: Array[java.lang.Double] = Array(0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.10, 11.11, 12.12, - 13.13, 14.14, 15.15, 16.16, 17.17, 18.18, 19.19, 20.20) - val decimal64Array: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, - 17L, 18L, 19L, 20L) - val decimal128Array: Array[Array[Long]] = Array( - Array(0L, 0L), Array(1L, 1L), Array(2L, 2L), Array(3L, 3L), Array(4L, 4L), Array(5L, 5L), Array(6L, 6L), - Array(7L, 7L), Array(8L, 8L), Array(9L, 9L), Array(10L, 10L), Array(11L, 11L), Array(12L, 12L), Array(13L, 13L), - Array(14L, 14L), Array(15L, 15L), Array(16L, 16L), Array(17L, 17L), Array(18L, 18L), Array(19L, 19L), Array(20L, 20L)) - val stringArray: Array[java.lang.String] = Array("", "a", "bb", "ccc", "dddd", "eeeee", "ffffff", "ggggggg", - "hhhhhhhh", "iiiiiiiii", "jjjjjjjjjj", "kkkkkkkkkkk", "llllllllllll", "mmmmmmmmmmmmm", "nnnnnnnnnnnnnn", - "ooooooooooooooo", "pppppppppppppppp", "qqqqqqqqqqqqqqqqq", "rrrrrrrrrrrrrrrrrr", "sssssssssssssssssss", - "tttttttttttttttttttt") - val booleanArray: Array[java.lang.Boolean] = Array(true, true, true, true, true, true, true, true, true, true, - false, false, false, false, false, false, false, false, false, false, false) - - val pidVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) - val intVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) - val shortVector0 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) - val longVector0 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) - val doubleVector0 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) - val decimal64Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) - val decimal128Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) - val varcharVector0 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) - val booleanVector0 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) - - val cb0 = ColumnarShuffleWriterSuite.makeColumnarBatch( - pidVector0.getVec.getSize, - List(pidVector0, intVector0, shortVector0, longVector0, doubleVector0, - decimal64Vector0, decimal128Vector0, varcharVector0, booleanVector0) - ) - - val pidVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) - val intVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) - val shortVector1 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) - val longVector1 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) - val doubleVector1 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) - val decimal64Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) - val decimal128Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) - val varcharVector1 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) - val booleanVector1 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) - - val cb1 = ColumnarShuffleWriterSuite.makeColumnarBatch( - pidVector1.getVec.getSize, - List(pidVector1, intVector1, shortVector1, longVector1, doubleVector1, - decimal64Vector1, decimal128Vector1, varcharVector1, booleanVector1) - ) - - def records: Iterator[(Int, ColumnarBatch)] = Iterator((0, cb0), (0, cb1)) - - val writer = new ColumnarShuffleWriter[Int, ColumnarBatch]( - blockResolver, - shuffleHandle, - 0L, // MapId - taskContext.taskMetrics().shuffleWriteMetrics) - - writer.write(records) - writer.stop(success = true) - - assert(writer.getPartitionLengths.sum === outputFile.length()) - assert(writer.getPartitionLengths.count(_ == 0L) === 0) - // should be (numPartitions - 2) zero length files - - val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics - assert(shuffleWriteMetrics.bytesWritten === outputFile.length()) - assert(shuffleWriteMetrics.recordsWritten === pidArray.length * 2) - - assert(taskMetrics.diskBytesSpilled === 0) - assert(taskMetrics.memoryBytesSpilled === 0) - - val serializer = new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() - val deserializedStream = serializer.deserializeStream(new FileInputStream(outputFile)) - - try { - val kv = deserializedStream.asKeyValueIterator - var length = 0 - kv.foreach { - case (_, batch: ColumnarBatch) => - length += 1 - assert(batch.numRows == 42) - assert(batch.numCols == 8) - assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(0) == 0) - assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(19) == 19) - assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(0) == 0) - assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(19) == 19) - assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0) - assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19) - assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(0) == 0.0) - assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(19) == 19.19) - assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0L) - assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19L) - assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(0) sameElements Array(0L, 0L)) - assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(19) sameElements Array(19L, 19L)) - assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(0) sameElements "") - assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(19) sameElements "sssssssssssssssssss") - assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(0) == true) - assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(19) == false) - (0 until batch.numCols).foreach { i => - val valueVector = batch.column(i).asInstanceOf[OmniColumnVector].getVec - assert(valueVector.getSize == batch.numRows) - } - batch.close() - } - assert(length == 1) - } finally { - deserializedStream.close() - } - - } -}