diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 8b195c6c833e7..988895a824109 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -38,8 +38,8 @@ #include "presto_cpp/main/operators/BroadcastWrite.h" #include "presto_cpp/main/operators/LocalPersistentShuffle.h" #include "presto_cpp/main/operators/PartitionAndSerialize.h" +#include "presto_cpp/main/operators/ShuffleExchangeSource.h" #include "presto_cpp/main/operators/ShuffleRead.h" -#include "presto_cpp/main/operators/CompactRowExchangeSource.h" #include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h" #include "presto_cpp/main/types/VeloxPlanConversion.h" #include "velox/common/base/Counters.h" @@ -451,7 +451,7 @@ void PrestoServer::run() { }); velox::exec::ExchangeSource::registerFactory( - operators::CompactRowExchangeSource::createExchangeSource); + operators::ShuffleExchangeSource::createExchangeSource); // Batch broadcast exchange source. velox::exec::ExchangeSource::registerFactory( diff --git a/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt b/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt index 41a37024e8e21..104bf74db3825 100644 --- a/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt @@ -14,9 +14,9 @@ add_library( presto_operators PartitionAndSerialize.cpp + ShuffleExchangeSource.cpp ShuffleRead.cpp ShuffleWrite.cpp - CompactRowExchangeSource LocalPersistentShuffle.cpp BroadcastWrite.cpp BroadcastFactory.cpp diff --git a/presto-native-execution/presto_cpp/main/operators/CompactRowExchangeSource.cpp b/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp similarity index 86% rename from presto-native-execution/presto_cpp/main/operators/CompactRowExchangeSource.cpp rename to presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp index 092a1631b186f..cdd86eec79853 100644 --- a/presto-native-execution/presto_cpp/main/operators/CompactRowExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp @@ -15,7 +15,7 @@ #include #include "presto_cpp/main/common/Configs.h" -#include "presto_cpp/main/operators/CompactRowExchangeSource.h" +#include "presto_cpp/main/operators/ShuffleExchangeSource.h" #include "velox/serializers/RowSerializer.h" namespace facebook::presto::operators { @@ -29,8 +29,8 @@ namespace facebook::presto::operators { VELOX_FAIL("ShuffleReader::{} failed: {}", methodName, e.what()); \ } -folly::SemiFuture -CompactRowExchangeSource::request( +folly::SemiFuture +ShuffleExchangeSource::request( uint32_t /*maxBytes*/, std::chrono::microseconds /*maxWait*/) { auto nextBatch = [this]() { @@ -48,8 +48,7 @@ CompactRowExchangeSource::request( VELOX_CHECK_LE(totalBytes, std::numeric_limits::max()); ++numBatches_; queue_->enqueueLocked( - std::make_unique( - std::move(batch)), + std::make_unique(std::move(batch)), promises); } } @@ -61,7 +60,7 @@ CompactRowExchangeSource::request( }) .deferError( [](folly::exception_wrapper e) mutable - -> CompactRowExchangeSource::Response { + -> ShuffleExchangeSource::Response { VELOX_FAIL("ShuffleReader::{} failed: {}", "next", e.what()); }); }; @@ -69,9 +68,8 @@ CompactRowExchangeSource::request( CALL_SHUFFLE(return nextBatch(), "next"); } -folly::SemiFuture -CompactRowExchangeSource::requestDataSizes( - std::chrono::microseconds /*maxWait*/) { +folly::SemiFuture +ShuffleExchangeSource::requestDataSizes(std::chrono::microseconds /*maxWait*/) { std::vector remainingBytes; if (!atEnd_) { // Use default value of ExchangeClient::getAveragePageSize() for now. @@ -82,7 +80,7 @@ CompactRowExchangeSource::requestDataSizes( return folly::makeSemiFuture(Response{0, atEnd_, std::move(remainingBytes)}); } -folly::F14FastMap CompactRowExchangeSource::stats() const { +folly::F14FastMap ShuffleExchangeSource::stats() const { return shuffleReader_->stats(); } @@ -101,7 +99,7 @@ std::optional getSerializedShuffleInfo(folly::Uri& uri) { // static std::shared_ptr -CompactRowExchangeSource::createExchangeSource( +ShuffleExchangeSource::createExchangeSource( const std::string& url, int32_t destination, const std::shared_ptr& queue, @@ -123,7 +121,7 @@ CompactRowExchangeSource::createExchangeSource( "shuffle.name is not provided in config.properties to create a shuffle " "interface."); auto shuffleFactory = ShuffleInterfaceFactory::factory(shuffleName); - return std::make_shared( + return std::make_shared( uri.host(), destination, queue, diff --git a/presto-native-execution/presto_cpp/main/operators/CompactRowExchangeSource.h b/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.h similarity index 91% rename from presto-native-execution/presto_cpp/main/operators/CompactRowExchangeSource.h rename to presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.h index 134914b56266a..a44d03f396128 100644 --- a/presto-native-execution/presto_cpp/main/operators/CompactRowExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.h @@ -21,16 +21,16 @@ namespace facebook::presto::operators { -class CompactRowBatch : public velox::exec::SerializedPage { +class ShuffleRowBatch : public velox::exec::SerializedPage { public: - explicit CompactRowBatch( + explicit ShuffleRowBatch( std::unique_ptr rowBatch) : velox::exec:: SerializedPage{folly::IOBuf::wrapBuffer( rowBatch->data->as(), rowBatch->data->size()), nullptr, rowBatch->rows.size()}, rowBatch_{std::move(rowBatch)} {} - ~CompactRowBatch() override {} + ~ShuffleRowBatch() override {} const std::vector& rows() const { return rowBatch_->rows; @@ -40,9 +40,9 @@ class CompactRowBatch : public velox::exec::SerializedPage { const std::unique_ptr rowBatch_; }; -class CompactRowExchangeSource : public velox::exec::ExchangeSource { +class ShuffleExchangeSource : public velox::exec::ExchangeSource { public: - CompactRowExchangeSource( + ShuffleExchangeSource( const std::string& taskId, int destination, const std::shared_ptr& queue, diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp b/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp index eb6a9261b2b1a..d644c7ce0ac5b 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp @@ -12,14 +12,13 @@ * limitations under the License. */ #include "presto_cpp/main/operators/ShuffleRead.h" +#include "presto_cpp/main/operators/ShuffleExchangeSource.h" +#include "velox/common/Casts.h" #include "velox/exec/Exchange.h" #include "velox/row/CompactRow.h" -#include "velox/serializers/CompactRowSerializer.h" -#include "velox/serializers/RowSerializer.h" using namespace facebook::velox::exec; using namespace facebook::velox; -using facebook::velox::serializer::RowIteratorImpl; namespace facebook::presto::operators { velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) { @@ -27,73 +26,9 @@ velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) { } namespace { -std::unique_ptr shuffleRowIteratorFactory( - ByteInputStream* source, - const VectorSerde::Options* /*unused*/) { - return std::make_unique(source, source->size()); -} - -class ShuffleVectorSerde : public VectorSerde { +class ShuffleRead : public Exchange { public: - ShuffleVectorSerde() : VectorSerde(VectorSerde::Kind::kCompactRow) {} - - void estimateSerializedSize( - const BaseVector* /* vector */, - const folly::Range& /* ranges */, - vector_size_t** /* sizes */) override { - // Not used. - VELOX_UNREACHABLE(); - } - - std::unique_ptr createIterativeSerializer( - RowTypePtr /* type */, - int32_t /* numRows */, - StreamArena* /* streamArena */, - const Options* /* options */) override { - // Not used. - VELOX_UNREACHABLE(); - } - - void deserialize( - ByteInputStream* source, - velox::memory::MemoryPool* pool, - RowTypePtr type, - RowVectorPtr* result, - const Options* /* options */) override { - VELOX_UNSUPPORTED("ShuffleVectorSerde::deserialize is not supported"); - } - - void deserialize( - ByteInputStream* source, - std::unique_ptr& sourceRowIterator, - uint64_t maxRows, - RowTypePtr type, - RowVectorPtr* result, - velox::memory::MemoryPool* pool, - const Options* options) override { - std::vector serializedRows; - std::vector> serializedBuffers; - velox::serializer::RowDeserializer::deserialize( - source, - maxRows, - sourceRowIterator, - serializedRows, - serializedBuffers, - shuffleRowIteratorFactory, - options); - - if (serializedRows.empty()) { - *result = BaseVector::create(type, 0, pool); - return; - } - - *result = row::CompactRow::deserialize(serializedRows, type, pool); - } -}; - -class ShuffleReadOperator : public Exchange { - public: - ShuffleReadOperator( + ShuffleRead( int32_t operatorId, DriverCtx* ctx, const std::shared_ptr& shuffleReadNode, @@ -104,19 +39,82 @@ class ShuffleReadOperator : public Exchange { std::make_shared( shuffleReadNode->id(), shuffleReadNode->outputType(), - velox::VectorSerde::Kind::kCompactRow), + VectorSerde::Kind::kCompactRow), exchangeClient, - "ShuffleRead"), - serde_(std::make_unique()) {} + "ShuffleRead") {} + + RowVectorPtr getOutput() override; protected: VectorSerde* getSerde() override { - return serde_.get(); + VELOX_UNSUPPORTED("ShuffleReadOperator doesn't use serde"); } private: - std::unique_ptr serde_; + size_t nextRow_{0}; + // Reusable buffers. + std::vector rows_; }; + +RowVectorPtr ShuffleRead::getOutput() { + if (currentPages_.empty()) { + return nullptr; + } + + SCOPE_EXIT { + if (nextRow_ == rows_.size()) { + currentPages_.clear(); + rows_.clear(); + nextRow_ = 0; + } + }; + + uint64_t rawInputBytes{0}; + if (rows_.empty()) { + VELOX_CHECK_EQ(nextRow_, 0); + size_t numRows{0}; + for (const auto& page : currentPages_) { + rawInputBytes += page->size(); + numRows += page->numRows().value(); + } + rows_.reserve(numRows); + for (const auto& page : currentPages_) { + auto* batch = checked_pointer_cast(page.get()); + const auto& rows = batch->rows(); + for (const auto& row : rows) { + rows_.emplace_back(row); + } + } + } + VELOX_CHECK_LE(nextRow_, rows_.size()); + if (rows_.empty()) { + return nullptr; + } + + auto numOutputRows = kInitialOutputRows; + if (estimatedRowSize_.has_value()) { + numOutputRows = std::max( + (preferredOutputBatchBytes_ / estimatedRowSize_.value()), + kInitialOutputRows); + } + numOutputRows = std::min(numOutputRows, rows_.size() - nextRow_); + + // Create a view of the rows to deserialize from nextRow_ to nextRow_ + + // numOutputRows. + if (numOutputRows == rows_.size()) { + result_ = row::CompactRow::deserialize(rows_, outputType_, pool()); + } else { + std::vector outputRows( + rows_.begin() + nextRow_, rows_.begin() + nextRow_ + numOutputRows); + result_ = row::CompactRow::deserialize(outputRows, outputType_, pool()); + } + nextRow_ += numOutputRows; + estimatedRowSize_ = std::max( + result_->estimateFlatSize() / numOutputRows, + estimatedRowSize_.value_or(1L)); + recordInputStats(rawInputBytes); + return result_; +} } // namespace folly::dynamic ShuffleReadNode::serialize() const { @@ -140,7 +138,7 @@ std::unique_ptr ShuffleReadTranslator::toOperator( std::shared_ptr exchangeClient) { if (auto shuffleReadNode = std::dynamic_pointer_cast(node)) { - return std::make_unique( + return std::make_unique( id, ctx, shuffleReadNode, exchangeClient); } return nullptr; diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp b/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp index dbb99c0e62711..dd25fa2f54eac 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp @@ -32,9 +32,9 @@ velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) { VELOX_FAIL("ShuffleWriter::{} failed: {}", methodName, e.what()); \ } -class ShuffleWriteOperator : public Operator { +class ShuffleWrite : public Operator { public: - ShuffleWriteOperator( + ShuffleWrite( int32_t operatorId, DriverCtx* FOLLY_NONNULL ctx, const std::shared_ptr& planNode) @@ -73,9 +73,10 @@ class ShuffleWriteOperator : public Operator { constexpr int kReplicateNullsAndAny = 3; checkCreateShuffleWriter(); - auto partitions = input->childAt(kPartition)->as>(); - auto serializedKeys = input->childAt(kKey)->as>(); - auto serializedData = input->childAt(kData)->as>(); + auto* partitions = input->childAt(kPartition)->as>(); + auto* serializedKeys = input->childAt(kKey)->as>(); + auto* serializedData = + input->childAt(kData)->as>(); SimpleVector* replicate = nullptr; if (input->type()->size() == 4) { replicate = @@ -194,7 +195,7 @@ std::unique_ptr ShuffleWriteTranslator::toOperator( const core::PlanNodePtr& node) { if (auto shuffleWriteNode = std::dynamic_pointer_cast(node)) { - return std::make_unique(id, ctx, shuffleWriteNode); + return std::make_unique(id, ctx, shuffleWriteNode); } return nullptr; } diff --git a/presto-native-execution/presto_cpp/main/operators/tests/CMakeLists.txt b/presto-native-execution/presto_cpp/main/operators/tests/CMakeLists.txt index 13095c885a03d..5a082aa4c59a1 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/operators/tests/CMakeLists.txt @@ -15,7 +15,7 @@ target_link_libraries(presto_operators_plan_builder velox_core) add_executable( presto_operators_test - PlanNodeSerdeTest.cpp CompactRowShuffleTest.cpp BroadcastTest.cpp + PlanNodeSerdeTest.cpp ShuffleTest.cpp BroadcastTest.cpp BinarySortableSerializerTest.cpp PlanNodeBuilderTest.cpp) add_test(presto_operators_test presto_operators_test) diff --git a/presto-native-execution/presto_cpp/main/operators/tests/CompactRowShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp similarity index 94% rename from presto-native-execution/presto_cpp/main/operators/tests/CompactRowShuffleTest.cpp rename to presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp index 5d1b42398114c..ade4100b6e4de 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/CompactRowShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp @@ -14,9 +14,9 @@ #include #include "folly/init/Init.h" #include "presto_cpp/external/json/nlohmann/json.hpp" -#include "presto_cpp/main/operators/CompactRowExchangeSource.h" #include "presto_cpp/main/operators/LocalPersistentShuffle.h" #include "presto_cpp/main/operators/PartitionAndSerialize.h" +#include "presto_cpp/main/operators/ShuffleExchangeSource.h" #include "presto_cpp/main/operators/ShuffleRead.h" #include "presto_cpp/main/operators/ShuffleWrite.h" #include "presto_cpp/main/operators/tests/PlanBuilder.h" @@ -94,7 +94,8 @@ class TestShuffleWriter : public ShuffleWriter { maxKeyBytes_(maxKeyBytes), inProgressSizes_(numPartitions, 0), readyPartitions_( - std::make_shared>>>()), + std::make_shared< + std::vector>>>()), serializedSortKeys_( std::make_shared>>()) { inProgressPartitions_.resize(numPartitions_); @@ -120,7 +121,8 @@ class TestShuffleWriter : public ShuffleWriter { auto size = sizeof(TRowSize) + rowSize; // Check if there is enough space in the buffer. - if (readBatch && inProgressSizes_[partition] + size >= maxBytesPerPartition_) { + if (readBatch && + inProgressSizes_[partition] + size >= maxBytesPerPartition_) { readBatch->data->setSize(inProgressSizes_[partition]); (*readyPartitions_)[partition].emplace_back(std::move(readBatch)); inProgressPartitions_[partition].reset(); @@ -130,7 +132,8 @@ class TestShuffleWriter : public ShuffleWriter { if (readBatch == nullptr) { auto buffer = AlignedBuffer::allocate(maxBytesPerPartition_, pool_); VELOX_CHECK_NOT_NULL(buffer); - readBatch = std::make_unique(std::vector{}, std::move(buffer)); + readBatch = std::make_unique( + std::vector{}, std::move(buffer)); inProgressPartitions_[partition] = std::move(readBatch); inProgressSizes_[partition] = 0; } @@ -141,7 +144,8 @@ class TestShuffleWriter : public ShuffleWriter { *(TRowSize*)(rawBuffer) = folly::Endian::big(rowSize); ::memcpy(rawBuffer + sizeof(TRowSize), data.data(), rowSize); - readBatch->rows.push_back(data); + readBatch->rows.push_back( + std::string_view(rawBuffer + sizeof(TRowSize), rowSize)); inProgressSizes_[partition] += size; if (!key.empty()) { @@ -168,7 +172,8 @@ class TestShuffleWriter : public ShuffleWriter { {exec::ExchangeClient::kBackgroundCpuTimeMs, kFakeBackgroundCpuTimeMs}}; } - std::shared_ptr>>>& readyPartitions() { + std::shared_ptr>>>& + readyPartitions() { return readyPartitions_; } @@ -215,7 +220,8 @@ class TestShuffleWriter : public ShuffleWriter { /// Tracks the total size of each in-progress partition in /// inProgressPartitions_ std::vector inProgressSizes_; - std::shared_ptr>>> readyPartitions_; + std::shared_ptr>>> + readyPartitions_; std::shared_ptr>> serializedSortKeys_; }; @@ -223,7 +229,8 @@ class TestShuffleReader : public ShuffleReader { public: TestShuffleReader( const int32_t partition, - const std::shared_ptr>>>& + const std::shared_ptr< + std::vector>>>& readyPartitions) : partition_(partition), readyPartitions_(readyPartitions) {} @@ -233,7 +240,6 @@ class TestShuffleReader : public ShuffleReader { if ((*readyPartitions_)[partition_].empty()) { return folly::makeSemiFuture(std::unique_ptr(nullptr)); } - auto readBatch = std::move((*readyPartitions_)[partition_].back()); (*readyPartitions_)[partition_].pop_back(); return folly::makeSemiFuture>( @@ -252,7 +258,8 @@ class TestShuffleReader : public ShuffleReader { private: const int32_t partition_; - const std::shared_ptr>>>& readyPartitions_; + const std::shared_ptr>>>& + readyPartitions_; }; class TestShuffleFactory : public ShuffleInterfaceFactory { @@ -281,13 +288,12 @@ void registerExchangeSource(const std::string& shuffleName) { const std::string& taskId, int destination, const std::shared_ptr& queue, - memory::MemoryPool* pool) - -> std::shared_ptr { + memory::MemoryPool* pool) -> std::shared_ptr { if (strncmp(taskId.c_str(), "batch://", 8) == 0) { auto uri = folly::Uri(taskId); for (auto& pair : uri.getQueryParams()) { if (pair.first == "shuffleInfo") { - return std::make_shared( + return std::make_shared( taskId, destination, queue, @@ -304,7 +310,7 @@ void registerExchangeSource(const std::string& shuffleName) { } } // namespace -class CompactRowShuffleTest : public exec::test::OperatorTestBase { +class ShuffleTest : public exec::test::OperatorTestBase { public: std::string testShuffleInfo( uint32_t numPartitions, @@ -359,6 +365,7 @@ class CompactRowShuffleTest : public exec::test::OperatorTestBase { } void TearDown() override { + TestShuffleWriter::reset(); exec::test::waitForAllTasksToBeDeleted(); exec::test::OperatorTestBase::TearDown(); } @@ -822,7 +829,7 @@ class CompactRowShuffleTest : public exec::test::OperatorTestBase { } }; -TEST_F(CompactRowShuffleTest, operators) { +TEST_F(ShuffleTest, operators) { auto data = makeRowVector({ makeFlatVector({1, 2, 3, 4}), makeFlatVector({10, 20, 30, 40}), @@ -845,7 +852,7 @@ TEST_F(CompactRowShuffleTest, operators) { TestShuffleWriter::reset(); } -DEBUG_ONLY_TEST_F(CompactRowShuffleTest, shuffleWriterExceptions) { +DEBUG_ONLY_TEST_F(ShuffleTest, shuffleWriterExceptions) { auto data = makeRowVector({ makeFlatVector({1, 2, 3, 4}), makeFlatVector({10, 20, 30, 40}), @@ -872,12 +879,9 @@ DEBUG_ONLY_TEST_F(CompactRowShuffleTest, shuffleWriterExceptions) { VELOX_ASSERT_THROW( exec::test::readCursor(params), "ShuffleWriter::collect failed"); - - TestShuffleWriter::reset(); - exec::test::waitForAllTasksToBeDeleted(); } -DEBUG_ONLY_TEST_F(CompactRowShuffleTest, shuffleReaderExceptions) { +DEBUG_ONLY_TEST_F(ShuffleTest, shuffleReaderExceptions) { auto data = makeRowVector({ makeFlatVector({1, 2, 3, 4}), makeFlatVector({10, 20, 30, 40}), @@ -918,12 +922,9 @@ DEBUG_ONLY_TEST_F(CompactRowShuffleTest, shuffleReaderExceptions) { VELOX_ASSERT_THROW( runShuffleReadTask(params, info), "ShuffleReader::next failed"); } - - TestShuffleWriter::reset(); - exec::test::waitForAllTasksToBeDeleted(); } -TEST_F(CompactRowShuffleTest, endToEnd) { +TEST_F(ShuffleTest, endToEnd) { size_t numPartitions = 5; size_t numMapDrivers = 2; @@ -946,10 +947,9 @@ TEST_F(CompactRowShuffleTest, endToEnd) { numMapDrivers, {data}, kFakeBackgroundCpuTimeMs * Timestamp::kNanosecondsInMillisecond); - TestShuffleWriter::reset(); } -TEST_F(CompactRowShuffleTest, endToEndWithSortedShuffle) { +TEST_F(ShuffleTest, endToEndWithSortedShuffle) { size_t numPartitions = 2; size_t numMapDrivers = 1; @@ -990,10 +990,9 @@ TEST_F(CompactRowShuffleTest, endToEndWithSortedShuffle) { ordering, fields, expectedSortingOrder); - TestShuffleWriter::reset(); } -TEST_F(CompactRowShuffleTest, endToEndWithSortedShuffleRowLimit) { +TEST_F(ShuffleTest, endToEndWithSortedShuffleRowLimit) { size_t numPartitions = 3; size_t numMapDrivers = 1; @@ -1048,10 +1047,9 @@ TEST_F(CompactRowShuffleTest, endToEndWithSortedShuffleRowLimit) { fields, expectedSortingOrder, std::move(queryConfig)); - TestShuffleWriter::reset(); } -TEST_F(CompactRowShuffleTest, endToEndWithReplicateNullAndAny) { +TEST_F(ShuffleTest, endToEndWithReplicateNullAndAny) { size_t numPartitions = 9; size_t numMapDrivers = 2; @@ -1074,10 +1072,9 @@ TEST_F(CompactRowShuffleTest, endToEndWithReplicateNullAndAny) { numMapDrivers, {data}, kFakeBackgroundCpuTimeMs * Timestamp::kNanosecondsInMillisecond); - TestShuffleWriter::reset(); } -TEST_F(CompactRowShuffleTest, replicateNullsAndAny) { +TEST_F(ShuffleTest, replicateNullsAndAny) { // No nulls. Expect to replicate first row. auto data = makeRowVector({ makeFlatVector({1, 2, 3, 4}), @@ -1107,7 +1104,7 @@ TEST_F(CompactRowShuffleTest, replicateNullsAndAny) { data, makeFlatVector({true, false, true, true, false})); } -TEST_F(CompactRowShuffleTest, persistentShuffleDeser) { +TEST_F(ShuffleTest, persistentShuffleDeser) { std::string serializedWriteInfo = "{\n" " \"rootPath\": \"abc\",\n" @@ -1169,7 +1166,7 @@ TEST_F(CompactRowShuffleTest, persistentShuffleDeser) { nlohmann::detail::type_error); } -TEST_F(CompactRowShuffleTest, persistentShuffle) { +TEST_F(ShuffleTest, persistentShuffle) { uint32_t numPartitions = 1; uint32_t numMapDrivers = 1; @@ -1202,43 +1199,43 @@ TEST_F(CompactRowShuffleTest, persistentShuffle) { cleanupDirectory(rootPath); } -TEST_F(CompactRowShuffleTest, persistentShuffleFuzz) { +TEST_F(ShuffleTest, persistentShuffleFuzz) { fuzzerTest(false, 1); fuzzerTest(false, 3); fuzzerTest(false, 7); } -TEST_F(CompactRowShuffleTest, persistentShuffleFuzzWithReplicateNullsAndAny) { +TEST_F(ShuffleTest, persistentShuffleFuzzWithReplicateNullsAndAny) { fuzzerTest(true, 1); fuzzerTest(true, 3); fuzzerTest(true, 7); } -TEST_F(CompactRowShuffleTest, partitionAndSerializeOutputByteLimit) { +TEST_F(ShuffleTest, partitionAndSerializeOutputByteLimit) { partitionAndSerializeWithThresholds(10'000, 1, 10, 10); } -TEST_F(CompactRowShuffleTest, partitionAndSerializeOutputRowLimit) { +TEST_F(ShuffleTest, partitionAndSerializeOutputRowLimit) { partitionAndSerializeWithThresholds(5, 1'000'000'000, 10, 2); } -TEST_F(CompactRowShuffleTest, partitionAndSerializeOutputRowLimitWithSort) { +TEST_F(ShuffleTest, partitionAndSerializeOutputRowLimitWithSort) { partitionAndSerializeWithThresholds(5, 1'000'000'000, 10, 2, true); } -TEST_F(CompactRowShuffleTest, partitionAndSerializeOutputByteLimitWithSort) { +TEST_F(ShuffleTest, partitionAndSerializeOutputByteLimitWithSort) { partitionAndSerializeWithThresholds(10'000, 100, 10, 10, true); } -TEST_F(CompactRowShuffleTest, partitionAndSerializeNoLimit) { +TEST_F(ShuffleTest, partitionAndSerializeNoLimit) { partitionAndSerializeWithThresholds(1'000, 1'000'000'000, 5, 1); } -TEST_F(CompactRowShuffleTest, partitionAndSerializeBothLimited) { +TEST_F(ShuffleTest, partitionAndSerializeBothLimited) { partitionAndSerializeWithThresholds(1, 1'000'000, 5, 5); } -TEST_F(CompactRowShuffleTest, partitionAndSerializeOperator) { +TEST_F(ShuffleTest, partitionAndSerializeOperator) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), makeFlatVector(1'000, [](auto row) { return row * 10; }), @@ -1252,7 +1249,7 @@ TEST_F(CompactRowShuffleTest, partitionAndSerializeOperator) { testPartitionAndSerialize(plan, data); } -TEST_F(CompactRowShuffleTest, partitionAndSerializeWithLargeInput) { +TEST_F(ShuffleTest, partitionAndSerializeWithLargeInput) { auto data = makeRowVector( {makeFlatVector(20'000, [](auto row) { return row; })}); @@ -1264,7 +1261,7 @@ TEST_F(CompactRowShuffleTest, partitionAndSerializeWithLargeInput) { testPartitionAndSerialize(plan, data); } -TEST_F(CompactRowShuffleTest, partitionAndSerializeWithDifferentColumnOrder) { +TEST_F(ShuffleTest, partitionAndSerializeWithDifferentColumnOrder) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), makeFlatVector(1'000, [](auto row) { return row * 10; }), @@ -1299,9 +1296,7 @@ TEST_F(CompactRowShuffleTest, partitionAndSerializeWithDifferentColumnOrder) { testPartitionAndSerialize(plan, expected); } -TEST_F( - CompactRowShuffleTest, - partitionAndSerializeOperatorWhenSinglePartition) { +TEST_F(ShuffleTest, partitionAndSerializeOperatorWhenSinglePartition) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), makeFlatVector(1'000, [](auto row) { return row * 10; }), @@ -1315,7 +1310,7 @@ TEST_F( testPartitionAndSerialize(plan, data); } -TEST_F(CompactRowShuffleTest, shuffleWriterToString) { +TEST_F(ShuffleTest, shuffleWriterToString) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), makeFlatVector(1'000, [](auto row) { return row * 10; }), @@ -1338,7 +1333,7 @@ TEST_F(CompactRowShuffleTest, shuffleWriterToString) { " -> partition:INTEGER, key:VARBINARY, data:VARBINARY\n"); } -TEST_F(CompactRowShuffleTest, partitionAndSerializeToString) { +TEST_F(ShuffleTest, partitionAndSerializeToString) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), makeFlatVector(1'000, [](auto row) { return row * 10; }), @@ -1382,7 +1377,7 @@ class DummyShuffleInterfaceFactory : public ShuffleInterfaceFactory { } }; -TEST_F(CompactRowShuffleTest, shuffleInterfaceRegistration) { +TEST_F(ShuffleTest, shuffleInterfaceRegistration) { const std::string kShuffleName = "dummy-shuffle"; EXPECT_TRUE(ShuffleInterfaceFactory::registerFactory( kShuffleName, std::make_unique()));