From 37ce2c8d5e7b127aba67a9bb65c79201d14858bf Mon Sep 17 00:00:00 2001 From: Xiao Du Date: Mon, 5 Jan 2026 06:19:15 -0800 Subject: [PATCH] feat: Add session properties for aggregation compaction (#26874) Summary: Summary This diff adds two new session properties to configure string compaction for the approx_most_frequent aggregate function during global aggregation: 1. native_aggregation_compaction_bytes_threshold (BIGINT, default: 0) Memory threshold in bytes for triggering string compaction When total string storage exceeds this limit with high unused memory ratio, compaction is triggered to reclaim dead strings Disabled by default (0) 2. native_aggregation_compaction_unused_memory_ratio (DOUBLE, default: 0.25) Ratio of unused (evicted) bytes to total bytes that triggers compaction Value is between 0.0 and 1.0 Reviewed By: xiaoxmeng Differential Revision: D89909129 --- .../sphinx/presto_cpp/properties-session.rst | 24 +++++++++++++++++++ .../NativeWorkerSessionPropertyProvider.java | 18 ++++++++++++++ .../presto_cpp/main/SessionProperties.cpp | 23 ++++++++++++++++++ .../presto_cpp/main/SessionProperties.h | 18 ++++++++++++++ .../main/tests/SessionPropertiesTest.cpp | 6 ++++- 5 files changed, 88 insertions(+), 1 deletion(-) diff --git a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst index 4a9623a1af2be..cac0763034dba 100644 --- a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst +++ b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst @@ -544,3 +544,27 @@ output for each input batch. If this is true, then the protocol::SpatialJoinNode is converted to a velox::core::SpatialJoinNode. Otherwise, it is converted to a velox::core::NestedLoopJoinNode. + +``native_aggregation_compaction_bytes_threshold`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``bigint`` +* **Default value:** ``0`` + +Native Execution only. Memory threshold in bytes for triggering string compaction +during global aggregation. When total string storage exceeds this limit and the +unused memory ratio is high, compaction is triggered to reclaim dead strings. +Disabled by default (0). Currently only applies to approx_most_frequent aggregate +with StringView type during global aggregation. + +``native_aggregation_compaction_unused_memory_ratio`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``double`` +* **Minimum value:** ``0`` +* **Maximum value:** ``1`` +* **Default value:** ``0.25`` + +Native Execution only. Ratio of unused (evicted) bytes to total bytes that triggers +compaction. The value is in the range of [0, 1). Currently only applies to +approx_most_frequent aggregate with StringView type during global aggregation. diff --git a/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java b/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java index a08b474c6e958..30d6818998cf5 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java @@ -88,6 +88,8 @@ public class NativeWorkerSessionPropertyProvider public static final String NATIVE_INDEX_LOOKUP_JOIN_SPLIT_OUTPUT = "native_index_lookup_join_split_output"; public static final String NATIVE_UNNEST_SPLIT_OUTPUT = "native_unnest_split_output"; public static final String NATIVE_USE_VELOX_GEOSPATIAL_JOIN = "native_use_velox_geospatial_join"; + public static final String NATIVE_AGGREGATION_COMPACTION_BYTES_THRESHOLD = "native_aggregation_compaction_bytes_threshold"; + public static final String NATIVE_AGGREGATION_COMPACTION_UNUSED_MEMORY_RATIO = "native_aggregation_compaction_unused_memory_ratio"; private final List> sessionProperties; @@ -432,6 +434,22 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig) "velox::core::SpatialJoinNode. Otherwise, it is converted to a " + "velox::core::NestedLoopJoinNode.", true, + !nativeExecution), + longProperty( + NATIVE_AGGREGATION_COMPACTION_BYTES_THRESHOLD, + "Memory threshold in bytes for triggering string compaction during " + + "global aggregation. When total string storage exceeds this limit with " + + "high unused memory ratio, compaction is triggered to reclaim dead strings. " + + "Disabled by default (0). NOTE: Currently only applies to approx_most_frequent " + + "aggregate with StringView type during global aggregation.", + 0L, + !nativeExecution), + doubleProperty( + NATIVE_AGGREGATION_COMPACTION_UNUSED_MEMORY_RATIO, + "Ratio of unused (evicted) bytes to total bytes that triggers compaction. " + + "The value is in the range of [0, 1). NOTE: Currently only applies to approx_most_frequent " + + "aggregate with StringView type during global aggregation.", + 0.25, !nativeExecution)); } diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.cpp b/presto-native-execution/presto_cpp/main/SessionProperties.cpp index f5db261b46c50..39781c0e778c3 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.cpp +++ b/presto-native-execution/presto_cpp/main/SessionProperties.cpp @@ -586,6 +586,29 @@ SessionProperties::SessionProperties() { false, std::nullopt, "true"); + + addSessionProperty( + kAggregationCompactionBytesThreshold, + "Memory threshold in bytes for triggering string compaction during global " + "aggregation. When total string storage exceeds this limit with high unused " + "memory ratio, compaction is triggered to reclaim dead strings. Disabled by " + "default (0). NOTE: Currently only applies to approx_most_frequent aggregate " + "with StringView type during global aggregation. May extend to other aggregates.", + BIGINT(), + false, + QueryConfig::kAggregationCompactionBytesThreshold, + std::to_string(c.aggregationCompactionBytesThreshold())); + + addSessionProperty( + kAggregationCompactionUnusedMemoryRatio, + "Ratio of unused (evicted) bytes to total bytes that triggers compaction. " + "The value is in the range of [0, 1). Default is 0.25. NOTE: Currently only applies " + "to approx_most_frequent aggregate with StringView type during global " + "aggregation. May extend to other aggregates.", + DOUBLE(), + false, + QueryConfig::kAggregationCompactionUnusedMemoryRatio, + std::to_string(c.aggregationCompactionUnusedMemoryRatio())); } const std::string SessionProperties::toVeloxConfig( diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.h b/presto-native-execution/presto_cpp/main/SessionProperties.h index f8d9fd0b2295b..ebe416e1e595c 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.h +++ b/presto-native-execution/presto_cpp/main/SessionProperties.h @@ -378,6 +378,24 @@ class SessionProperties { static constexpr const char* kUseVeloxGeospatialJoin = "native_use_velox_geospatial_join"; + /// Memory threshold in bytes for triggering string compaction during global + /// aggregation. When total string storage exceeds this limit with high unused + /// memory ratio, compaction is triggered to reclaim dead strings. Disabled by + /// default (0). + /// + /// NOTE: Currently only applies to approx_most_frequent aggregate with + /// StringView type during global aggregation. May extend to other aggregates. + static constexpr const char* kAggregationCompactionBytesThreshold = + "native_aggregation_compaction_bytes_threshold"; + + /// Ratio of unused (evicted) bytes to total bytes that triggers compaction. + /// The value is in the range of [0, 1). Default is 0.25. + /// + /// NOTE: Currently only applies to approx_most_frequent aggregate with + /// StringView type during global aggregation. May extend to other aggregates. + static constexpr const char* kAggregationCompactionUnusedMemoryRatio = + "native_aggregation_compaction_unused_memory_ratio"; + inline bool hasVeloxConfig(const std::string& key) { auto sessionProperty = sessionProperties_.find(key); if (sessionProperty == sessionProperties_.end()) { diff --git a/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp b/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp index 2c775431781a9..016d61e52ce26 100644 --- a/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp @@ -127,7 +127,11 @@ TEST_F(SessionPropertiesTest, validateMapping) { {SessionProperties::kUnnestSplitOutput, core::QueryConfig::kUnnestSplitOutput}, {SessionProperties::kUseVeloxGeospatialJoin, - SessionProperties::kUseVeloxGeospatialJoin}}; + SessionProperties::kUseVeloxGeospatialJoin}, + {SessionProperties::kAggregationCompactionBytesThreshold, + core::QueryConfig::kAggregationCompactionBytesThreshold}, + {SessionProperties::kAggregationCompactionUnusedMemoryRatio, + core::QueryConfig::kAggregationCompactionUnusedMemoryRatio}}; const auto sessionProperties = SessionProperties::instance(); for (const auto& [sessionProperty, expectedVeloxConfig] : expectedMappings) {