From 84c7c086a5f2fca4b59686e7368b563e1158631c Mon Sep 17 00:00:00 2001 From: Xiao Du Date: Wed, 4 Mar 2026 10:58:46 -0800 Subject: [PATCH] feat: Add session property aggregation_memory_compaction_reclaim_enabled (#27221) Summary: Per title Reviewed By: xiaoxmeng Differential Revision: D94130609 --- .../src/main/sphinx/presto_cpp/properties-session.rst | 11 +++++++++++ .../NativeWorkerSessionPropertyProvider.java | 8 ++++++++ .../presto_cpp/main/SessionProperties.cpp | 11 +++++++++++ .../presto_cpp/main/SessionProperties.h | 7 +++++++ .../presto_cpp/main/tests/SessionPropertiesTest.cpp | 2 ++ 5 files changed, 39 insertions(+) diff --git a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst index 6394964ecfaaf..fd2357cf3f19e 100644 --- a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst +++ b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst @@ -589,6 +589,17 @@ Native Execution only. Ratio of unused (evicted) bytes to total bytes that trigg compaction. The value is in the range of [0, 1). Currently only applies to approx_most_frequent aggregate with StringView type during global aggregation. +``native_aggregation_memory_compaction_reclaim_enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``boolean`` +* **Default value:** ``false`` + +Native Execution only. If true, enables lightweight memory compaction before +spilling during memory reclaim in aggregation. When enabled, the aggregation +operator will try to compact aggregate function state (for example, free dead strings) +before resorting to spilling. + ``optimizer.optimize_top_n_rank`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java b/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java index 88d2007965575..ab3e522197617 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java @@ -91,6 +91,7 @@ public class NativeWorkerSessionPropertyProvider public static final String NATIVE_USE_VELOX_GEOSPATIAL_JOIN = "native_use_velox_geospatial_join"; public static final String NATIVE_AGGREGATION_COMPACTION_BYTES_THRESHOLD = "native_aggregation_compaction_bytes_threshold"; public static final String NATIVE_AGGREGATION_COMPACTION_UNUSED_MEMORY_RATIO = "native_aggregation_compaction_unused_memory_ratio"; + public static final String NATIVE_AGGREGATION_MEMORY_COMPACTION_RECLAIM_ENABLED = "native_aggregation_memory_compaction_reclaim_enabled"; public static final String NATIVE_MERGE_JOIN_OUTPUT_BATCH_START_SIZE = "native_merge_join_output_batch_start_size"; private final List> sessionProperties; @@ -458,6 +459,13 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig) "aggregate with StringView type during global aggregation.", 0.25, !nativeExecution), + booleanProperty( + NATIVE_AGGREGATION_MEMORY_COMPACTION_RECLAIM_ENABLED, + "If true, enables lightweight memory compaction before spilling during " + + "memory reclaim in aggregation. When enabled, the aggregation operator " + + "will try to compact aggregate function state before resorting to spilling.", + false, + !nativeExecution), integerProperty( NATIVE_MERGE_JOIN_OUTPUT_BATCH_START_SIZE, "Initial output batch size in rows for MergeJoin operator. When non-zero, " + diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.cpp b/presto-native-execution/presto_cpp/main/SessionProperties.cpp index edc3e5d844100..dfadaf2d8fe07 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.cpp +++ b/presto-native-execution/presto_cpp/main/SessionProperties.cpp @@ -630,6 +630,17 @@ SessionProperties::SessionProperties() { false, QueryConfig::kAggregationCompactionUnusedMemoryRatio, std::to_string(c.aggregationCompactionUnusedMemoryRatio())); + + addSessionProperty( + kAggregationMemoryCompactionReclaimEnabled, + "If true, enables lightweight memory compaction before spilling during " + "memory reclaim in aggregation. When enabled, the aggregation operator " + "will try to compact aggregate function state (for example, free dead strings) " + "before resorting to spilling. Disabled by default.", + BOOLEAN(), + false, + QueryConfig::kAggregationMemoryCompactionReclaimEnabled, + boolToString(c.aggregationMemoryCompactionReclaimEnabled())); } const std::string SessionProperties::toVeloxConfig( diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.h b/presto-native-execution/presto_cpp/main/SessionProperties.h index 77ab322e41721..2e4c454f009c7 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.h +++ b/presto-native-execution/presto_cpp/main/SessionProperties.h @@ -409,6 +409,13 @@ class SessionProperties { static constexpr const char* kAggregationCompactionUnusedMemoryRatio = "native_aggregation_compaction_unused_memory_ratio"; + /// If true, enables lightweight memory compaction before spilling during + /// memory reclaim in aggregation. When enabled, the aggregation operator + /// will try to compact aggregate function state (e.g., free dead strings) + /// before resorting to spilling. Disabled by default. + static constexpr const char* kAggregationMemoryCompactionReclaimEnabled = + "native_aggregation_memory_compaction_reclaim_enabled"; + inline bool hasVeloxConfig(const std::string& key) { auto sessionProperty = sessionProperties_.find(key); if (sessionProperty == sessionProperties_.end()) { diff --git a/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp b/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp index 4d56b2903349b..0244760c21acb 100644 --- a/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp @@ -134,6 +134,8 @@ TEST_F(SessionPropertiesTest, validateMapping) { core::QueryConfig::kAggregationCompactionBytesThreshold}, {SessionProperties::kAggregationCompactionUnusedMemoryRatio, core::QueryConfig::kAggregationCompactionUnusedMemoryRatio}, + {SessionProperties::kAggregationMemoryCompactionReclaimEnabled, + core::QueryConfig::kAggregationMemoryCompactionReclaimEnabled}, {SessionProperties::kMergeJoinOutputBatchStartSize, core::QueryConfig::kMergeJoinOutputBatchStartSize}};