From 8ed6cc1691294424e09c1783b26f25dee05cb68f Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Wed, 2 Aug 2023 21:05:29 +0530 Subject: [PATCH 1/3] Api: Track partition statistics via TableMetadata Tracking `PartitionStatisticsFile` in a same way as how `StatisticsFile` is already tracked. --- .palantir/revapi.yml | 4 + .../iceberg/PartitionStatisticsFile.java | 41 +++++ .../main/java/org/apache/iceberg/Table.java | 14 ++ .../java/org/apache/iceberg/Transaction.java | 11 ++ .../iceberg/UpdatePartitionStatistics.java | 40 +++++ .../org/apache/iceberg/BaseMetadataTable.java | 5 + .../org/apache/iceberg/BaseReadOnlyTable.java | 6 + .../java/org/apache/iceberg/BaseTable.java | 10 ++ .../org/apache/iceberg/BaseTransaction.java | 19 +++ .../iceberg/CommitCallbackTransaction.java | 5 + .../apache/iceberg/FileCleanupStrategy.java | 14 +- .../GenericPartitionStatisticsFile.java | 24 +++ .../iceberg/IncrementalFileCleanup.java | 3 +- .../org/apache/iceberg/MetadataUpdate.java | 41 +++++ .../apache/iceberg/MetadataUpdateParser.java | 44 +++++ .../PartitionStatisticsFileParser.java | 61 +++++++ .../apache/iceberg/ReachableFileCleanup.java | 3 +- .../org/apache/iceberg/ReachableFileUtil.java | 52 ++++++ .../org/apache/iceberg/SerializableTable.java | 10 ++ .../iceberg/SetPartitionStatistics.java | 74 +++++++++ .../org/apache/iceberg/TableMetadata.java | 39 +++++ .../apache/iceberg/TableMetadataParser.java | 32 ++++ .../iceberg/TestMetadataUpdateParser.java | 77 +++++++++ .../apache/iceberg/TestRemoveSnapshots.java | 118 ++++++++++++- .../iceberg/TestSetPartitionStatistics.java | 137 +++++++++++++++ .../org/apache/iceberg/TestTableMetadata.java | 157 ++++++++++++++++++ ...TableMetadataPartitionStatisticsFiles.json | 61 +++++++ 27 files changed, 1092 insertions(+), 10 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java create mode 100644 api/src/main/java/org/apache/iceberg/UpdatePartitionStatistics.java create mode 100644 core/src/main/java/org/apache/iceberg/GenericPartitionStatisticsFile.java create mode 100644 core/src/main/java/org/apache/iceberg/PartitionStatisticsFileParser.java create mode 100644 core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java create mode 100644 core/src/test/java/org/apache/iceberg/TestSetPartitionStatistics.java create mode 100644 core/src/test/resources/TableMetadataPartitionStatisticsFiles.json diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 283697418b6b..98023912c715 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -873,6 +873,10 @@ acceptedBreaks: new: "method void org.apache.iceberg.encryption.Ciphers::()" justification: "Static utility class - should not have public constructor" "1.4.0": + org.apache.iceberg:iceberg-api: + - code: "java.method.addedToInterface" + new: "method java.util.List org.apache.iceberg.Table::partitionStatisticsFiles()" + justification: "Track partition stats from TableMetadata" org.apache.iceberg:iceberg-core: - code: "java.field.serialVersionUIDChanged" new: "field org.apache.iceberg.util.SerializableMap.serialVersionUID" diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java b/api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java new file mode 100644 index 000000000000..f4217a4dc56b --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; + +/** + * Represents a partition statistics file in the table default format, that can be used to read + * table data more efficiently. + * + *

Statistics are informational. A reader can choose to ignore statistics information. Statistics + * support is not required to read the table correctly. + */ +public interface PartitionStatisticsFile extends Serializable { + /** ID of the Iceberg table's snapshot the partition statistics file is associated with. */ + long snapshotId(); + + /** + * Returns fully qualified path to the file, suitable for constructing a Hadoop Path. Never null. + */ + String path(); + + /** Returns the size of the partition statistics file in bytes. */ + long fileSizeInBytes(); +} diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 5ab1def51ca0..98ea8a46371c 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -286,6 +286,17 @@ default UpdateStatistics updateStatistics() { "Updating statistics is not supported by " + getClass().getName()); } + /** + * Create a new {@link UpdatePartitionStatistics update partition statistics API} to add or remove + * partition statistics files in this table. + * + * @return a new {@link UpdatePartitionStatistics} + */ + default UpdatePartitionStatistics updatePartitionStatistics() { + throw new UnsupportedOperationException( + "Updating partition statistics is not supported by " + getClass().getName()); + } + /** * Create a new {@link ExpireSnapshots expire API} to manage snapshots in this table and commit. * @@ -327,6 +338,9 @@ default UpdateStatistics updateStatistics() { */ List statisticsFiles(); + /** Returns the current partition statistics files for the table. */ + List partitionStatisticsFiles(); + /** * Returns the current refs for the table * diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java b/api/src/main/java/org/apache/iceberg/Transaction.java index aeec1f589d06..fd84a974013d 100644 --- a/api/src/main/java/org/apache/iceberg/Transaction.java +++ b/api/src/main/java/org/apache/iceberg/Transaction.java @@ -148,6 +148,17 @@ default UpdateStatistics updateStatistics() { "Updating statistics is not supported by " + getClass().getName()); } + /** + * Create a new {@link UpdatePartitionStatistics update partition statistics API} to add or remove + * partition statistics files in this table. + * + * @return a new {@link UpdatePartitionStatistics} + */ + default UpdatePartitionStatistics updatePartitionStatistics() { + throw new UnsupportedOperationException( + "Updating partition statistics is not supported by " + getClass().getName()); + } + /** * Create a new {@link ExpireSnapshots expire API} to manage snapshots in this table. * diff --git a/api/src/main/java/org/apache/iceberg/UpdatePartitionStatistics.java b/api/src/main/java/org/apache/iceberg/UpdatePartitionStatistics.java new file mode 100644 index 000000000000..a36400036d1c --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/UpdatePartitionStatistics.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; + +/** API for updating partition statistics files in a table. */ +public interface UpdatePartitionStatistics extends PendingUpdate> { + /** + * Set the table's partition statistics file for given snapshot, replacing the previous partition + * statistics file for the snapshot if any exists. + * + * @return this for method chaining + */ + UpdatePartitionStatistics setPartitionStatistics( + long snapshotId, PartitionStatisticsFile partitionStatisticsFile); + + /** + * Remove the table's partition statistics file for given snapshot. + * + * @return this for method chaining + */ + UpdatePartitionStatistics removePartitionStatistics(long snapshotId); +} diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index 124115496b3e..b5b63871bbdd 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -195,6 +195,11 @@ public List statisticsFiles() { return ImmutableList.of(); } + @Override + public List partitionStatisticsFiles() { + return ImmutableList.of(); + } + @Override public Map refs() { return table().refs(); diff --git a/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java b/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java index 4acbf2a16396..243c292ebc25 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java @@ -100,6 +100,12 @@ public UpdateStatistics updateStatistics() { "Cannot update statistics of a " + descriptor + " table"); } + @Override + public UpdatePartitionStatistics updatePartitionStatistics() { + throw new UnsupportedOperationException( + "Cannot update partition statistics of a " + descriptor + " table"); + } + @Override public ExpireSnapshots expireSnapshots() { throw new UnsupportedOperationException( diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index d4cf1f4b76f4..a44adc4e9035 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -220,6 +220,11 @@ public UpdateStatistics updateStatistics() { return new SetStatistics(ops); } + @Override + public UpdatePartitionStatistics updatePartitionStatistics() { + return new SetPartitionStatistics(ops); + } + @Override public ExpireSnapshots expireSnapshots() { return new RemoveSnapshots(ops); @@ -255,6 +260,11 @@ public List statisticsFiles() { return ops.current().statisticsFiles(); } + @Override + public List partitionStatisticsFiles() { + return ops.current().partitionStatisticsFiles(); + } + @Override public Map refs() { return ops.current().refs(); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 30103fd87fe2..2f051466bb8b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -248,6 +248,15 @@ public UpdateStatistics updateStatistics() { return updateStatistics; } + @Override + public UpdatePartitionStatistics updatePartitionStatistics() { + checkLastOperationCommitted("UpdatePartitionStatistics"); + UpdatePartitionStatistics updatePartitionStatistics = + new SetPartitionStatistics(transactionOps); + updates.add(updatePartitionStatistics); + return updatePartitionStatistics; + } + @Override public ExpireSnapshots expireSnapshots() { checkLastOperationCommitted("ExpireSnapshots"); @@ -733,6 +742,11 @@ public UpdateStatistics updateStatistics() { return BaseTransaction.this.updateStatistics(); } + @Override + public UpdatePartitionStatistics updatePartitionStatistics() { + return BaseTransaction.this.updatePartitionStatistics(); + } + @Override public ExpireSnapshots expireSnapshots() { return BaseTransaction.this.expireSnapshots(); @@ -769,6 +783,11 @@ public List statisticsFiles() { return current.statisticsFiles(); } + @Override + public List partitionStatisticsFiles() { + return current.partitionStatisticsFiles(); + } + @Override public Map refs() { return current.refs(); diff --git a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java index 9dfcf8c52867..e80f6bcdac95 100644 --- a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java +++ b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java @@ -106,6 +106,11 @@ public UpdateStatistics updateStatistics() { return wrapped.updateStatistics(); } + @Override + public UpdatePartitionStatistics updatePartitionStatistics() { + return wrapped.updatePartitionStatistics(); + } + @Override public ExpireSnapshots expireSnapshots() { return wrapped.expireSnapshots(); diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java index c0a113898abe..6457f574b766 100644 --- a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java +++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java @@ -21,7 +21,6 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; -import java.util.stream.Collectors; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.io.CloseableIterable; @@ -98,10 +97,15 @@ private Set statsFileLocations(TableMetadata tableMetadata) { Set statsFileLocations = Sets.newHashSet(); if (tableMetadata.statisticsFiles() != null) { - statsFileLocations = - tableMetadata.statisticsFiles().stream() - .map(StatisticsFile::path) - .collect(Collectors.toSet()); + tableMetadata.statisticsFiles().stream() + .map(StatisticsFile::path) + .forEach(statsFileLocations::add); + } + + if (tableMetadata.partitionStatisticsFiles() != null) { + tableMetadata.partitionStatisticsFiles().stream() + .map(PartitionStatisticsFile::path) + .forEach(statsFileLocations::add); } return statsFileLocations; diff --git a/core/src/main/java/org/apache/iceberg/GenericPartitionStatisticsFile.java b/core/src/main/java/org/apache/iceberg/GenericPartitionStatisticsFile.java new file mode 100644 index 000000000000..e8f815552927 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/GenericPartitionStatisticsFile.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.immutables.value.Value; + +@Value.Immutable +public interface GenericPartitionStatisticsFile extends PartitionStatisticsFile {} diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java index f0a46f1576a6..6b4923830afe 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java @@ -262,7 +262,8 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira deleteFiles(manifestsToDelete, "manifest"); deleteFiles(manifestListsToDelete, "manifest list"); - if (!beforeExpiration.statisticsFiles().isEmpty()) { + if (!beforeExpiration.statisticsFiles().isEmpty() + || !beforeExpiration.partitionStatisticsFiles().isEmpty()) { Set expiredStatisticsFilesLocations = expiredStatisticsFilesLocations(beforeExpiration, afterExpiration); deleteFiles(expiredStatisticsFilesLocations, "statistics files"); diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 363aabbff24f..b35f4a181939 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -243,6 +243,47 @@ public void applyTo(TableMetadata.Builder metadataBuilder) { } } + class SetPartitionStatistics implements MetadataUpdate { + private final long snapshotId; + private final PartitionStatisticsFile partitionStatisticsFile; + + public SetPartitionStatistics( + long snapshotId, PartitionStatisticsFile partitionStatisticsFile) { + this.snapshotId = snapshotId; + this.partitionStatisticsFile = partitionStatisticsFile; + } + + public long snapshotId() { + return snapshotId; + } + + public PartitionStatisticsFile partitionStatisticsFile() { + return partitionStatisticsFile; + } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setPartitionStatistics(snapshotId, partitionStatisticsFile); + } + } + + class RemovePartitionStatistics implements MetadataUpdate { + private final long snapshotId; + + public RemovePartitionStatistics(long snapshotId) { + this.snapshotId = snapshotId; + } + + public long snapshotId() { + return snapshotId; + } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.removePartitionStatistics(snapshotId); + } + } + class AddSnapshot implements MetadataUpdate { private final Snapshot snapshot; diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java index e88f749c80f0..cffe26b098ee 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java @@ -57,6 +57,8 @@ private MetadataUpdateParser() {} static final String REMOVE_STATISTICS = "remove-statistics"; static final String ADD_VIEW_VERSION = "add-view-version"; static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version"; + static final String SET_PARTITION_STATISTICS = "set-partition-statistics"; + static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics"; // AssignUUID private static final String UUID = "uuid"; @@ -86,6 +88,9 @@ private MetadataUpdateParser() {} // SetStatistics private static final String STATISTICS = "statistics"; + // SetPartitionStatistics + private static final String PARTITION_STATISTICS = "partition-statistics"; + // AddSnapshot private static final String SNAPSHOT = "snapshot"; @@ -133,6 +138,8 @@ private MetadataUpdateParser() {} .put(MetadataUpdate.SetDefaultSortOrder.class, SET_DEFAULT_SORT_ORDER) .put(MetadataUpdate.SetStatistics.class, SET_STATISTICS) .put(MetadataUpdate.RemoveStatistics.class, REMOVE_STATISTICS) + .put(MetadataUpdate.SetPartitionStatistics.class, SET_PARTITION_STATISTICS) + .put(MetadataUpdate.RemovePartitionStatistics.class, REMOVE_PARTITION_STATISTICS) .put(MetadataUpdate.AddSnapshot.class, ADD_SNAPSHOT) .put(MetadataUpdate.RemoveSnapshot.class, REMOVE_SNAPSHOTS) .put(MetadataUpdate.RemoveSnapshotRef.class, REMOVE_SNAPSHOT_REF) @@ -198,6 +205,14 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator case REMOVE_STATISTICS: writeRemoveStatistics((MetadataUpdate.RemoveStatistics) metadataUpdate, generator); break; + case SET_PARTITION_STATISTICS: + writeSetPartitionStatistics( + (MetadataUpdate.SetPartitionStatistics) metadataUpdate, generator); + break; + case REMOVE_PARTITION_STATISTICS: + writeRemovePartitionStatistics( + (MetadataUpdate.RemovePartitionStatistics) metadataUpdate, generator); + break; case ADD_SNAPSHOT: writeAddSnapshot((MetadataUpdate.AddSnapshot) metadataUpdate, generator); break; @@ -275,6 +290,10 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) { return readSetStatistics(jsonNode); case REMOVE_STATISTICS: return readRemoveStatistics(jsonNode); + case SET_PARTITION_STATISTICS: + return readSetPartitionStatistics(jsonNode); + case REMOVE_PARTITION_STATISTICS: + return readRemovePartitionStatistics(jsonNode); case ADD_SNAPSHOT: return readAddSnapshot(jsonNode); case REMOVE_SNAPSHOTS: @@ -355,6 +374,18 @@ private static void writeRemoveStatistics( gen.writeNumberField(SNAPSHOT_ID, update.snapshotId()); } + private static void writeSetPartitionStatistics( + MetadataUpdate.SetPartitionStatistics update, JsonGenerator gen) throws IOException { + gen.writeNumberField(SNAPSHOT_ID, update.snapshotId()); + gen.writeFieldName(PARTITION_STATISTICS); + PartitionStatisticsFileParser.toJson(update.partitionStatisticsFile(), gen); + } + + private static void writeRemovePartitionStatistics( + MetadataUpdate.RemovePartitionStatistics update, JsonGenerator gen) throws IOException { + gen.writeNumberField(SNAPSHOT_ID, update.snapshotId()); + } + private static void writeAddSnapshot(MetadataUpdate.AddSnapshot update, JsonGenerator gen) throws IOException { gen.writeFieldName(SNAPSHOT); @@ -478,6 +509,19 @@ private static MetadataUpdate readRemoveStatistics(JsonNode node) { return new MetadataUpdate.RemoveStatistics(snapshotId); } + private static MetadataUpdate readSetPartitionStatistics(JsonNode node) { + long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); + JsonNode partitionStatisticsFileNode = JsonUtil.get(PARTITION_STATISTICS, node); + PartitionStatisticsFile partitionStatisticsFile = + PartitionStatisticsFileParser.fromJson(partitionStatisticsFileNode); + return new MetadataUpdate.SetPartitionStatistics(snapshotId, partitionStatisticsFile); + } + + private static MetadataUpdate readRemovePartitionStatistics(JsonNode node) { + long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); + return new MetadataUpdate.RemovePartitionStatistics(snapshotId); + } + private static MetadataUpdate readAddSnapshot(JsonNode node) { Snapshot snapshot = SnapshotParser.fromJson(JsonUtil.get(SNAPSHOT, node)); return new MetadataUpdate.AddSnapshot(snapshot); diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatisticsFileParser.java b/core/src/main/java/org/apache/iceberg/PartitionStatisticsFileParser.java new file mode 100644 index 000000000000..97288c98a26d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PartitionStatisticsFileParser.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import org.apache.iceberg.util.JsonUtil; + +public class PartitionStatisticsFileParser { + + private static final String SNAPSHOT_ID = "snapshot-id"; + private static final String STATISTICS_PATH = "statistics-path"; + private static final String FILE_SIZE_IN_BYTES = "file-size-in-bytes"; + + private PartitionStatisticsFileParser() {} + + public static String toJson(PartitionStatisticsFile partitionStatisticsFile) { + return toJson(partitionStatisticsFile, false); + } + + public static String toJson(PartitionStatisticsFile partitionStatisticsFile, boolean pretty) { + return JsonUtil.generate(gen -> toJson(partitionStatisticsFile, gen), pretty); + } + + public static void toJson(PartitionStatisticsFile statisticsFile, JsonGenerator generator) + throws IOException { + generator.writeStartObject(); + generator.writeNumberField(SNAPSHOT_ID, statisticsFile.snapshotId()); + generator.writeStringField(STATISTICS_PATH, statisticsFile.path()); + generator.writeNumberField(FILE_SIZE_IN_BYTES, statisticsFile.fileSizeInBytes()); + generator.writeEndObject(); + } + + static PartitionStatisticsFile fromJson(JsonNode node) { + long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); + String path = JsonUtil.getString(STATISTICS_PATH, node); + long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE_IN_BYTES, node); + return ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .path(path) + .fileSizeInBytes(fileSizeInBytes) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java index 6b15e9147dc0..9db2ef4d90c6 100644 --- a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java +++ b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java @@ -82,7 +82,8 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira deleteFiles(manifestListsToDelete, "manifest list"); - if (!beforeExpiration.statisticsFiles().isEmpty()) { + if (!beforeExpiration.statisticsFiles().isEmpty() + || !beforeExpiration.partitionStatisticsFiles().isEmpty()) { deleteFiles( expiredStatisticsFilesLocations(beforeExpiration, afterExpiration), "statistics files"); } diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java index bd23a221ab22..773f20d1234b 100644 --- a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java +++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java @@ -137,7 +137,9 @@ public static List manifestListLocations(Table table, Set snapshot * * @param table table for which statistics files needs to be listed * @return the location of statistics files + * @deprecated use the {@code allStatisticsFilesLocations(table)} instead. */ + @Deprecated public static List statisticsFilesLocations(Table table) { return statisticsFilesLocations(table, statisticsFile -> true); } @@ -148,7 +150,9 @@ public static List statisticsFilesLocations(Table table) { * @param table table for which statistics files needs to be listed * @param predicate predicate for filtering the statistics files * @return the location of statistics files + * @deprecated use the {@code allStatisticsFilesLocations(table, snapshotIds)} instead. */ + @Deprecated public static List statisticsFilesLocations( Table table, Predicate predicate) { return table.statisticsFiles().stream() @@ -156,4 +160,52 @@ public static List statisticsFilesLocations( .map(StatisticsFile::path) .collect(Collectors.toList()); } + + /** + * Returns locations of statistics files in a table. + * + * @param table table for which statistics files needs to be listed + * @return the location of statistics files + */ + public static List allStatisticsFilesLocations(Table table) { + return allStatisticsFilesLocations(table, null); + } + + /** + * Returns locations of statistics files for a table for given snapshots. + * + * @param table table for which statistics files needs to be listed + * @param snapshotIds snapshot IDs for which statistics files needs to be listed + * @return the location of statistics files + */ + public static List allStatisticsFilesLocations(Table table, Set snapshotIds) { + List statsFileLocations = Lists.newArrayList(); + + Predicate statisticsFilePredicate; + Predicate partitionStatisticsFilePredicate; + if (snapshotIds == null) { + statisticsFilePredicate = statisticsFile -> true; + partitionStatisticsFilePredicate = partitionStatisticsFile -> true; + } else { + statisticsFilePredicate = statisticsFile -> snapshotIds.contains(statisticsFile.snapshotId()); + partitionStatisticsFilePredicate = + partitionStatisticsFile -> snapshotIds.contains(partitionStatisticsFile.snapshotId()); + } + + if (table.statisticsFiles() != null) { + table.statisticsFiles().stream() + .filter(statisticsFilePredicate) + .map(StatisticsFile::path) + .forEach(statsFileLocations::add); + } + + if (table.partitionStatisticsFiles() != null) { + table.partitionStatisticsFiles().stream() + .filter(partitionStatisticsFilePredicate) + .map(PartitionStatisticsFile::path) + .forEach(statsFileLocations::add); + } + + return statsFileLocations; + } } diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 778a993c5144..5a98ddbaf993 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -251,6 +251,11 @@ public List statisticsFiles() { return lazyTable().statisticsFiles(); } + @Override + public List partitionStatisticsFiles() { + return lazyTable().partitionStatisticsFiles(); + } + @Override public Map refs() { return refs; @@ -361,6 +366,11 @@ public UpdateStatistics updateStatistics() { throw new UnsupportedOperationException(errorMsg("updateStatistics")); } + @Override + public UpdatePartitionStatistics updatePartitionStatistics() { + throw new UnsupportedOperationException(errorMsg("updatePartitionStatistics")); + } + @Override public ExpireSnapshots expireSnapshots() { throw new UnsupportedOperationException(errorMsg("expireSnapshots")); diff --git a/core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java b/core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java new file mode 100644 index 000000000000..09b41a9244a8 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class SetPartitionStatistics implements UpdatePartitionStatistics { + private final TableOperations ops; + private final Map> partitionStatisticsToSet = + Maps.newHashMap(); + + public SetPartitionStatistics(TableOperations ops) { + this.ops = ops; + } + + @Override + public UpdatePartitionStatistics setPartitionStatistics( + long snapshotId, PartitionStatisticsFile partitionStatisticsFile) { + Preconditions.checkArgument(snapshotId == partitionStatisticsFile.snapshotId()); + partitionStatisticsToSet.put(snapshotId, Optional.of(partitionStatisticsFile)); + return this; + } + + @Override + public UpdatePartitionStatistics removePartitionStatistics(long snapshotId) { + partitionStatisticsToSet.put(snapshotId, Optional.empty()); + return this; + } + + @Override + public List apply() { + return internalApply(ops.current()).partitionStatisticsFiles(); + } + + @Override + public void commit() { + TableMetadata base = ops.current(); + TableMetadata newMetadata = internalApply(base); + ops.commit(base, newMetadata); + } + + private TableMetadata internalApply(TableMetadata base) { + TableMetadata.Builder builder = TableMetadata.buildFrom(base); + partitionStatisticsToSet.forEach( + (snapshotId, statistics) -> { + if (statistics.isPresent()) { + builder.setPartitionStatistics(snapshotId, statistics.get()); + } else { + builder.removePartitionStatistics(snapshotId); + } + }); + return builder.build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 56ff1f772461..c2a82218b8b3 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -257,6 +257,7 @@ public String toString() { private final List snapshotLog; private final List previousFiles; private final List statisticsFiles; + private final List partitionStatisticsFiles; private final List changes; private SerializableSupplier> snapshotsSupplier; private volatile List snapshots; @@ -288,6 +289,7 @@ public String toString() { List previousFiles, Map refs, List statisticsFiles, + List partitionStatisticsFiles, List changes) { Preconditions.checkArgument( specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty"); @@ -338,6 +340,7 @@ public String toString() { this.sortOrdersById = indexSortOrders(sortOrders); this.refs = validateRefs(currentSnapshotId, refs, snapshotsById); this.statisticsFiles = ImmutableList.copyOf(statisticsFiles); + this.partitionStatisticsFiles = ImmutableList.copyOf(partitionStatisticsFiles); HistoryEntry last = null; for (HistoryEntry logEntry : snapshotLog) { @@ -537,6 +540,10 @@ public List statisticsFiles() { return statisticsFiles; } + public List partitionStatisticsFiles() { + return partitionStatisticsFiles; + } + public List snapshotLog() { return snapshotLog; } @@ -878,6 +885,7 @@ public static class Builder { private SerializableSupplier> snapshotsSupplier; private final Map refs; private final Map> statisticsFiles; + private final Map> partitionStatisticsFiles; // change tracking private final List changes; @@ -915,6 +923,7 @@ private Builder() { this.previousFiles = Lists.newArrayList(); this.refs = Maps.newHashMap(); this.statisticsFiles = Maps.newHashMap(); + this.partitionStatisticsFiles = Maps.newHashMap(); this.snapshotsById = Maps.newHashMap(); this.schemasById = Maps.newHashMap(); this.specsById = Maps.newHashMap(); @@ -948,6 +957,9 @@ private Builder(TableMetadata base) { this.refs = Maps.newHashMap(base.refs); this.statisticsFiles = base.statisticsFiles.stream().collect(Collectors.groupingBy(StatisticsFile::snapshotId)); + this.partitionStatisticsFiles = + base.partitionStatisticsFiles.stream() + .collect(Collectors.groupingBy(PartitionStatisticsFile::snapshotId)); this.snapshotsById = Maps.newHashMap(base.snapshotsById); this.schemasById = Maps.newHashMap(base.schemasById); @@ -1288,6 +1300,29 @@ public Builder suppressHistoricalSnapshots() { return this; } + public Builder setPartitionStatistics( + long snapshotId, PartitionStatisticsFile partitionStatisticsFile) { + Preconditions.checkNotNull(partitionStatisticsFile, "partition statistics file is null"); + Preconditions.checkArgument( + snapshotId == partitionStatisticsFile.snapshotId(), + "snapshotId does not match: %s vs %s", + snapshotId, + partitionStatisticsFile.snapshotId()); + partitionStatisticsFiles.put(snapshotId, ImmutableList.of(partitionStatisticsFile)); + changes.add(new MetadataUpdate.SetPartitionStatistics(snapshotId, partitionStatisticsFile)); + return this; + } + + public Builder removePartitionStatistics(long snapshotId) { + Preconditions.checkNotNull(snapshotId, "snapshotId is null"); + if (partitionStatisticsFiles.remove(snapshotId) == null) { + return this; + } + + changes.add(new MetadataUpdate.RemovePartitionStatistics(snapshotId)); + return this; + } + public Builder removeSnapshots(List snapshotsToRemove) { Set idsToRemove = snapshotsToRemove.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); @@ -1318,6 +1353,7 @@ private Builder rewriteSnapshotsInternal(Collection idsToRemove, boolean s changes.add(new MetadataUpdate.RemoveSnapshot(snapshotId)); } removeStatistics(snapshotId); + removePartitionStatistics(snapshotId); } else { retainedSnapshots.add(snapshot); } @@ -1443,6 +1479,9 @@ public TableMetadata build() { ImmutableList.copyOf(metadataHistory), ImmutableMap.copyOf(refs), statisticsFiles.values().stream().flatMap(List::stream).collect(Collectors.toList()), + partitionStatisticsFiles.values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()), discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes)); } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index 473f8497cbb4..0212738dea9c 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -109,6 +109,7 @@ private TableMetadataParser() {} static final String METADATA_FILE = "metadata-file"; static final String METADATA_LOG = "metadata-log"; static final String STATISTICS = "statistics"; + static final String PARTITION_STATISTICS = "partition-statistics"; public static void overwrite(TableMetadata metadata, OutputFile outputFile) { internalWrite(metadata, outputFile, true); @@ -231,6 +232,12 @@ public static void toJson(TableMetadata metadata, JsonGenerator generator) throw } generator.writeEndArray(); + generator.writeArrayFieldStart(PARTITION_STATISTICS); + for (PartitionStatisticsFile partitionStatisticsFile : metadata.partitionStatisticsFiles()) { + PartitionStatisticsFileParser.toJson(partitionStatisticsFile, generator); + } + generator.writeEndArray(); + generator.writeArrayFieldStart(SNAPSHOT_LOG); for (HistoryEntry logEntry : metadata.snapshotLog()) { generator.writeStartObject(); @@ -481,6 +488,13 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) { statisticsFiles = ImmutableList.of(); } + List partitionStatisticsFiles; + if (node.has(PARTITION_STATISTICS)) { + partitionStatisticsFiles = partitionStatisticsFilesFromJson(node.get(PARTITION_STATISTICS)); + } else { + partitionStatisticsFiles = ImmutableList.of(); + } + ImmutableList.Builder entries = ImmutableList.builder(); if (node.has(SNAPSHOT_LOG)) { Iterator logIterator = node.get(SNAPSHOT_LOG).elements(); @@ -528,6 +542,7 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) { metadataEntries.build(), refs, statisticsFiles, + partitionStatisticsFiles, ImmutableList.of() /* no changes from the file */); } @@ -561,4 +576,21 @@ private static List statisticsFilesFromJson(JsonNode statisticsF return statisticsFilesBuilder.build(); } + + private static List partitionStatisticsFilesFromJson( + JsonNode partitionStatisticsFilesList) { + Preconditions.checkArgument( + partitionStatisticsFilesList.isArray(), + "Cannot parse partition statistics files from non-array: %s", + partitionStatisticsFilesList); + + ImmutableList.Builder partitionStatisticsFileBuilder = + ImmutableList.builder(); + for (JsonNode partitionStatisticsFile : partitionStatisticsFilesList) { + partitionStatisticsFileBuilder.add( + PartitionStatisticsFileParser.fromJson(partitionStatisticsFile)); + } + + return partitionStatisticsFileBuilder.build(); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index 483b51d48651..c19c824f9623 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -884,6 +884,48 @@ public void testSetCurrentViewVersionToJson() { Assertions.assertThat(MetadataUpdateParser.toJson(update)).isEqualTo(expected); } + @Test + public void testSetPartitionStatistics() { + String json = + "{\"action\":\"set-partition-statistics\",\"snapshot-id\":1940541653261589030," + + "\"partition-statistics\":{\"snapshot-id\":1940541653261589030," + + "\"statistics-path\":\"s3://bucket/warehouse/stats1.parquet\"," + + "\"file-size-in-bytes\":43}}"; + + long snapshotId = 1940541653261589030L; + MetadataUpdate expected = + new MetadataUpdate.SetPartitionStatistics( + snapshotId, + ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .path("s3://bucket/warehouse/stats1" + ".parquet") + .fileSizeInBytes(43L) + .build()); + assertEquals( + MetadataUpdateParser.SET_PARTITION_STATISTICS, + expected, + MetadataUpdateParser.fromJson(json)); + Assert.assertEquals( + "Set partition statistics should convert to the correct JSON value", + json, + MetadataUpdateParser.toJson(expected)); + } + + @Test + public void testRemovePartitionStatistics() { + String json = + "{\"action\":\"remove-partition-statistics\",\"snapshot-id\":1940541653261589030}"; + MetadataUpdate expected = new MetadataUpdate.RemovePartitionStatistics(1940541653261589030L); + assertEquals( + MetadataUpdateParser.REMOVE_PARTITION_STATISTICS, + expected, + MetadataUpdateParser.fromJson(json)); + Assert.assertEquals( + "Remove partition statistics should convert to the correct JSON value", + json, + MetadataUpdateParser.toJson(expected)); + } + public void assertEquals( String action, MetadataUpdate expectedUpdate, MetadataUpdate actualUpdate) { switch (action) { @@ -935,6 +977,16 @@ public void assertEquals( (MetadataUpdate.RemoveStatistics) expectedUpdate, (MetadataUpdate.RemoveStatistics) actualUpdate); break; + case MetadataUpdateParser.SET_PARTITION_STATISTICS: + assertEqualsSetPartitionStatistics( + (MetadataUpdate.SetPartitionStatistics) expectedUpdate, + (MetadataUpdate.SetPartitionStatistics) actualUpdate); + break; + case MetadataUpdateParser.REMOVE_PARTITION_STATISTICS: + assertEqualsRemovePartitionStatistics( + (MetadataUpdate.RemovePartitionStatistics) expectedUpdate, + (MetadataUpdate.RemovePartitionStatistics) actualUpdate); + break; case MetadataUpdateParser.ADD_SNAPSHOT: assertEqualsAddSnapshot( (MetadataUpdate.AddSnapshot) expectedUpdate, (MetadataUpdate.AddSnapshot) actualUpdate); @@ -1134,6 +1186,31 @@ private static void assertEqualsRemoveStatistics( "Snapshots to remove should be the same", expected.snapshotId(), actual.snapshotId()); } + private static void assertEqualsSetPartitionStatistics( + MetadataUpdate.SetPartitionStatistics expected, + MetadataUpdate.SetPartitionStatistics actual) { + Assert.assertEquals("Snapshot IDs should be equal", expected.snapshotId(), actual.snapshotId()); + Assert.assertEquals( + "Partition Statistics files snapshot IDs should be equal", + expected.partitionStatisticsFile().snapshotId(), + actual.partitionStatisticsFile().snapshotId()); + Assert.assertEquals( + "Partition statistics files paths should be equal", + expected.partitionStatisticsFile().path(), + actual.partitionStatisticsFile().path()); + Assert.assertEquals( + "Partition statistics file size should be equal", + expected.partitionStatisticsFile().fileSizeInBytes(), + actual.partitionStatisticsFile().fileSizeInBytes()); + } + + private static void assertEqualsRemovePartitionStatistics( + MetadataUpdate.RemovePartitionStatistics expected, + MetadataUpdate.RemovePartitionStatistics actual) { + Assert.assertEquals( + "Snapshots to remove should be the same", expected.snapshotId(), actual.snapshotId()); + } + private static void assertEqualsAddSnapshot( MetadataUpdate.AddSnapshot expected, MetadataUpdate.AddSnapshot actual) { Assert.assertEquals( diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index fc3aa6c91685..b6f7d7e8c503 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.List; @@ -32,6 +33,7 @@ import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.PositionOutputStream; import org.apache.iceberg.puffin.Blob; import org.apache.iceberg.puffin.Puffin; import org.apache.iceberg.puffin.PuffinWriter; @@ -1277,8 +1279,8 @@ public void testExpireWithStatisticsFiles() throws IOException { .as("Should contain only the statistics file of snapshot2") .isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId())); - Assertions.assertThat(new File(statsFileLocation1).exists()).isFalse(); - Assertions.assertThat(new File(statsFileLocation2).exists()).isTrue(); + Assertions.assertThat(new File(statsFileLocation1)).doesNotExist(); + Assertions.assertThat(new File(statsFileLocation2)).exists(); } @Test @@ -1315,7 +1317,84 @@ public void testExpireWithStatisticsFilesWithReuse() throws IOException { .as("Should contain only the statistics file of snapshot2") .isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId())); // the reused stats file should exist. - Assertions.assertThat(new File(statsFileLocation1).exists()).isTrue(); + Assertions.assertThat(new File(statsFileLocation1)).exists(); + } + + @Test + public void testExpireWithPartitionStatisticsFiles() throws IOException { + table.newAppend().appendFile(FILE_A).commit(); + String statsFileLocation1 = statsFileLocation(table.location()); + PartitionStatisticsFile statisticsFile1 = + writePartitionStatsFile( + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + statsFileLocation1, + table.io()); + commitPartitionStats(table, statisticsFile1); + + table.newAppend().appendFile(FILE_B).commit(); + String statsFileLocation2 = statsFileLocation(table.location()); + PartitionStatisticsFile statisticsFile2 = + writePartitionStatsFile( + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + statsFileLocation2, + table.io()); + commitPartitionStats(table, statisticsFile2); + Assert.assertEquals( + "Should have 2 partition statistics file", 2, table.partitionStatisticsFiles().size()); + + long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); + removeSnapshots(table).expireOlderThan(tAfterCommits).commit(); + + // only the current snapshot and its stats file should be retained + Assert.assertEquals("Should keep 1 snapshot", 1, Iterables.size(table.snapshots())); + Assertions.assertThat(table.partitionStatisticsFiles()) + .hasSize(1) + .extracting(PartitionStatisticsFile::snapshotId) + .as("Should contain only the statistics file of snapshot2") + .isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId())); + + Assertions.assertThat(new File(statsFileLocation1)).doesNotExist(); + Assertions.assertThat(new File(statsFileLocation2)).exists(); + } + + @Test + public void testExpireWithPartitionStatisticsFilesWithReuse() throws IOException { + table.newAppend().appendFile(FILE_A).commit(); + String statsFileLocation1 = statsFileLocation(table.location()); + PartitionStatisticsFile statisticsFile1 = + writePartitionStatsFile( + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + statsFileLocation1, + table.io()); + commitPartitionStats(table, statisticsFile1); + + table.newAppend().appendFile(FILE_B).commit(); + // If an expired snapshot's stats file is reused for some reason by the live snapshots, + // that stats file should not get deleted from the file system as the live snapshots still + // reference it. + PartitionStatisticsFile statisticsFile2 = + reusePartitionStatsFile(table.currentSnapshot().snapshotId(), statisticsFile1); + commitPartitionStats(table, statisticsFile2); + + Assert.assertEquals( + "Should have 2 partition statistics file", 2, table.partitionStatisticsFiles().size()); + + long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); + removeSnapshots(table).expireOlderThan(tAfterCommits).commit(); + + // only the current snapshot and its stats file (reused from previous snapshot) should be + // retained + Assert.assertEquals("Should keep 1 snapshot", 1, Iterables.size(table.snapshots())); + Assertions.assertThat(table.partitionStatisticsFiles()) + .hasSize(1) + .extracting(PartitionStatisticsFile::snapshotId) + .as("Should contain only the statistics file of snapshot2") + .isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId())); + // the reused stats file should exist. + Assertions.assertThat(new File(statsFileLocation1)).exists(); } @Test @@ -1639,4 +1718,37 @@ private String statsFileLocation(String tableLocation) { String statsFileName = "stats-file-" + UUID.randomUUID(); return tableLocation + "/metadata/" + statsFileName; } + + private static PartitionStatisticsFile writePartitionStatsFile( + long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO) { + PositionOutputStream positionOutputStream; + try { + positionOutputStream = fileIO.newOutputFile(statsLocation).create(); + positionOutputStream.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .fileSizeInBytes(snapshotSequenceNumber) + .path(statsLocation) + .build(); + } + + private static PartitionStatisticsFile reusePartitionStatsFile( + long snapshotId, PartitionStatisticsFile statisticsFile) { + return ImmutableGenericPartitionStatisticsFile.builder() + .path(statisticsFile.path()) + .fileSizeInBytes(statisticsFile.fileSizeInBytes()) + .snapshotId(snapshotId) + .build(); + } + + private static void commitPartitionStats(Table table, PartitionStatisticsFile statisticsFile) { + table + .updatePartitionStatistics() + .setPartitionStatistics(statisticsFile.snapshotId(), statisticsFile) + .commit(); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestSetPartitionStatistics.java b/core/src/test/java/org/apache/iceberg/TestSetPartitionStatistics.java new file mode 100644 index 000000000000..a5ead00061db --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSetPartitionStatistics.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestSetPartitionStatistics extends TableTestBase { + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] {1, 2}; + } + + public TestSetPartitionStatistics(int formatVersion) { + super(formatVersion); + } + + @Test + public void testEmptyUpdateStatistics() { + assertTableMetadataVersion(0); + TableMetadata base = readMetadata(); + + table.updatePartitionStatistics().commit(); + + Assert.assertSame( + "Base metadata should not change when commit is created", base, table.ops().current()); + assertTableMetadataVersion(1); + } + + @Test + public void testEmptyTransactionalUpdateStatistics() { + assertTableMetadataVersion(0); + TableMetadata base = readMetadata(); + + Transaction transaction = table.newTransaction(); + transaction.updatePartitionStatistics().commit(); + transaction.commitTransaction(); + + Assert.assertSame( + "Base metadata should not change when commit is created", base, table.ops().current()); + assertTableMetadataVersion(0); + } + + @Test + public void testUpdateStatistics() { + // Create a snapshot + table.newFastAppend().commit(); + assertTableMetadataVersion(1); + + TableMetadata base = readMetadata(); + long snapshotId = base.currentSnapshot().snapshotId(); + PartitionStatisticsFile partitionStatisticsFile = + ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .path("/some/partition/statistics/file.parquet") + .fileSizeInBytes(42L) + .build(); + + table + .updatePartitionStatistics() + .setPartitionStatistics(snapshotId, partitionStatisticsFile) + .commit(); + + TableMetadata metadata = readMetadata(); + assertTableMetadataVersion(2); + Assert.assertEquals( + "Table snapshot should be the same after setting partition statistics file", + snapshotId, + metadata.currentSnapshot().snapshotId()); + Assert.assertEquals( + "Table metadata should have partition statistics files", + ImmutableList.of(partitionStatisticsFile), + metadata.partitionStatisticsFiles()); + } + + @Test + public void testRemoveStatistics() { + // Create a snapshot + table.newFastAppend().commit(); + assertTableMetadataVersion(1); + + TableMetadata base = readMetadata(); + long snapshotId = base.currentSnapshot().snapshotId(); + PartitionStatisticsFile partitionStatisticsFile = + ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .path("/some/partition/statistics/file.parquet") + .fileSizeInBytes(42L) + .build(); + + table + .updatePartitionStatistics() + .setPartitionStatistics(snapshotId, partitionStatisticsFile) + .commit(); + + TableMetadata metadata = readMetadata(); + assertTableMetadataVersion(2); + Assert.assertEquals( + "Table metadata should have partition statistics files", + ImmutableList.of(partitionStatisticsFile), + metadata.partitionStatisticsFiles()); + + table.updatePartitionStatistics().removePartitionStatistics(snapshotId).commit(); + + metadata = readMetadata(); + assertTableMetadataVersion(3); + Assert.assertEquals( + "Table metadata should have no partition statistics files", + ImmutableList.of(), + metadata.partitionStatisticsFiles()); + } + + private void assertTableMetadataVersion(int expected) { + Assert.assertEquals( + String.format("Table should be on version %s", expected), expected, (int) version()); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index 7ff21e4c389b..ccb9410bc1bb 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -92,6 +92,7 @@ public class TestTableMetadata { public TableOperations ops = new LocalTableOperations(temp); @Test + @SuppressWarnings("MethodLength") public void testJsonConversion() throws Exception { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); @@ -145,6 +146,14 @@ public void testJsonConversion() throws Exception { new GenericBlobMetadata( "some-stats", 11L, 2, ImmutableList.of(4), ImmutableMap.of())))); + List partitionStatisticsFiles = + ImmutableList.of( + ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(11L) + .path("/some/partition/stats/file.parquet") + .fileSizeInBytes(42L) + .build()); + TableMetadata expected = new TableMetadata( null, @@ -169,6 +178,7 @@ public void testJsonConversion() throws Exception { ImmutableList.of(), refs, statisticsFiles, + partitionStatisticsFiles, ImmutableList.of()); String asJson = TableMetadataParser.toJson(expected); @@ -235,6 +245,10 @@ public void testJsonConversion() throws Exception { metadata.snapshot(previousSnapshotId).schemaId()); Assert.assertEquals( "Statistics files should match", statisticsFiles, metadata.statisticsFiles()); + Assert.assertEquals( + "Partition statistics files should match", + partitionStatisticsFiles, + metadata.partitionStatisticsFiles()); Assert.assertEquals("Refs map should match", refs, metadata.refs()); } @@ -291,6 +305,7 @@ public void testBackwardCompat() throws Exception { ImmutableList.of(), ImmutableMap.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of()); String asJson = toJsonWithoutSpecAndSchemaList(expected); @@ -432,6 +447,7 @@ public void testInvalidMainBranch() throws IOException { ImmutableList.of(), refs, ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Current snapshot ID does not match main branch"); @@ -476,6 +492,7 @@ public void testMainWithoutCurrent() throws IOException { ImmutableList.of(), refs, ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Current snapshot is not set, but main branch exists"); @@ -515,6 +532,7 @@ public void testBranchSnapshotMissing() { ImmutableList.of(), refs, ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessageEndingWith("does not exist in the existing snapshots list"); @@ -618,6 +636,7 @@ public void testJsonWithPreviousMetadataLog() throws Exception { ImmutableList.copyOf(previousMetadataLog), ImmutableMap.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of()); String asJson = TableMetadataParser.toJson(base); @@ -697,6 +716,7 @@ public void testAddPreviousMetadataRemoveNone() throws IOException { ImmutableList.copyOf(previousMetadataLog), ImmutableMap.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -794,6 +814,7 @@ public void testAddPreviousMetadataRemoveOne() throws IOException { ImmutableList.copyOf(previousMetadataLog), ImmutableMap.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -897,6 +918,7 @@ public void testAddPreviousMetadataRemoveMultiple() throws IOException { ImmutableList.copyOf(previousMetadataLog), ImmutableMap.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -945,6 +967,7 @@ public void testV2UUIDValidation() { ImmutableList.of(), ImmutableMap.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("UUID is required in format v2"); @@ -978,6 +1001,7 @@ public void testVersionValidation() { ImmutableList.of(), ImmutableMap.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Unsupported format version: v" + unsupportedVersion); @@ -1322,6 +1346,122 @@ public void testRemoveStatistics() { Assert.assertEquals("Statistics file path", "/some/path/to/stats/file2", statisticsFile.path()); } + @Test + public void testPartitionStatistics() { + Schema schema = new Schema(Types.NestedField.required(10, "x", Types.StringType.get())); + + TableMetadata meta = + TableMetadata.newTableMetadata( + schema, PartitionSpec.unpartitioned(), null, ImmutableMap.of()); + Assert.assertEquals( + "Should default to no partition statistics files", + ImmutableList.of(), + meta.partitionStatisticsFiles()); + } + + @Test + public void testSetPartitionStatistics() { + Schema schema = new Schema(Types.NestedField.required(10, "x", Types.StringType.get())); + + TableMetadata meta = + TableMetadata.newTableMetadata( + schema, PartitionSpec.unpartitioned(), null, ImmutableMap.of()); + + TableMetadata withPartitionStatistics = + TableMetadata.buildFrom(meta) + .setPartitionStatistics( + 43, + ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(43) + .path("/some/path/to/partition/stats/file" + ".parquet") + .fileSizeInBytes(42L) + .build()) + .build(); + + Assertions.assertThat(withPartitionStatistics.partitionStatisticsFiles()) + .as("There should be one partition statistics file registered") + .hasSize(1); + PartitionStatisticsFile partitionStatisticsFile = + Iterables.getOnlyElement(withPartitionStatistics.partitionStatisticsFiles()); + Assert.assertEquals("Statistics file snapshot", 43L, partitionStatisticsFile.snapshotId()); + Assert.assertEquals( + "Statistics file path", + "/some/path/to/partition/stats/file.parquet", + partitionStatisticsFile.path()); + Assert.assertEquals( + "Statistics file size in bytes", 42L, partitionStatisticsFile.fileSizeInBytes()); + + TableMetadata withStatisticsReplaced = + TableMetadata.buildFrom(withPartitionStatistics) + .setPartitionStatistics( + 43, + ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(43) + .path("/some/path/to/partition/stats/file2" + ".parquet") + .fileSizeInBytes(48L) + .build()) + .build(); + + Assertions.assertThat(withStatisticsReplaced.partitionStatisticsFiles()) + .as("There should be one statistics file registered") + .hasSize(1); + partitionStatisticsFile = + Iterables.getOnlyElement(withStatisticsReplaced.partitionStatisticsFiles()); + Assert.assertEquals("Statistics file snapshot", 43L, partitionStatisticsFile.snapshotId()); + Assert.assertEquals( + "Statistics file path", + "/some/path/to/partition/stats/file2.parquet", + partitionStatisticsFile.path()); + Assert.assertEquals( + "Statistics file size in bytes", 48L, partitionStatisticsFile.fileSizeInBytes()); + } + + @Test + public void testRemovePartitionStatistics() { + Schema schema = new Schema(Types.NestedField.required(10, "x", Types.StringType.get())); + + TableMetadata meta = + TableMetadata.buildFrom( + TableMetadata.newTableMetadata( + schema, PartitionSpec.unpartitioned(), null, ImmutableMap.of())) + .setPartitionStatistics( + 43, + ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(43) + .path("/some/path/to/partition/stats/file1" + ".parquet") + .fileSizeInBytes(48L) + .build()) + .setPartitionStatistics( + 44, + ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(44) + .path("/some/path/to/partition/stats/file2" + ".parquet") + .fileSizeInBytes(49L) + .build()) + .build(); + + Assert.assertSame( + "Should detect no partition statistics to remove", + meta, + TableMetadata.buildFrom(meta).removePartitionStatistics(42L).build()); + + TableMetadata withOneRemoved = + TableMetadata.buildFrom(meta).removePartitionStatistics(43).build(); + + Assertions.assertThat(withOneRemoved.partitionStatisticsFiles()) + .as("There should be one partition statistics file retained") + .hasSize(1); + PartitionStatisticsFile partitionStatisticsFile = + Iterables.getOnlyElement(withOneRemoved.partitionStatisticsFiles()); + Assert.assertEquals("Statistics file snapshot", 44L, partitionStatisticsFile.snapshotId()); + Assert.assertEquals( + "Statistics file path", + "/some/path/to/partition/stats/file2.parquet", + partitionStatisticsFile.path()); + Assert.assertEquals( + "Statistics file size in bytes", 49L, partitionStatisticsFile.fileSizeInBytes()); + } + @Test public void testParseSchemaIdentifierFields() throws Exception { String data = readTableMetadataInputFile("TableMetadataV2Valid.json"); @@ -1533,6 +1673,23 @@ public void testParseStatisticsFiles() throws Exception { Iterables.getOnlyElement(parsed.statisticsFiles())); } + @Test + public void testParsePartitionStatisticsFiles() throws Exception { + String data = readTableMetadataInputFile("TableMetadataPartitionStatisticsFiles.json"); + TableMetadata parsed = TableMetadataParser.fromJson(data); + Assertions.assertThat(parsed.partitionStatisticsFiles()) + .as("parsed partition statistics files") + .hasSize(1); + Assert.assertEquals( + "parsed partition statistics file", + ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(3055729675574597004L) + .path("s3://a/b/partition-stats.parquet") + .fileSizeInBytes(43L) + .build(), + Iterables.getOnlyElement(parsed.partitionStatisticsFiles())); + } + @Test public void testNoReservedPropertyForTableMetadataCreation() { Schema schema = new Schema(Types.NestedField.required(10, "x", Types.StringType.get())); diff --git a/core/src/test/resources/TableMetadataPartitionStatisticsFiles.json b/core/src/test/resources/TableMetadataPartitionStatisticsFiles.json new file mode 100644 index 000000000000..af8f99b37e73 --- /dev/null +++ b/core/src/test/resources/TableMetadataPartitionStatisticsFiles.json @@ -0,0 +1,61 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + } + ] + } + ], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [] + } + ], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "properties": {}, + "current-snapshot-id": 3055729675574597004, + "snapshots": [ + { + "snapshot-id": 3055729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/2.avro", + "schema-id": 0 + } + ], + "partition-statistics": [ + { + "snapshot-id": 3055729675574597004, + "statistics-path": "s3://a/b/partition-stats.parquet", + "file-size-in-bytes": 43 + } + ], + "snapshot-log": [], + "metadata-log": [] +} \ No newline at end of file From 3124544119974de6fa4d2bf5378c0ca9b7fe146c Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Thu, 7 Dec 2023 16:12:20 +0530 Subject: [PATCH 2/3] Address comments --- .palantir/revapi.yml | 4 -- .../iceberg/PartitionStatisticsFile.java | 7 +-- .../main/java/org/apache/iceberg/Table.java | 5 +- .../iceberg/UpdatePartitionStatistics.java | 3 +- .../apache/iceberg/FileCleanupStrategy.java | 5 ++ .../iceberg/IncrementalFileCleanup.java | 3 +- .../org/apache/iceberg/MetadataUpdate.java | 9 ++-- .../apache/iceberg/MetadataUpdateParser.java | 4 +- .../apache/iceberg/ReachableFileCleanup.java | 3 +- .../org/apache/iceberg/ReachableFileUtil.java | 52 ------------------- .../iceberg/SetPartitionStatistics.java | 28 ++++------ .../org/apache/iceberg/TableMetadata.java | 32 ++++++------ .../iceberg/TestMetadataUpdateParser.java | 3 +- .../apache/iceberg/TestRemoveSnapshots.java | 24 +++------ .../iceberg/TestSetPartitionStatistics.java | 10 +--- .../org/apache/iceberg/TestTableMetadata.java | 4 -- ...TableMetadataPartitionStatisticsFiles.json | 2 +- 17 files changed, 56 insertions(+), 142 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 98023912c715..283697418b6b 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -873,10 +873,6 @@ acceptedBreaks: new: "method void org.apache.iceberg.encryption.Ciphers::()" justification: "Static utility class - should not have public constructor" "1.4.0": - org.apache.iceberg:iceberg-api: - - code: "java.method.addedToInterface" - new: "method java.util.List org.apache.iceberg.Table::partitionStatisticsFiles()" - justification: "Track partition stats from TableMetadata" org.apache.iceberg:iceberg-core: - code: "java.field.serialVersionUIDChanged" new: "field org.apache.iceberg.util.SerializableMap.serialVersionUID" diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java b/api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java index f4217a4dc56b..e804748b8780 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java +++ b/api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java @@ -21,8 +21,7 @@ import java.io.Serializable; /** - * Represents a partition statistics file in the table default format, that can be used to read - * table data more efficiently. + * Represents a partition statistics file that can be used to read table data more efficiently. * *

Statistics are informational. A reader can choose to ignore statistics information. Statistics * support is not required to read the table correctly. @@ -31,9 +30,7 @@ public interface PartitionStatisticsFile extends Serializable { /** ID of the Iceberg table's snapshot the partition statistics file is associated with. */ long snapshotId(); - /** - * Returns fully qualified path to the file, suitable for constructing a Hadoop Path. Never null. - */ + /** Returns fully qualified path to the file. Never null. */ String path(); /** Returns the size of the partition statistics file in bytes. */ diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 98ea8a46371c..6da9bbf2f4fe 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -24,6 +24,7 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; /** Represents a table. */ public interface Table { @@ -339,7 +340,9 @@ default UpdatePartitionStatistics updatePartitionStatistics() { List statisticsFiles(); /** Returns the current partition statistics files for the table. */ - List partitionStatisticsFiles(); + default List partitionStatisticsFiles() { + return ImmutableList.of(); + } /** * Returns the current refs for the table diff --git a/api/src/main/java/org/apache/iceberg/UpdatePartitionStatistics.java b/api/src/main/java/org/apache/iceberg/UpdatePartitionStatistics.java index a36400036d1c..6590799d347e 100644 --- a/api/src/main/java/org/apache/iceberg/UpdatePartitionStatistics.java +++ b/api/src/main/java/org/apache/iceberg/UpdatePartitionStatistics.java @@ -28,8 +28,7 @@ public interface UpdatePartitionStatistics extends PendingUpdate pathsToDelete, String fileType) { .run(deleteFunc::accept); } + protected boolean hasAnyStatisticsFiles(TableMetadata tableMetadata) { + return !tableMetadata.statisticsFiles().isEmpty() + || !tableMetadata.partitionStatisticsFiles().isEmpty(); + } + protected Set expiredStatisticsFilesLocations( TableMetadata beforeExpiration, TableMetadata afterExpiration) { Set statsFileLocationsBeforeExpiration = statsFileLocations(beforeExpiration); diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java index 6b4923830afe..60ad46e8e864 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java @@ -262,8 +262,7 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira deleteFiles(manifestsToDelete, "manifest"); deleteFiles(manifestListsToDelete, "manifest list"); - if (!beforeExpiration.statisticsFiles().isEmpty() - || !beforeExpiration.partitionStatisticsFiles().isEmpty()) { + if (hasAnyStatisticsFiles(beforeExpiration)) { Set expiredStatisticsFilesLocations = expiredStatisticsFilesLocations(beforeExpiration, afterExpiration); deleteFiles(expiredStatisticsFilesLocations, "statistics files"); diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index b35f4a181939..d133b76901da 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -244,17 +244,14 @@ public void applyTo(TableMetadata.Builder metadataBuilder) { } class SetPartitionStatistics implements MetadataUpdate { - private final long snapshotId; private final PartitionStatisticsFile partitionStatisticsFile; - public SetPartitionStatistics( - long snapshotId, PartitionStatisticsFile partitionStatisticsFile) { - this.snapshotId = snapshotId; + public SetPartitionStatistics(PartitionStatisticsFile partitionStatisticsFile) { this.partitionStatisticsFile = partitionStatisticsFile; } public long snapshotId() { - return snapshotId; + return partitionStatisticsFile.snapshotId(); } public PartitionStatisticsFile partitionStatisticsFile() { @@ -263,7 +260,7 @@ public PartitionStatisticsFile partitionStatisticsFile() { @Override public void applyTo(TableMetadata.Builder metadataBuilder) { - metadataBuilder.setPartitionStatistics(snapshotId, partitionStatisticsFile); + metadataBuilder.setPartitionStatistics(partitionStatisticsFile); } } diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java index cffe26b098ee..8cdfd3c72b6e 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java @@ -376,7 +376,6 @@ private static void writeRemoveStatistics( private static void writeSetPartitionStatistics( MetadataUpdate.SetPartitionStatistics update, JsonGenerator gen) throws IOException { - gen.writeNumberField(SNAPSHOT_ID, update.snapshotId()); gen.writeFieldName(PARTITION_STATISTICS); PartitionStatisticsFileParser.toJson(update.partitionStatisticsFile(), gen); } @@ -510,11 +509,10 @@ private static MetadataUpdate readRemoveStatistics(JsonNode node) { } private static MetadataUpdate readSetPartitionStatistics(JsonNode node) { - long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); JsonNode partitionStatisticsFileNode = JsonUtil.get(PARTITION_STATISTICS, node); PartitionStatisticsFile partitionStatisticsFile = PartitionStatisticsFileParser.fromJson(partitionStatisticsFileNode); - return new MetadataUpdate.SetPartitionStatistics(snapshotId, partitionStatisticsFile); + return new MetadataUpdate.SetPartitionStatistics(partitionStatisticsFile); } private static MetadataUpdate readRemovePartitionStatistics(JsonNode node) { diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java index 9db2ef4d90c6..dd4239196996 100644 --- a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java +++ b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java @@ -82,8 +82,7 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira deleteFiles(manifestListsToDelete, "manifest list"); - if (!beforeExpiration.statisticsFiles().isEmpty() - || !beforeExpiration.partitionStatisticsFiles().isEmpty()) { + if (hasAnyStatisticsFiles(beforeExpiration)) { deleteFiles( expiredStatisticsFilesLocations(beforeExpiration, afterExpiration), "statistics files"); } diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java index 773f20d1234b..bd23a221ab22 100644 --- a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java +++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java @@ -137,9 +137,7 @@ public static List manifestListLocations(Table table, Set snapshot * * @param table table for which statistics files needs to be listed * @return the location of statistics files - * @deprecated use the {@code allStatisticsFilesLocations(table)} instead. */ - @Deprecated public static List statisticsFilesLocations(Table table) { return statisticsFilesLocations(table, statisticsFile -> true); } @@ -150,9 +148,7 @@ public static List statisticsFilesLocations(Table table) { * @param table table for which statistics files needs to be listed * @param predicate predicate for filtering the statistics files * @return the location of statistics files - * @deprecated use the {@code allStatisticsFilesLocations(table, snapshotIds)} instead. */ - @Deprecated public static List statisticsFilesLocations( Table table, Predicate predicate) { return table.statisticsFiles().stream() @@ -160,52 +156,4 @@ public static List statisticsFilesLocations( .map(StatisticsFile::path) .collect(Collectors.toList()); } - - /** - * Returns locations of statistics files in a table. - * - * @param table table for which statistics files needs to be listed - * @return the location of statistics files - */ - public static List allStatisticsFilesLocations(Table table) { - return allStatisticsFilesLocations(table, null); - } - - /** - * Returns locations of statistics files for a table for given snapshots. - * - * @param table table for which statistics files needs to be listed - * @param snapshotIds snapshot IDs for which statistics files needs to be listed - * @return the location of statistics files - */ - public static List allStatisticsFilesLocations(Table table, Set snapshotIds) { - List statsFileLocations = Lists.newArrayList(); - - Predicate statisticsFilePredicate; - Predicate partitionStatisticsFilePredicate; - if (snapshotIds == null) { - statisticsFilePredicate = statisticsFile -> true; - partitionStatisticsFilePredicate = partitionStatisticsFile -> true; - } else { - statisticsFilePredicate = statisticsFile -> snapshotIds.contains(statisticsFile.snapshotId()); - partitionStatisticsFilePredicate = - partitionStatisticsFile -> snapshotIds.contains(partitionStatisticsFile.snapshotId()); - } - - if (table.statisticsFiles() != null) { - table.statisticsFiles().stream() - .filter(statisticsFilePredicate) - .map(StatisticsFile::path) - .forEach(statsFileLocations::add); - } - - if (table.partitionStatisticsFiles() != null) { - table.partitionStatisticsFiles().stream() - .filter(partitionStatisticsFilePredicate) - .map(PartitionStatisticsFile::path) - .forEach(statsFileLocations::add); - } - - return statsFileLocations; - } } diff --git a/core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java b/core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java index 09b41a9244a8..93a1d19ea04a 100644 --- a/core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java @@ -19,15 +19,14 @@ package org.apache.iceberg; import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.Set; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; public class SetPartitionStatistics implements UpdatePartitionStatistics { private final TableOperations ops; - private final Map> partitionStatisticsToSet = - Maps.newHashMap(); + private final Set partitionStatisticsToSet = Sets.newHashSet(); + private final Set partitionStatisticsToRemove = Sets.newHashSet(); public SetPartitionStatistics(TableOperations ops) { this.ops = ops; @@ -35,15 +34,16 @@ public SetPartitionStatistics(TableOperations ops) { @Override public UpdatePartitionStatistics setPartitionStatistics( - long snapshotId, PartitionStatisticsFile partitionStatisticsFile) { - Preconditions.checkArgument(snapshotId == partitionStatisticsFile.snapshotId()); - partitionStatisticsToSet.put(snapshotId, Optional.of(partitionStatisticsFile)); + PartitionStatisticsFile partitionStatisticsFile) { + Preconditions.checkArgument( + null != partitionStatisticsFile, "partition statistics file must not be null"); + partitionStatisticsToSet.add(partitionStatisticsFile); return this; } @Override public UpdatePartitionStatistics removePartitionStatistics(long snapshotId) { - partitionStatisticsToSet.put(snapshotId, Optional.empty()); + partitionStatisticsToRemove.add(snapshotId); return this; } @@ -61,14 +61,8 @@ public void commit() { private TableMetadata internalApply(TableMetadata base) { TableMetadata.Builder builder = TableMetadata.buildFrom(base); - partitionStatisticsToSet.forEach( - (snapshotId, statistics) -> { - if (statistics.isPresent()) { - builder.setPartitionStatistics(snapshotId, statistics.get()); - } else { - builder.removePartitionStatistics(snapshotId); - } - }); + partitionStatisticsToSet.forEach(builder::setPartitionStatistics); + partitionStatisticsToRemove.forEach(builder::removePartitionStatistics); return builder.build(); } } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index c2a82218b8b3..801b7b7eac3a 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -955,18 +955,25 @@ private Builder(TableMetadata base) { this.previousFileLocation = base.metadataFileLocation; this.previousFiles = base.previousFiles; this.refs = Maps.newHashMap(base.refs); - this.statisticsFiles = - base.statisticsFiles.stream().collect(Collectors.groupingBy(StatisticsFile::snapshotId)); - this.partitionStatisticsFiles = - base.partitionStatisticsFiles.stream() - .collect(Collectors.groupingBy(PartitionStatisticsFile::snapshotId)); - + this.statisticsFiles = statsFileBySnapshotID(base); + this.partitionStatisticsFiles = partitionStatsFileBySnapshotID(base); this.snapshotsById = Maps.newHashMap(base.snapshotsById); this.schemasById = Maps.newHashMap(base.schemasById); this.specsById = Maps.newHashMap(base.specsById); this.sortOrdersById = Maps.newHashMap(base.sortOrdersById); } + private static Map> statsFileBySnapshotID(TableMetadata base) { + return base.statisticsFiles.stream() + .collect(Collectors.groupingBy(StatisticsFile::snapshotId)); + } + + private static Map> partitionStatsFileBySnapshotID( + TableMetadata base) { + return base.partitionStatisticsFiles.stream() + .collect(Collectors.groupingBy(PartitionStatisticsFile::snapshotId)); + } + public Builder withMetadataLocation(String newMetadataLocation) { this.metadataLocation = newMetadataLocation; return this; @@ -1300,16 +1307,11 @@ public Builder suppressHistoricalSnapshots() { return this; } - public Builder setPartitionStatistics( - long snapshotId, PartitionStatisticsFile partitionStatisticsFile) { + public Builder setPartitionStatistics(PartitionStatisticsFile partitionStatisticsFile) { Preconditions.checkNotNull(partitionStatisticsFile, "partition statistics file is null"); - Preconditions.checkArgument( - snapshotId == partitionStatisticsFile.snapshotId(), - "snapshotId does not match: %s vs %s", - snapshotId, - partitionStatisticsFile.snapshotId()); - partitionStatisticsFiles.put(snapshotId, ImmutableList.of(partitionStatisticsFile)); - changes.add(new MetadataUpdate.SetPartitionStatistics(snapshotId, partitionStatisticsFile)); + partitionStatisticsFiles.put( + partitionStatisticsFile.snapshotId(), ImmutableList.of(partitionStatisticsFile)); + changes.add(new MetadataUpdate.SetPartitionStatistics(partitionStatisticsFile)); return this; } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index c19c824f9623..64e01f8cd8ff 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -887,7 +887,7 @@ public void testSetCurrentViewVersionToJson() { @Test public void testSetPartitionStatistics() { String json = - "{\"action\":\"set-partition-statistics\",\"snapshot-id\":1940541653261589030," + "{\"action\":\"set-partition-statistics\"," + "\"partition-statistics\":{\"snapshot-id\":1940541653261589030," + "\"statistics-path\":\"s3://bucket/warehouse/stats1.parquet\"," + "\"file-size-in-bytes\":43}}"; @@ -895,7 +895,6 @@ public void testSetPartitionStatistics() { long snapshotId = 1940541653261589030L; MetadataUpdate expected = new MetadataUpdate.SetPartitionStatistics( - snapshotId, ImmutableGenericPartitionStatisticsFile.builder() .snapshotId(snapshotId) .path("s3://bucket/warehouse/stats1" + ".parquet") diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index b6f7d7e8c503..1a270e5b00fe 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -1326,20 +1326,14 @@ public void testExpireWithPartitionStatisticsFiles() throws IOException { String statsFileLocation1 = statsFileLocation(table.location()); PartitionStatisticsFile statisticsFile1 = writePartitionStatsFile( - table.currentSnapshot().snapshotId(), - table.currentSnapshot().sequenceNumber(), - statsFileLocation1, - table.io()); + table.currentSnapshot().snapshotId(), statsFileLocation1, table.io()); commitPartitionStats(table, statisticsFile1); table.newAppend().appendFile(FILE_B).commit(); String statsFileLocation2 = statsFileLocation(table.location()); PartitionStatisticsFile statisticsFile2 = writePartitionStatsFile( - table.currentSnapshot().snapshotId(), - table.currentSnapshot().sequenceNumber(), - statsFileLocation2, - table.io()); + table.currentSnapshot().snapshotId(), statsFileLocation2, table.io()); commitPartitionStats(table, statisticsFile2); Assert.assertEquals( "Should have 2 partition statistics file", 2, table.partitionStatisticsFiles().size()); @@ -1365,10 +1359,7 @@ public void testExpireWithPartitionStatisticsFilesWithReuse() throws IOException String statsFileLocation1 = statsFileLocation(table.location()); PartitionStatisticsFile statisticsFile1 = writePartitionStatsFile( - table.currentSnapshot().snapshotId(), - table.currentSnapshot().sequenceNumber(), - statsFileLocation1, - table.io()); + table.currentSnapshot().snapshotId(), statsFileLocation1, table.io()); commitPartitionStats(table, statisticsFile1); table.newAppend().appendFile(FILE_B).commit(); @@ -1720,7 +1711,7 @@ private String statsFileLocation(String tableLocation) { } private static PartitionStatisticsFile writePartitionStatsFile( - long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO) { + long snapshotId, String statsLocation, FileIO fileIO) { PositionOutputStream positionOutputStream; try { positionOutputStream = fileIO.newOutputFile(statsLocation).create(); @@ -1731,7 +1722,7 @@ private static PartitionStatisticsFile writePartitionStatsFile( return ImmutableGenericPartitionStatisticsFile.builder() .snapshotId(snapshotId) - .fileSizeInBytes(snapshotSequenceNumber) + .fileSizeInBytes(42L) .path(statsLocation) .build(); } @@ -1746,9 +1737,6 @@ private static PartitionStatisticsFile reusePartitionStatsFile( } private static void commitPartitionStats(Table table, PartitionStatisticsFile statisticsFile) { - table - .updatePartitionStatistics() - .setPartitionStatistics(statisticsFile.snapshotId(), statisticsFile) - .commit(); + table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit(); } } diff --git a/core/src/test/java/org/apache/iceberg/TestSetPartitionStatistics.java b/core/src/test/java/org/apache/iceberg/TestSetPartitionStatistics.java index a5ead00061db..2ab5a141133f 100644 --- a/core/src/test/java/org/apache/iceberg/TestSetPartitionStatistics.java +++ b/core/src/test/java/org/apache/iceberg/TestSetPartitionStatistics.java @@ -76,10 +76,7 @@ public void testUpdateStatistics() { .fileSizeInBytes(42L) .build(); - table - .updatePartitionStatistics() - .setPartitionStatistics(snapshotId, partitionStatisticsFile) - .commit(); + table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile).commit(); TableMetadata metadata = readMetadata(); assertTableMetadataVersion(2); @@ -108,10 +105,7 @@ public void testRemoveStatistics() { .fileSizeInBytes(42L) .build(); - table - .updatePartitionStatistics() - .setPartitionStatistics(snapshotId, partitionStatisticsFile) - .commit(); + table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile).commit(); TableMetadata metadata = readMetadata(); assertTableMetadataVersion(2); diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index ccb9410bc1bb..0e5b32595708 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -1370,7 +1370,6 @@ public void testSetPartitionStatistics() { TableMetadata withPartitionStatistics = TableMetadata.buildFrom(meta) .setPartitionStatistics( - 43, ImmutableGenericPartitionStatisticsFile.builder() .snapshotId(43) .path("/some/path/to/partition/stats/file" + ".parquet") @@ -1394,7 +1393,6 @@ public void testSetPartitionStatistics() { TableMetadata withStatisticsReplaced = TableMetadata.buildFrom(withPartitionStatistics) .setPartitionStatistics( - 43, ImmutableGenericPartitionStatisticsFile.builder() .snapshotId(43) .path("/some/path/to/partition/stats/file2" + ".parquet") @@ -1425,14 +1423,12 @@ public void testRemovePartitionStatistics() { TableMetadata.newTableMetadata( schema, PartitionSpec.unpartitioned(), null, ImmutableMap.of())) .setPartitionStatistics( - 43, ImmutableGenericPartitionStatisticsFile.builder() .snapshotId(43) .path("/some/path/to/partition/stats/file1" + ".parquet") .fileSizeInBytes(48L) .build()) .setPartitionStatistics( - 44, ImmutableGenericPartitionStatisticsFile.builder() .snapshotId(44) .path("/some/path/to/partition/stats/file2" + ".parquet") diff --git a/core/src/test/resources/TableMetadataPartitionStatisticsFiles.json b/core/src/test/resources/TableMetadataPartitionStatisticsFiles.json index af8f99b37e73..df145e6636a3 100644 --- a/core/src/test/resources/TableMetadataPartitionStatisticsFiles.json +++ b/core/src/test/resources/TableMetadataPartitionStatisticsFiles.json @@ -58,4 +58,4 @@ ], "snapshot-log": [], "metadata-log": [] -} \ No newline at end of file +} From 5bd45e1c4f8c604a0a0c2dbe755297dd0468748a Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Fri, 15 Dec 2023 18:33:16 +0530 Subject: [PATCH 3/3] Address new comments --- .../iceberg/PartitionStatisticsFile.java | 4 +-- .../iceberg/SetPartitionStatistics.java | 20 +++++------ .../org/apache/iceberg/TableMetadata.java | 35 ++++++++----------- .../apache/iceberg/TableMetadataParser.java | 19 +++++----- 4 files changed, 34 insertions(+), 44 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java b/api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java index e804748b8780..aef0d6c32ded 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java +++ b/api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java @@ -18,15 +18,13 @@ */ package org.apache.iceberg; -import java.io.Serializable; - /** * Represents a partition statistics file that can be used to read table data more efficiently. * *

Statistics are informational. A reader can choose to ignore statistics information. Statistics * support is not required to read the table correctly. */ -public interface PartitionStatisticsFile extends Serializable { +public interface PartitionStatisticsFile { /** ID of the Iceberg table's snapshot the partition statistics file is associated with. */ long snapshotId(); diff --git a/core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java b/core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java index 93a1d19ea04a..aceb72e6a697 100644 --- a/core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java @@ -19,31 +19,31 @@ package org.apache.iceberg; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; public class SetPartitionStatistics implements UpdatePartitionStatistics { private final TableOperations ops; - private final Set partitionStatisticsToSet = Sets.newHashSet(); - private final Set partitionStatisticsToRemove = Sets.newHashSet(); + private final Map statsToSet = Maps.newHashMap(); + private final Set statsToRemove = Sets.newHashSet(); public SetPartitionStatistics(TableOperations ops) { this.ops = ops; } @Override - public UpdatePartitionStatistics setPartitionStatistics( - PartitionStatisticsFile partitionStatisticsFile) { - Preconditions.checkArgument( - null != partitionStatisticsFile, "partition statistics file must not be null"); - partitionStatisticsToSet.add(partitionStatisticsFile); + public UpdatePartitionStatistics setPartitionStatistics(PartitionStatisticsFile file) { + Preconditions.checkArgument(null != file, "partition statistics file must not be null"); + statsToSet.put(file.snapshotId(), file); return this; } @Override public UpdatePartitionStatistics removePartitionStatistics(long snapshotId) { - partitionStatisticsToRemove.add(snapshotId); + statsToRemove.add(snapshotId); return this; } @@ -61,8 +61,8 @@ public void commit() { private TableMetadata internalApply(TableMetadata base) { TableMetadata.Builder builder = TableMetadata.buildFrom(base); - partitionStatisticsToSet.forEach(builder::setPartitionStatistics); - partitionStatisticsToRemove.forEach(builder::removePartitionStatistics); + statsToSet.values().forEach(builder::setPartitionStatistics); + statsToRemove.forEach(builder::removePartitionStatistics); return builder.build(); } } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 801b7b7eac3a..b9061a3107ac 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -955,25 +955,14 @@ private Builder(TableMetadata base) { this.previousFileLocation = base.metadataFileLocation; this.previousFiles = base.previousFiles; this.refs = Maps.newHashMap(base.refs); - this.statisticsFiles = statsFileBySnapshotID(base); - this.partitionStatisticsFiles = partitionStatsFileBySnapshotID(base); + this.statisticsFiles = indexStatistics(base.statisticsFiles); + this.partitionStatisticsFiles = indexPartitionStatistics(base.partitionStatisticsFiles); this.snapshotsById = Maps.newHashMap(base.snapshotsById); this.schemasById = Maps.newHashMap(base.schemasById); this.specsById = Maps.newHashMap(base.specsById); this.sortOrdersById = Maps.newHashMap(base.sortOrdersById); } - private static Map> statsFileBySnapshotID(TableMetadata base) { - return base.statisticsFiles.stream() - .collect(Collectors.groupingBy(StatisticsFile::snapshotId)); - } - - private static Map> partitionStatsFileBySnapshotID( - TableMetadata base) { - return base.partitionStatisticsFiles.stream() - .collect(Collectors.groupingBy(PartitionStatisticsFile::snapshotId)); - } - public Builder withMetadataLocation(String newMetadataLocation) { this.metadataLocation = newMetadataLocation; return this; @@ -1281,7 +1270,6 @@ public Builder setStatistics(long snapshotId, StatisticsFile statisticsFile) { } public Builder removeStatistics(long snapshotId) { - Preconditions.checkNotNull(snapshotId, "snapshotId is null"); if (statisticsFiles.remove(snapshotId) == null) { return this; } @@ -1307,16 +1295,14 @@ public Builder suppressHistoricalSnapshots() { return this; } - public Builder setPartitionStatistics(PartitionStatisticsFile partitionStatisticsFile) { - Preconditions.checkNotNull(partitionStatisticsFile, "partition statistics file is null"); - partitionStatisticsFiles.put( - partitionStatisticsFile.snapshotId(), ImmutableList.of(partitionStatisticsFile)); - changes.add(new MetadataUpdate.SetPartitionStatistics(partitionStatisticsFile)); + public Builder setPartitionStatistics(PartitionStatisticsFile file) { + Preconditions.checkNotNull(file, "partition statistics file is null"); + partitionStatisticsFiles.put(file.snapshotId(), ImmutableList.of(file)); + changes.add(new MetadataUpdate.SetPartitionStatistics(file)); return this; } public Builder removePartitionStatistics(long snapshotId) { - Preconditions.checkNotNull(snapshotId, "snapshotId is null"); if (partitionStatisticsFiles.remove(snapshotId) == null) { return this; } @@ -1776,6 +1762,15 @@ private static List updateSnapshotLog( return newSnapshotLog; } + private static Map> indexStatistics(List files) { + return files.stream().collect(Collectors.groupingBy(StatisticsFile::snapshotId)); + } + + private static Map> indexPartitionStatistics( + List files) { + return files.stream().collect(Collectors.groupingBy(PartitionStatisticsFile::snapshotId)); + } + private boolean isAddedSnapshot(long snapshotId) { return changes(MetadataUpdate.AddSnapshot.class) .anyMatch(add -> add.snapshot().snapshotId() == snapshotId); diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index 0212738dea9c..8bda184142cd 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -490,7 +490,7 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) { List partitionStatisticsFiles; if (node.has(PARTITION_STATISTICS)) { - partitionStatisticsFiles = partitionStatisticsFilesFromJson(node.get(PARTITION_STATISTICS)); + partitionStatisticsFiles = partitionStatsFilesFromJson(node.get(PARTITION_STATISTICS)); } else { partitionStatisticsFiles = ImmutableList.of(); } @@ -577,20 +577,17 @@ private static List statisticsFilesFromJson(JsonNode statisticsF return statisticsFilesBuilder.build(); } - private static List partitionStatisticsFilesFromJson( - JsonNode partitionStatisticsFilesList) { + private static List partitionStatsFilesFromJson(JsonNode filesList) { Preconditions.checkArgument( - partitionStatisticsFilesList.isArray(), + filesList.isArray(), "Cannot parse partition statistics files from non-array: %s", - partitionStatisticsFilesList); + filesList); - ImmutableList.Builder partitionStatisticsFileBuilder = - ImmutableList.builder(); - for (JsonNode partitionStatisticsFile : partitionStatisticsFilesList) { - partitionStatisticsFileBuilder.add( - PartitionStatisticsFileParser.fromJson(partitionStatisticsFile)); + ImmutableList.Builder statsFileBuilder = ImmutableList.builder(); + for (JsonNode partitionStatsFile : filesList) { + statsFileBuilder.add(PartitionStatisticsFileParser.fromJson(partitionStatsFile)); } - return partitionStatisticsFileBuilder.build(); + return statsFileBuilder.build(); } }