feat(plugin-iceberg): Push down min/max/count based on file stats#27085
feat(plugin-iceberg): Push down min/max/count based on file stats#27085hantangwangd wants to merge 3 commits intoprestodb:masterfrom
Conversation
Reviewer's GuideAdds an Iceberg aggregation pushdown optimizer that evaluates MIN/MAX/COUNT using Iceberg file statistics, exposes it via config and session properties, wires it into the Iceberg plan optimizer pipeline, and thoroughly tests behavior, edge cases, and performance (including NaN/Infinity handling and metrics mode constraints). Sequence diagram for MIN/MAX/COUNT aggregation pushdown during planningsequenceDiagram
participant Planner
participant IcebergPlanOptimizerProvider as IcebergPlanOptimizerProvider
participant IcebergAggregationOptimizer as IcebergAggregationOptimizer
participant Optimizer as Optimizer
participant IcebergTransactionManager as IcebergTransactionManager
participant IcebergMetadata as IcebergAbstractMetadata
participant IcebergTable as Table
participant AggregateEvaluator as AggregateEvaluator
Planner->>IcebergPlanOptimizerProvider: optimizePlan(plan, session)
IcebergPlanOptimizerProvider->>IcebergAggregationOptimizer: optimize(subplan, session, variableAllocator, idAllocator)
IcebergAggregationOptimizer->>IcebergSessionProperties: isAggregatePushDownEnabled(session)
IcebergAggregationOptimizer->>IcebergSessionProperties: isPushdownFilterEnabled(session)
IcebergAggregationOptimizer-->>IcebergPlanOptimizerProvider: return subplan if disabled
IcebergAggregationOptimizer->>Optimizer: new Optimizer(session, idAllocator, icebergTransactionManager, functionResolution)
IcebergAggregationOptimizer->>Optimizer: rewriteWith(this, subplan)
Optimizer->>Optimizer: visitAggregation(aggNode, context)
Optimizer->>Optimizer: findTableScan(aggNode.source)
Optimizer->>IcebergTransactionManager: get(tableHandle.getTransaction())
IcebergTransactionManager-->>Optimizer: IcebergAbstractMetadata
Optimizer->>IcebergMetadata: getTable(connectorSession, schemaTableName)
IcebergMetadata-->>Optimizer: IcebergTable
Optimizer->>Optimizer: isReducible(table, aggNode)
Optimizer->>IcebergUtil: getNonMetadataColumnConstraints(validPredicate)
IcebergUtil-->>Optimizer: TupleDomain
Optimizer->>ExpressionConverter: toIcebergExpression(predicate)
ExpressionConverter-->>Optimizer: Expression filter
Optimizer->>AggregateConverter: convert(aggregation)
AggregateConverter-->>Optimizer: Expression aggregateExpression
Optimizer->>Binder: bind(schema.asStruct(), expr, false)
Binder-->>Optimizer: BoundAggregate
Optimizer->>AggregateEvaluator: create(aggregates)
AggregateEvaluator-->>Optimizer: AggregateEvaluator
Optimizer->>IcebergTable: newScan().includeColumnStats()
IcebergTable-->>Optimizer: TableScan
Optimizer->>IcebergTable: currentSnapshot or snapshot(snapshotId)
IcebergTable-->>Optimizer: Snapshot
Optimizer->>TableScan: useSnapshot(snapshot.snapshotId())
Optimizer->>TableScan: filter(filter)
Optimizer->>TableScan: planFiles()
TableScan-->>Optimizer: Iterable FileScanTask
loop for each FileScanTask
Optimizer->>AggregateEvaluator: update(task.file())
end
Optimizer->>AggregateEvaluator: allAggregatorsValid()
AggregateEvaluator-->>Optimizer: boolean
Optimizer->>AggregateEvaluator: result()
AggregateEvaluator-->>Optimizer: StructLike
Optimizer->>IcebergUtil: getNativeValue(type, value)
IcebergUtil-->>Optimizer: nativeValue
Optimizer->>Optimizer: build ConstantExpression and Assignments
Optimizer->>ValuesNode: new ValuesNode(...outputVariables, constantRow)
ValuesNode-->>Optimizer: valuesNode
Optimizer->>ProjectNode: new ProjectNode(valuesNode, assignments)
ProjectNode-->>IcebergAggregationOptimizer: reducedPlan
IcebergAggregationOptimizer-->>IcebergPlanOptimizerProvider: reducedPlan
IcebergPlanOptimizerProvider-->>Planner: optimizedPlan
Class diagram for Iceberg aggregation pushdown optimizer and utilitiesclassDiagram
class IcebergPlanOptimizerProvider {
-Set~ConnectorPlanOptimizer~ logicalPlanOptimizers
}
class IcebergAggregationOptimizer {
+Logger LOGGER
-IcebergTransactionManager icebergTransactionManager
-StandardFunctionResolution functionResolution
+IcebergAggregationOptimizer(IcebergTransactionManager icebergTransactionManager, StandardFunctionResolution functionResolution)
+PlanNode optimize(PlanNode maxSubplan, ConnectorSession session, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator)
}
class IcebergAggregationOptimizer_Optimizer {
-ConnectorSession connectorSession
-PlanNodeIdAllocator idAllocator
-IcebergTransactionManager icebergTransactionManager
-AggregateConverter aggregateConverter
-Map~Predicate~FunctionHandle~~, Expression.Operation~ allowedFunctions
+Optimizer(ConnectorSession connectorSession, PlanNodeIdAllocator idAllocator, IcebergTransactionManager icebergTransactionManager, StandardFunctionResolution functionResolution)
+PlanNode visitAggregation(AggregationNode node, RewriteContext~Void~ context)
-Optional~TableScanNode~ findTableScan(PlanNode source)
-boolean isReducible(Table table, AggregationNode node)
-PlanNode reduce(AggregationNode node, Schema schema, Table table, Optional~Long~ snapshotId, Expression filter)
-ConnectorMetadata getConnectorMetadata(TableHandle tableHandle)
-boolean metricsModeSupportsAggregatePushDown(Table table, List~BoundAggregate~wildcard, wildcard~~ aggregates)
}
class AggregateConverter {
-Map~Predicate~FunctionHandle~~, Expression.Operation~ allowedFunctions
+AggregateConverter(Map~Predicate~FunctionHandle~~, Expression.Operation~ allowedFunctions)
+Expression convert(AggregationNode.Aggregation aggregation)
}
class IcebergConfig {
-boolean aggregatePushDownEnabled
+boolean isAggregatePushDownEnabled()
+IcebergConfig setAggregatePushDownEnabled(boolean aggregatePushDownEnabled)
}
class IcebergSessionProperties {
+String AGGREGATE_PUSH_DOWN_ENABLED
+boolean isAggregatePushDownEnabled(ConnectorSession session)
}
class IcebergTransactionManager {
+ConnectorMetadata get(ConnectorTransactionHandle transactionHandle)
}
class IcebergAbstractMetadata {
}
class IcebergUtil {
+Object getNativeValue(Type type, Object value)
+Table getIcebergTable(ConnectorMetadata metadata, ConnectorSession session, SchemaTableName schemaTableName)
+TupleDomain~IcebergColumnHandle~ getNonMetadataColumnConstraints(TupleDomain~IcebergColumnHandle~ predicate)
}
class IcebergTableHandle {
+SchemaTableName getSchemaTableName()
+Optional~IcebergTableName~ getIcebergTableName()
}
class IcebergTableLayoutHandle {
+TupleDomain~IcebergColumnHandle~ getValidPredicate()
}
class AggregationNode {
+Map~VariableReferenceExpression, Aggregation~ getAggregations()
+List~VariableReferenceExpression~ getOutputVariables()
+List~VariableReferenceExpression~ getGroupingKeys()
}
class TableScanNode {
+TableHandle getTable()
}
class ProjectNode {
}
class ValuesNode {
}
class Table {
+Schema schema()
+Snapshot currentSnapshot()
+Snapshot snapshot(long snapshotId)
+TableScan newScan()
}
class BaseTable {
}
class TableScan {
+TableScan includeColumnStats()
+TableScan useSnapshot(long snapshotId)
+TableScan filter(Expression filter)
+CloseableIterable~FileScanTask~ planFiles()
}
class AggregateEvaluator {
+static AggregateEvaluator create(List~BoundAggregate~wildcard, wildcard~~ aggregates)
+List~BoundAggregate~wildcard, wildcard~~ aggregates()
+void update(DataFile file)
+boolean allAggregatorsValid()
+StructLike result()
+Types.StructType resultType()
}
class MetricsConfig {
+static MetricsConfig forTable(Table table)
+MetricsModes.MetricsMode columnMode(String columnName)
}
class MetricsModes_MetricsMode {
}
class MetricsModes_None {
}
class MetricsModes_Counts {
}
class MetricsModes_Truncate {
}
IcebergPlanOptimizerProvider --> IcebergAggregationOptimizer : registers
IcebergAggregationOptimizer *-- IcebergAggregationOptimizer_Optimizer : creates
IcebergAggregationOptimizer_Optimizer --> IcebergTransactionManager : uses
IcebergAggregationOptimizer_Optimizer --> AggregateConverter : uses
IcebergAggregationOptimizer_Optimizer --> IcebergUtil : uses
IcebergAggregationOptimizer_Optimizer --> AggregationNode : rewrites
IcebergAggregationOptimizer_Optimizer --> TableScanNode : finds
IcebergAggregationOptimizer_Optimizer --> ProjectNode : wraps
IcebergAggregationOptimizer_Optimizer --> ValuesNode : produces
IcebergAggregationOptimizer_Optimizer --> Table : reads
IcebergAggregationOptimizer_Optimizer --> AggregateEvaluator : evaluates
IcebergAggregationOptimizer_Optimizer --> MetricsConfig : checks
AggregateConverter --> AggregationNode : converts
IcebergConfig --> IcebergSessionProperties : providesDefault
IcebergSessionProperties --> IcebergAggregationOptimizer : controlsEnable
IcebergTransactionManager --> IcebergAbstractMetadata : returns
BaseTable --|> Table
MetricsModes_None --|> MetricsModes_MetricsMode
MetricsModes_Counts --|> MetricsModes_MetricsMode
MetricsModes_Truncate --|> MetricsModes_MetricsMode
Flow diagram for aggregation pushdown decision and rewriteflowchart TD
A["Start optimize in IcebergAggregationOptimizer"] --> B["Check session isAggregatePushDownEnabled"]
B -->|false| Z["Return original plan"]
B -->|true| C["Check session isPushdownFilterEnabled"]
C -->|true| Z
C -->|false| D["Traverse plan and visitAggregation"]
D --> E["Find underlying TableScanNode via findTableScan"]
E -->|not found| Z
E -->|found| F["Resolve Iceberg Table from IcebergTransactionManager and metadata"]
F --> G["isReducible(table, aggregationNode)?"]
G -->|false| Z
G -->|true| H["Build TupleDomain predicates and convert to Iceberg Expression filter"]
H --> I["Convert each MIN/MAX/COUNT aggregation via AggregateConverter"]
I -->|conversion fails| Z
I -->|success| J["Bind expressions to schema using Binder and collect BoundAggregate list"]
J -->|bind fails| Z
J -->|success| K["Create AggregateEvaluator with aggregates"]
K --> L["Check metricsModeSupportsAggregatePushDown for all columns"]
L -->|false| Z
L -->|true| M["Build TableScan: includeColumnStats, select snapshot, apply filter"]
M --> N["Plan files and iterate FileScanTask"]
N -->|row level deletes detected| Z
N -->|no deletes| O["AggregateEvaluator.update(file) for each task"]
O --> P["AggregateEvaluator.allAggregatorsValid?"]
P -->|false| Z
P -->|true| Q["Get StructLike result and resultType fields"]
Q --> R["For each output variable, extract field, convert with IcebergUtil.getNativeValue"]
R --> S["Build ConstantExpression values and Assignments"]
S --> T["Create ValuesNode with single constant row"]
T --> U["Create ProjectNode on top of ValuesNode"]
U --> V["Return rewritten aggregation plan as constants"]
Z --> V
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
80fe8aa to
b7ca9f5
Compare
e1bfc49 to
d805494
Compare
d805494 to
9edb1a8
Compare
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- In
TestIcebergLogicalPlanner.testInfinity, the assertions for the REAL min/max usefields.get(1)twice; the second check should likely reference the min index (e.g.,fields.get(2)) so the test actually validates both max and min. - In
IcebergUtil.getNativeValue, theByteBufferbranch uses((ByteBuffer) value).array(), which can ignore the buffer’s position/limit and also fail for non-array-backed buffers; consider usingSlices.wrappedBuffer((ByteBuffer) value)or wrappingarray(), arrayOffset() + position(), remaining()instead.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `TestIcebergLogicalPlanner.testInfinity`, the assertions for the REAL min/max use `fields.get(1)` twice; the second check should likely reference the min index (e.g., `fields.get(2)`) so the test actually validates both max and min.
- In `IcebergUtil.getNativeValue`, the `ByteBuffer` branch uses `((ByteBuffer) value).array()`, which can ignore the buffer’s position/limit and also fail for non-array-backed buffers; consider using `Slices.wrappedBuffer((ByteBuffer) value)` or wrapping `array(), arrayOffset() + position(), remaining()` instead.
## Individual Comments
### Comment 1
<location path="presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java" line_range="1446-1447" />
<code_context>
+ else if (value instanceof BigDecimal) {
+ slice = encodeScaledValue((BigDecimal) value);
+ }
+ else if (value instanceof ByteBuffer) {
+ slice = Slices.wrappedBuffer(((ByteBuffer) value).array());
+ }
+ else if (value instanceof CharBuffer) {
</code_context>
<issue_to_address>
**issue (bug_risk):** Using ByteBuffer.array() can fail for non-array-backed buffers and ignores position/limit.
This assumes the buffer is array-backed and writable; `array()` can throw and also ignores `position`/`limit`, exposing the whole backing array. Please handle non-array-backed and direct buffers (e.g., use `hasArray()` and otherwise copy only `remaining()` bytes into a new array before wrapping).
</issue_to_address>
### Comment 2
<location path="presto-docs/src/main/sphinx/connector/iceberg.rst" line_range="417" />
<code_context>
``iceberg.max-statistics-file-cache-size`` Maximum size in bytes that should be consumed by the ``256MB`` Yes Yes, only needed on coordinator
statistics file cache.
+
+``iceberg.aggregate-push-down-enabled`` Controls whether to push down aggregate (MIN/MAX/COUNT) to ``true`` Yes No
+ Iceberg based on data file stats.
======================================================= ============================================================= ================================== =================== =============================================
</code_context>
<issue_to_address>
**suggestion (typo):** Consider pluralizing "aggregate" to better match the list of functions (MIN/MAX/COUNT).
You could update this to "Controls whether to push down aggregates (MIN/MAX/COUNT)" or "aggregate functions (MIN/MAX/COUNT)" so the noun matches the list of functions.
```suggestion
``iceberg.aggregate-push-down-enabled`` Controls whether to push down aggregate functions (MIN/MAX/COUNT) to ``true`` Yes No
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| else if (value instanceof ByteBuffer) { | ||
| slice = Slices.wrappedBuffer(((ByteBuffer) value).array()); |
There was a problem hiding this comment.
issue (bug_risk): Using ByteBuffer.array() can fail for non-array-backed buffers and ignores position/limit.
This assumes the buffer is array-backed and writable; array() can throw and also ignores position/limit, exposing the whole backing array. Please handle non-array-backed and direct buffers (e.g., use hasArray() and otherwise copy only remaining() bytes into a new array before wrapping).
| ``iceberg.max-statistics-file-cache-size`` Maximum size in bytes that should be consumed by the ``256MB`` Yes Yes, only needed on coordinator | ||
| statistics file cache. | ||
|
|
||
| ``iceberg.aggregate-push-down-enabled`` Controls whether to push down aggregate (MIN/MAX/COUNT) to ``true`` Yes No |
There was a problem hiding this comment.
suggestion (typo): Consider pluralizing "aggregate" to better match the list of functions (MIN/MAX/COUNT).
You could update this to "Controls whether to push down aggregates (MIN/MAX/COUNT)" or "aggregate functions (MIN/MAX/COUNT)" so the noun matches the list of functions.
| ``iceberg.aggregate-push-down-enabled`` Controls whether to push down aggregate (MIN/MAX/COUNT) to ``true`` Yes No | |
| ``iceberg.aggregate-push-down-enabled`` Controls whether to push down aggregate functions (MIN/MAX/COUNT) to ``true`` Yes No |
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull branch, local doc build, looks good. Thanks!
Description
This PR is inspired by the aggregate push down optimization from Spark on Iceberg, which pushes down min/max/count to Iceberg. See: apache/iceberg#5872. After this change, MIN, MAX, COUNT will be calculated on Iceberg side using the statistics info in the manifest file.
For a detailed comparison between this optimization and the existing partition-based metadata optimization strategy, please see: #22080 (comment).
Benchmark scenarios:
Benchmark result:
Motivation and Context
Fix issue: #21885
Impact
Test Plan
Contributor checklist
Release Notes