Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions api/src/main/java/org/apache/iceberg/TableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,4 @@ default TableScan appendsAfter(long fromSnapshotId) {
* @return the Snapshot this scan will use
*/
Snapshot snapshot();

/**
* Create a new {@link TableScan} from this scan's configuration that will have column stats
*
* @return a new scan based on this with column stats
*/
default TableScan withColStats() {
throw new UnsupportedOperationException("scan with colStats is not supported");
}
}
5 changes: 0 additions & 5 deletions core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,4 @@ public CloseableIterable<CombinedScanTask> planTasks() {
return TableScanUtil.planTasks(
splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost());
}

@Override
public TableScan withColStats() {
return newRefinedScan(table(), tableSchema(), context().withColStats(true));
}
}
17 changes: 0 additions & 17 deletions core/src/main/java/org/apache/iceberg/TableScanContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,21 +374,4 @@ TableScanContext reportWith(MetricsReporter reporter) {
fromSnapshotInclusive,
reporter);
}

TableScanContext withColStats(boolean stats) {
return new TableScanContext(
snapshotId,
rowFilter,
ignoreResiduals,
caseSensitive,
stats,
projectedSchema,
selectedColumns,
options,
fromSnapshotId,
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
metricsReporter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ public void testAggregatePushDownInMergeOnReadDelete() {
String select = "SELECT max(data), min(data), count(data) FROM %s";

List<Object[]> explain = sql("EXPLAIN " + select, tableName);
String explainString = explain.get(0)[0].toString();
String explainString = explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
if (explainString.contains("max(data)".toLowerCase(Locale.ROOT))
|| explainString.contains("min(data)".toLowerCase(Locale.ROOT))
|| explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
if (explainString.contains("max(data)")
|| explainString.contains("min(data)")
|| explainString.contains("count(data)")) {
explainContainsPushDownAggregates = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private SparkSQLProperties() {}

// Controls whether to push down aggregate (MAX/MIN/COUNT) to Iceberg
public static final String AGGREGATE_PUSH_DOWN_ENABLED =
"spark.sql.iceberg.aggregate-push-down-enabled";
"spark.sql.iceberg.aggregate-push-down.enabled";
Comment thread
rdblue marked this conversation as resolved.
public static final boolean AGGREGATE_PUSH_DOWN_ENABLED_DEFAULT = true;

// Controls write distribution mode
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
package org.apache.iceberg.spark.source;

import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.List;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.LocalScan;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

class SparkLocalScan implements LocalScan {
Expand All @@ -32,10 +32,14 @@ class SparkLocalScan implements LocalScan {
private final StructType readSchema;
private final InternalRow[] rows;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Shall we group all vars together? There is an empty line before filters now.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

SparkLocalScan(Table table, StructType readSchema, InternalRow[] rows) {
private final List<Expression> filterExpressions;

SparkLocalScan(
Table table, StructType readSchema, InternalRow[] rows, List<Expression> filterExpressions) {
this.table = table;
this.readSchema = readSchema;
this.rows = rows;
this.filterExpressions = filterExpressions;
}

@Override
Expand All @@ -50,8 +54,11 @@ public StructType readSchema() {

@Override
public String description() {
String fields =
Arrays.stream(readSchema.fields()).map(StructField::name).collect(Collectors.joining(", "));
return String.format("%s [%s]", table, fields);
return String.format("%s [filters=%s]", table, Spark3Util.describe(filterExpressions));
}

@Override
public String toString() {
return description();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkAggregates;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
Expand Down Expand Up @@ -188,13 +189,12 @@ public boolean pushAggregation(Aggregation aggregation) {
if (expr != null) {
Expression bound = Binder.bind(schema.asStruct(), expr, caseSensitive);
expressions.add((BoundAggregate<?, ?>) bound);
} else {
LOG.info(
"Skipping aggregate pushdown: AggregateFunc {} can't be converted to iceberg Expression",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to iceberg Expression -> to Iceberg expression or simply to Iceberg.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

aggregateFunc);
return false;
}
} catch (UnsupportedOperationException e) {
LOG.info(
"Skipping aggregate pushdown: AggregateFunc {} can't be converted to iceberg Expression",
aggregateFunc,
e);
return false;
} catch (IllegalArgumentException e) {
LOG.info("Skipping aggregate pushdown: Bind failed for AggregateFunc {}", aggregateFunc, e);
return false;
Expand All @@ -207,7 +207,7 @@ public boolean pushAggregation(Aggregation aggregation) {
return false;
}

TableScan scan = table.newScan().withColStats();
TableScan scan = table.newScan().includeColumnStats();
Snapshot snapshot = readSnapshot();
if (snapshot == null) {
LOG.info("Skipping aggregate pushdown: table snapshot is null");
Expand Down Expand Up @@ -242,7 +242,8 @@ public boolean pushAggregation(Aggregation aggregation) {
StructLike structLike = aggregateEvaluator.result();
pushedAggregateRows[0] =
new StructInternalRow(aggregateEvaluator.resultType()).setStruct(structLike);
localScan = new SparkLocalScan(table, pushedAggregateSchema, pushedAggregateRows);
localScan =
new SparkLocalScan(table, pushedAggregateSchema, pushedAggregateRows, filterExpressions);

return true;
}
Expand Down
Loading