Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions api/src/main/java/org/apache/iceberg/TableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,4 @@ default TableScan appendsAfter(long fromSnapshotId) {
* @return the Snapshot this scan will use
*/
Snapshot snapshot();

/**
* Create a new {@link TableScan} from this scan's configuration that will have column stats
*
* @return a new scan based on this with column stats
*/
default TableScan withColStats() {
throw new UnsupportedOperationException("scan with colStats is not supported");
}
}
5 changes: 0 additions & 5 deletions core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,4 @@ public CloseableIterable<CombinedScanTask> planTasks() {
return TableScanUtil.planTasks(
splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost());
}

@Override
public TableScan withColStats() {
return newRefinedScan(table(), tableSchema(), context().withColStats(true));
}
}
17 changes: 0 additions & 17 deletions core/src/main/java/org/apache/iceberg/TableScanContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,21 +374,4 @@ TableScanContext reportWith(MetricsReporter reporter) {
fromSnapshotInclusive,
reporter);
}

TableScanContext withColStats(boolean stats) {
return new TableScanContext(
snapshotId,
rowFilter,
ignoreResiduals,
caseSensitive,
stats,
projectedSchema,
selectedColumns,
options,
fromSnapshotId,
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
metricsReporter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ public void testAggregatePushDownInMergeOnReadDelete() {
String select = "SELECT max(data), min(data), count(data) FROM %s";

List<Object[]> explain = sql("EXPLAIN " + select, tableName);
String explainString = explain.get(0)[0].toString();
String explainString = explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
if (explainString.contains("max(data)".toLowerCase(Locale.ROOT))
|| explainString.contains("min(data)".toLowerCase(Locale.ROOT))
|| explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
if (explainString.contains("max(data)")
|| explainString.contains("min(data)")
|| explainString.contains("count(data)")) {
explainContainsPushDownAggregates = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static Expression convert(AggregateFunc aggregate) {
case COUNT:
Count countAgg = (Count) aggregate;
if (countAgg.isDistinct()) {
// manifest file doesn't have count distinct so this can't be converted to push down
// manifest file doesn't have count distinct so this can't be pushed down
return null;
}

Expand All @@ -57,15 +57,18 @@ public static Expression convert(AggregateFunc aggregate) {
} else {
return null;
}

case COUNT_STAR:
return Expressions.countStar();

case MAX:
Max maxAgg = (Max) aggregate;
if (maxAgg.column() instanceof NamedReference) {
return Expressions.max(SparkUtil.toColumnName((NamedReference) maxAgg.column()));
} else {
return null;
}

case MIN:
Min minAgg = (Min) aggregate;
if (minAgg.column() instanceof NamedReference) {
Expand All @@ -75,6 +78,7 @@ public static Expression convert(AggregateFunc aggregate) {
}
}
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private SparkSQLProperties() {}

// Controls whether to push down aggregate (MAX/MIN/COUNT) to Iceberg
public static final String AGGREGATE_PUSH_DOWN_ENABLED =
"spark.sql.iceberg.aggregate-push-down-enabled";
"spark.sql.iceberg.aggregate-push-down.enabled";
public static final boolean AGGREGATE_PUSH_DOWN_ENABLED_DEFAULT = true;

// Controls write distribution mode
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,28 @@
*/
package org.apache.iceberg.spark.source;

import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.List;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.LocalScan;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

class SparkLocalScan implements LocalScan {

private final Table table;
private final StructType readSchema;
private final InternalRow[] rows;
private final List<Expression> filterExpressions;

SparkLocalScan(Table table, StructType readSchema, InternalRow[] rows) {
SparkLocalScan(
Table table, StructType readSchema, InternalRow[] rows, List<Expression> filterExpressions) {
this.table = table;
this.readSchema = readSchema;
this.rows = rows;
this.filterExpressions = filterExpressions;
}

@Override
Expand All @@ -50,8 +54,13 @@ public StructType readSchema() {

@Override
public String description() {
String fields =
Arrays.stream(readSchema.fields()).map(StructField::name).collect(Collectors.joining(", "));
return String.format("%s [%s]", table, fields);
return String.format("%s [filters=%s]", table, Spark3Util.describe(filterExpressions));
}

@Override
public String toString() {
return String.format(
"IcebergLocalScan(table=%s, type=%s, filters=%s)",
table, SparkSchemaUtil.convert(readSchema).asStruct(), filterExpressions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkAggregates;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
Expand Down Expand Up @@ -188,13 +189,12 @@ public boolean pushAggregation(Aggregation aggregation) {
if (expr != null) {
Expression bound = Binder.bind(schema.asStruct(), expr, caseSensitive);
expressions.add((BoundAggregate<?, ?>) bound);
} else {
LOG.info(
"Skipping aggregate pushdown: AggregateFunc {} can't be converted to iceberg expression",
aggregateFunc);
return false;
}
} catch (UnsupportedOperationException e) {
LOG.info(
"Skipping aggregate pushdown: AggregateFunc {} can't be converted to iceberg Expression",
aggregateFunc,
e);
return false;
} catch (IllegalArgumentException e) {
LOG.info("Skipping aggregate pushdown: Bind failed for AggregateFunc {}", aggregateFunc, e);
return false;
Expand All @@ -207,7 +207,7 @@ public boolean pushAggregation(Aggregation aggregation) {
return false;
}

TableScan scan = table.newScan().withColStats();
TableScan scan = table.newScan().includeColumnStats();
Snapshot snapshot = readSnapshot();
if (snapshot == null) {
LOG.info("Skipping aggregate pushdown: table snapshot is null");
Expand Down Expand Up @@ -242,7 +242,8 @@ public boolean pushAggregation(Aggregation aggregation) {
StructLike structLike = aggregateEvaluator.result();
pushedAggregateRows[0] =
new StructInternalRow(aggregateEvaluator.resultType()).setStruct(structLike);
localScan = new SparkLocalScan(table, pushedAggregateSchema, pushedAggregateRows);
localScan =
new SparkLocalScan(table, pushedAggregateSchema, pushedAggregateRows, filterExpressions);

return true;
}
Expand Down
Loading