diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java index 8c309cf69e6d..8504e1e33ecd 100644 --- a/core/src/main/java/org/apache/iceberg/BaseScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseScan.java @@ -289,4 +289,21 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche public ThisT metricsReporter(MetricsReporter reporter) { return newRefinedScan(table, schema, context.reportWith(reporter)); } + + /** + * Retrieves a list of column names based on the type of manifest content provided. + * + * @param content the manifest content type to scan. + * @return a list of column names corresponding to the specified manifest content type. + */ + public static List scanColumns(ManifestContent content) { + switch (content) { + case DATA: + return BaseScan.SCAN_COLUMNS; + case DELETES: + return BaseScan.DELETE_SCAN_COLUMNS; + default: + throw new UnsupportedOperationException("Cannot read unknown manifest type: " + content); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java new file mode 100644 index 000000000000..a7c16c6eb59b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Objects; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class PartitionStats implements StructLike { + + private Record partition; // PartitionData as Record + private int specId; + private long dataRecordCount; + private int dataFileCount; + private long totalDataFileSizeInBytes; + private long positionDeleteRecordCount; + private int positionDeleteFileCount; + private long equalityDeleteRecordCount; + private int equalityDeleteFileCount; + private long totalRecordCount; + private Long lastUpdatedAt; // null by default + private Long lastUpdatedSnapshotId; // null by default + + private static final int STATS_COUNT = 12; + + public PartitionStats(Record partition) { + this.partition = partition; + } + + public Record partition() { + return partition; + } + + public int specId() { + return specId; + } + + public void setSpecId(int specId) { + this.specId = specId; + } + + public long dataRecordCount() { + return dataRecordCount; + } + + public void setDataRecordCount(long dataRecordCount) { + this.dataRecordCount = dataRecordCount; + } + + public int dataFileCount() { + return dataFileCount; + } + + public void setDataFileCount(int dataFileCount) { + this.dataFileCount = dataFileCount; + } + + public long totalDataFileSizeInBytes() { + return totalDataFileSizeInBytes; + } + + public void setTotalDataFileSizeInBytes(long totalDataFileSizeInBytes) { + this.totalDataFileSizeInBytes = totalDataFileSizeInBytes; + } + + public long positionDeleteRecordCount() { + return positionDeleteRecordCount; + } + + public void setPositionDeleteRecordCount(long positionDeleteRecordCount) { + this.positionDeleteRecordCount = positionDeleteRecordCount; + } + + public int positionDeleteFileCount() { + return positionDeleteFileCount; + } + + public void setPositionDeleteFileCount(int positionDeleteFileCount) { + this.positionDeleteFileCount = positionDeleteFileCount; + } + + public long equalityDeleteRecordCount() { + return equalityDeleteRecordCount; + } + + public void setEqualityDeleteRecordCount(long equalityDeleteRecordCount) { + this.equalityDeleteRecordCount = equalityDeleteRecordCount; + } + + public int equalityDeleteFileCount() { + return equalityDeleteFileCount; + } + + public void setEqualityDeleteFileCount(int equalityDeleteFileCount) { + this.equalityDeleteFileCount = equalityDeleteFileCount; + } + + public long totalRecordCount() { + return totalRecordCount; + } + + public void setTotalRecordCount(long totalRecordCount) { + this.totalRecordCount = totalRecordCount; + } + + public Long lastUpdatedAt() { + return lastUpdatedAt; + } + + public void setLastUpdatedAt(Long lastUpdatedAt) { + this.lastUpdatedAt = lastUpdatedAt; + } + + public Long lastUpdatedSnapshotId() { + return lastUpdatedSnapshotId; + } + + public void setLastUpdatedSnapshotId(Long lastUpdatedSnapshotId) { + this.lastUpdatedSnapshotId = lastUpdatedSnapshotId; + } + + /** + * Updates the partition stats from the data/delete file. + * + * @param file the ContentFile from the manifest entry. + * @param snapshot the snapshot corresponding to the live entry. + */ + public void liveEntry(ContentFile file, Snapshot snapshot) { + Preconditions.checkState(file != null, "content file cannot be null"); + + specId = file.specId(); + + switch (file.content()) { + case DATA: + dataRecordCount = file.recordCount(); + dataFileCount = 1; + totalDataFileSizeInBytes = file.fileSizeInBytes(); + break; + case POSITION_DELETES: + positionDeleteRecordCount = file.recordCount(); + positionDeleteFileCount = 1; + break; + case EQUALITY_DELETES: + equalityDeleteRecordCount = file.recordCount(); + equalityDeleteFileCount = 1; + break; + default: + throw new UnsupportedOperationException("Unsupported file content type: " + file.content()); + } + + if (snapshot != null) { + lastUpdatedSnapshotId = snapshot.snapshotId(); + lastUpdatedAt = snapshot.timestampMillis(); + } + + // Note: Not computing the `TOTAL_RECORD_COUNT` for now as it needs scanning the data. + } + + /** + * Updates the modified time and snapshot ID for the deleted manifest entry. + * + * @param snapshot the snapshot corresponding to the deleted manifest entry. + */ + public void deletedEntry(Snapshot snapshot) { + if (snapshot != null && lastUpdatedAt != null && snapshot.timestampMillis() > lastUpdatedAt) { + lastUpdatedAt = snapshot.timestampMillis(); + lastUpdatedSnapshotId = snapshot.snapshotId(); + } + } + + /** + * Appends statistics from given entry to current entry. + * + * @param entry the entry from which statistics will be sourced. + */ + public void appendStats(PartitionStats entry) { + Preconditions.checkState(entry != null, "entry to update from cannot be null"); + + specId = Math.max(specId, entry.specId); + dataRecordCount += entry.dataRecordCount; + dataFileCount += entry.dataFileCount; + totalDataFileSizeInBytes += entry.totalDataFileSizeInBytes; + positionDeleteRecordCount += entry.positionDeleteRecordCount; + positionDeleteFileCount += entry.positionDeleteFileCount; + equalityDeleteRecordCount += entry.equalityDeleteRecordCount; + equalityDeleteFileCount += entry.equalityDeleteFileCount; + totalRecordCount += entry.totalRecordCount; + + if (entry.lastUpdatedAt != null) { + if (lastUpdatedAt == null || (lastUpdatedAt < entry.lastUpdatedAt)) { + lastUpdatedAt = entry.lastUpdatedAt; + lastUpdatedSnapshotId = entry.lastUpdatedSnapshotId; + } + } + } + + @Override + public int size() { + return STATS_COUNT; + } + + @Override + public T get(int pos, Class javaClass) { + switch (pos) { + case 0: + return javaClass.cast(partition); + case 1: + return javaClass.cast(specId); + case 2: + return javaClass.cast(dataRecordCount); + case 3: + return javaClass.cast(dataFileCount); + case 4: + return javaClass.cast(totalDataFileSizeInBytes); + case 5: + return javaClass.cast(positionDeleteRecordCount); + case 6: + return javaClass.cast(positionDeleteFileCount); + case 7: + return javaClass.cast(equalityDeleteRecordCount); + case 8: + return javaClass.cast(equalityDeleteFileCount); + case 9: + return javaClass.cast(totalRecordCount); + case 10: + return javaClass.cast(lastUpdatedAt); + case 11: + return javaClass.cast(lastUpdatedSnapshotId); + default: + throw new UnsupportedOperationException("Unknown position: " + pos); + } + } + + @Override + public void set(int pos, T value) { + switch (pos) { + case 0: + partition = (Record) value; + break; + case 1: + specId = (int) value; + break; + case 2: + dataRecordCount = (long) value; + break; + case 3: + dataFileCount = (int) value; + break; + case 4: + totalDataFileSizeInBytes = (long) value; + break; + case 5: + // optional field as per spec, implementation initialize to 0 for counters + positionDeleteRecordCount = value == null ? 0L : (long) value; + break; + case 6: + // optional field as per spec, implementation initialize to 0 for counters + positionDeleteFileCount = value == null ? 0 : (int) value; + break; + case 7: + // optional field as per spec, implementation initialize to 0 for counters + equalityDeleteRecordCount = value == null ? 0L : (long) value; + break; + case 8: + // optional field as per spec, implementation initialize to 0 for counters + equalityDeleteFileCount = value == null ? 0 : (int) value; + break; + case 9: + // optional field as per spec, implementation initialize to 0 for counters + totalRecordCount = value == null ? 0L : (long) value; + break; + case 10: + lastUpdatedAt = (Long) value; + break; + case 11: + lastUpdatedSnapshotId = (Long) value; + break; + default: + throw new UnsupportedOperationException("Unknown position: " + pos); + } + } + + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (!(other instanceof PartitionStats)) { + return false; + } + + PartitionStats that = (PartitionStats) other; + return Objects.equals(partition, that.partition) + && specId == that.specId + && dataRecordCount == that.dataRecordCount + && dataFileCount == that.dataFileCount + && totalDataFileSizeInBytes == that.totalDataFileSizeInBytes + && positionDeleteRecordCount == that.positionDeleteRecordCount + && positionDeleteFileCount == that.positionDeleteFileCount + && equalityDeleteRecordCount == that.equalityDeleteRecordCount + && equalityDeleteFileCount == that.equalityDeleteFileCount + && totalRecordCount == that.totalRecordCount + && Objects.equals(lastUpdatedAt, that.lastUpdatedAt) + && Objects.equals(lastUpdatedSnapshotId, that.lastUpdatedSnapshotId); + } + + @Override + public int hashCode() { + return Objects.hash( + partition, + specId, + dataRecordCount, + dataFileCount, + totalDataFileSizeInBytes, + positionDeleteRecordCount, + positionDeleteFileCount, + equalityDeleteRecordCount, + equalityDeleteFileCount, + totalRecordCount, + lastUpdatedAt, + lastUpdatedSnapshotId); + } +} diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java new file mode 100644 index 000000000000..ac50f6d78fe2 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PartitionStatsUtil { + + private PartitionStatsUtil() {} + + private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsUtil.class); + + /** + * Computes the partition stats for the given snapshot of the table. + * + * @param table the table for which partition stats to be computed. + * @param snapshot the snapshot for which partition stats is computed. + * @return iterable {@link PartitionStats} + */ + public static Iterable computeStats(Table table, Snapshot snapshot) { + Preconditions.checkState(table != null, "table cannot be null"); + Preconditions.checkState(snapshot != null, "snapshot cannot be null"); + + Types.StructType partitionType = Partitioning.partitionType(table); + Map partitionEntryMap = Maps.newConcurrentMap(); + + List manifestFiles = snapshot.allManifests(table.io()); + Tasks.foreach(manifestFiles) + .stopOnFailure() + .executeWith(ThreadPools.getWorkerPool()) + .onFailure( + (file, thrown) -> + LOG.warn( + "Failed to compute the partition stats for the manifest file: {}", + file.path(), + thrown)) + .run( + manifest -> { + try (CloseableIterable entries = + PartitionStatsUtil.fromManifest(table, manifest, partitionType)) { + entries.forEach( + entry -> { + Record partitionKey = entry.partition(); + partitionEntryMap.merge( + partitionKey, + entry, + (existingEntry, newEntry) -> { + existingEntry.appendStats(newEntry); + return existingEntry; + }); + }); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + return partitionEntryMap.values(); + } + + /** + * Sorts the {@link PartitionStats} based on the partition data. + * + * @param stats iterable {@link PartitionStats} which needs to be sorted. + * @param partitionType unified partition schema. + * @return Iterator of {@link PartitionStats} + */ + public static Iterator sortStats( + Iterable stats, Types.StructType partitionType) { + List entries = Lists.newArrayList(stats.iterator()); + entries.sort( + Comparator.comparing(PartitionStats::partition, Comparators.forType(partitionType))); + return entries.iterator(); + } + + private static CloseableIterable fromManifest( + Table table, ManifestFile manifest, Types.StructType partitionType) { + return CloseableIterable.transform( + ManifestFiles.open(manifest, table.io(), table.specs()) + .select(BaseScan.scanColumns(manifest.content())) + .entries(), + entry -> { + // partition data as per unified partition spec + Record partitionData = coercedPartitionData(entry.file(), table.specs(), partitionType); + PartitionStats partitionStats = new PartitionStats(partitionData); + if (entry.isLive()) { + partitionStats.liveEntry(entry.file(), table.snapshot(entry.snapshotId())); + } else { + partitionStats.deletedEntry(table.snapshot(entry.snapshotId())); + } + + return partitionStats; + }); + } + + private static Record coercedPartitionData( + ContentFile file, Map specs, Types.StructType partitionType) { + // keep the partition data as per the unified spec by coercing + StructLike partition = + PartitionUtil.coercePartition(partitionType, specs.get(file.specId()), file.partition()); + GenericRecord record = GenericRecord.create(partitionType); + for (int index = 0; index < partitionType.fields().size(); index++) { + Object val = + partition.get(index, partitionType.fields().get(index).type().typeId().javaClass()); + if (val != null) { + record.set(index, val); + } + } + + return record; + } +} diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index 5ff796e95827..6d0fc8c235f9 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -201,7 +201,7 @@ private static CloseableIterable> readEntries( return CloseableIterable.transform( ManifestFiles.open(manifest, table.io(), table.specs()) .caseSensitive(scan.isCaseSensitive()) - .select(scanColumns(manifest.content())) // don't select stats columns + .select(BaseScan.scanColumns(manifest.content())) // don't select stats columns .liveEntries(), t -> (ManifestEntry>) @@ -209,17 +209,6 @@ private static CloseableIterable> readEntries( t.copyWithoutStats()); } - private static List scanColumns(ManifestContent content) { - switch (content) { - case DATA: - return BaseScan.SCAN_COLUMNS; - case DELETES: - return BaseScan.DELETE_SCAN_COLUMNS; - default: - throw new UnsupportedOperationException("Cannot read unknown manifest type: " + content); - } - } - private static CloseableIterable filteredManifests( StaticTableScan scan, Table table, List manifestFilesList) { CloseableIterable manifestFiles = diff --git a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java index 4cb41263152d..114820443382 100644 --- a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java +++ b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java @@ -18,9 +18,12 @@ */ package org.apache.iceberg.data; +import java.nio.ByteBuffer; +import java.util.UUID; import org.apache.avro.generic.GenericData; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.DateTimeUtil; public class IdentityPartitionConverters { @@ -48,6 +51,18 @@ public static Object convertConstant(Type type, Object value) { case FIXED: if (value instanceof GenericData.Fixed) { return ((GenericData.Fixed) value).bytes(); + } else if (value instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) value); + } + return value; + case UUID: + if (value instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) value); + } + return value; + case BINARY: + if (value instanceof byte[]) { + return ByteBuffer.wrap((byte[]) value); } return value; default: diff --git a/core/src/main/java/org/apache/iceberg/data/PartitionStatsRecord.java b/core/src/main/java/org/apache/iceberg/data/PartitionStatsRecord.java new file mode 100644 index 000000000000..4f4cb9da68e7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/data/PartitionStatsRecord.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; + +/** + * Wraps the {@link PartitionStats} as {@link Record}. Used by generic file format writers and + * readers. + */ +public class PartitionStatsRecord implements Record, StructLike { + private static final LoadingCache> NAME_MAP_CACHE = + Caffeine.newBuilder() + .weakKeys() + .build( + struct -> { + Map idToPos = Maps.newHashMap(); + List fields = struct.fields(); + for (int index = 0; index < fields.size(); index += 1) { + idToPos.put(fields.get(index).name(), index); + } + return idToPos; + }); + + private final StructType struct; + private final PartitionStats partitionStats; + private final Map nameToPos; + + public static PartitionStatsRecord create(Schema schema, PartitionStats partitionStats) { + return new PartitionStatsRecord(schema.asStruct(), partitionStats); + } + + public static PartitionStatsRecord create(StructType struct, PartitionStats partitionStats) { + return new PartitionStatsRecord(struct, partitionStats); + } + + public PartitionStats unwrap() { + return partitionStats; + } + + private PartitionStatsRecord(StructType struct, PartitionStats partitionStats) { + this.struct = struct; + this.partitionStats = partitionStats; + this.nameToPos = NAME_MAP_CACHE.get(struct); + } + + private PartitionStatsRecord(PartitionStatsRecord toCopy) { + this.struct = toCopy.struct; + this.partitionStats = toCopy.partitionStats; + this.nameToPos = toCopy.nameToPos; + } + + private PartitionStatsRecord(PartitionStatsRecord toCopy, Map overwrite) { + this.struct = toCopy.struct; + this.partitionStats = toCopy.partitionStats; + this.nameToPos = toCopy.nameToPos; + for (Map.Entry entry : overwrite.entrySet()) { + setField(entry.getKey(), entry.getValue()); + } + } + + @Override + public StructType struct() { + return struct; + } + + @Override + public Object getField(String name) { + Integer pos = nameToPos.get(name); + if (pos != null) { + return partitionStats.get(pos, Object.class); + } + + return null; + } + + @Override + public void setField(String name, Object value) { + Integer pos = nameToPos.get(name); + Preconditions.checkArgument(pos != null, "Cannot set unknown field named: %s", name); + partitionStats.set(pos, value); + } + + @Override + public int size() { + return partitionStats.size(); + } + + @Override + public Object get(int pos) { + return partitionStats.get(pos, Object.class); + } + + @Override + public T get(int pos, Class javaClass) { + Object value = get(pos); + if (value == null || javaClass.isInstance(value)) { + return javaClass.cast(value); + } else { + throw new IllegalStateException("Not an instance of " + javaClass.getName() + ": " + value); + } + } + + @Override + public void set(int pos, T value) { + partitionStats.set(pos, value); + } + + @Override + public PartitionStatsRecord copy() { + return new PartitionStatsRecord(this); + } + + @Override + public PartitionStatsRecord copy(Map overwriteValues) { + return new PartitionStatsRecord(this, overwriteValues); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Record("); + for (int index = 0; index < partitionStats.size(); index += 1) { + if (index != 0) { + sb.append(", "); + } + sb.append(partitionStats.get(index, Object.class)); + } + sb.append(")"); + return sb.toString(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (!(other instanceof PartitionStatsRecord)) { + return false; + } + + PartitionStatsRecord that = (PartitionStatsRecord) other; + return this.partitionStats.equals(that.partitionStats); + } + + @Override + public int hashCode() { + return Objects.hashCode(partitionStats); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index eeff5db8e5a6..ccccc91369c0 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -52,6 +52,26 @@ public static TestTable create( return create(temp, name, schema, spec, SortOrder.unsorted(), formatVersion); } + public static TestTable create( + File temp, + String name, + Schema schema, + PartitionSpec spec, + int formatVersion, + Map properties) { + TestTableOperations ops = new TestTableOperations(name, temp); + if (ops.current() != null) { + throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); + } + + ops.commit( + null, + newTableMetadata( + schema, spec, SortOrder.unsorted(), temp.toString(), properties, formatVersion)); + + return new TestTable(ops, name); + } + public static TestTable create( File temp, String name, diff --git a/data/src/jmh/java/org/apache/iceberg/PartitionStatsGeneratorBenchmark.java b/data/src/jmh/java/org/apache/iceberg/PartitionStatsGeneratorBenchmark.java new file mode 100644 index 000000000000..8d1c282023be --- /dev/null +++ b/data/src/jmh/java/org/apache/iceberg/PartitionStatsGeneratorBenchmark.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.iceberg.data.PartitionStatsHandler; +import org.apache.iceberg.data.PartitionStatsRecord; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Timeout; +import org.openjdk.jmh.annotations.Warmup; + +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 2) +@Measurement(iterations = 5) +@Timeout(time = 1000, timeUnit = TimeUnit.HOURS) +@BenchmarkMode(Mode.SingleShotTime) +public class PartitionStatsGeneratorBenchmark { + + private static final Schema SCHEMA = + new Schema( + required(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + + protected static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + + private String baseDir; + + // Create 10k manifests + private static final int MANIFEST_COUNTER = 10000; + + // each manifest with 100 partition values + private static final int PARTITION_PER_MANIFEST = 100; + + // 20 data files per partition, which results in 2k data files per manifest + private static final int DATA_FILES_PER_PARTITION_COUNT = 20; + + private Table table; + + @Setup + public void setupBenchmark() { + baseDir = + Paths.get(new File(System.getProperty("java.io.tmpdir")).getAbsolutePath()).toString(); + table = TestTables.create(new File(baseDir), "foo", SCHEMA, SPEC, SortOrder.unsorted(), 2); + + IntStream.range(0, MANIFEST_COUNTER) + .forEach( + manifestCount -> { + AppendFiles appendFiles = table.newAppend(); + + IntStream.range(0, PARTITION_PER_MANIFEST) + .forEach( + partitionOrdinal -> { + StructLike partition = TestHelpers.Row.of(partitionOrdinal); + IntStream.range(0, DATA_FILES_PER_PARTITION_COUNT) + .forEach( + fileOrdinal -> + appendFiles.appendFile( + FileGenerationUtil.generateDataFile(table, partition))); + }); + + appendFiles.commit(); + }); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + if (baseDir != null) { + try (Stream walk = java.nio.file.Files.walk(Paths.get(baseDir))) { + walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } + baseDir = null; + } + } + + @Benchmark + @Threads(1) + public void benchmarkPartitionStats() throws IOException { + Snapshot currentSnapshot = table.currentSnapshot(); + + PartitionStatisticsFile result = PartitionStatsHandler.computeAndWritePartitionStatsFile(table); + table.updatePartitionStatistics().setPartitionStatistics(result).commit(); + assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); + + // validate row count + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + PartitionStatsHandler.schema(Partitioning.partitionType(table)), + Files.localInput(result.path()))) { + assertThat(recordIterator).hasSize(PARTITION_PER_MANIFEST); + } + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java new file mode 100644 index 000000000000..411f5f5f5d4f --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Iterator; +import java.util.function.BiFunction; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; +import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.PartitionStatsUtil; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SnapshotUtil; + +/** + * Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers + * to support writing and reading of the stats in table default format. + */ +public final class PartitionStatsHandler { + + private PartitionStatsHandler() {} + + public enum Column { + PARTITION(0), + SPEC_ID(1), + DATA_RECORD_COUNT(2), + DATA_FILE_COUNT(3), + TOTAL_DATA_FILE_SIZE_IN_BYTES(4), + POSITION_DELETE_RECORD_COUNT(5), + POSITION_DELETE_FILE_COUNT(6), + EQUALITY_DELETE_RECORD_COUNT(7), + EQUALITY_DELETE_FILE_COUNT(8), + TOTAL_RECORD_COUNT(9), + LAST_UPDATED_AT(10), + LAST_UPDATED_SNAPSHOT_ID(11); + + private final int id; + + Column(int id) { + this.id = id; + } + + public int id() { + return id; + } + } + + /** + * Generates the Partition Stats Files Schema based on a given partition type. + * + *

Note: Provide the unified partition tuple as mentioned in the spec. + * + * @param partitionType the struct type that defines the structure of the partition. + * @return a schema that corresponds to the provided unified partition type. + */ + public static Schema schema(Types.StructType partitionType) { + Preconditions.checkState( + !partitionType.fields().isEmpty(), "getting schema for an unpartitioned table"); + + return new Schema( + Types.NestedField.required(1, Column.PARTITION.name(), partitionType), + Types.NestedField.required(2, Column.SPEC_ID.name(), Types.IntegerType.get()), + Types.NestedField.required(3, Column.DATA_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.required(4, Column.DATA_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.required( + 5, Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), Types.LongType.get()), + Types.NestedField.optional( + 6, Column.POSITION_DELETE_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional( + 7, Column.POSITION_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.optional( + 8, Column.EQUALITY_DELETE_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional( + 9, Column.EQUALITY_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.optional(10, Column.TOTAL_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional(11, Column.LAST_UPDATED_AT.name(), Types.LongType.get()), + Types.NestedField.optional( + 12, Column.LAST_UPDATED_SNAPSHOT_ID.name(), Types.LongType.get())); + } + + /** + * Computes and writes the {@link PartitionStatisticsFile} for a given table's current snapshot. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @return {@link PartitionStatisticsFile} for the current snapshot. + */ + public static PartitionStatisticsFile computeAndWritePartitionStatsFile(Table table) { + return computeAndWritePartitionStatsFile(table, null); + } + + /** + * Computes and writes the {@link PartitionStatisticsFile} for a given table and branch. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @param branch A branch information to select the required snapshot. + * @return {@link PartitionStatisticsFile} for the given branch. + */ + public static PartitionStatisticsFile computeAndWritePartitionStatsFile( + Table table, String branch) { + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + if (currentSnapshot == null) { + Preconditions.checkArgument( + branch == null, "Couldn't find the snapshot for the branch %s", branch); + return null; + } + + Types.StructType partitionType = Partitioning.partitionType(table); + Schema schema = schema(partitionType); + + Iterable stats = PartitionStatsUtil.computeStats(table, currentSnapshot); + Iterator sortedStats = PartitionStatsUtil.sortStats(stats, partitionType); + Iterator convertedRecords = statsToRecords(sortedStats, schema); + return writePartitionStatsFile(table, currentSnapshot.snapshotId(), schema, convertedRecords); + } + + @VisibleForTesting + static ImmutableGenericPartitionStatisticsFile writePartitionStatsFile( + Table table, long snapshotId, Schema dataSchema, Iterator records) { + OutputFile outputFile = newPartitionStatsFile(table, snapshotId); + FileWriterFactory factory = + GenericFileWriterFactory.builderFor(table) + .dataSchema(dataSchema) + .dataFileFormat(fileFormat(outputFile.location())) + .build(); + DataWriter writer = + factory.newDataWriter( + EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY), + PartitionSpec.unpartitioned(), + null); + try (Closeable toClose = writer) { + records.forEachRemaining(writer::write); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .path(outputFile.location()) + .fileSizeInBytes(outputFile.toInputFile().getLength()) + .build(); + } + + /** + * Reads partition statistics from the specified {@link InputFile} using given schema. + * + * @param schema The {@link Schema} of the partition statistics file. + * @param inputFile An {@link InputFile} pointing to the partition stats file. + */ + public static CloseableIterable readPartitionStatsFile( + Schema schema, InputFile inputFile) { + CloseableIterable records; + FileFormat fileFormat = fileFormat(inputFile.location()); + switch (fileFormat) { + case PARQUET: + records = + Parquet.read(inputFile) + .project(schema) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build(); + break; + case ORC: + records = + ORC.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema)) + .build(); + break; + case AVRO: + records = Avro.read(inputFile).project(schema).createReaderFunc(DataReader::create).build(); + break; + default: + throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name()); + } + + return CloseableIterable.transform( + records, PartitionStatsHandler::recordToPartitionStatsRecord); + } + + private static FileFormat fileFormat(String fileLocation) { + return FileFormat.fromString(fileLocation.substring(fileLocation.lastIndexOf(".") + 1)); + } + + private static OutputFile newPartitionStatsFile(Table table, long snapshotId) { + FileFormat fileFormat = + fileFormat( + table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)); + return table + .io() + .newOutputFile( + ((HasTableOperations) table) + .operations() + .metadataFileLocation( + fileFormat.addExtension(String.format("partition-stats-%d", snapshotId)))); + } + + private static PartitionStatsRecord recordToPartitionStatsRecord(Record record) { + PartitionStats partitionStats = + new PartitionStats(record.get(Column.PARTITION.id(), Record.class)); + partitionStats.setSpecId(record.get(Column.SPEC_ID.id(), Integer.class)); + partitionStats.setDataRecordCount(record.get(Column.DATA_RECORD_COUNT.id(), Long.class)); + partitionStats.setDataFileCount(record.get(Column.DATA_FILE_COUNT.id(), Integer.class)); + partitionStats.setTotalDataFileSizeInBytes( + record.get(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), Long.class)); + partitionStats.setPositionDeleteRecordCount( + record.get(Column.POSITION_DELETE_RECORD_COUNT.id(), Long.class)); + partitionStats.setPositionDeleteFileCount( + record.get(Column.POSITION_DELETE_FILE_COUNT.id(), Integer.class)); + partitionStats.setEqualityDeleteRecordCount( + record.get(Column.EQUALITY_DELETE_RECORD_COUNT.id(), Long.class)); + partitionStats.setEqualityDeleteFileCount( + record.get(Column.EQUALITY_DELETE_FILE_COUNT.id(), Integer.class)); + partitionStats.setTotalRecordCount(record.get(Column.TOTAL_RECORD_COUNT.id(), Long.class)); + partitionStats.setLastUpdatedAt(record.get(Column.LAST_UPDATED_AT.id(), Long.class)); + partitionStats.setLastUpdatedSnapshotId( + record.get(Column.LAST_UPDATED_SNAPSHOT_ID.id(), Long.class)); + + return PartitionStatsRecord.create(record.struct(), partitionStats); + } + + @VisibleForTesting + static Iterator statsToRecords( + Iterator partitionStatsIterator, Schema recordSchema) { + return new TransformIteratorWithBiFunction<>( + partitionStatsIterator, + (partitionStats, schema) -> { + PartitionStatsRecord record = PartitionStatsRecord.create(schema, partitionStats); + record.set( + Column.PARTITION.id(), + convertPartitionValues(record.get(Column.PARTITION.id(), Record.class))); + return record; + }, + recordSchema); + } + + private static Record convertPartitionValues(Record partitionRecord) { + if (partitionRecord == null) { + return null; + } + + GenericRecord converted = GenericRecord.create(partitionRecord.struct()); + for (int index = 0; index < partitionRecord.size(); index++) { + Object val = partitionRecord.get(index); + if (val != null) { + converted.set( + index, + IdentityPartitionConverters.convertConstant( + partitionRecord.struct().fields().get(index).type(), val)); + } + } + + return converted; + } + + private static class TransformIteratorWithBiFunction implements Iterator { + private final Iterator iterator; + private final BiFunction transformer; + private final U additionalInput; + + TransformIteratorWithBiFunction( + Iterator iterator, BiFunction transformer, U additionalInput) { + this.iterator = iterator; + this.transformer = transformer; + this.additionalInput = additionalInput; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public R next() { + T nextElement = iterator.next(); + return transformer.apply(nextElement, additionalInput); + } + } +} diff --git a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java new file mode 100644 index 000000000000..1a8ce4304308 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java @@ -0,0 +1,712 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.apache.iceberg.data.PartitionStatsHandler.Column; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; +import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.assertj.core.groups.Tuple; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestPartitionStatsHandler { + private static final Schema SCHEMA = + new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + + protected static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build(); + + @TempDir public File temp; + + private static final Random RANDOM = ThreadLocalRandom.current(); + + @Parameters(name = "fileFormat = {0}") + public static List parameters() { + return Arrays.asList(FileFormat.PARQUET, FileFormat.ORC, FileFormat.AVRO); + } + + @Parameter private FileFormat format; + + @Test + public void testPartitionStatsOnEmptyTable() throws Exception { + Table testTable = TestTables.create(tempDir("empty_table"), "empty_table", SCHEMA, SPEC, 2); + assertThat(PartitionStatsHandler.computeAndWritePartitionStatsFile(testTable)).isNull(); + } + + @Test + public void testPartitionStatsOnEmptyBranch() throws Exception { + Table testTable = TestTables.create(tempDir("empty_branch"), "empty_branch", SCHEMA, SPEC, 2); + testTable.manageSnapshots().createBranch("b1").commit(); + PartitionStatisticsFile partitionStatisticsFile = + PartitionStatsHandler.computeAndWritePartitionStatsFile(testTable, "b1"); + // creates an empty stats file since the dummy snapshot exist + assertThat(partitionStatisticsFile.fileSizeInBytes()).isEqualTo(0L); + assertThat(partitionStatisticsFile.snapshotId()) + .isEqualTo(testTable.refs().get("b1").snapshotId()); + } + + @Test + public void testPartitionStatsOnInvalidSnapshot() throws Exception { + Table testTable = + TestTables.create(tempDir("invalid_snapshot"), "invalid_snapshot", SCHEMA, SPEC, 2); + assertThatThrownBy( + () -> + PartitionStatsHandler.computeAndWritePartitionStatsFile( + testTable, "INVALID_BRANCH")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Couldn't find the snapshot for the branch INVALID_BRANCH"); + } + + @Test + public void testPartitionStatsOnUnPartitionedTable() throws Exception { + Table testTable = + TestTables.create( + tempDir("unpartitioned_table"), + "unpartitioned_table", + SCHEMA, + PartitionSpec.unpartitioned(), + 2); + + List records = prepareRecords(testTable.schema()); + DataFile dataFile = FileHelpers.writeDataFile(testTable, outputFile(), records); + testTable.newAppend().appendFile(dataFile).commit(); + + assertThatThrownBy(() -> PartitionStatsHandler.computeAndWritePartitionStatsFile(testTable)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("getting schema for an unpartitioned table"); + } + + @SuppressWarnings("checkstyle:MethodLength") + @TestTemplate // Tests for all the table formats (PARQUET, ORC, AVRO) + public void testPartitionStats() throws Exception { + Table testTable = + TestTables.create( + tempDir("partition_stats_" + format.name()), + "partition_stats_compute_" + format.name(), + SCHEMA, + SPEC, + 2, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + + List records = prepareRecords(testTable.schema()); + DataFile dataFile1 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("foo", "A"), records.subList(0, 3)); + DataFile dataFile2 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("foo", "B"), records.subList(3, 4)); + DataFile dataFile3 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("bar", "A"), records.subList(4, 5)); + DataFile dataFile4 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("bar", "B"), records.subList(5, 7)); + + for (int i = 0; i < 3; i++) { + // insert same set of seven records thrice to have a new manifest files + testTable + .newAppend() + .appendFile(dataFile1) + .appendFile(dataFile2) + .appendFile(dataFile3) + .appendFile(dataFile4) + .commit(); + } + + Snapshot snapshot1 = testTable.currentSnapshot(); + Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); + Types.StructType partitionType = + recordSchema.findField(Column.PARTITION.name()).type().asStructType(); + computeAndValidatePartitionStats( + testTable, + recordSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 9L, + 3, + 3 * dataFile1.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 3L, + 3, + 3 * dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 0, + 3L, + 3, + 3 * dataFile3.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 0, + 6L, + 3, + 3 * dataFile4.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId())); + + DeleteFile posDeletes = commitPositionDeletes(testTable, dataFile1); + Snapshot snapshot2 = testTable.currentSnapshot(); + + DeleteFile eqDeletes = commitEqualityDeletes(testTable); + Snapshot snapshot3 = testTable.currentSnapshot(); + + recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); + partitionType = recordSchema.findField(Column.PARTITION.name()).type().asStructType(); + computeAndValidatePartitionStats( + testTable, + recordSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 9L, + 3, + 3 * dataFile1.fileSizeInBytes(), + 0L, + 0, + eqDeletes.recordCount(), + 1, + 0L, + snapshot3.timestampMillis(), + snapshot3.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 3L, + 3, + 3 * dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 0, + 3L, + 3, + 3 * dataFile3.fileSizeInBytes(), + posDeletes.recordCount(), + 1, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), + snapshot2.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 0, + 6L, + 3, + 3 * dataFile4.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId())); + } + + @SuppressWarnings("checkstyle:MethodLength") + @Test + public void testPartitionStatsWithSchemaEvolution() throws Exception { + final PartitionSpec specBefore = PartitionSpec.builderFor(SCHEMA).identity("c2").build(); + + Table testTable = + TestTables.create( + tempDir("partition_stats_schema_evolve"), + "partition_stats_schema_evolve", + SCHEMA, + specBefore, + SortOrder.unsorted(), + 2); + // foo -> 4 records, bar -> 3 records + List records = prepareRecords(testTable.schema()); + DataFile dataFile1 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("foo"), records.subList(0, 4)); + DataFile dataFile2 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("bar"), records.subList(4, 7)); + + for (int i = 0; i < 2; i++) { + // insert same set of seven records twice to have a new manifest files + testTable.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit(); + } + + Snapshot snapshot1 = testTable.currentSnapshot(); + Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); + Types.StructType partitionType = + recordSchema.findField(Column.PARTITION.name()).type().asStructType(); + + computeAndValidatePartitionStats( + testTable, + recordSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo"), + 0, + 8L, + 2, + 2 * dataFile1.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar"), + 0, + 6L, + 2, + 2 * dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId())); + + // Evolve the partition spec to include c3 + testTable.updateSpec().addField("c3").commit(); + records = prepareRecords(testTable.schema()); + // append null partition record + GenericRecord record = GenericRecord.create(testTable.schema()); + records.add(record.copy("c1", 0, "c2", "bar", "c3", null)); + + DataFile dataFile3 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("foo", "A"), records.subList(0, 3)); + DataFile dataFile4 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("foo", "B"), records.subList(3, 4)); + DataFile dataFile5 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("bar", "A"), records.subList(4, 5)); + DataFile dataFile6 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("bar", "B"), records.subList(5, 7)); + DataFile dataFile7 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("bar", null), records.subList(7, 8)); + + testTable + .newAppend() + .appendFile(dataFile3) + .appendFile(dataFile4) + .appendFile(dataFile5) + .appendFile(dataFile6) + .appendFile(dataFile7) + .commit(); + + Snapshot snapshot2 = testTable.currentSnapshot(); + recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); + partitionType = recordSchema.findField(Column.PARTITION.name()).type().asStructType(); + computeAndValidatePartitionStats( + testTable, + recordSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo", null), + 0, // old spec id as the record is unmodified + 8L, + 2, + 2 * dataFile1.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", null), + 1, + 7L, + 3, + 2 * dataFile2.fileSizeInBytes() + dataFile7.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), + snapshot2.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 1, + 3L, + 1, + dataFile3.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), + snapshot2.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 1, + 1L, + 1, + dataFile4.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), + snapshot2.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 1, + 1L, + 1, + dataFile5.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), + snapshot2.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 1, + 2L, + 1, + dataFile6.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), + snapshot2.snapshotId())); + } + + @Test + public void testAllDatatypePartitionWriting() throws Exception { + Schema schema = + new Schema( + required(100, "id", Types.LongType.get()), + optional(101, "data", Types.StringType.get()), + optional(113, "bytes", Types.BinaryType.get())); + + PartitionSpec spec = PartitionSpec.builderFor(schema).identity("bytes").build(); + Table testTable = + TestTables.create( + tempDir("test_all_type"), "test_all_type", schema, spec, SortOrder.unsorted(), 2); + + Types.StructType partitionSchema = Partitioning.partitionType(testTable); + Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); + + Record partitionData = + GenericRecord.create(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); + partitionData.set(0, new byte[] {1, 2, 3}); + + PartitionStats partitionStats = new PartitionStats(partitionData); + partitionStats.set(Column.SPEC_ID.id(), RANDOM.nextInt(10)); + partitionStats.set(Column.DATA_RECORD_COUNT.id(), RANDOM.nextLong()); + partitionStats.set(Column.DATA_FILE_COUNT.id(), RANDOM.nextInt()); + partitionStats.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), 1024L * RANDOM.nextInt(20)); + + Iterator convertedRecords = + PartitionStatsHandler.statsToRecords( + Collections.singletonList(partitionStats).iterator(), dataSchema); + + List expectedRecords = Lists.newArrayList(convertedRecords); + + PartitionStatisticsFile statisticsFile = + PartitionStatsHandler.writePartitionStatsFile( + testTable, 42L, dataSchema, expectedRecords.iterator()); + + List writtenRecords; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + dataSchema, Files.localInput(statisticsFile.path()))) { + writtenRecords = Lists.newArrayList(recordIterator); + } + assertThat(writtenRecords).isEqualTo(expectedRecords); + } + + @Test + public void testOptionalFieldsWriting() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + Table testTable = + TestTables.create( + tempDir("test_partition_stats_optional"), + "test_partition_stats_optional", + SCHEMA, + spec, + SortOrder.unsorted(), + 2); + + Types.StructType partitionSchema = Partitioning.partitionType(testTable); + Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); + + ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); + + for (int i = 0; i < 5; i++) { + GenericRecord partitionData = + GenericRecord.create(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); + partitionData.set(0, RANDOM.nextInt()); + + PartitionStats partitionStats = new PartitionStats(partitionData); + + partitionStats.set(Column.PARTITION.ordinal(), partitionData); + partitionStats.set(Column.SPEC_ID.ordinal(), RANDOM.nextInt(10)); + partitionStats.set(Column.DATA_RECORD_COUNT.ordinal(), RANDOM.nextLong()); + partitionStats.set(Column.DATA_FILE_COUNT.ordinal(), RANDOM.nextInt()); + partitionStats.set( + Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal(), 1024L * RANDOM.nextInt(20)); + partitionStats.set(Column.POSITION_DELETE_RECORD_COUNT.ordinal(), null); + partitionStats.set(Column.POSITION_DELETE_FILE_COUNT.ordinal(), null); + partitionStats.set(Column.EQUALITY_DELETE_RECORD_COUNT.ordinal(), null); + partitionStats.set(Column.EQUALITY_DELETE_FILE_COUNT.ordinal(), null); + partitionStats.set(Column.TOTAL_RECORD_COUNT.ordinal(), null); + partitionStats.set(Column.LAST_UPDATED_AT.ordinal(), null); + partitionStats.set(Column.LAST_UPDATED_SNAPSHOT_ID.ordinal(), null); + + partitionListBuilder.add(partitionStats); + } + + Iterator convertedRecords = + PartitionStatsHandler.statsToRecords(partitionListBuilder.build().iterator(), dataSchema); + + List expectedRecords = Lists.newArrayList(convertedRecords); + + PartitionStatisticsFile statisticsFile = + PartitionStatsHandler.writePartitionStatsFile( + testTable, 42L, dataSchema, expectedRecords.iterator()); + + List writtenRecords; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + dataSchema, Files.localInput(statisticsFile.path()))) { + writtenRecords = Lists.newArrayList(recordIterator); + } + assertThat(writtenRecords).isEqualTo(expectedRecords); + assertThat(expectedRecords.get(0).unwrap()) + .extracting( + PartitionStats::positionDeleteRecordCount, + PartitionStats::positionDeleteFileCount, + PartitionStats::equalityDeleteRecordCount, + PartitionStats::equalityDeleteFileCount, + PartitionStats::totalRecordCount, + PartitionStats::lastUpdatedAt, + PartitionStats::lastUpdatedSnapshotId) + .isEqualTo( + Arrays.asList( + 0L, 0, 0L, 0, 0L, null, null)); // null counters should be initialized to zero. + } + + private OutputFile outputFile() throws IOException { + return Files.localOutput(File.createTempFile("data", null, tempDir("stats"))); + } + + private static Record partitionRecord(Types.StructType partitionType, String c2, String c3) { + Record partitionData = GenericRecord.create(partitionType); + partitionData.set(0, c2); + partitionData.set(1, c3); + return partitionData; + } + + private static Record partitionRecord(Types.StructType partitionType, String c2) { + Record partitionData = GenericRecord.create(partitionType); + partitionData.set(0, c2); + return partitionData; + } + + private static List prepareRecords(Schema schema) { + GenericRecord record = GenericRecord.create(schema); + List records = Lists.newArrayList(); + // foo 4 records, bar 3 records + // foo, A -> 3 records + records.add(record.copy("c1", 0, "c2", "foo", "c3", "A")); + records.add(record.copy("c1", 1, "c2", "foo", "c3", "A")); + records.add(record.copy("c1", 2, "c2", "foo", "c3", "A")); + // foo, B -> 1 record + records.add(record.copy("c1", 3, "c2", "foo", "c3", "B")); + // bar, A -> 1 record + records.add(record.copy("c1", 4, "c2", "bar", "c3", "A")); + // bar, B -> 2 records + records.add(record.copy("c1", 5, "c2", "bar", "c3", "B")); + records.add(record.copy("c1", 6, "c2", "bar", "c3", "B")); + return records; + } + + private static void computeAndValidatePartitionStats( + Table testTable, Schema recordSchema, Tuple... expectedValues) throws IOException { + // compute and commit partition stats file + Snapshot currentSnapshot = testTable.currentSnapshot(); + PartitionStatisticsFile result = + PartitionStatsHandler.computeAndWritePartitionStatsFile(testTable); + testTable.updatePartitionStatistics().setPartitionStatistics(result).commit(); + assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); + + // read the partition entries from the stats file + List partitionStats; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + recordSchema, Files.localInput(result.path()))) { + partitionStats = Lists.newArrayList(recordIterator); + } + assertThat(partitionStats) + .extracting(PartitionStatsRecord::unwrap) + .extracting( + PartitionStats::partition, + PartitionStats::specId, + PartitionStats::dataRecordCount, + PartitionStats::dataFileCount, + PartitionStats::totalDataFileSizeInBytes, + PartitionStats::positionDeleteRecordCount, + PartitionStats::positionDeleteFileCount, + PartitionStats::equalityDeleteRecordCount, + PartitionStats::equalityDeleteFileCount, + PartitionStats::totalRecordCount, + PartitionStats::lastUpdatedAt, + PartitionStats::lastUpdatedSnapshotId) + .containsExactlyInAnyOrder(expectedValues); + } + + private DeleteFile commitEqualityDeletes(Table testTable) throws IOException { + Schema deleteRowSchema = testTable.schema().select("c1"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = + Lists.newArrayList(dataDelete.copy("c1", 1), dataDelete.copy("c1", 2)); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + testTable, + Files.localOutput(File.createTempFile("junit", null, tempDir("eq_delete"))), + TestHelpers.Row.of("foo", "A"), + dataDeletes, + deleteRowSchema); + + testTable.newRowDelta().addDeletes(eqDeletes).commit(); + return eqDeletes; + } + + private DeleteFile commitPositionDeletes(Table testTable, DataFile dataFile1) throws IOException { + List> deletes = Lists.newArrayList(); + for (long i = 0; i < 2; i++) { + deletes.add( + positionDelete(testTable.schema(), dataFile1.path(), i, (int) i, String.valueOf(i))); + } + DeleteFile posDeletes = + FileHelpers.writePosDeleteFile( + testTable, + Files.localOutput(File.createTempFile("junit", null, tempDir("pos_delete"))), + TestHelpers.Row.of("bar", "A"), + deletes); + testTable.newRowDelta().addDeletes(posDeletes).commit(); + return posDeletes; + } + + private static PositionDelete positionDelete( + Schema tableSchema, CharSequence path, Long position, Object... values) { + PositionDelete posDelete = PositionDelete.create(); + GenericRecord nested = GenericRecord.create(tableSchema); + for (int i = 0; i < values.length; i++) { + nested.set(i, values[i]); + } + posDelete.set(path, position, nested); + return posDelete; + } + + private File tempDir(String folderName) throws IOException { + return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile(); + } +}