Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions presto-docs/src/main/sphinx/presto_cpp/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,17 @@ In streaming aggregation, wait until there are enough output rows
to produce a batch of the size specified by this property. If set to ``0``, then
``Operator::outputBatchRows`` is used as the minimum number of output batch rows.

``native_merge_join_output_batch_start_size``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Default value:** ``0``

Native Execution only. Initial output batch size in rows for MergeJoin operator.
When non-zero, the batch size starts at this value and is dynamically adjusted
based on the average row size of previous output batches. When zero (default),
dynamic adjustment is disabled and the batch size is fixed at ``preferred_output_batch_rows``.

``native_request_data_sizes_max_wait_sec``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class NativeWorkerSessionPropertyProvider
public static final String NATIVE_USE_VELOX_GEOSPATIAL_JOIN = "native_use_velox_geospatial_join";
public static final String NATIVE_AGGREGATION_COMPACTION_BYTES_THRESHOLD = "native_aggregation_compaction_bytes_threshold";
public static final String NATIVE_AGGREGATION_COMPACTION_UNUSED_MEMORY_RATIO = "native_aggregation_compaction_unused_memory_ratio";
public static final String NATIVE_MERGE_JOIN_OUTPUT_BATCH_START_SIZE = "native_merge_join_output_batch_start_size";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -456,6 +457,14 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig)
"The value is in the range of [0, 1). NOTE: Currently only applies to approx_most_frequent " +
"aggregate with StringView type during global aggregation.",
0.25,
!nativeExecution),
integerProperty(
NATIVE_MERGE_JOIN_OUTPUT_BATCH_START_SIZE,
"Initial output batch size in rows for MergeJoin operator. When non-zero, " +
"the batch size starts at this value and is dynamically adjusted based on " +
"the average row size of previous output batches. When zero (default), " +
"dynamic adjustment is disabled and the batch size is fixed at preferred_output_batch_rows.",
0,
!nativeExecution));
}

Expand Down
12 changes: 12 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,18 @@ SessionProperties::SessionProperties() {
QueryConfig::kMaxOutputBatchRows,
std::to_string(c.maxOutputBatchRows()));

addSessionProperty(
kMergeJoinOutputBatchStartSize,
"Initial output batch size in rows for MergeJoin operator. When non-zero, "
"the batch size starts at this value and is dynamically adjusted based on "
"the average row size of previous output batches. When zero (default), "
"dynamic adjustment is disabled and the batch size is fixed at "
"preferredOutputBatchRows.",
INTEGER(),
false,
QueryConfig::kMergeJoinOutputBatchStartSize,
std::to_string(c.mergeJoinOutputBatchStartSize()));

addSessionProperty(
kRowSizeTrackingMode,
"Enable (reader) row size tracker as a fallback to file level row size estimates.",
Expand Down
8 changes: 8 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,14 @@ class SessionProperties {
/// output rows.
static constexpr const char* kMaxOutputBatchRows = "max_output_batch_rows";

/// Initial output batch size in rows for MergeJoin operator. When non-zero,
/// the batch size starts at this value and is dynamically adjusted based on
/// the average row size of previous output batches. When zero (default),
/// dynamic adjustment is disabled and the batch size is fixed at
/// preferredOutputBatchRows.
static constexpr const char* kMergeJoinOutputBatchStartSize =
"native_merge_join_output_batch_start_size";

/// Enable (reader) row size tracker as a fallback to file level row size
/// estimates.
static constexpr const char* kRowSizeTrackingMode = "row_size_tracking_mode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ TEST_F(SessionPropertiesTest, validateMapping) {
{SessionProperties::kAggregationCompactionBytesThreshold,
core::QueryConfig::kAggregationCompactionBytesThreshold},
{SessionProperties::kAggregationCompactionUnusedMemoryRatio,
core::QueryConfig::kAggregationCompactionUnusedMemoryRatio}};
core::QueryConfig::kAggregationCompactionUnusedMemoryRatio},
{SessionProperties::kMergeJoinOutputBatchStartSize,
core::QueryConfig::kMergeJoinOutputBatchStartSize}};

const auto sessionProperties = SessionProperties::instance();
for (const auto& [sessionProperty, expectedVeloxConfig] : expectedMappings) {
Expand Down
Loading