diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java
index 4f3870de7bbd..d050094f06d6 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStats.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java
@@ -95,7 +95,9 @@ public Long lastUpdatedSnapshotId() {
*
* @param file the {@link ContentFile} from the manifest entry.
* @param snapshot the snapshot corresponding to the live entry.
+ * @deprecated since 1.10.0, visibility will be reduced in 1.11.0
*/
+ @Deprecated // will become package-private
public void liveEntry(ContentFile> file, Snapshot snapshot) {
Preconditions.checkArgument(specId == file.specId(), "Spec IDs must match");
@@ -128,18 +130,54 @@ public void liveEntry(ContentFile> file, Snapshot snapshot) {
* Updates the modified time and snapshot ID for the deleted manifest entry.
*
* @param snapshot the snapshot corresponding to the deleted manifest entry.
+ * @deprecated since 1.10.0, visibility will be reduced in 1.11.0
*/
+ @Deprecated // will become package-private
public void deletedEntry(Snapshot snapshot) {
if (snapshot != null) {
updateSnapshotInfo(snapshot.snapshotId(), snapshot.timestampMillis());
}
}
+ /**
+ * Decrement the counters as it was included in the previous stats and updates the modified time
+ * and snapshot ID for the deleted manifest entry.
+ *
+ * @param snapshot the snapshot corresponding to the deleted manifest entry.
+ */
+ void deletedEntryForIncrementalCompute(ContentFile> file, Snapshot snapshot) {
+ Preconditions.checkArgument(specId == file.specId(), "Spec IDs must match");
+
+ switch (file.content()) {
+ case DATA:
+ this.dataRecordCount -= file.recordCount();
+ this.dataFileCount -= 1;
+ this.totalDataFileSizeInBytes -= file.fileSizeInBytes();
+ break;
+ case POSITION_DELETES:
+ this.positionDeleteRecordCount -= file.recordCount();
+ this.positionDeleteFileCount -= 1;
+ break;
+ case EQUALITY_DELETES:
+ this.equalityDeleteRecordCount -= file.recordCount();
+ this.equalityDeleteFileCount -= 1;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported file content type: " + file.content());
+ }
+
+ if (snapshot != null) {
+ updateSnapshotInfo(snapshot.snapshotId(), snapshot.timestampMillis());
+ }
+ }
+
/**
* Appends statistics from given entry to current entry.
*
* @param entry the entry from which statistics will be sourced.
+ * @deprecated since 1.10.0, visibility will be reduced in 1.11.0
*/
+ @Deprecated // will become package-private
public void appendStats(PartitionStats entry) {
Preconditions.checkArgument(specId == entry.specId(), "Spec IDs must match");
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
index 1216f9d1bc89..aeb6b8b4c7e7 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
@@ -29,24 +29,33 @@
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.LongType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionMap;
import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers
@@ -56,6 +65,8 @@ public class PartitionStatsHandler {
private PartitionStatsHandler() {}
+ private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsHandler.class);
+
public static final int PARTITION_FIELD_ID = 0;
public static final String PARTITION_FIELD_NAME = "partition";
public static final NestedField SPEC_ID = NestedField.required(1, "spec_id", IntegerType.get());
@@ -106,13 +117,19 @@ public static Schema schema(StructType unifiedPartitionType) {
}
/**
- * Computes and writes the {@link PartitionStatisticsFile} for a given table's current snapshot.
+ * Computes the stats incrementally after the snapshot that has partition stats file till the
+ * current snapshot and writes the combined result into a {@link PartitionStatisticsFile} after
+ * merging the stats for a given table's current snapshot.
+ *
+ *
Does a full compute if previous statistics file does not exist.
*
* @param table The {@link Table} for which the partition statistics is computed.
* @return {@link PartitionStatisticsFile} for the current snapshot, or null if no statistics are
* present.
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) throws IOException {
+ Preconditions.checkArgument(table != null, "Invalid table: null");
+
if (table.currentSnapshot() == null) {
return null;
}
@@ -121,7 +138,11 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) thro
}
/**
- * Computes and writes the {@link PartitionStatisticsFile} for a given table and snapshot.
+ * Computes the stats incrementally after the snapshot that has partition stats file till the
+ * given snapshot and writes the combined result into a {@link PartitionStatisticsFile} after
+ * merging the stats for a given snapshot.
+ *
+ *
Does a full compute if previous statistics file does not exist.
*
* @param table The {@link Table} for which the partition statistics is computed.
* @param snapshotId snapshot for which partition statistics are computed.
@@ -130,15 +151,28 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) thro
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId)
throws IOException {
+ Preconditions.checkArgument(table != null, "Invalid table: null");
+ Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must be partitioned");
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId);
- Collection stats = computeStats(table, snapshot);
+ StructType partitionType = Partitioning.partitionType(table);
+
+ Collection stats;
+ PartitionStatisticsFile statisticsFile = latestStatsFile(table, snapshot.snapshotId());
+ if (statisticsFile == null) {
+ LOG.info(
+ "Using full compute as previous statistics file is not present for incremental compute.");
+ stats = computeStats(table, snapshot, file -> true, false /* incremental */).values();
+ } else {
+ stats = computeAndMergeStatsIncremental(table, snapshot, partitionType, statisticsFile);
+ }
+
if (stats.isEmpty()) {
+ // empty branch case
return null;
}
- StructType partitionType = Partitioning.partitionType(table);
List sortedStats = sortStatsByPartition(stats, partitionType);
return writePartitionStatsFile(
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
@@ -174,6 +208,9 @@ static PartitionStatisticsFile writePartitionStatsFile(
*/
public static CloseableIterable readPartitionStatsFile(
Schema schema, InputFile inputFile) {
+ Preconditions.checkArgument(schema != null, "Invalid schema: null");
+ Preconditions.checkArgument(inputFile != null, "Invalid input file: null");
+
FileFormat fileFormat = FileFormat.fromFileName(inputFile.location());
Preconditions.checkArgument(
fileFormat != null, "Unable to determine format of file: %s", inputFile.location());
@@ -230,34 +267,108 @@ private static PartitionStats recordToPartitionStats(StructLike record) {
return stats;
}
- private static Collection computeStats(Table table, Snapshot snapshot) {
- Preconditions.checkArgument(table != null, "table cannot be null");
- Preconditions.checkArgument(Partitioning.isPartitioned(table), "table must be partitioned");
- Preconditions.checkArgument(snapshot != null, "snapshot cannot be null");
+ private static Collection computeAndMergeStatsIncremental(
+ Table table,
+ Snapshot snapshot,
+ StructType partitionType,
+ PartitionStatisticsFile previousStatsFile)
+ throws IOException {
+ PartitionMap statsMap = PartitionMap.create(table.specs());
+ // read previous stats, note that partition field will be read as GenericRecord
+ try (CloseableIterable oldStats =
+ readPartitionStatsFile(schema(partitionType), Files.localInput(previousStatsFile.path()))) {
+ oldStats.forEach(
+ partitionStats ->
+ statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats));
+ }
+
+ // incrementally compute the new stats, partition field will be written as PartitionData
+ PartitionMap incrementalStatsMap =
+ computeStatsDiff(table, table.snapshot(previousStatsFile.snapshotId()), snapshot);
+
+ // convert PartitionData into GenericRecord and merge stats
+ incrementalStatsMap.forEach(
+ (key, value) ->
+ statsMap.merge(
+ Pair.of(key.first(), partitionDataToRecord((PartitionData) key.second())),
+ value,
+ (existingEntry, newEntry) -> {
+ existingEntry.appendStats(newEntry);
+ return existingEntry;
+ }));
+
+ return statsMap.values();
+ }
+
+ private static GenericRecord partitionDataToRecord(PartitionData data) {
+ GenericRecord record = GenericRecord.create(data.getPartitionType());
+ for (int index = 0; index < record.size(); index++) {
+ record.set(index, data.get(index));
+ }
+
+ return record;
+ }
+
+ @VisibleForTesting
+ static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId) {
+ List partitionStatisticsFiles = table.partitionStatisticsFiles();
+ if (partitionStatisticsFiles.isEmpty()) {
+ return null;
+ }
+
+ Map stats =
+ partitionStatisticsFiles.stream()
+ .collect(Collectors.toMap(PartitionStatisticsFile::snapshotId, file -> file));
+ for (Snapshot snapshot : SnapshotUtil.ancestorsOf(snapshotId, table::snapshot)) {
+ if (stats.containsKey(snapshot.snapshotId())) {
+ return stats.get(snapshot.snapshotId());
+ }
+ }
+ // This is unlikely to happen.
+ throw new RuntimeException(
+ "Unable to find previous stats with valid snapshot. Invalidate partition stats for all the snapshots to use full compute.");
+ }
+
+ private static PartitionMap computeStatsDiff(
+ Table table, Snapshot fromSnapshot, Snapshot toSnapshot) {
+ Set snapshotIdsRange =
+ Sets.newHashSet(
+ SnapshotUtil.ancestorIdsBetween(
+ toSnapshot.snapshotId(), fromSnapshot.snapshotId(), table::snapshot));
+ Predicate manifestFilePredicate =
+ manifestFile -> snapshotIdsRange.contains(manifestFile.snapshotId());
+ return computeStats(table, toSnapshot, manifestFilePredicate, true /* incremental */);
+ }
+
+ private static PartitionMap computeStats(
+ Table table, Snapshot snapshot, Predicate predicate, boolean incremental) {
StructType partitionType = Partitioning.partitionType(table);
- List manifests = snapshot.allManifests(table.io());
+ List manifests =
+ snapshot.allManifests(table.io()).stream().filter(predicate).collect(Collectors.toList());
+
Queue> statsByManifest = Queues.newConcurrentLinkedQueue();
Tasks.foreach(manifests)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
- .run(manifest -> statsByManifest.add(collectStats(table, manifest, partitionType)));
+ .run(
+ manifest ->
+ statsByManifest.add(
+ collectStatsForManifest(table, manifest, partitionType, incremental)));
- return mergeStats(statsByManifest, table.specs());
- }
+ PartitionMap statsMap = PartitionMap.create(table.specs());
+ for (PartitionMap stats : statsByManifest) {
+ mergePartitionMap(stats, statsMap);
+ }
- private static List sortStatsByPartition(
- Collection stats, StructType partitionType) {
- List entries = Lists.newArrayList(stats);
- entries.sort(
- Comparator.comparing(PartitionStats::partition, Comparators.forType(partitionType)));
- return entries;
+ return statsMap;
}
- private static PartitionMap collectStats(
- Table table, ManifestFile manifest, StructType partitionType) {
- try (ManifestReader> reader = openManifest(table, manifest)) {
+ private static PartitionMap collectStatsForManifest(
+ Table table, ManifestFile manifest, StructType partitionType, boolean incremental) {
+ List projection = BaseScan.scanColumns(manifest.content());
+ try (ManifestReader> reader = ManifestFiles.open(manifest, table.io()).select(projection)) {
PartitionMap statsMap = PartitionMap.create(table.specs());
int specId = manifest.partitionSpecId();
PartitionSpec spec = table.specs().get(specId);
@@ -275,9 +386,17 @@ private static PartitionMap collectStats(
((PartitionData) file.partition()).copy(),
() -> new PartitionStats(key, specId));
if (entry.isLive()) {
- stats.liveEntry(file, snapshot);
+ // Live can have both added and existing entries. Consider only added entries for
+ // incremental compute as existing entries was already included in previous compute.
+ if (!incremental || entry.status() == ManifestEntry.Status.ADDED) {
+ stats.liveEntry(file, snapshot);
+ }
} else {
- stats.deletedEntry(snapshot);
+ if (incremental) {
+ stats.deletedEntryForIncrementalCompute(file, snapshot);
+ } else {
+ stats.deletedEntry(snapshot);
+ }
}
}
@@ -287,27 +406,24 @@ private static PartitionMap collectStats(
}
}
- private static ManifestReader> openManifest(Table table, ManifestFile manifest) {
- List projection = BaseScan.scanColumns(manifest.content());
- return ManifestFiles.open(manifest, table.io()).select(projection);
+ private static void mergePartitionMap(
+ PartitionMap fromMap, PartitionMap toMap) {
+ fromMap.forEach(
+ (key, value) ->
+ toMap.merge(
+ key,
+ value,
+ (existingEntry, newEntry) -> {
+ existingEntry.appendStats(newEntry);
+ return existingEntry;
+ }));
}
- private static Collection mergeStats(
- Queue> statsByManifest, Map specs) {
- PartitionMap statsMap = PartitionMap.create(specs);
-
- for (PartitionMap stats : statsByManifest) {
- stats.forEach(
- (key, value) ->
- statsMap.merge(
- key,
- value,
- (existingEntry, newEntry) -> {
- existingEntry.appendStats(newEntry);
- return existingEntry;
- }));
- }
-
- return statsMap.values();
+ private static List sortStatsByPartition(
+ Collection stats, StructType partitionType) {
+ List entries = Lists.newArrayList(stats);
+ entries.sort(
+ Comparator.comparing(PartitionStats::partition, Comparators.forType(partitionType)));
+ return entries;
}
}
diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
index b68ef93e1c6d..a00ad76e1daa 100644
--- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
@@ -123,7 +123,7 @@ public void testPartitionStatsOnUnPartitionedTable() throws Exception {
assertThatThrownBy(() -> PartitionStatsHandler.computeAndWriteStatsFile(testTable))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("table must be partitioned");
+ .hasMessage("Table must be partitioned");
}
@Test
@@ -441,6 +441,41 @@ public void testPartitionStats() throws Exception {
snapshot1.snapshotId()));
}
+ @Test
+ public void testLatestStatsFile() throws Exception {
+ Table testTable =
+ TestTables.create(tempDir("stats_file"), "stats_file", SCHEMA, SPEC, 2, fileFormatProperty);
+
+ DataFile dataFile =
+ FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("foo", "A"));
+ testTable.newAppend().appendFile(dataFile).commit();
+
+ PartitionStatisticsFile statisticsFile =
+ PartitionStatsHandler.computeAndWriteStatsFile(
+ testTable, testTable.currentSnapshot().snapshotId());
+ testTable.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
+
+ PartitionStatisticsFile latestStatsFile =
+ PartitionStatsHandler.latestStatsFile(testTable, testTable.currentSnapshot().snapshotId());
+ assertThat(latestStatsFile).isEqualTo(statisticsFile);
+
+ // another commit but without stats file
+ testTable.newAppend().appendFile(dataFile).commit();
+ // should point to last stats file
+ latestStatsFile =
+ PartitionStatsHandler.latestStatsFile(testTable, testTable.currentSnapshot().snapshotId());
+ assertThat(latestStatsFile).isEqualTo(statisticsFile);
+
+ // compute stats
+ statisticsFile =
+ PartitionStatsHandler.computeAndWriteStatsFile(
+ testTable, testTable.currentSnapshot().snapshotId());
+ testTable.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
+ latestStatsFile =
+ PartitionStatsHandler.latestStatsFile(testTable, testTable.currentSnapshot().snapshotId());
+ assertThat(latestStatsFile).isEqualTo(statisticsFile);
+ }
+
private static StructLike partitionRecord(
Types.StructType partitionType, String val1, String val2) {
GenericRecord record = GenericRecord.create(partitionType);
diff --git a/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java b/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java
index f25ddccaa21a..c26c0042976a 100644
--- a/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java
+++ b/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java
@@ -46,4 +46,11 @@ public void testPartitionStats() throws Exception {
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Cannot write using unregistered internal data format: ORC");
}
+
+ @Override
+ public void testLatestStatsFile() throws Exception {
+ assertThatThrownBy(super::testLatestStatsFile)
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Cannot write using unregistered internal data format: ORC");
+ }
}
diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index a67aa3d5e73b..da44b7c6bdfe 100644
--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -22,19 +22,28 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.EnvironmentContext;
+import org.apache.iceberg.Files;
import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.PartitionStats;
+import org.apache.iceberg.PartitionStatsHandler;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.NamedReference;
import org.apache.iceberg.expressions.Zorder;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.ExtendedParser;
@@ -134,6 +143,49 @@ public void testRewriteDataFilesOnPartitionTable() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}
+ @TestTemplate
+ public void testPartitionStatsIncrementalCompute() throws IOException {
+ createPartitionTable();
+ // create 5 files for each partition (c2 = 'foo' and c2 = 'bar')
+ insertData(10);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ PartitionStatisticsFile statisticsFile = PartitionStatsHandler.computeAndWriteStatsFile(table);
+ table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
+
+ Schema dataSchema = PartitionStatsHandler.schema(Partitioning.partitionType(table));
+ List statsBeforeCompaction;
+ try (CloseableIterable recordIterator =
+ PartitionStatsHandler.readPartitionStatsFile(
+ dataSchema, Files.localInput(statisticsFile.path()))) {
+ statsBeforeCompaction = Lists.newArrayList(recordIterator);
+ }
+
+ sql("CALL %s.system.rewrite_data_files(table => '%s')", catalogName, tableIdent);
+
+ table.refresh();
+ statisticsFile =
+ PartitionStatsHandler.computeAndWriteStatsFile(table, table.currentSnapshot().snapshotId());
+ table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
+ List statsAfterCompaction;
+ try (CloseableIterable recordIterator =
+ PartitionStatsHandler.readPartitionStatsFile(
+ dataSchema, Files.localInput(statisticsFile.path()))) {
+ statsAfterCompaction = Lists.newArrayList(recordIterator);
+ }
+
+ for (int index = 0; index < statsBeforeCompaction.size(); index++) {
+ PartitionStats statsAfter = statsAfterCompaction.get(index);
+ PartitionStats statsBefore = statsBeforeCompaction.get(index);
+
+ assertThat(statsAfter.partition()).isEqualTo(statsBefore.partition());
+ // data count should match after compaction
+ assertThat(statsAfter.dataRecordCount()).isEqualTo(statsBefore.dataRecordCount());
+ // file count should not match as new file count will be one after compaction
+ assertThat(statsAfter.dataFileCount()).isNotEqualTo(statsBefore.dataFileCount());
+ }
+ }
+
@TestTemplate
public void testRewriteDataFilesOnNonPartitionTable() {
createTable();
diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index 8515bf347ebb..a940186b97dd 100644
--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -22,13 +22,22 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
+import org.apache.iceberg.Files;
import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.PartitionStats;
+import org.apache.iceberg.PartitionStatsHandler;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -388,4 +397,52 @@ public void testWriteManifestWithSpecId() {
ImmutableList.of(row(0), row(1)),
sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName));
}
+
+ @TestTemplate
+ public void testPartitionStatsIncrementalCompute() throws IOException {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)",
+ tableName);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ PartitionStatisticsFile statisticsFile = PartitionStatsHandler.computeAndWriteStatsFile(table);
+ table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
+
+ Schema dataSchema = PartitionStatsHandler.schema(Partitioning.partitionType(table));
+ List statsBeforeRewrite;
+ try (CloseableIterable recordIterator =
+ PartitionStatsHandler.readPartitionStatsFile(
+ dataSchema, Files.localInput(statisticsFile.path()))) {
+ statsBeforeRewrite = Lists.newArrayList(recordIterator);
+ }
+
+ sql(
+ "CALL %s.system.rewrite_manifests(usE_cAcHiNg => false, tAbLe => '%s')",
+ catalogName, tableIdent);
+
+ table.refresh();
+ statisticsFile =
+ PartitionStatsHandler.computeAndWriteStatsFile(table, table.currentSnapshot().snapshotId());
+ table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
+ List statsAfterRewrite;
+ try (CloseableIterable recordIterator =
+ PartitionStatsHandler.readPartitionStatsFile(
+ dataSchema, Files.localInput(statisticsFile.path()))) {
+ statsAfterRewrite = Lists.newArrayList(recordIterator);
+ }
+
+ for (int index = 0; index < statsBeforeRewrite.size(); index++) {
+ PartitionStats statsAfter = statsAfterRewrite.get(index);
+ PartitionStats statsBefore = statsBeforeRewrite.get(index);
+
+ assertThat(statsAfter.partition()).isEqualTo(statsBefore.partition());
+ // data count should match
+ assertThat(statsAfter.dataRecordCount()).isEqualTo(statsBefore.dataRecordCount());
+ // file count should match
+ assertThat(statsAfter.dataFileCount()).isEqualTo(statsBefore.dataFileCount());
+ }
+ }
}