diff --git a/core/src/jmh/java/org/apache/iceberg/PartitionStatsUtilBenchmark.java b/core/src/jmh/java/org/apache/iceberg/PartitionStatsUtilBenchmark.java new file mode 100644 index 000000000000..539494e34735 --- /dev/null +++ b/core/src/jmh/java/org/apache/iceberg/PartitionStatsUtilBenchmark.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.types.Types; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Timeout; +import org.openjdk.jmh.annotations.Warmup; + +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 2) +@Measurement(iterations = 5) +@Timeout(time = 1000, timeUnit = TimeUnit.HOURS) +@BenchmarkMode(Mode.SingleShotTime) +public class PartitionStatsUtilBenchmark { + + private static final Schema SCHEMA = + new Schema( + required(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + + // Create 10k manifests + private static final int MANIFEST_COUNTER = 10000; + + // each manifest with 100 partition values + private static final int PARTITION_PER_MANIFEST = 100; + + // 20 data files per partition, which results in 2k data files per manifest + private static final int DATA_FILES_PER_PARTITION_COUNT = 20; + + private static final HadoopTables TABLES = new HadoopTables(); + + private static final String TABLE_IDENT = "tbl"; + + private Table table; + + @Setup + public void setupBenchmark() { + this.table = TABLES.create(SCHEMA, SPEC, TABLE_IDENT); + + for (int manifestCount = 0; manifestCount < MANIFEST_COUNTER; manifestCount++) { + AppendFiles appendFiles = table.newFastAppend(); + + for (int partition = 0; partition < PARTITION_PER_MANIFEST; partition++) { + StructLike partitionData = TestHelpers.Row.of(partition); + for (int fileOrdinal = 0; fileOrdinal < DATA_FILES_PER_PARTITION_COUNT; fileOrdinal++) { + appendFiles.appendFile(FileGenerationUtil.generateDataFile(table, partitionData)); + } + } + + appendFiles.commit(); + } + } + + @TearDown + public void tearDownBenchmark() { + TABLES.dropTable(TABLE_IDENT); + } + + @Benchmark + @Threads(1) + public void benchmarkPartitionStats() { + Collection partitionStats = + PartitionStatsUtil.computeStats(table, table.currentSnapshot()); + assertThat(partitionStats).hasSize(PARTITION_PER_MANIFEST); + + PartitionStatsUtil.sortStats(partitionStats, Partitioning.partitionType(table)); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java index 8c309cf69e6d..804df01d31ba 100644 --- a/core/src/main/java/org/apache/iceberg/BaseScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseScan.java @@ -289,4 +289,21 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche public ThisT metricsReporter(MetricsReporter reporter) { return newRefinedScan(table, schema, context.reportWith(reporter)); } + + /** + * Retrieves a list of column names based on the type of manifest content provided. + * + * @param content the manifest content type to scan. + * @return a list of column names corresponding to the specified manifest content type. + */ + static List scanColumns(ManifestContent content) { + switch (content) { + case DATA: + return BaseScan.SCAN_COLUMNS; + case DELETES: + return BaseScan.DELETE_SCAN_COLUMNS; + default: + throw new UnsupportedOperationException("Cannot read unknown manifest type: " + content); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java new file mode 100644 index 000000000000..e4cbd1f6b9bd --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class PartitionStats implements StructLike { + + private static final int STATS_COUNT = 12; + + private StructLike partition; + private int specId; + private long dataRecordCount; + private int dataFileCount; + private long totalDataFileSizeInBytes; + private long positionDeleteRecordCount; + private int positionDeleteFileCount; + private long equalityDeleteRecordCount; + private int equalityDeleteFileCount; + private long totalRecordCount; + private Long lastUpdatedAt; // null by default + private Long lastUpdatedSnapshotId; // null by default + + public PartitionStats(StructLike partition, int specId) { + this.partition = partition; + this.specId = specId; + } + + public StructLike partition() { + return partition; + } + + public int specId() { + return specId; + } + + public long dataRecordCount() { + return dataRecordCount; + } + + public int dataFileCount() { + return dataFileCount; + } + + public long totalDataFileSizeInBytes() { + return totalDataFileSizeInBytes; + } + + public long positionDeleteRecordCount() { + return positionDeleteRecordCount; + } + + public int positionDeleteFileCount() { + return positionDeleteFileCount; + } + + public long equalityDeleteRecordCount() { + return equalityDeleteRecordCount; + } + + public int equalityDeleteFileCount() { + return equalityDeleteFileCount; + } + + public long totalRecordCount() { + return totalRecordCount; + } + + public Long lastUpdatedAt() { + return lastUpdatedAt; + } + + public Long lastUpdatedSnapshotId() { + return lastUpdatedSnapshotId; + } + + /** + * Updates the partition stats from the data/delete file. + * + * @param file the {@link ContentFile} from the manifest entry. + * @param snapshot the snapshot corresponding to the live entry. + */ + public void liveEntry(ContentFile file, Snapshot snapshot) { + Preconditions.checkArgument(specId == file.specId(), "Spec IDs must match"); + + switch (file.content()) { + case DATA: + this.dataRecordCount += file.recordCount(); + this.dataFileCount += 1; + this.totalDataFileSizeInBytes += file.fileSizeInBytes(); + break; + case POSITION_DELETES: + this.positionDeleteRecordCount += file.recordCount(); + this.positionDeleteFileCount += 1; + break; + case EQUALITY_DELETES: + this.equalityDeleteRecordCount += file.recordCount(); + this.equalityDeleteFileCount += 1; + break; + default: + throw new UnsupportedOperationException("Unsupported file content type: " + file.content()); + } + + if (snapshot != null) { + updateSnapshotInfo(snapshot.snapshotId(), snapshot.timestampMillis()); + } + + // Note: Not computing the `TOTAL_RECORD_COUNT` for now as it needs scanning the data. + } + + /** + * Updates the modified time and snapshot ID for the deleted manifest entry. + * + * @param snapshot the snapshot corresponding to the deleted manifest entry. + */ + public void deletedEntry(Snapshot snapshot) { + if (snapshot != null) { + updateSnapshotInfo(snapshot.snapshotId(), snapshot.timestampMillis()); + } + } + + /** + * Appends statistics from given entry to current entry. + * + * @param entry the entry from which statistics will be sourced. + */ + public void appendStats(PartitionStats entry) { + Preconditions.checkArgument(specId == entry.specId(), "Spec IDs must match"); + + this.dataRecordCount += entry.dataRecordCount; + this.dataFileCount += entry.dataFileCount; + this.totalDataFileSizeInBytes += entry.totalDataFileSizeInBytes; + this.positionDeleteRecordCount += entry.positionDeleteRecordCount; + this.positionDeleteFileCount += entry.positionDeleteFileCount; + this.equalityDeleteRecordCount += entry.equalityDeleteRecordCount; + this.equalityDeleteFileCount += entry.equalityDeleteFileCount; + this.totalRecordCount += entry.totalRecordCount; + + if (entry.lastUpdatedAt != null) { + updateSnapshotInfo(entry.lastUpdatedSnapshotId, entry.lastUpdatedAt); + } + } + + private void updateSnapshotInfo(long snapshotId, long updatedAt) { + if (lastUpdatedAt == null || lastUpdatedAt < updatedAt) { + this.lastUpdatedAt = updatedAt; + this.lastUpdatedSnapshotId = snapshotId; + } + } + + @Override + public int size() { + return STATS_COUNT; + } + + @Override + public T get(int pos, Class javaClass) { + switch (pos) { + case 0: + return javaClass.cast(partition); + case 1: + return javaClass.cast(specId); + case 2: + return javaClass.cast(dataRecordCount); + case 3: + return javaClass.cast(dataFileCount); + case 4: + return javaClass.cast(totalDataFileSizeInBytes); + case 5: + return javaClass.cast(positionDeleteRecordCount); + case 6: + return javaClass.cast(positionDeleteFileCount); + case 7: + return javaClass.cast(equalityDeleteRecordCount); + case 8: + return javaClass.cast(equalityDeleteFileCount); + case 9: + return javaClass.cast(totalRecordCount); + case 10: + return javaClass.cast(lastUpdatedAt); + case 11: + return javaClass.cast(lastUpdatedSnapshotId); + default: + throw new UnsupportedOperationException("Unknown position: " + pos); + } + } + + @Override + public void set(int pos, T value) { + switch (pos) { + case 0: + this.partition = (StructLike) value; + break; + case 1: + this.specId = (int) value; + break; + case 2: + this.dataRecordCount = (long) value; + break; + case 3: + this.dataFileCount = (int) value; + break; + case 4: + this.totalDataFileSizeInBytes = (long) value; + break; + case 5: + // optional field as per spec, implementation initialize to 0 for counters + this.positionDeleteRecordCount = value == null ? 0L : (long) value; + break; + case 6: + // optional field as per spec, implementation initialize to 0 for counters + this.positionDeleteFileCount = value == null ? 0 : (int) value; + break; + case 7: + // optional field as per spec, implementation initialize to 0 for counters + this.equalityDeleteRecordCount = value == null ? 0L : (long) value; + break; + case 8: + // optional field as per spec, implementation initialize to 0 for counters + this.equalityDeleteFileCount = value == null ? 0 : (int) value; + break; + case 9: + // optional field as per spec, implementation initialize to 0 for counters + this.totalRecordCount = value == null ? 0L : (long) value; + break; + case 10: + this.lastUpdatedAt = (Long) value; + break; + case 11: + this.lastUpdatedSnapshotId = (Long) value; + break; + default: + throw new UnsupportedOperationException("Unknown position: " + pos); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java new file mode 100644 index 000000000000..1fe4e6767fe6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Queues; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.PartitionMap; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; + +public class PartitionStatsUtil { + + private PartitionStatsUtil() {} + + /** + * Computes the partition stats for the given snapshot of the table. + * + * @param table the table for which partition stats to be computed. + * @param snapshot the snapshot for which partition stats is computed. + * @return the collection of {@link PartitionStats} + */ + public static Collection computeStats(Table table, Snapshot snapshot) { + Preconditions.checkArgument(table != null, "table cannot be null"); + Preconditions.checkArgument(Partitioning.isPartitioned(table), "table must be partitioned"); + Preconditions.checkArgument(snapshot != null, "snapshot cannot be null"); + + StructType partitionType = Partitioning.partitionType(table); + List manifests = snapshot.allManifests(table.io()); + Queue> statsByManifest = Queues.newConcurrentLinkedQueue(); + Tasks.foreach(manifests) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(ThreadPools.getWorkerPool()) + .run(manifest -> statsByManifest.add(collectStats(table, manifest, partitionType))); + + return mergeStats(statsByManifest, table.specs()); + } + + /** + * Sorts the {@link PartitionStats} based on the partition data. + * + * @param stats collection of {@link PartitionStats} which needs to be sorted. + * @param partitionType unified partition schema. + * @return the list of {@link PartitionStats} + */ + public static List sortStats( + Collection stats, StructType partitionType) { + List entries = Lists.newArrayList(stats); + entries.sort(partitionStatsCmp(partitionType)); + return entries; + } + + private static Comparator partitionStatsCmp(StructType partitionType) { + return Comparator.comparing(PartitionStats::partition, Comparators.forType(partitionType)); + } + + private static PartitionMap collectStats( + Table table, ManifestFile manifest, StructType partitionType) { + try (ManifestReader reader = openManifest(table, manifest)) { + PartitionMap statsMap = PartitionMap.create(table.specs()); + int specId = manifest.partitionSpecId(); + PartitionSpec spec = table.specs().get(specId); + PartitionData keyTemplate = new PartitionData(partitionType); + + for (ManifestEntry entry : reader.entries()) { + ContentFile file = entry.file(); + StructLike coercedPartition = + PartitionUtil.coercePartition(partitionType, spec, file.partition()); + StructLike key = keyTemplate.copyFor(coercedPartition); + Snapshot snapshot = table.snapshot(entry.snapshotId()); + PartitionStats stats = + statsMap.computeIfAbsent(specId, key, () -> new PartitionStats(key, specId)); + if (entry.isLive()) { + stats.liveEntry(file, snapshot); + } else { + stats.deletedEntry(snapshot); + } + } + + return statsMap; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static ManifestReader openManifest(Table table, ManifestFile manifest) { + List projection = BaseScan.scanColumns(manifest.content()); + return ManifestFiles.open(manifest, table.io()).select(projection); + } + + private static Collection mergeStats( + Queue> statsByManifest, Map specs) { + PartitionMap statsMap = PartitionMap.create(specs); + + for (PartitionMap stats : statsByManifest) { + stats.forEach( + (key, value) -> + statsMap.merge( + key, + value, + (existingEntry, newEntry) -> { + existingEntry.appendStats(newEntry); + return existingEntry; + })); + } + + return statsMap.values(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index 7e4fcae333d8..832e0b59fe50 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -242,6 +242,16 @@ public static StructType partitionType(Table table) { return buildPartitionProjectionType("table partition", specs, allFieldIds(specs)); } + /** + * Checks if any of the specs in a table is partitioned. + * + * @param table the table to check. + * @return {@code true} if the table is partitioned, {@code false} otherwise. + */ + public static boolean isPartitioned(Table table) { + return table.specs().values().stream().anyMatch(PartitionSpec::isPartitioned); + } + private static StructType buildPartitionProjectionType( String typeName, Collection specs, Set projectedFieldIds) { diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index 5ff796e95827..6d0fc8c235f9 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -201,7 +201,7 @@ private static CloseableIterable> readEntries( return CloseableIterable.transform( ManifestFiles.open(manifest, table.io(), table.specs()) .caseSensitive(scan.isCaseSensitive()) - .select(scanColumns(manifest.content())) // don't select stats columns + .select(BaseScan.scanColumns(manifest.content())) // don't select stats columns .liveEntries(), t -> (ManifestEntry>) @@ -209,17 +209,6 @@ private static CloseableIterable> readEntries( t.copyWithoutStats()); } - private static List scanColumns(ManifestContent content) { - switch (content) { - case DATA: - return BaseScan.SCAN_COLUMNS; - case DELETES: - return BaseScan.DELETE_SCAN_COLUMNS; - default: - throw new UnsupportedOperationException("Cannot read unknown manifest type: " + content); - } - } - private static CloseableIterable filteredManifests( StaticTableScan scan, Table table, List manifestFilesList) { CloseableIterable manifestFiles = diff --git a/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java b/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java index e48f23ff9a0b..f66496ae6624 100644 --- a/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java +++ b/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java @@ -86,6 +86,21 @@ public static DeleteFile generatePositionDeleteFile(Table table, StructLike part .build(); } + public static DeleteFile generateEqualityDeleteFile(Table table, StructLike partition) { + PartitionSpec spec = table.spec(); + LocationProvider locations = table.locationProvider(); + String path = locations.newDataLocation(spec, partition, generateFileName()); + long fileSize = generateFileSize(); + return FileMetadata.deleteFileBuilder(spec) + .ofEqualityDeletes() + .withPartition(partition) + .withPath(path) + .withFileSizeInBytes(fileSize) + .withFormat(FileFormat.PARQUET) + .withRecordCount(generateRowCount()) + .build(); + } + public static DeleteFile generatePositionDeleteFile(Table table, DataFile dataFile) { PartitionSpec spec = table.spec(); StructLike partition = dataFile.partition(); diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java b/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java new file mode 100644 index 000000000000..541fcd2ca22d --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.assertj.core.groups.Tuple; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestPartitionStatsUtil { + private static final Schema SCHEMA = + new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + + protected static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build(); + + @TempDir public File temp; + + @Test + public void testPartitionStatsOnEmptyTable() throws Exception { + Table testTable = TestTables.create(tempDir("empty_table"), "empty_table", SCHEMA, SPEC, 2); + assertThatThrownBy( + () -> PartitionStatsUtil.computeStats(testTable, testTable.currentSnapshot())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("snapshot cannot be null"); + } + + @Test + public void testPartitionStatsOnUnPartitionedTable() throws Exception { + Table testTable = + TestTables.create( + tempDir("unpartitioned_table"), + "unpartitioned_table", + SCHEMA, + PartitionSpec.unpartitioned(), + 2); + + List files = prepareDataFiles(testTable); + AppendFiles appendFiles = testTable.newAppend(); + files.forEach(appendFiles::appendFile); + appendFiles.commit(); + + assertThatThrownBy( + () -> PartitionStatsUtil.computeStats(testTable, testTable.currentSnapshot())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("table must be partitioned"); + } + + @Test + public void testPartitionStats() throws Exception { + Table testTable = + TestTables.create( + tempDir("partition_stats_compute"), "partition_stats_compute", SCHEMA, SPEC, 2); + + List files = prepareDataFiles(testTable); + for (int i = 0; i < 3; i++) { + // insert same set of records thrice to have a new manifest files + AppendFiles appendFiles = testTable.newAppend(); + files.forEach(appendFiles::appendFile); + appendFiles.commit(); + } + + Snapshot snapshot1 = testTable.currentSnapshot(); + Types.StructType partitionType = Partitioning.partitionType(testTable); + computeAndValidatePartitionStats( + testTable, + Tuple.tuple( + partitionData(partitionType, "foo", "A"), + 0, + 3 * files.get(0).recordCount(), + 3, + 3 * files.get(0).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "foo", "B"), + 0, + 3 * files.get(1).recordCount(), + 3, + 3 * files.get(1).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "bar", "A"), + 0, + 3 * files.get(2).recordCount(), + 3, + 3 * files.get(2).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "bar", "B"), + 0, + 3 * files.get(3).recordCount(), + 3, + 3 * files.get(3).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId())); + + DeleteFile deleteFile = + FileGenerationUtil.generatePositionDeleteFile(testTable, TestHelpers.Row.of("foo", "A")); + testTable.newRowDelta().addDeletes(deleteFile).commit(); + Snapshot snapshot2 = testTable.currentSnapshot(); + + DeleteFile eqDelete = + FileGenerationUtil.generateEqualityDeleteFile(testTable, TestHelpers.Row.of("bar", "B")); + testTable.newRowDelta().addDeletes(eqDelete).commit(); + Snapshot snapshot3 = testTable.currentSnapshot(); + + computeAndValidatePartitionStats( + testTable, + Tuple.tuple( + partitionData(partitionType, "foo", "A"), + 0, + 3 * files.get(0).recordCount(), + 3, + 3 * files.get(0).fileSizeInBytes(), + deleteFile.recordCount(), // position delete file count + 1, // one position delete file + 0L, + 0, + 0L, + snapshot2.timestampMillis(), // new snapshot from pos delete commit + snapshot2.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "foo", "B"), + 0, + 3 * files.get(1).recordCount(), + 3, + 3 * files.get(1).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "bar", "A"), + 0, + 3 * files.get(2).recordCount(), + 3, + 3 * files.get(2).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "bar", "B"), + 0, + 3 * files.get(3).recordCount(), + 3, + 3 * files.get(3).fileSizeInBytes(), + 0L, + 0, + eqDelete.recordCount(), + 1, // one equality delete file + 0L, + snapshot3.timestampMillis(), // new snapshot from equality delete commit + snapshot3.snapshotId())); + } + + @Test + @SuppressWarnings("MethodLength") + public void testPartitionStatsWithSchemaEvolution() throws Exception { + final PartitionSpec specBefore = PartitionSpec.builderFor(SCHEMA).identity("c2").build(); + + Table testTable = + TestTables.create( + tempDir("partition_stats_schema_evolve"), + "partition_stats_schema_evolve", + SCHEMA, + specBefore, + SortOrder.unsorted(), + 2); + + List dataFiles = prepareDataFilesOnePart(testTable); + for (int i = 0; i < 2; i++) { + AppendFiles appendFiles = testTable.newAppend(); + dataFiles.forEach(appendFiles::appendFile); + appendFiles.commit(); + } + Snapshot snapshot1 = testTable.currentSnapshot(); + Types.StructType partitionType = Partitioning.partitionType(testTable); + + computeAndValidatePartitionStats( + testTable, + Tuple.tuple( + partitionData(partitionType, "foo"), + 0, + 2 * dataFiles.get(0).recordCount(), + 2, + 2 * dataFiles.get(0).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "bar"), + 0, + 2 * dataFiles.get(1).recordCount(), + 2, + 2 * dataFiles.get(1).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId())); + + // Evolve the partition spec to include c3 + testTable.updateSpec().addField("c3").commit(); + List filesWithNewSpec = prepareDataFiles(testTable); + filesWithNewSpec.add( + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("bar", null))); + partitionType = Partitioning.partitionType(testTable); + + AppendFiles appendFiles = testTable.newAppend(); + filesWithNewSpec.forEach(appendFiles::appendFile); + appendFiles.commit(); + Snapshot snapshot2 = testTable.currentSnapshot(); + + computeAndValidatePartitionStats( + testTable, + Tuple.tuple( + partitionData(partitionType, "foo", null), // unified tuple + 0, // old spec id as the record is unmodified + 2 * dataFiles.get(0).recordCount(), + 2, + 2 * dataFiles.get(0).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "bar", null), + 0, // old spec id for "bar, null" before evolution + 2 * dataFiles.get(1).recordCount(), + 2, + 2 * dataFiles.get(1).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "bar", null), + 1, // new spec id for "bar, null" after evolution + filesWithNewSpec.get(4).recordCount(), + 1, + filesWithNewSpec.get(4).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), // new snapshot + snapshot2.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "foo", "A"), + 1, // new spec id + filesWithNewSpec.get(0).recordCount(), + 1, + filesWithNewSpec.get(0).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), // new snapshot + snapshot2.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "foo", "B"), + 1, + filesWithNewSpec.get(1).recordCount(), + 1, + filesWithNewSpec.get(1).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), + snapshot2.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "bar", "A"), + 1, + filesWithNewSpec.get(2).recordCount(), + 1, + filesWithNewSpec.get(2).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), + snapshot2.snapshotId()), + Tuple.tuple( + partitionData(partitionType, "bar", "B"), + 1, + filesWithNewSpec.get(3).recordCount(), + 1, + filesWithNewSpec.get(3).fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), + snapshot2.snapshotId())); + } + + private static PartitionData partitionData(Types.StructType partitionType, String c2, String c3) { + PartitionData partitionData = new PartitionData(partitionType); + partitionData.set(0, c2); + partitionData.set(1, c3); + return partitionData; + } + + private static PartitionData partitionData(Types.StructType partitionType, String c2) { + PartitionData partitionData = new PartitionData(partitionType); + partitionData.set(0, c2); + return partitionData; + } + + private static List prepareDataFiles(Table table) { + List dataFiles = Lists.newArrayList(); + dataFiles.add(FileGenerationUtil.generateDataFile(table, TestHelpers.Row.of("foo", "A"))); + dataFiles.add(FileGenerationUtil.generateDataFile(table, TestHelpers.Row.of("foo", "B"))); + dataFiles.add(FileGenerationUtil.generateDataFile(table, TestHelpers.Row.of("bar", "A"))); + dataFiles.add(FileGenerationUtil.generateDataFile(table, TestHelpers.Row.of("bar", "B"))); + + return dataFiles; + } + + private static List prepareDataFilesOnePart(Table table) { + List dataFiles = Lists.newArrayList(); + dataFiles.add(FileGenerationUtil.generateDataFile(table, TestHelpers.Row.of("foo"))); + dataFiles.add(FileGenerationUtil.generateDataFile(table, TestHelpers.Row.of("bar"))); + + return dataFiles; + } + + private static void computeAndValidatePartitionStats(Table testTable, Tuple... expectedValues) { + // compute and commit partition stats file + Collection result = + PartitionStatsUtil.computeStats(testTable, testTable.currentSnapshot()); + + assertThat(result) + .extracting( + PartitionStats::partition, + PartitionStats::specId, + PartitionStats::dataRecordCount, + PartitionStats::dataFileCount, + PartitionStats::totalDataFileSizeInBytes, + PartitionStats::positionDeleteRecordCount, + PartitionStats::positionDeleteFileCount, + PartitionStats::equalityDeleteRecordCount, + PartitionStats::equalityDeleteFileCount, + PartitionStats::totalRecordCount, + PartitionStats::lastUpdatedAt, + PartitionStats::lastUpdatedSnapshotId) + .containsExactlyInAnyOrder(expectedValues); + } + + private File tempDir(String folderName) throws IOException { + return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile(); + } +}