Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,15 @@ The maximum bytes to buffer per PartitionedOutput operator to avoid creating tin
For PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator would buffer up to that number of
bytes / number of destinations for each destination before producing a SerializedPage. Default is 32MB.

``native_partitioned_output_eager_flush``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

Native Execution only. If true, the PartitionedOutput operator will flush rows eagerly, without waiting
until buffers reach a certain size. Default is false.

``native_max_local_exchange_partition_count``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class NativeWorkerSessionPropertyProvider
public static final String NATIVE_MAX_EXTENDED_PARTIAL_AGGREGATION_MEMORY = "native_max_extended_partial_aggregation_memory";
public static final String NATIVE_MAX_SPILL_BYTES = "native_max_spill_bytes";
public static final String NATIVE_MAX_PAGE_PARTITIONING_BUFFER_SIZE = "native_max_page_partitioning_buffer_size";
public static final String NATIVE_PARTITIONED_OUTPUT_EAGER_FLUSH = "native_partitioned_output_eager_flush";
public static final String NATIVE_MAX_OUTPUT_BUFFER_SIZE = "native_max_output_buffer_size";
public static final String NATIVE_QUERY_TRACE_ENABLED = "native_query_trace_enabled";
public static final String NATIVE_QUERY_TRACE_DIR = "native_query_trace_dir";
Expand Down Expand Up @@ -317,6 +318,11 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig)
"producing a SerializedPage.",
24L << 20,
!nativeExecution),
booleanProperty(NATIVE_PARTITIONED_OUTPUT_EAGER_FLUSH,
"Native Execution only. If true, the PartitionedOutput operator will flush rows eagerly, without " +
"waiting until buffers reach certain size. Default is false.",
false,
!nativeExecution),
integerProperty(
NATIVE_MAX_LOCAL_EXCHANGE_PARTITION_COUNT,
"Maximum number of partitions created by a local exchange. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,15 @@ SessionProperties::SessionProperties() {
QueryConfig::kMaxPartitionedOutputBufferSize,
std::to_string(c.maxPartitionedOutputBufferSize()));

addSessionProperty(
kPartitionedOutputEagerFlush,
"If true, the PartitionedOutput operator will flush rows eagerly, without"
" waiting until buffers reach a certain size. Default is false.",
BOOLEAN(),
false,
QueryConfig::kPartitionedOutputEagerFlush,
"false");

// If `legacy_timestamp` is true, the coordinator expects timestamp
// conversions without a timezone to be converted to the user's
// session_timezone.
Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ class SessionProperties {
static constexpr const char* kMaxPartitionedOutputBufferSize =
"native_max_page_partitioning_buffer_size";

/// If true, the PartitionedOutput operator will flush rows eagerly, without
/// waiting until buffers reach certain size. Default is false.
static constexpr const char* kPartitionedOutputEagerFlush =
"native_partitioned_output_eager_flush";

/// Maximum number of partitions created by a local exchange.
/// Affects concurrency for pipelines containing LocalPartitionNode.
static constexpr const char* kMaxLocalExchangePartitionCount =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ TEST_F(SessionPropertiesTest, validateMapping) {
core::QueryConfig::kMaxOutputBufferSize},
{SessionProperties::kMaxPartitionedOutputBufferSize,
core::QueryConfig::kMaxPartitionedOutputBufferSize},
{SessionProperties::kPartitionedOutputEagerFlush,
core::QueryConfig::kPartitionedOutputEagerFlush},
{SessionProperties::kLegacyTimestamp,
core::QueryConfig::kAdjustTimestampToTimezone},
{SessionProperties::kDriverCpuTimeSliceLimitMs,
Expand Down
Loading