diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index 96a968f0c4daf..052ffd84c4ff0 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #if __has_include("filesystem") #include namespace fs = std::filesystem; @@ -143,6 +144,7 @@ SystemConfig::SystemConfig() { NUM_PROP(kMaxDriversPerTask, hardwareConcurrency()), NONE_PROP(kTaskWriterCount), NONE_PROP(kTaskPartitionedWriterCount), + NONE_PROP(kTaskMaxStorageBroadcastBytes), NUM_PROP(kConcurrentLifespansPerTask, 1), STR_PROP(kTaskMaxPartialAggregationMemory, "16MB"), NUM_PROP(kDriverMaxSplitPreload, 2), @@ -167,7 +169,9 @@ SystemConfig::SystemConfig() { NUM_PROP(kDriverStuckOperatorThresholdMs, 30 * 60 * 1000), NUM_PROP( kDriverCancelTasksWithStuckOperatorsThresholdMs, 40 * 60 * 1000), - NUM_PROP(kDriverNumStuckOperatorsToDetachWorker, std::round(0.5 * hardwareConcurrency())), + NUM_PROP( + kDriverNumStuckOperatorsToDetachWorker, + std::round(0.5 * hardwareConcurrency())), NUM_PROP(kSpillerNumCpuThreadsHwMultiplier, 1.0), STR_PROP(kSpillerFileCreateConfig, ""), STR_PROP(kSpillerDirectoryCreateConfig, ""), @@ -427,6 +431,10 @@ folly::Optional SystemConfig::taskPartitionedWriterCount() const { return optionalProperty(kTaskPartitionedWriterCount); } +folly::Optional SystemConfig::taskMaxStorageBroadcastBytes() const { + return optionalProperty(kTaskMaxStorageBroadcastBytes); +} + int32_t SystemConfig::concurrentLifespansPerTask() const { return optionalProperty(kConcurrentLifespansPerTask).value(); } diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index f205bfd552134..b22c8ee282b04 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -179,6 +179,14 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kTaskWriterCount{"task.writer-count"}; static constexpr std::string_view kTaskPartitionedWriterCount{ "task.partitioned-writer-count"}; + + /// Maximum number of bytes per task that can be broadcast to storage for + /// storage-based broadcast joins. This property is only applicable to + /// storage-based broadcast join operations, currently used in the Presto on + /// Spark native stack. When the broadcast data size exceeds this limit, the + /// query fails. + static constexpr std::string_view kTaskMaxStorageBroadcastBytes{ + "task.max-storage-broadcast-bytes"}; static constexpr std::string_view kConcurrentLifespansPerTask{ "task.concurrent-lifespans-per-task"}; static constexpr std::string_view kTaskMaxPartialAggregationMemory{ @@ -843,6 +851,8 @@ class SystemConfig : public ConfigBase { folly::Optional taskPartitionedWriterCount() const; + folly::Optional taskMaxStorageBroadcastBytes() const; + int32_t concurrentLifespansPerTask() const; double httpServerNumIoThreadsHwMultiplier() const; diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp b/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp index 3b1375a74aad0..15f97d2a1997a 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp @@ -16,16 +16,29 @@ #include #include #include "presto_cpp/external/json/nlohmann/json.hpp" +#include "presto_cpp/main/common/Exception.h" #include "presto_cpp/main/thrift/ThriftIO.h" #include "presto_cpp/main/thrift/gen-cpp2/presto_native_types.h" +#include "presto_cpp/presto_protocol/core/presto_protocol_core.h" #include "velox/common/file/File.h" #include "velox/vector/FlatVector.h" using namespace facebook::velox::exec; using namespace facebook::velox; +using namespace facebook::presto; namespace facebook::presto::operators { +#define PRESTO_BROADCAST_LIMIT_EXCEEDED(errorMessage) \ + _VELOX_THROW( \ + ::facebook::velox::VeloxRuntimeError, \ + ::facebook::velox::error_source::kErrorSourceRuntime.c_str(), \ + ::facebook::presto::error_code::kExceededLocalBroadcastJoinMemoryLimit \ + .c_str(), \ + /* isRetriable */ false, \ + "{}", \ + errorMessage); + namespace { std::string makeUuid() { return boost::lexical_cast(boost::uuids::random_generator()()); @@ -40,11 +53,13 @@ BroadcastFactory::BroadcastFactory(const std::string& basePath) std::unique_ptr BroadcastFactory::createWriter( uint64_t writeBufferSize, + uint64_t maxBroadcastBytes, velox::memory::MemoryPool* pool, std::unique_ptr serdeOptions) { fileSystem_->mkdir(basePath_); return std::make_unique( fmt::format("{}/file_broadcast_{}", basePath_, makeUuid()), + maxBroadcastBytes, writeBufferSize, std::move(serdeOptions), pool); @@ -69,6 +84,7 @@ std::unique_ptr BroadcastFileInfo::deserialize( BroadcastFileWriter::BroadcastFileWriter( const std::string& pathPrefix, + uint64_t maxBroadcastBytes, uint64_t writeBufferSize, std::unique_ptr serdeOptions, velox::memory::MemoryPool* pool) @@ -79,7 +95,8 @@ BroadcastFileWriter::BroadcastFileWriter( "", std::move(serdeOptions), getNamedVectorSerde(VectorSerde::Kind::kPresto), - pool) {} + pool), + maxBroadcastBytes_(maxBroadcastBytes) {} void BroadcastFileWriter::write(const RowVectorPtr& rowVector) { const auto numRows = rowVector->size(); @@ -89,6 +106,20 @@ void BroadcastFileWriter::write(const RowVectorPtr& rowVector) { numRows_ += numRows; } +void BroadcastFileWriter::updateWriteStats( + uint64_t writtenBytes, + uint64_t /* flushTimeNs */, + uint64_t /* fileWriteTimeNs */) { + writtenBytes_ += writtenBytes; + if (FOLLY_UNLIKELY(writtenBytes_ > maxBroadcastBytes_)) { + PRESTO_BROADCAST_LIMIT_EXCEEDED(fmt::format( + "Storage broadcast join exceeded per task broadcast limit " + "writtenBytes_ {} vs maxBroadcastBytes_ {}", + succinctBytes(writtenBytes_), + succinctBytes(maxBroadcastBytes_))); + } +} + uint64_t BroadcastFileWriter::flush() { const auto pageBytes = serializer::SerializedPageFileWriter::flush(); if (pageBytes != 0) { diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.h b/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.h index 983fa99efc534..8bc9e47420010 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.h +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.h @@ -40,6 +40,7 @@ class BroadcastFileWriter : velox::serializer::SerializedPageFileWriter { public: BroadcastFileWriter( const std::string& pathPrefix, + uint64_t maxBroadcastBytes, uint64_t writeBufferSize, std::unique_ptr serdeOptions, velox::memory::MemoryPool* pool); @@ -58,6 +59,11 @@ class BroadcastFileWriter : velox::serializer::SerializedPageFileWriter { velox::RowVectorPtr fileStats(); private: + void updateWriteStats( + uint64_t writtenBytes, + uint64_t /* flushTimeNs */, + uint64_t /* fileWriteTimeNs */) override; + uint64_t flush() override; void closeFile() override; @@ -70,6 +76,9 @@ class BroadcastFileWriter : velox::serializer::SerializedPageFileWriter { // [serialized-thrift-footer][footer_size(8)] void writeFooter(); + const uint64_t maxBroadcastBytes_; + + uint64_t writtenBytes_{0}; int64_t numRows_{0}; std::vector pageSizes_; velox::RowVectorPtr fileStats_{nullptr}; @@ -120,6 +129,7 @@ class BroadcastFactory { std::unique_ptr createWriter( uint64_t writeBufferSize, + uint64_t maxBroadcastBytes, velox::memory::MemoryPool* pool, std::unique_ptr serdeOptions); diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp b/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp index e39e7ea8a9b80..796a78e36f622 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp @@ -12,6 +12,7 @@ * limitations under the License. */ #include "presto_cpp/main/operators/BroadcastWrite.h" +#include "presto_cpp/main/common/Configs.h" #include "presto_cpp/main/operators/BroadcastFactory.h" using namespace facebook::velox::exec; @@ -55,10 +56,12 @@ class BroadcastWriteOperator : public Operator { serdeChannels_(calculateOutputChannels( planNode->inputType(), planNode->serdeRowType(), - planNode->serdeRowType())) { + planNode->serdeRowType())), + maxBroadcastBytes_(planNode->maxBroadcastBytes()) { auto fileBroadcast = BroadcastFactory(planNode->basePath()); fileBroadcastWriter_ = fileBroadcast.createWriter( 8 << 20, + planNode->maxBroadcastBytes(), operatorCtx_->pool(), getVectorSerdeOptions(ctx->queryConfig(), VectorSerde::Kind::kPresto)); } @@ -120,6 +123,7 @@ class BroadcastWriteOperator : public Operator { // Empty if column order in the serdeRowType_ is exactly the same as in input // or serdeRowType_ has no columns. const std::vector serdeChannels_; + const uint64_t maxBroadcastBytes_; std::unique_ptr fileBroadcastWriter_; bool finished_{false}; }; @@ -129,6 +133,7 @@ folly::dynamic BroadcastWriteNode::serialize() const { auto obj = PlanNode::serialize(); obj["broadcastWriteBasePath"] = ISerializable::serialize(basePath_); + obj["maxBroadcastBytes"] = maxBroadcastBytes_; obj["rowType"] = serdeRowType_->serialize(); obj["sources"] = ISerializable::serialize(sources_); return obj; @@ -141,6 +146,7 @@ velox::core::PlanNodePtr BroadcastWriteNode::create( deserializePlanNodeId(obj), ISerializable::deserialize( obj["broadcastWriteBasePath"], context), + obj["maxBroadcastBytes"].asInt(), ISerializable::deserialize(obj["rowType"]), ISerializable::deserialize>( obj["sources"], context)[0]); diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.h b/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.h index ad9ec3a94d783..43af4bc10bb43 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.h +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.h @@ -28,68 +28,15 @@ class BroadcastWriteNode : public velox::core::PlanNode { BroadcastWriteNode( const velox::core::PlanNodeId& id, const std::string& basePath, + uint64_t maxBroadcastBytes, velox::RowTypePtr serdeRowType, velox::core::PlanNodePtr source) : velox::core::PlanNode(id), basePath_{basePath}, - serdeRowType_{serdeRowType}, + maxBroadcastBytes_{maxBroadcastBytes}, + serdeRowType_{std::move(serdeRowType)}, sources_{std::move(source)} {} - class Builder { - public: - Builder() = default; - - explicit Builder(const BroadcastWriteNode& other) { - id_ = other.id(); - basePath_ = other.basePath(); - serdeRowType_ = other.serdeRowType(); - source_ = other.sources()[0]; - } - - Builder& id(velox::core::PlanNodeId id) { - id_ = std::move(id); - return *this; - } - - Builder& basePath(std::string basePath) { - basePath_ = std::move(basePath); - return *this; - } - - Builder& serdeRowType(velox::RowTypePtr serdeRowType) { - serdeRowType_ = std::move(serdeRowType); - return *this; - } - - Builder& source(velox::core::PlanNodePtr source) { - source_ = std::move(source); - return *this; - } - - std::shared_ptr build() const { - VELOX_USER_CHECK(id_.has_value(), "BroadcastWriteNode id is not set"); - VELOX_USER_CHECK( - basePath_.has_value(), "BroadcastWriteNode basePath is not set"); - VELOX_USER_CHECK( - serdeRowType_.has_value(), - "BroadcastWriteNode serdeRowType is not set"); - VELOX_USER_CHECK( - source_.has_value(), "BroadcastWriteNode source is not set"); - - return std::make_shared( - id_.value(), - basePath_.value(), - serdeRowType_.value(), - source_.value()); - } - - private: - std::optional id_; - std::optional basePath_; - std::optional serdeRowType_; - std::optional source_; - }; - folly::dynamic serialize() const override; static velox::core::PlanNodePtr create( @@ -113,6 +60,10 @@ class BroadcastWriteNode : public velox::core::PlanNode { return basePath_; } + uint64_t maxBroadcastBytes() const { + return maxBroadcastBytes_; + } + /// The desired schema of the serialized data. May include a subset of input /// columns, some columns may be duplicated, some columns may be missing, /// columns may appear in different order. @@ -128,6 +79,7 @@ class BroadcastWriteNode : public velox::core::PlanNode { void addDetails(std::stringstream& stream) const override {} const std::string basePath_; + const uint64_t maxBroadcastBytes_; const velox::RowTypePtr serdeRowType_; const std::vector sources_; }; diff --git a/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp index e060d36d48aa7..2c5da4ffd06c7 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp @@ -13,6 +13,7 @@ */ #include #include +#include "presto_cpp/main/common/Exception.h" #include "presto_cpp/main/operators/BroadcastExchangeSource.h" #include "presto_cpp/main/operators/BroadcastWrite.h" #include "presto_cpp/main/operators/tests/PlanBuilder.h" @@ -75,10 +76,12 @@ class BroadcastTest : public exec::test::OperatorTestBase, const std::string& basePath, const std::optional>& serdeLayout = std::nullopt) { - auto writerPlan = exec::test::PlanBuilder() - .values(data, true) - .addNode(addBroadcastWriteNode(basePath, serdeLayout)) - .planNode(); + auto writerPlan = + exec::test::PlanBuilder() + .values(data, true) + .addNode(addBroadcastWriteNode( + basePath, std::numeric_limits::max(), serdeLayout)) + .planNode(); auto serdeRowType = std::dynamic_pointer_cast(writerPlan) @@ -393,6 +396,7 @@ TEST_P(BroadcastTest, broadcastFileWriter) { { auto writer = std::make_unique( filePath + "_success", + std::numeric_limits::max(), 1024, getVectorSerdeOptions(GetParam().compressionKind), pool()); @@ -426,6 +430,7 @@ TEST_P(BroadcastTest, broadcastFileWriter) { { auto writer = std::make_unique( filePath + "_before_no_more_data", + std::numeric_limits::max(), 1024, getVectorSerdeOptions(GetParam().compressionKind), pool()); @@ -441,6 +446,7 @@ TEST_P(BroadcastTest, broadcastFileWriter) { { auto writer = std::make_unique( filePath + "_write_after_no_more", + std::numeric_limits::max(), 1024, getVectorSerdeOptions(GetParam().compressionKind), pool()); @@ -456,6 +462,7 @@ TEST_P(BroadcastTest, broadcastFileWriter) { { auto writer = std::make_unique( filePath + "_multiple_no_more", + std::numeric_limits::max(), 1024, getVectorSerdeOptions(GetParam().compressionKind), pool()); @@ -474,6 +481,7 @@ TEST_P(BroadcastTest, broadcastFileWriter) { { auto writer = std::make_unique( filePath + "_multiple_stats", + std::numeric_limits::max(), 1024, getVectorSerdeOptions(GetParam().compressionKind), pool()); @@ -515,6 +523,7 @@ TEST_P(BroadcastTest, broadcastFileWriter) { { auto writer = std::make_unique( filePath + "_no_data", + std::numeric_limits::max(), 1024, getVectorSerdeOptions(GetParam().compressionKind), pool()); @@ -532,6 +541,7 @@ TEST_P(BroadcastTest, broadcastFileWriter) { auto writer = std::make_unique( filePath + "_empty_data", + std::numeric_limits::max(), 1024, getVectorSerdeOptions(GetParam().compressionKind), pool()); @@ -584,6 +594,7 @@ TEST_P(BroadcastTest, endToEndWithDifferentWriterPageSizes) { // Create writer with specific buffer size directly auto writer = std::make_unique( filePath, + std::numeric_limits::max(), kPageSizes[i], getVectorSerdeOptions(GetParam().compressionKind), pool()); @@ -636,6 +647,95 @@ TEST_P(BroadcastTest, endToEndWithDifferentWriterPageSizes) { } } +TEST_P(BroadcastTest, exceedBroadcastFileWriterLimit) { + auto tempDirectoryPath = exec::test::TempDirectoryPath::create(); + auto fileSystem = + velox::filesystems::getFileSystem(tempDirectoryPath->getPath(), nullptr); + fileSystem->mkdir(tempDirectoryPath->getPath()); + + auto filePath = + fmt::format("{}/broadcast_limit_test", tempDirectoryPath->getPath()); + + auto testData = makeRowVector({ + makeFlatVector(100, [](auto row) { return row; }), + makeFlatVector(100, [](auto row) { return row * 10; }), + makeFlatVector( + 100, + [](auto row) { + return fmt::format("test_string_with_some_length_{}", row); + }), + }); + + auto writer = std::make_unique( + filePath, + 100, + 1024, + getVectorSerdeOptions(GetParam().compressionKind), + pool()); + + try { + writer->write(testData); + FAIL() << "Expected PrestoException to be thrown"; + } catch (const VeloxException& e) { + EXPECT_EQ(e.errorSource(), velox::error_source::kErrorSourceRuntime); + EXPECT_EQ( + e.errorCode(), + presto::error_code::kExceededLocalBroadcastJoinMemoryLimit); + EXPECT_TRUE( + e.message().find( + "Storage broadcast join exceeded per task broadcast limit") != + std::string::npos); + } +} + +TEST_P(BroadcastTest, broadcastJoinExceedLimit) { + auto tempDirectoryPath = exec::test::TempDirectoryPath::create(); + + // Create build side data (data to broadcast) + auto buildData = makeRowVector({ + makeFlatVector(100, [](auto row) { return row % 10; }), + makeFlatVector(100, [](auto row) { return row * 100; }), + makeFlatVector( + 100, + [](auto row) { + return fmt::format("build_side_string_with_length_{}", row); + }), + }); + + // Use a very small limit to trigger the exception + const uint64_t smallLimit = 100; // 100 bytes - too small for the data + + auto writerPlan = + exec::test::PlanBuilder() + .values({buildData}) + .addNode(addBroadcastWriteNode( + tempDirectoryPath->getPath(), smallLimit, std::nullopt)) + .planNode(); + + exec::CursorParameters params; + params.planNode = writerPlan; + + std::unordered_map configs; + configs[core::QueryConfig::kShuffleCompressionKind] = + common::compressionKindToString(GetParam().compressionKind); + params.queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(std::move(configs))); + + try { + auto [writeTaskCursor, writeResults] = exec::test::readCursor(params); + FAIL() << "Expected PrestoException to be thrown during broadcast write"; + } catch (const VeloxException& e) { + EXPECT_EQ(e.errorSource(), velox::error_source::kErrorSourceRuntime); + EXPECT_EQ( + e.errorCode(), + presto::error_code::kExceededLocalBroadcastJoinMemoryLimit); + EXPECT_TRUE( + e.message().find( + "Storage broadcast join exceeded per task broadcast limit") != + std::string::npos); + } +} + INSTANTIATE_TEST_SUITE_P( BroadcastTest, BroadcastTest, diff --git a/presto-native-execution/presto_cpp/main/operators/tests/PlanBuilder.cpp b/presto-native-execution/presto_cpp/main/operators/tests/PlanBuilder.cpp index e5a1c470df219..382761cf2cf3f 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/PlanBuilder.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/PlanBuilder.cpp @@ -90,6 +90,7 @@ std::function addShuffleWriteNode( std::function addBroadcastWriteNode( const std::string& basePath, + uint64_t maxBroadcastBytes, const std::optional>& outputLayout) { return [=](core::PlanNodeId nodeId, core::PlanNodePtr source) -> core::PlanNodePtr { @@ -105,7 +106,7 @@ std::function addBroadcastWriteNode( } return std::make_shared( - nodeId, basePath, outputType, std::move(source)); + nodeId, basePath, maxBroadcastBytes, outputType, std::move(source)); }; } } // namespace facebook::presto::operators diff --git a/presto-native-execution/presto_cpp/main/operators/tests/PlanBuilder.h b/presto-native-execution/presto_cpp/main/operators/tests/PlanBuilder.h index 89e88d70fb63e..90ba07809f832 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/PlanBuilder.h +++ b/presto-native-execution/presto_cpp/main/operators/tests/PlanBuilder.h @@ -46,10 +46,12 @@ std::function /// Add BroadcastWriteNode for writing broadcast data to files under /// specified basePath +/// @param maxBroadcastBytes Maximum size in bytes for broadcast data /// @param outputLayout Optional ordered list of input column names to use as /// serde layout. Input columns may appear in different order, some columns may /// be missing and other columns may be duplicated. addBroadcastWriteNode( const std::string& basePath, + uint64_t maxBroadcastBytes = std::numeric_limits::max(), const std::optional>& outputLayout = std::nullopt); } // namespace facebook::presto::operators diff --git a/presto-native-execution/presto_cpp/main/operators/tests/PlanNodeBuilderTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/PlanNodeBuilderTest.cpp index 11041d99abbed..8dd4642c2b868 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/PlanNodeBuilderTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/PlanNodeBuilderTest.cpp @@ -34,6 +34,7 @@ const std::shared_ptr kShuffleRead = TEST(PlanNodeBuilderTest, testBroadcastWrite) { const core::PlanNodeId id = "broadcast_write_id"; const std::string basePath = "base_path"; + const uint64_t maxBroadcastBytes = std::numeric_limits::max(); const RowTypePtr serdeRowType = ROW({"write_c0", "write_c1"}, {BIGINT(), DOUBLE()}); @@ -41,21 +42,16 @@ TEST(PlanNodeBuilderTest, testBroadcastWrite) { [&](const std::shared_ptr& node) { EXPECT_EQ(node->id(), id); EXPECT_EQ(node->basePath(), basePath); + EXPECT_EQ(node->maxBroadcastBytes(), maxBroadcastBytes); EXPECT_EQ(node->serdeRowType(), serdeRowType); EXPECT_EQ( node->sources(), std::vector{kShuffleRead}); }; - const auto node = BroadcastWriteNode::Builder() - .id(id) - .basePath(basePath) - .serdeRowType(serdeRowType) - .source(kShuffleRead) - .build(); - verify(node); + const auto node = std::make_shared( + id, basePath, maxBroadcastBytes, serdeRowType, kShuffleRead); - const auto node2 = BroadcastWriteNode::Builder(*node).build(); - verify(node2); + verify(node); } TEST(PlanNodeBuilderTest, testPartitionAndSerialize) { diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp index e649d2425ed84..26e6e18e77706 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp @@ -1287,15 +1287,15 @@ core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan( if (SessionProperties::instance()->useVeloxGeospatialJoin()) { return std::make_shared( - node->id, - joinType, - exprConverter_.toVeloxExpr(node->filter), - exprConverter_.toVeloxExpr(node->probeGeometryVariable), - exprConverter_.toVeloxExpr(node->buildGeometryVariable), - radiusVariable, - toVeloxQueryPlan(node->left, tableWriteInfo, taskId), - toVeloxQueryPlan(node->right, tableWriteInfo, taskId), - toRowType(node->outputVariables, typeParser_)); + node->id, + joinType, + exprConverter_.toVeloxExpr(node->filter), + exprConverter_.toVeloxExpr(node->probeGeometryVariable), + exprConverter_.toVeloxExpr(node->buildGeometryVariable), + radiusVariable, + toVeloxQueryPlan(node->left, tableWriteInfo, taskId), + toVeloxQueryPlan(node->right, tableWriteInfo, taskId), + toRowType(node->outputVariables, typeParser_)); } return std::make_shared( node->id, @@ -2260,11 +2260,15 @@ core::PlanFragment VeloxBatchQueryPlanConverter::toVeloxQueryPlan( if (partitionedOutputNode->isBroadcast()) { VELOX_USER_CHECK_NOT_NULL( broadcastBasePath_, "broadcastBasePath is required"); + const auto maxBroadcastBytesOpt = + SystemConfig::instance()->taskMaxStorageBroadcastBytes(); // TODO - Use original plan node with root node and aggregate operator // stats for additional nodes. auto broadcastWriteNode = std::make_shared( fmt::format("{}.bw", partitionedOutputNode->id()), *broadcastBasePath_, + maxBroadcastBytesOpt.has_value() ? maxBroadcastBytesOpt.value() + : std::numeric_limits::max(), partitionedOutputNode->outputType(), core::LocalPartitionNode::gather( "broadcast-write-gather", @@ -2412,7 +2416,8 @@ void parseIndexLookupCondition( auto isLookupVariable = [&](const protocol::VariableReferenceExpression& var) { if (lookupVariables.empty()) { - return true; // If empty, treat all variables as lookup variables for compatibility + return true; // If empty, treat all variables as lookup variables for + // compatibility } return std::find_if( lookupVariables.begin(), @@ -2462,7 +2467,8 @@ void parseIndexLookupCondition( const auto conditionColumnExpr = exprConverter.toVeloxExpr(contains->arguments[0]); - if (acceptConstant || !core::TypedExprs::isConstant(conditionColumnExpr)) { + if (acceptConstant || + !core::TypedExprs::isConstant(conditionColumnExpr)) { joinConditionPtrs.push_back( std::make_shared( keyColumnExpr, conditionColumnExpr));