diff --git a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst index 826fbd7c9e3e5..cf47613188040 100644 --- a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst +++ b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst @@ -481,6 +481,17 @@ In streaming aggregation, wait until there are enough output rows to produce a batch of the size specified by this property. If set to ``0``, then ``Operator::outputBatchRows`` is used as the minimum number of output batch rows. +``native_merge_join_output_batch_start_size`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``integer`` +* **Default value:** ``0`` + +Native Execution only. Initial output batch size in rows for MergeJoin operator. +When non-zero, the batch size starts at this value and is dynamically adjusted +based on the average row size of previous output batches. When zero (default), +dynamic adjustment is disabled and the batch size is fixed at ``preferred_output_batch_rows``. + ``native_request_data_sizes_max_wait_sec`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java b/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java index 499e1f892d4b9..88d2007965575 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java @@ -91,6 +91,7 @@ public class NativeWorkerSessionPropertyProvider public static final String NATIVE_USE_VELOX_GEOSPATIAL_JOIN = "native_use_velox_geospatial_join"; public static final String NATIVE_AGGREGATION_COMPACTION_BYTES_THRESHOLD = "native_aggregation_compaction_bytes_threshold"; public static final String NATIVE_AGGREGATION_COMPACTION_UNUSED_MEMORY_RATIO = "native_aggregation_compaction_unused_memory_ratio"; + public static final String NATIVE_MERGE_JOIN_OUTPUT_BATCH_START_SIZE = "native_merge_join_output_batch_start_size"; private final List> sessionProperties; @@ -456,6 +457,14 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig) "The value is in the range of [0, 1). NOTE: Currently only applies to approx_most_frequent " + "aggregate with StringView type during global aggregation.", 0.25, + !nativeExecution), + integerProperty( + NATIVE_MERGE_JOIN_OUTPUT_BATCH_START_SIZE, + "Initial output batch size in rows for MergeJoin operator. When non-zero, " + + "the batch size starts at this value and is dynamically adjusted based on " + + "the average row size of previous output batches. When zero (default), " + + "dynamic adjustment is disabled and the batch size is fixed at preferred_output_batch_rows.", + 0, !nativeExecution)); } diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.cpp b/presto-native-execution/presto_cpp/main/SessionProperties.cpp index 92677e46b886e..edc3e5d844100 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.cpp +++ b/presto-native-execution/presto_cpp/main/SessionProperties.cpp @@ -578,6 +578,18 @@ SessionProperties::SessionProperties() { QueryConfig::kMaxOutputBatchRows, std::to_string(c.maxOutputBatchRows())); + addSessionProperty( + kMergeJoinOutputBatchStartSize, + "Initial output batch size in rows for MergeJoin operator. When non-zero, " + "the batch size starts at this value and is dynamically adjusted based on " + "the average row size of previous output batches. When zero (default), " + "dynamic adjustment is disabled and the batch size is fixed at " + "preferredOutputBatchRows.", + INTEGER(), + false, + QueryConfig::kMergeJoinOutputBatchStartSize, + std::to_string(c.mergeJoinOutputBatchStartSize())); + addSessionProperty( kRowSizeTrackingMode, "Enable (reader) row size tracker as a fallback to file level row size estimates.", diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.h b/presto-native-execution/presto_cpp/main/SessionProperties.h index e73fd6bbacfac..77ab322e41721 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.h +++ b/presto-native-execution/presto_cpp/main/SessionProperties.h @@ -373,6 +373,14 @@ class SessionProperties { /// output rows. static constexpr const char* kMaxOutputBatchRows = "max_output_batch_rows"; + /// Initial output batch size in rows for MergeJoin operator. When non-zero, + /// the batch size starts at this value and is dynamically adjusted based on + /// the average row size of previous output batches. When zero (default), + /// dynamic adjustment is disabled and the batch size is fixed at + /// preferredOutputBatchRows. + static constexpr const char* kMergeJoinOutputBatchStartSize = + "native_merge_join_output_batch_start_size"; + /// Enable (reader) row size tracker as a fallback to file level row size /// estimates. static constexpr const char* kRowSizeTrackingMode = "row_size_tracking_mode"; diff --git a/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp b/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp index fc550dfe63dc3..4d56b2903349b 100644 --- a/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp @@ -133,7 +133,9 @@ TEST_F(SessionPropertiesTest, validateMapping) { {SessionProperties::kAggregationCompactionBytesThreshold, core::QueryConfig::kAggregationCompactionBytesThreshold}, {SessionProperties::kAggregationCompactionUnusedMemoryRatio, - core::QueryConfig::kAggregationCompactionUnusedMemoryRatio}}; + core::QueryConfig::kAggregationCompactionUnusedMemoryRatio}, + {SessionProperties::kMergeJoinOutputBatchStartSize, + core::QueryConfig::kMergeJoinOutputBatchStartSize}}; const auto sessionProperties = SessionProperties::instance(); for (const auto& [sessionProperty, expectedVeloxConfig] : expectedMappings) {