diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java new file mode 100644 index 000000000000..1216f9d1bc89 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Queues; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.PartitionMap; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; + +/** + * Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers + * to support writing and reading of the stats in table default format. + */ +public class PartitionStatsHandler { + + private PartitionStatsHandler() {} + + public static final int PARTITION_FIELD_ID = 0; + public static final String PARTITION_FIELD_NAME = "partition"; + public static final NestedField SPEC_ID = NestedField.required(1, "spec_id", IntegerType.get()); + public static final NestedField DATA_RECORD_COUNT = + NestedField.required(2, "data_record_count", LongType.get()); + public static final NestedField DATA_FILE_COUNT = + NestedField.required(3, "data_file_count", IntegerType.get()); + public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES = + NestedField.required(4, "total_data_file_size_in_bytes", LongType.get()); + public static final NestedField POSITION_DELETE_RECORD_COUNT = + NestedField.optional(5, "position_delete_record_count", LongType.get()); + public static final NestedField POSITION_DELETE_FILE_COUNT = + NestedField.optional(6, "position_delete_file_count", IntegerType.get()); + public static final NestedField EQUALITY_DELETE_RECORD_COUNT = + NestedField.optional(7, "equality_delete_record_count", LongType.get()); + public static final NestedField EQUALITY_DELETE_FILE_COUNT = + NestedField.optional(8, "equality_delete_file_count", IntegerType.get()); + public static final NestedField TOTAL_RECORD_COUNT = + NestedField.optional(9, "total_record_count", LongType.get()); + public static final NestedField LAST_UPDATED_AT = + NestedField.optional(10, "last_updated_at", LongType.get()); + public static final NestedField LAST_UPDATED_SNAPSHOT_ID = + NestedField.optional(11, "last_updated_snapshot_id", LongType.get()); + + /** + * Generates the partition stats file schema based on a combined partition type which considers + * all specs in a table. + * + * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link + * Partitioning#partitionType(Table)}. + * @return a schema that corresponds to the provided unified partition type. + */ + public static Schema schema(StructType unifiedPartitionType) { + Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); + return new Schema( + NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), + SPEC_ID, + DATA_RECORD_COUNT, + DATA_FILE_COUNT, + TOTAL_DATA_FILE_SIZE_IN_BYTES, + POSITION_DELETE_RECORD_COUNT, + POSITION_DELETE_FILE_COUNT, + EQUALITY_DELETE_RECORD_COUNT, + EQUALITY_DELETE_FILE_COUNT, + TOTAL_RECORD_COUNT, + LAST_UPDATED_AT, + LAST_UPDATED_SNAPSHOT_ID); + } + + /** + * Computes and writes the {@link PartitionStatisticsFile} for a given table's current snapshot. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @return {@link PartitionStatisticsFile} for the current snapshot, or null if no statistics are + * present. + */ + public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) throws IOException { + if (table.currentSnapshot() == null) { + return null; + } + + return computeAndWriteStatsFile(table, table.currentSnapshot().snapshotId()); + } + + /** + * Computes and writes the {@link PartitionStatisticsFile} for a given table and snapshot. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @param snapshotId snapshot for which partition statistics are computed. + * @return {@link PartitionStatisticsFile} for the given snapshot, or null if no statistics are + * present. + */ + public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId) + throws IOException { + Snapshot snapshot = table.snapshot(snapshotId); + Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId); + + Collection stats = computeStats(table, snapshot); + if (stats.isEmpty()) { + return null; + } + + StructType partitionType = Partitioning.partitionType(table); + List sortedStats = sortStatsByPartition(stats, partitionType); + return writePartitionStatsFile( + table, snapshot.snapshotId(), schema(partitionType), sortedStats); + } + + @VisibleForTesting + static PartitionStatisticsFile writePartitionStatsFile( + Table table, long snapshotId, Schema dataSchema, Iterable records) + throws IOException { + FileFormat fileFormat = + FileFormat.fromString( + table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)); + + OutputFile outputFile = newPartitionStatsFile(table, fileFormat, snapshotId); + + try (FileAppender writer = + InternalData.write(fileFormat, outputFile).schema(dataSchema).build()) { + records.iterator().forEachRemaining(writer::add); + } + + return ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .path(outputFile.location()) + .fileSizeInBytes(outputFile.toInputFile().getLength()) + .build(); + } + + /** + * Reads partition statistics from the specified {@link InputFile} using given schema. + * + * @param schema The {@link Schema} of the partition statistics file. + * @param inputFile An {@link InputFile} pointing to the partition stats file. + */ + public static CloseableIterable readPartitionStatsFile( + Schema schema, InputFile inputFile) { + FileFormat fileFormat = FileFormat.fromFileName(inputFile.location()); + Preconditions.checkArgument( + fileFormat != null, "Unable to determine format of file: %s", inputFile.location()); + + CloseableIterable records = + InternalData.read(fileFormat, inputFile).project(schema).build(); + return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats); + } + + private static OutputFile newPartitionStatsFile( + Table table, FileFormat fileFormat, long snapshotId) { + Preconditions.checkArgument( + table instanceof HasTableOperations, + "Table must have operations to retrieve metadata location"); + + return table + .io() + .newOutputFile( + ((HasTableOperations) table) + .operations() + .metadataFileLocation( + fileFormat.addExtension( + String.format( + Locale.ROOT, "partition-stats-%d-%s", snapshotId, UUID.randomUUID())))); + } + + private static PartitionStats recordToPartitionStats(StructLike record) { + PartitionStats stats = + new PartitionStats( + record.get(PARTITION_FIELD_ID, StructLike.class), + record.get(SPEC_ID.fieldId(), Integer.class)); + stats.set(DATA_RECORD_COUNT.fieldId(), record.get(DATA_RECORD_COUNT.fieldId(), Long.class)); + stats.set(DATA_FILE_COUNT.fieldId(), record.get(DATA_FILE_COUNT.fieldId(), Integer.class)); + stats.set( + TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), + record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), Long.class)); + stats.set( + POSITION_DELETE_RECORD_COUNT.fieldId(), + record.get(POSITION_DELETE_RECORD_COUNT.fieldId(), Long.class)); + stats.set( + POSITION_DELETE_FILE_COUNT.fieldId(), + record.get(POSITION_DELETE_FILE_COUNT.fieldId(), Integer.class)); + stats.set( + EQUALITY_DELETE_RECORD_COUNT.fieldId(), + record.get(EQUALITY_DELETE_RECORD_COUNT.fieldId(), Long.class)); + stats.set( + EQUALITY_DELETE_FILE_COUNT.fieldId(), + record.get(EQUALITY_DELETE_FILE_COUNT.fieldId(), Integer.class)); + stats.set(TOTAL_RECORD_COUNT.fieldId(), record.get(TOTAL_RECORD_COUNT.fieldId(), Long.class)); + stats.set(LAST_UPDATED_AT.fieldId(), record.get(LAST_UPDATED_AT.fieldId(), Long.class)); + stats.set( + LAST_UPDATED_SNAPSHOT_ID.fieldId(), + record.get(LAST_UPDATED_SNAPSHOT_ID.fieldId(), Long.class)); + return stats; + } + + private static Collection computeStats(Table table, Snapshot snapshot) { + Preconditions.checkArgument(table != null, "table cannot be null"); + Preconditions.checkArgument(Partitioning.isPartitioned(table), "table must be partitioned"); + Preconditions.checkArgument(snapshot != null, "snapshot cannot be null"); + + StructType partitionType = Partitioning.partitionType(table); + List manifests = snapshot.allManifests(table.io()); + Queue> statsByManifest = Queues.newConcurrentLinkedQueue(); + Tasks.foreach(manifests) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(ThreadPools.getWorkerPool()) + .run(manifest -> statsByManifest.add(collectStats(table, manifest, partitionType))); + + return mergeStats(statsByManifest, table.specs()); + } + + private static List sortStatsByPartition( + Collection stats, StructType partitionType) { + List entries = Lists.newArrayList(stats); + entries.sort( + Comparator.comparing(PartitionStats::partition, Comparators.forType(partitionType))); + return entries; + } + + private static PartitionMap collectStats( + Table table, ManifestFile manifest, StructType partitionType) { + try (ManifestReader reader = openManifest(table, manifest)) { + PartitionMap statsMap = PartitionMap.create(table.specs()); + int specId = manifest.partitionSpecId(); + PartitionSpec spec = table.specs().get(specId); + PartitionData keyTemplate = new PartitionData(partitionType); + + for (ManifestEntry entry : reader.entries()) { + ContentFile file = entry.file(); + StructLike coercedPartition = + PartitionUtil.coercePartition(partitionType, spec, file.partition()); + StructLike key = keyTemplate.copyFor(coercedPartition); + Snapshot snapshot = table.snapshot(entry.snapshotId()); + PartitionStats stats = + statsMap.computeIfAbsent( + specId, + ((PartitionData) file.partition()).copy(), + () -> new PartitionStats(key, specId)); + if (entry.isLive()) { + stats.liveEntry(file, snapshot); + } else { + stats.deletedEntry(snapshot); + } + } + + return statsMap; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static ManifestReader openManifest(Table table, ManifestFile manifest) { + List projection = BaseScan.scanColumns(manifest.content()); + return ManifestFiles.open(manifest, table.io()).select(projection); + } + + private static Collection mergeStats( + Queue> statsByManifest, Map specs) { + PartitionMap statsMap = PartitionMap.create(specs); + + for (PartitionMap stats : statsByManifest) { + stats.forEach( + (key, value) -> + statsMap.merge( + key, + value, + (existingEntry, newEntry) -> { + existingEntry.appendStats(newEntry); + return existingEntry; + })); + } + + return statsMap.values(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java index 4cf1902937f3..ceb0fb9005d7 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java @@ -35,6 +35,11 @@ import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; +/** + * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link + * org.apache.iceberg.PartitionStatsHandler} directly + */ +@Deprecated public class PartitionStatsUtil { private PartitionStatsUtil() {} diff --git a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java similarity index 72% rename from data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java rename to core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java index f0fb0455e57a..b68ef93e1c6d 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java @@ -16,19 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.data; - -import static org.apache.iceberg.data.PartitionStatsHandler.DATA_FILE_COUNT; -import static org.apache.iceberg.data.PartitionStatsHandler.DATA_RECORD_COUNT; -import static org.apache.iceberg.data.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT; -import static org.apache.iceberg.data.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT; -import static org.apache.iceberg.data.PartitionStatsHandler.LAST_UPDATED_AT; -import static org.apache.iceberg.data.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID; -import static org.apache.iceberg.data.PartitionStatsHandler.PARTITION_FIELD_ID; -import static org.apache.iceberg.data.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT; -import static org.apache.iceberg.data.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT; -import static org.apache.iceberg.data.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES; -import static org.apache.iceberg.data.PartitionStatsHandler.TOTAL_RECORD_COUNT; +package org.apache.iceberg; + +import static org.apache.iceberg.PartitionStatsHandler.DATA_FILE_COUNT; +import static org.apache.iceberg.PartitionStatsHandler.DATA_RECORD_COUNT; +import static org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT; +import static org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT; +import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_AT; +import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID; +import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_ID; +import static org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT; +import static org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT; +import static org.apache.iceberg.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES; +import static org.apache.iceberg.PartitionStatsHandler.TOTAL_RECORD_COUNT; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; @@ -42,48 +42,27 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Random; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Files; -import org.apache.iceberg.Parameter; -import org.apache.iceberg.ParameterizedTestExtension; -import org.apache.iceberg.Parameters; -import org.apache.iceberg.PartitionData; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PartitionStatisticsFile; -import org.apache.iceberg.PartitionStats; -import org.apache.iceberg.Partitioning; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.SortOrder; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.TestHelpers; -import org.apache.iceberg.TestTables; -import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; -import org.assertj.core.api.Assumptions; import org.assertj.core.groups.Tuple; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; -@ExtendWith(ParameterizedTestExtension.class) -public class TestPartitionStatsHandler { +public abstract class PartitionStatsHandlerTestBase { + + public abstract FileFormat format(); + private static final Schema SCHEMA = new Schema( optional(1, "c1", Types.IntegerType.get()), @@ -97,22 +76,22 @@ public class TestPartitionStatsHandler { private static final Random RANDOM = ThreadLocalRandom.current(); - @Parameters(name = "fileFormat = {0}") - public static List parameters() { - return Arrays.asList(FileFormat.PARQUET, FileFormat.ORC, FileFormat.AVRO); - } - - @Parameter private FileFormat format; + private final Map fileFormatProperty = + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format().name()); @Test public void testPartitionStatsOnEmptyTable() throws Exception { - Table testTable = TestTables.create(tempDir("empty_table"), "empty_table", SCHEMA, SPEC, 2); + Table testTable = + TestTables.create( + tempDir("empty_table"), "empty_table", SCHEMA, SPEC, 2, fileFormatProperty); assertThat(PartitionStatsHandler.computeAndWriteStatsFile(testTable)).isNull(); } @Test public void testPartitionStatsOnEmptyBranch() throws Exception { - Table testTable = TestTables.create(tempDir("empty_branch"), "empty_branch", SCHEMA, SPEC, 2); + Table testTable = + TestTables.create( + tempDir("empty_branch"), "empty_branch", SCHEMA, SPEC, 2, fileFormatProperty); testTable.manageSnapshots().createBranch("b1").commit(); long branchSnapshot = testTable.refs().get("b1").snapshotId(); assertThat(PartitionStatsHandler.computeAndWriteStatsFile(testTable, branchSnapshot)).isNull(); @@ -121,7 +100,8 @@ public void testPartitionStatsOnEmptyBranch() throws Exception { @Test public void testPartitionStatsOnInvalidSnapshot() throws Exception { Table testTable = - TestTables.create(tempDir("invalid_snapshot"), "invalid_snapshot", SCHEMA, SPEC, 2); + TestTables.create( + tempDir("invalid_snapshot"), "invalid_snapshot", SCHEMA, SPEC, 2, fileFormatProperty); assertThatThrownBy(() -> PartitionStatsHandler.computeAndWriteStatsFile(testTable, 42L)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Snapshot not found: 42"); @@ -135,10 +115,10 @@ public void testPartitionStatsOnUnPartitionedTable() throws Exception { "unpartitioned_table", SCHEMA, PartitionSpec.unpartitioned(), - 2); + 2, + fileFormatProperty); - List records = prepareRecords(testTable.schema()); - DataFile dataFile = FileHelpers.writeDataFile(testTable, outputFile(), records); + DataFile dataFile = FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of()); testTable.newAppend().appendFile(dataFile).commit(); assertThatThrownBy(() -> PartitionStatsHandler.computeAndWriteStatsFile(testTable)) @@ -189,7 +169,7 @@ public void testAllDatatypePartitionWriting() throws Exception { Table testTable = TestTables.create( - tempDir("test_all_type"), "test_all_type", schema, spec, SortOrder.unsorted(), 2); + tempDir("test_all_type"), "test_all_type", schema, spec, 2, fileFormatProperty); Types.StructType partitionSchema = Partitioning.partitionType(testTable); Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); @@ -244,8 +224,8 @@ public void testOptionalFieldsWriting() throws Exception { "test_partition_stats_optional", SCHEMA, spec, - SortOrder.unsorted(), - 2); + 2, + fileFormatProperty); Types.StructType partitionSchema = Partitioning.partitionType(testTable); Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); @@ -305,34 +285,25 @@ public void testOptionalFieldsWriting() throws Exception { } @SuppressWarnings("checkstyle:MethodLength") - @TestTemplate // Tests for all the table formats (PARQUET, ORC, AVRO) + @Test public void testPartitionStats() throws Exception { - Assumptions.assumeThat(format) - .as("ORC internal readers and writers are not supported") - .isNotEqualTo(FileFormat.ORC); - Table testTable = TestTables.create( - tempDir("partition_stats_" + format.name()), - "partition_stats_compute_" + format.name(), + tempDir("partition_stats_compute"), + "partition_stats_compute", SCHEMA, SPEC, 2, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + fileFormatProperty); - List records = prepareRecords(testTable.schema()); DataFile dataFile1 = - FileHelpers.writeDataFile( - testTable, outputFile(), TestHelpers.Row.of("foo", "A"), records.subList(0, 3)); + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("foo", "A")); DataFile dataFile2 = - FileHelpers.writeDataFile( - testTable, outputFile(), TestHelpers.Row.of("foo", "B"), records.subList(3, 4)); + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("foo", "B")); DataFile dataFile3 = - FileHelpers.writeDataFile( - testTable, outputFile(), TestHelpers.Row.of("bar", "A"), records.subList(4, 5)); + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("bar", "A")); DataFile dataFile4 = - FileHelpers.writeDataFile( - testTable, outputFile(), TestHelpers.Row.of("bar", "B"), records.subList(5, 7)); + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("bar", "B")); for (int i = 0; i < 3; i++) { // insert same set of seven records thrice to have a new manifest files @@ -355,7 +326,7 @@ public void testPartitionStats() throws Exception { Tuple.tuple( partitionRecord(partitionType, "foo", "A"), 0, - 9L, + 3 * dataFile1.recordCount(), 3, 3 * dataFile1.fileSizeInBytes(), 0L, @@ -368,7 +339,7 @@ public void testPartitionStats() throws Exception { Tuple.tuple( partitionRecord(partitionType, "foo", "B"), 0, - 3L, + 3 * dataFile2.recordCount(), 3, 3 * dataFile2.fileSizeInBytes(), 0L, @@ -381,7 +352,7 @@ public void testPartitionStats() throws Exception { Tuple.tuple( partitionRecord(partitionType, "bar", "A"), 0, - 3L, + 3 * dataFile3.recordCount(), 3, 3 * dataFile3.fileSizeInBytes(), 0L, @@ -394,7 +365,7 @@ public void testPartitionStats() throws Exception { Tuple.tuple( partitionRecord(partitionType, "bar", "B"), 0, - 6L, + 3 * dataFile4.recordCount(), 3, 3 * dataFile4.fileSizeInBytes(), 0L, @@ -405,7 +376,7 @@ public void testPartitionStats() throws Exception { snapshot1.timestampMillis(), snapshot1.snapshotId())); - DeleteFile posDeletes = commitPositionDeletes(testTable, dataFile1); + DeleteFile posDeletes = commitPositionDeletes(testTable); Snapshot snapshot2 = testTable.currentSnapshot(); DeleteFile eqDeletes = commitEqualityDeletes(testTable); @@ -419,7 +390,7 @@ public void testPartitionStats() throws Exception { Tuple.tuple( partitionRecord(partitionType, "foo", "A"), 0, - 9L, + 3 * dataFile1.recordCount(), 3, 3 * dataFile1.fileSizeInBytes(), 0L, @@ -432,7 +403,7 @@ public void testPartitionStats() throws Exception { Tuple.tuple( partitionRecord(partitionType, "foo", "B"), 0, - 3L, + 3 * dataFile2.recordCount(), 3, 3 * dataFile2.fileSizeInBytes(), 0L, @@ -445,7 +416,7 @@ public void testPartitionStats() throws Exception { Tuple.tuple( partitionRecord(partitionType, "bar", "A"), 0, - 3L, + 3 * dataFile3.recordCount(), 3, 3 * dataFile3.fileSizeInBytes(), posDeletes.recordCount(), @@ -458,7 +429,7 @@ public void testPartitionStats() throws Exception { Tuple.tuple( partitionRecord(partitionType, "bar", "B"), 0, - 6L, + 3 * dataFile4.recordCount(), 3, 3 * dataFile4.fileSizeInBytes(), 0L, @@ -470,10 +441,6 @@ public void testPartitionStats() throws Exception { snapshot1.snapshotId())); } - private OutputFile outputFile() throws IOException { - return Files.localOutput(File.createTempFile("data", null, tempDir("stats"))); - } - private static StructLike partitionRecord( Types.StructType partitionType, String val1, String val2) { GenericRecord record = GenericRecord.create(partitionType); @@ -482,24 +449,6 @@ private static StructLike partitionRecord( return record; } - private static List prepareRecords(Schema schema) { - GenericRecord record = GenericRecord.create(schema); - List records = Lists.newArrayList(); - // foo 4 records, bar 3 records - // foo, A -> 3 records - records.add(record.copy("c1", 0, "c2", "foo", "c3", "A")); - records.add(record.copy("c1", 1, "c2", "foo", "c3", "A")); - records.add(record.copy("c1", 2, "c2", "foo", "c3", "A")); - // foo, B -> 1 record - records.add(record.copy("c1", 3, "c2", "foo", "c3", "B")); - // bar, A -> 1 record - records.add(record.copy("c1", 4, "c2", "bar", "c3", "A")); - // bar, B -> 2 records - records.add(record.copy("c1", 5, "c2", "bar", "c3", "B")); - records.add(record.copy("c1", 6, "c2", "bar", "c3", "B")); - return records; - } - private static void computeAndValidatePartitionStats( Table testTable, Schema recordSchema, Tuple... expectedValues) throws IOException { // compute and commit partition stats file @@ -533,49 +482,17 @@ private static void computeAndValidatePartitionStats( .containsExactlyInAnyOrder(expectedValues); } - private DeleteFile commitEqualityDeletes(Table testTable) throws IOException { - Schema deleteRowSchema = testTable.schema().select("c1"); - Record dataDelete = GenericRecord.create(deleteRowSchema); - List dataDeletes = - Lists.newArrayList(dataDelete.copy("c1", 1), dataDelete.copy("c1", 2)); - - DeleteFile eqDeletes = - FileHelpers.writeDeleteFile( - testTable, - Files.localOutput(File.createTempFile("junit", null, tempDir("eq_delete"))), - TestHelpers.Row.of("foo", "A"), - dataDeletes, - deleteRowSchema); - testTable.newRowDelta().addDeletes(eqDeletes).commit(); - return eqDeletes; + private DeleteFile commitEqualityDeletes(Table testTable) { + DeleteFile eqDelete = + FileGenerationUtil.generateEqualityDeleteFile(testTable, TestHelpers.Row.of("foo", "A")); + testTable.newRowDelta().addDeletes(eqDelete).commit(); + return eqDelete; } - private DeleteFile commitPositionDeletes(Table testTable, DataFile dataFile1) throws IOException { - List> deletes = Lists.newArrayList(); - for (long i = 0; i < 2; i++) { - deletes.add( - positionDelete(testTable.schema(), dataFile1.location(), i, (int) i, String.valueOf(i))); - } - - DeleteFile posDeletes = - FileHelpers.writePosDeleteFile( - testTable, - Files.localOutput(File.createTempFile("junit", null, tempDir("pos_delete"))), - TestHelpers.Row.of("bar", "A"), - deletes); - testTable.newRowDelta().addDeletes(posDeletes).commit(); - return posDeletes; - } - - private static PositionDelete positionDelete( - Schema tableSchema, CharSequence path, Long position, Object... values) { - PositionDelete posDelete = PositionDelete.create(); - GenericRecord nested = GenericRecord.create(tableSchema); - for (int i = 0; i < values.length; i++) { - nested.set(i, values[i]); - } - - posDelete.set(path, position, nested); + private DeleteFile commitPositionDeletes(Table testTable) { + DeleteFile posDelete = + FileGenerationUtil.generatePositionDeleteFile(testTable, TestHelpers.Row.of("bar", "A")); + testTable.newRowDelta().addDeletes(posDelete).commit(); return posDelete; } diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java b/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java index b4308ff28c50..98e75f262655 100644 --- a/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java +++ b/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java @@ -33,6 +33,10 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +/** + * @deprecated since 1.10.0, will be removed in 1.11.0; covered by `PartitionStatsHandlerTestBase`. + */ +@Deprecated public class TestPartitionStatsUtil { private static final Schema SCHEMA = new Schema( diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroPartitionStatsHandler.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroPartitionStatsHandler.java new file mode 100644 index 000000000000..1da6fccc4e16 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroPartitionStatsHandler.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionStatsHandlerTestBase; + +public class TestAvroPartitionStatsHandler extends PartitionStatsHandlerTestBase { + + public FileFormat format() { + return FileFormat.AVRO; + } +} diff --git a/core/src/jmh/java/org/apache/iceberg/PartitionStatsUtilBenchmark.java b/data/src/jmh/java/org/apache/iceberg/PartitionStatsHandlerBenchmark.java similarity index 81% rename from core/src/jmh/java/org/apache/iceberg/PartitionStatsUtilBenchmark.java rename to data/src/jmh/java/org/apache/iceberg/PartitionStatsHandlerBenchmark.java index 539494e34735..ec0bfa9306d5 100644 --- a/core/src/jmh/java/org/apache/iceberg/PartitionStatsUtilBenchmark.java +++ b/data/src/jmh/java/org/apache/iceberg/PartitionStatsHandlerBenchmark.java @@ -22,9 +22,12 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; -import java.util.Collection; +import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -45,7 +48,7 @@ @Measurement(iterations = 5) @Timeout(time = 1000, timeUnit = TimeUnit.HOURS) @BenchmarkMode(Mode.SingleShotTime) -public class PartitionStatsUtilBenchmark { +public class PartitionStatsHandlerBenchmark { private static final Schema SCHEMA = new Schema( @@ -95,11 +98,17 @@ public void tearDownBenchmark() { @Benchmark @Threads(1) - public void benchmarkPartitionStats() { - Collection partitionStats = - PartitionStatsUtil.computeStats(table, table.currentSnapshot()); - assertThat(partitionStats).hasSize(PARTITION_PER_MANIFEST); + public void benchmarkPartitionStats() throws IOException { + PartitionStatisticsFile statisticsFile = PartitionStatsHandler.computeAndWriteStatsFile(table); + + List stats; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + PartitionStatsHandler.schema(Partitioning.partitionType(table)), + Files.localInput(statisticsFile.path()))) { + stats = Lists.newArrayList(recordIterator); + } - PartitionStatsUtil.sortStats(partitionStats, Partitioning.partitionType(table)); + assertThat(stats).hasSize(PARTITION_PER_MANIFEST); } } diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java index 7a465a35ad57..e4901f4e8cca 100644 --- a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java +++ b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java @@ -56,7 +56,11 @@ /** * Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers * to support writing and reading of the stats in table default format. + * + * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link + * org.apache.iceberg.PartitionStatsHandler} from core module */ +@Deprecated public class PartitionStatsHandler { private PartitionStatsHandler() {} diff --git a/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java b/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java new file mode 100644 index 000000000000..f25ddccaa21a --- /dev/null +++ b/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestOrcPartitionStatsHandler extends PartitionStatsHandlerTestBase { + + public FileFormat format() { + return FileFormat.ORC; + } + + @Override + public void testAllDatatypePartitionWriting() throws Exception { + assertThatThrownBy(super::testAllDatatypePartitionWriting) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot write using unregistered internal data format: ORC"); + } + + @Override + public void testOptionalFieldsWriting() throws Exception { + assertThatThrownBy(super::testOptionalFieldsWriting) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot write using unregistered internal data format: ORC"); + } + + @Override + public void testPartitionStats() throws Exception { + assertThatThrownBy(super::testPartitionStats) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot write using unregistered internal data format: ORC"); + } +} diff --git a/parquet/src/test/java/org/apache/iceberg/TestParquetPartitionStatsHandler.java b/parquet/src/test/java/org/apache/iceberg/TestParquetPartitionStatsHandler.java new file mode 100644 index 000000000000..29bec7bca9d0 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/TestParquetPartitionStatsHandler.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +public class TestParquetPartitionStatsHandler extends PartitionStatsHandlerTestBase { + + public FileFormat format() { + return FileFormat.PARQUET; + } +}