diff --git a/api/src/main/java/org/apache/iceberg/expressions/AggregateUtil.java b/api/src/main/java/org/apache/iceberg/expressions/AggregateUtil.java
new file mode 100644
index 000000000000..1273e0f8c733
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/expressions/AggregateUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.expressions;
+
+import java.util.List;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/** Aggregate utility methods. */
+public class AggregateUtil {
+ private AggregateUtil() {}
+
+ /**
+ * Create a NestedField for this Aggregate Expression. This NestedField is used to build the
+ * pushed down aggregate schema.
+ *
+ *
e.g. SELECT COUNT(*), MAX(col1), MIN(col1), MAX(col2), MIN(col3) FROM table;
+ *
+ *
The returned NestedField for the aggregates are Types.NestedField.required(1, COUNT(*),
+ * Types.LongType.get()) Types.NestedField.required(2, MAX(col1), Types.IntegerType.get())
+ * Types.NestedField.required(3, MIN(col1), Types.IntegerType.get()) Types.NestedField.required(4,
+ * MAX(col2), Types.StringType.get()) Types.NestedField.required(5, MIN(col3),
+ * Types.StringType.get())
+ */
+ public static Types.NestedField buildAggregateNestedField(BoundAggregate aggregate, int index) {
+ return aggregate.nestedField(index);
+ }
+
+ /**
+ * Returns the column name this aggregate function is on. e.g. SELECT Max(col3) FROM table; This
+ * method returns col3
+ */
+ public static String getAggregateColumnName(BoundAggregate aggregate) {
+ return aggregate.columnName();
+ }
+
+ /**
+ * Returns the data type of this Aggregate Expression. The data type for COUNT is always Long. The
+ * data type for MAX and MIX are the same as the data type of the column this aggregate is applied
+ * on.
+ */
+ public static Type getAggregateType(BoundAggregate aggregate) {
+ return aggregate.type();
+ }
+
+ /**
+ * Returns the index of this Aggregate column in table schema. e.g. SELECT Max(col3) FROM table;
+ * Suppose the table has columns (col1, col2, col3), this method returns 2.
+ */
+ public static int columnIndexInTableSchema(
+ BoundAggregate aggregate, Table table, boolean caseSensitive) {
+ List columns = table.schema().columns();
+ for (int i = 0; i < columns.size(); i++) {
+ if (aggregate.columnName().equals("*")) {
+ return -1;
+ }
+ if (caseSensitive) {
+ if (aggregate.columnName().equals(columns.get(i).name())) {
+ return i;
+ }
+ } else {
+ if (aggregate.columnName().equalsIgnoreCase(columns.get(i).name())) {
+ return i;
+ }
+ }
+ }
+ throw new ValidationException(
+ "Aggregate is on an invalid table column %s", aggregate.columnName());
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java b/api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java
index 650271b3b78a..41b8eaa6278d 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java
@@ -29,7 +29,8 @@ protected BoundAggregate(Operation op, BoundTerm term) {
@Override
public C eval(StructLike struct) {
- throw new UnsupportedOperationException(this.getClass().getName() + " does not implement eval");
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " does not implement eval.");
}
@Override
@@ -37,6 +38,29 @@ public BoundReference> ref() {
return term().ref();
}
+ public Types.NestedField nestedField(int index) {
+ if (op() == Operation.COUNT_STAR) {
+ return Types.NestedField.required(index, "COUNT(*)", Types.LongType.get());
+ } else {
+ if (term() instanceof BoundReference) {
+ if (op() == Operation.COUNT) {
+ return Types.NestedField.required(
+ index, "COUNT(" + term().ref().name() + ")", Types.LongType.get());
+ } else if (op() == Operation.MAX) {
+ return Types.NestedField.required(
+ index, "MAX(" + term().ref().name() + ")", term().type());
+ } else if (op() == Operation.MIN) {
+ return Types.NestedField.required(
+ index, "MIN(" + term().ref().name() + ")", term().type());
+ } else {
+ throw new UnsupportedOperationException(op() + " is not supported.");
+ }
+ } else {
+ throw new UnsupportedOperationException("Aggregate BoundTransform is not supported.");
+ }
+ }
+ }
+
public Type type() {
if (op() == Operation.COUNT || op() == Operation.COUNT_STAR) {
return Types.LongType.get();
@@ -44,4 +68,12 @@ public Type type() {
return term().type();
}
}
+
+ public String columnName() {
+ if (op() == Operation.COUNT_STAR) {
+ return "*";
+ } else {
+ return ref().name();
+ }
+ }
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 85c71827d7ac..0559f38a995f 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -228,4 +228,15 @@ public Long streamFromTimestamp() {
.defaultValue(Long.MIN_VALUE)
.parse();
}
+
+ public boolean aggregatePushDown() {
+ boolean enable =
+ confParser
+ .booleanConf()
+ .option(SparkReadOptions.AGGREGATE_PUSH_DOWN_ENABLED)
+ .sessionConf(SparkSQLProperties.AGGREGATE_PUSH_DOWN_ENABLED)
+ .defaultValue(SparkSQLProperties.AGGREGATE_PUSH_DOWN_ENABLED_DEFAULT)
+ .parse();
+ return enable;
+ }
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index 96e09d70ef65..bc9797a82207 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -84,4 +84,6 @@ private SparkReadOptions() {}
public static final String VERSION_AS_OF = "versionAsOf";
public static final String TIMESTAMP_AS_OF = "timestampAsOf";
+
+ public static final String AGGREGATE_PUSH_DOWN_ENABLED = "aggregatePushDownEnabled";
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index fa8bd719f391..17a39336478e 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -42,4 +42,8 @@ private SparkSQLProperties() {}
// Controls whether to check the order of fields during writes
public static final String CHECK_ORDERING = "spark.sql.iceberg.check-ordering";
public static final boolean CHECK_ORDERING_DEFAULT = true;
+
+ // Controls whether to push down aggregate (MAX/MIN/COUNT) to Iceberg
+ public static final String AGGREGATE_PUSH_DOWN_ENABLED = "spark.sql.iceberg.aggregate_pushdown";
+ public static final boolean AGGREGATE_PUSH_DOWN_ENABLED_DEFAULT = true;
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java
new file mode 100644
index 000000000000..4b8c971380b5
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.aggregate.AggregateFunc;
+import org.apache.spark.sql.connector.expressions.aggregate.Count;
+import org.apache.spark.sql.connector.expressions.aggregate.CountStar;
+import org.apache.spark.sql.connector.expressions.aggregate.Max;
+import org.apache.spark.sql.connector.expressions.aggregate.Min;
+
+public class SparkAggregates {
+
+ private SparkAggregates() {}
+
+ private static final Map, Operation> AGGREGATES =
+ ImmutableMap., Operation>builder()
+ .put(Count.class, Operation.COUNT)
+ .put(CountStar.class, Operation.COUNT_STAR)
+ .put(Max.class, Operation.MAX)
+ .put(Min.class, Operation.MIN)
+ .build();
+
+ public static Expression convert(AggregateFunc aggregate) {
+ Operation op = AGGREGATES.get(aggregate.getClass());
+ if (op != null) {
+ switch (op) {
+ case COUNT:
+ Count countAgg = (Count) aggregate;
+ assert (countAgg.column() instanceof NamedReference);
+ return Expressions.count(SparkUtil.toColumnName((NamedReference) countAgg.column()));
+ case COUNT_STAR:
+ return Expressions.countStar();
+ case MAX:
+ Max maxAgg = (Max) aggregate;
+ assert (maxAgg.column() instanceof NamedReference);
+ return Expressions.max(SparkUtil.toColumnName((NamedReference) maxAgg.column()));
+ case MIN:
+ Min minAgg = (Min) aggregate;
+ assert (minAgg.column() instanceof NamedReference);
+ return Expressions.min(SparkUtil.toColumnName((NamedReference) minAgg.column()));
+ }
+ }
+ return null;
+ }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
new file mode 100644
index 000000000000..78f6a90458cd
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.iceberg.Table;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.LocalScan;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class SparkLocalScan implements LocalScan {
+
+ private final Table table;
+ private final StructType aggregateSchema;
+ private final InternalRow[] rows;
+
+ SparkLocalScan(Table table, StructType aggregateSchema, InternalRow[] rows) {
+ this.table = table;
+ this.aggregateSchema = aggregateSchema;
+ this.rows = rows;
+ }
+
+ @Override
+ public InternalRow[] rows() {
+ return rows;
+ }
+
+ @Override
+ public StructType readSchema() {
+ return aggregateSchema;
+ }
+
+ @Override
+ public String description() {
+ String aggregates =
+ Arrays.stream(aggregateSchema.fields())
+ .map(StructField::name)
+ .collect(Collectors.joining(", "));
+ return String.format("%s [aggregates=%s]", table, aggregates);
+ }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPushedDownAggregateUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPushedDownAggregateUtil.java
new file mode 100644
index 000000000000..5ea4c743cedc
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPushedDownAggregateUtil.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.MetricsModes;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.AggregateUtil;
+import org.apache.iceberg.expressions.BoundAggregate;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.Decimal;
+import scala.collection.JavaConverters;
+
+/** Helper methods for working with Spark aggregate push down. */
+public class SparkPushedDownAggregateUtil {
+ static final int LOWER_BOUNDS_COLUMN_INDEX = 0;
+ static final int UPPER_BOUNDS_COLUMN_INDEX = 1;
+ static final int RECORD_COUNT_COLUMN_INDEX = 2;
+ static final int NULL_COUNT_COLUMN_INDEX = 3;
+
+ private SparkPushedDownAggregateUtil() {}
+
+ public static boolean metricsModeSupportsAggregatePushDown(
+ Table table, List aggregates) {
+ MetricsConfig config = MetricsConfig.forTable(table);
+ for (BoundAggregate aggregate : aggregates) {
+ String colName = AggregateUtil.getAggregateColumnName(aggregate);
+ if (!colName.equals("*")) {
+ MetricsModes.MetricsMode mode = config.columnMode(colName);
+ if (mode.toString().equals("none")) {
+ return false;
+ } else if (mode.toString().equals("counts")) {
+ if (aggregate.op() == Expression.Operation.MAX
+ || aggregate.op() == Expression.Operation.MIN) {
+ return false;
+ }
+ } else if (mode.toString().contains("truncate")) {
+ if (AggregateUtil.getAggregateType(aggregate).typeId() == Type.TypeID.STRING) {
+ if (aggregate.op() == Expression.Operation.MAX
+ || aggregate.op() == Expression.Operation.MIN) {
+ return false;
+ }
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+
+ public static InternalRow[] constructInternalRowForPushedDownAggregate(
+ SparkSession spark,
+ Table table,
+ List aggregates,
+ List indexInTable) {
+ List