Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions presto-docs/src/main/sphinx/presto_cpp/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -544,3 +544,27 @@ output for each input batch.
If this is true, then the protocol::SpatialJoinNode is converted to a
velox::core::SpatialJoinNode. Otherwise, it is converted to a
velox::core::NestedLoopJoinNode.

``native_aggregation_compaction_bytes_threshold``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``bigint``
* **Default value:** ``0``

Native Execution only. Memory threshold in bytes for triggering string compaction
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you envision compaction to apply for non-string data types ? If this is only for strings then we could clarify the naming to be specific about string compaction.

Also its hard to follow from an end-user perspective
"When total string storage exceeds this limit and the
unused memory ratio is high, compaction is triggered to reclaim dead strings."

It might be useful to describe how total string storage is calculated so that its easier to understand how to set this property. If we can compute that from some Velox metrics (available through Prometheus), then it would be great to share the computation.

Do you have some Velox blog article or documentation for this work ? Would be great to link that here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditi-pandit , thanks for the great questions,

  1. Is this only for strings?
    Yes, currently this is specific to string compaction for the approx_most_frequent aggregate with StringView type during global aggregation. It may extend to other aggregates in the future, but for now, it's limited to this use case.

  2. How does the compaction mechanism work?
    This is not a general Velox string compaction mechanism. It's a per-aggregation-function compaction that operates on the accumulator object during global aggregation (addSingleGroupRawInput).

For approx_most_frequent, the accumulator uses:
A Strings struct that stores non-inlined strings in memory blocks managed by HashStringAllocator
An ApproxMostFrequentStreamSummary data structure that maintains a fixed-capacity summary of top-k frequent values
When values are evicted from the summary (due to the stream summary algorithm), the strings remain in memory as "dead strings." The compaction mechanism:

Tracks activeBytes_ (bytes used by strings currently in the summary) and evictedBytes_ (bytes used by evicted/dead strings)
When activeBytes_ + evictedBytes_ > compactionBytesThreshold AND evictedBytes_ > compactionBytesThreshold * compactionUnusedMemoryRatio, compaction is triggered
Compaction copies only the active strings to a new Strings storage and frees the old storage

  1. How to set this property?
    The total string storage (activeBytes_ + evictedBytes_) depends on:
    The capacity of the stream summary (user-specified parameter)
    The size of the string values being aggregated
    The churn rate of values being evicted

Unfortunately, there's no direct Velox/Prometheus metric for this today. Users may need to estimate based on their data characteristics (e.g., average string size × summary capacity × expected churn).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users may need to estimate based on their data characteristics (e.g., average string size × summary capacity × expected churn).

This would have been useful information to include in the documentation for the reader.

during global aggregation. When total string storage exceeds this limit and the
unused memory ratio is high, compaction is triggered to reclaim dead strings.
Disabled by default (0). Currently only applies to approx_most_frequent aggregate
with StringView type during global aggregation.

``native_aggregation_compaction_unused_memory_ratio``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``double``
* **Minimum value:** ``0``
* **Maximum value:** ``1``
* **Default value:** ``0.25``

Native Execution only. Ratio of unused (evicted) bytes to total bytes that triggers
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well its difficult to understand how "unused (evicted) bytes" is computed. If we can compute that from some Velox metrics (available through Prometheus), then it would be great to share the computation.

compaction. The value is in the range of [0, 1). Currently only applies to
approx_most_frequent aggregate with StringView type during global aggregation.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class NativeWorkerSessionPropertyProvider
public static final String NATIVE_INDEX_LOOKUP_JOIN_SPLIT_OUTPUT = "native_index_lookup_join_split_output";
public static final String NATIVE_UNNEST_SPLIT_OUTPUT = "native_unnest_split_output";
public static final String NATIVE_USE_VELOX_GEOSPATIAL_JOIN = "native_use_velox_geospatial_join";
public static final String NATIVE_AGGREGATION_COMPACTION_BYTES_THRESHOLD = "native_aggregation_compaction_bytes_threshold";
public static final String NATIVE_AGGREGATION_COMPACTION_UNUSED_MEMORY_RATIO = "native_aggregation_compaction_unused_memory_ratio";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -432,6 +434,22 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig)
"velox::core::SpatialJoinNode. Otherwise, it is converted to a " +
"velox::core::NestedLoopJoinNode.",
true,
!nativeExecution),
longProperty(
NATIVE_AGGREGATION_COMPACTION_BYTES_THRESHOLD,
"Memory threshold in bytes for triggering string compaction during " +
"global aggregation. When total string storage exceeds this limit with " +
"high unused memory ratio, compaction is triggered to reclaim dead strings. " +
"Disabled by default (0). NOTE: Currently only applies to approx_most_frequent " +
"aggregate with StringView type during global aggregation.",
0L,
!nativeExecution),
doubleProperty(
NATIVE_AGGREGATION_COMPACTION_UNUSED_MEMORY_RATIO,
"Ratio of unused (evicted) bytes to total bytes that triggers compaction. " +
"The value is in the range of [0, 1). NOTE: Currently only applies to approx_most_frequent " +
"aggregate with StringView type during global aggregation.",
0.25,
!nativeExecution));
}

Expand Down
23 changes: 23 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,29 @@ SessionProperties::SessionProperties() {
false,
std::nullopt,
"true");

addSessionProperty(
kAggregationCompactionBytesThreshold,
"Memory threshold in bytes for triggering string compaction during global "
"aggregation. When total string storage exceeds this limit with high unused "
"memory ratio, compaction is triggered to reclaim dead strings. Disabled by "
"default (0). NOTE: Currently only applies to approx_most_frequent aggregate "
"with StringView type during global aggregation. May extend to other aggregates.",
BIGINT(),
false,
QueryConfig::kAggregationCompactionBytesThreshold,
std::to_string(c.aggregationCompactionBytesThreshold()));

addSessionProperty(
kAggregationCompactionUnusedMemoryRatio,
"Ratio of unused (evicted) bytes to total bytes that triggers compaction. "
"The value is in the range of [0, 1). Default is 0.25. NOTE: Currently only applies "
"to approx_most_frequent aggregate with StringView type during global "
"aggregation. May extend to other aggregates.",
DOUBLE(),
false,
QueryConfig::kAggregationCompactionUnusedMemoryRatio,
std::to_string(c.aggregationCompactionUnusedMemoryRatio()));
}

const std::string SessionProperties::toVeloxConfig(
Expand Down
18 changes: 18 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,24 @@ class SessionProperties {
static constexpr const char* kUseVeloxGeospatialJoin =
"native_use_velox_geospatial_join";

/// Memory threshold in bytes for triggering string compaction during global
/// aggregation. When total string storage exceeds this limit with high unused
/// memory ratio, compaction is triggered to reclaim dead strings. Disabled by
/// default (0).
///
/// NOTE: Currently only applies to approx_most_frequent aggregate with
/// StringView type during global aggregation. May extend to other aggregates.
static constexpr const char* kAggregationCompactionBytesThreshold =
"native_aggregation_compaction_bytes_threshold";

/// Ratio of unused (evicted) bytes to total bytes that triggers compaction.
/// The value is in the range of [0, 1). Default is 0.25.
///
/// NOTE: Currently only applies to approx_most_frequent aggregate with
/// StringView type during global aggregation. May extend to other aggregates.
static constexpr const char* kAggregationCompactionUnusedMemoryRatio =
"native_aggregation_compaction_unused_memory_ratio";

inline bool hasVeloxConfig(const std::string& key) {
auto sessionProperty = sessionProperties_.find(key);
if (sessionProperty == sessionProperties_.end()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ TEST_F(SessionPropertiesTest, validateMapping) {
{SessionProperties::kUnnestSplitOutput,
core::QueryConfig::kUnnestSplitOutput},
{SessionProperties::kUseVeloxGeospatialJoin,
SessionProperties::kUseVeloxGeospatialJoin}};
SessionProperties::kUseVeloxGeospatialJoin},
{SessionProperties::kAggregationCompactionBytesThreshold,
core::QueryConfig::kAggregationCompactionBytesThreshold},
{SessionProperties::kAggregationCompactionUnusedMemoryRatio,
core::QueryConfig::kAggregationCompactionUnusedMemoryRatio}};

const auto sessionProperties = SessionProperties::instance();
for (const auto& [sessionProperty, expectedVeloxConfig] : expectedMappings) {
Expand Down
Loading