From 7753aee86677b1e8b551f6396d7c3152dc7c35ba Mon Sep 17 00:00:00 2001 From: Sergey Pershin Date: Fri, 30 Jan 2026 15:03:40 -0800 Subject: [PATCH] Add 'native_partitioned_output_eager_flush' session property --- .../src/main/sphinx/presto_cpp/properties-session.rst | 9 +++++++++ .../NativeWorkerSessionPropertyProvider.java | 6 ++++++ .../presto_cpp/main/SessionProperties.cpp | 9 +++++++++ .../presto_cpp/main/SessionProperties.h | 5 +++++ .../presto_cpp/main/tests/SessionPropertiesTest.cpp | 2 ++ 5 files changed, 31 insertions(+) diff --git a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst index cac0763034dba..826fbd7c9e3e5 100644 --- a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst +++ b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst @@ -300,6 +300,15 @@ The maximum bytes to buffer per PartitionedOutput operator to avoid creating tin For PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator would buffer up to that number of bytes / number of destinations for each destination before producing a SerializedPage. Default is 32MB. +``native_partitioned_output_eager_flush`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``boolean`` +* **Default value:** ``false`` + +Native Execution only. If true, the PartitionedOutput operator will flush rows eagerly, without waiting +until buffers reach a certain size. Default is false. + ``native_max_local_exchange_partition_count`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java b/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java index 30d6818998cf5..499e1f892d4b9 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java @@ -62,6 +62,7 @@ public class NativeWorkerSessionPropertyProvider public static final String NATIVE_MAX_EXTENDED_PARTIAL_AGGREGATION_MEMORY = "native_max_extended_partial_aggregation_memory"; public static final String NATIVE_MAX_SPILL_BYTES = "native_max_spill_bytes"; public static final String NATIVE_MAX_PAGE_PARTITIONING_BUFFER_SIZE = "native_max_page_partitioning_buffer_size"; + public static final String NATIVE_PARTITIONED_OUTPUT_EAGER_FLUSH = "native_partitioned_output_eager_flush"; public static final String NATIVE_MAX_OUTPUT_BUFFER_SIZE = "native_max_output_buffer_size"; public static final String NATIVE_QUERY_TRACE_ENABLED = "native_query_trace_enabled"; public static final String NATIVE_QUERY_TRACE_DIR = "native_query_trace_dir"; @@ -317,6 +318,11 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig) "producing a SerializedPage.", 24L << 20, !nativeExecution), + booleanProperty(NATIVE_PARTITIONED_OUTPUT_EAGER_FLUSH, + "Native Execution only. If true, the PartitionedOutput operator will flush rows eagerly, without " + + "waiting until buffers reach certain size. Default is false.", + false, + !nativeExecution), integerProperty( NATIVE_MAX_LOCAL_EXCHANGE_PARTITION_COUNT, "Maximum number of partitions created by a local exchange. " + diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.cpp b/presto-native-execution/presto_cpp/main/SessionProperties.cpp index 39781c0e778c3..92677e46b886e 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.cpp +++ b/presto-native-execution/presto_cpp/main/SessionProperties.cpp @@ -351,6 +351,15 @@ SessionProperties::SessionProperties() { QueryConfig::kMaxPartitionedOutputBufferSize, std::to_string(c.maxPartitionedOutputBufferSize())); + addSessionProperty( + kPartitionedOutputEagerFlush, + "If true, the PartitionedOutput operator will flush rows eagerly, without" + " waiting until buffers reach a certain size. Default is false.", + BOOLEAN(), + false, + QueryConfig::kPartitionedOutputEagerFlush, + "false"); + // If `legacy_timestamp` is true, the coordinator expects timestamp // conversions without a timezone to be converted to the user's // session_timezone. diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.h b/presto-native-execution/presto_cpp/main/SessionProperties.h index ebe416e1e595c..e73fd6bbacfac 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.h +++ b/presto-native-execution/presto_cpp/main/SessionProperties.h @@ -272,6 +272,11 @@ class SessionProperties { static constexpr const char* kMaxPartitionedOutputBufferSize = "native_max_page_partitioning_buffer_size"; + /// If true, the PartitionedOutput operator will flush rows eagerly, without + /// waiting until buffers reach certain size. Default is false. + static constexpr const char* kPartitionedOutputEagerFlush = + "native_partitioned_output_eager_flush"; + /// Maximum number of partitions created by a local exchange. /// Affects concurrency for pipelines containing LocalPartitionNode. static constexpr const char* kMaxLocalExchangePartitionCount = diff --git a/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp b/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp index 016d61e52ce26..fc550dfe63dc3 100644 --- a/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp @@ -86,6 +86,8 @@ TEST_F(SessionPropertiesTest, validateMapping) { core::QueryConfig::kMaxOutputBufferSize}, {SessionProperties::kMaxPartitionedOutputBufferSize, core::QueryConfig::kMaxPartitionedOutputBufferSize}, + {SessionProperties::kPartitionedOutputEagerFlush, + core::QueryConfig::kPartitionedOutputEagerFlush}, {SessionProperties::kLegacyTimestamp, core::QueryConfig::kAdjustTimestampToTimezone}, {SessionProperties::kDriverCpuTimeSliceLimitMs,