diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java new file mode 100644 index 000000000000..262ea1733027 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/ContentFile.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +/** + * Superinterface of {@link DataFile} and {@link DeleteFile} that exposes common methods. + * + * @param the concrete Java class of a ContentFile instance. + */ +public interface ContentFile { + /** + * @return type of content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES + */ + FileContent content(); + + /** + * @return fully qualified path to the file, suitable for constructing a Hadoop Path + */ + CharSequence path(); + + /** + * @return format of the file + */ + FileFormat format(); + + /** + * @return partition for this file as a {@link StructLike} + */ + StructLike partition(); + + /** + * @return the number of top-level records in the file + */ + long recordCount(); + + /** + * @return the file size in bytes + */ + long fileSizeInBytes(); + + /** + * @return if collected, map from column ID to the size of the column in bytes, null otherwise + */ + Map columnSizes(); + + /** + * @return if collected, map from column ID to the count of its non-null values, null otherwise + */ + Map valueCounts(); + + /** + * @return if collected, map from column ID to its null value count, null otherwise + */ + Map nullValueCounts(); + + /** + * @return if collected, map from column ID to value lower bounds, null otherwise + */ + Map lowerBounds(); + + /** + * @return if collected, map from column ID to value upper bounds, null otherwise + */ + Map upperBounds(); + + /** + * @return metadata about how this file is encrypted, or null if the file is stored in plain + * text. + */ + ByteBuffer keyMetadata(); + + /** + * @return List of recommended split locations, if applicable, null otherwise. + * When available, this information is used for planning scan tasks whose boundaries + * are determined by these offsets. The returned list must be sorted in ascending order. + */ + List splitOffsets(); + + + /** + * Copies this file. Manifest readers can reuse file instances; use + * this method to copy data when collecting files from tasks. + * + * @return a copy of this data file + */ + F copy(); + + /** + * Copies this file without file stats. Manifest readers can reuse file instances; use + * this method to copy data without stats when collecting files. + * + * @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts + */ + F copyWithoutStats(); +} diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index d9a44413b38d..747057b52f03 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -19,9 +19,6 @@ package org.apache.iceberg; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.BinaryType; import org.apache.iceberg.types.Types.IntegerType; @@ -35,9 +32,9 @@ import static org.apache.iceberg.types.Types.NestedField.required; /** - * Interface for files listed in a table manifest. + * Interface for data files listed in a table manifest. */ -public interface DataFile { +public interface DataFile extends ContentFile { // fields for adding delete data files Types.NestedField CONTENT = optional(134, "content", IntegerType.get(), "Contents of the file: 0=data, 1=position deletes, 2=equality deletes"); @@ -86,86 +83,8 @@ static StructType getType(StructType partitionType) { /** * @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES */ + @Override default FileContent content() { return FileContent.DATA; } - - /** - * @return fully qualified path to the file, suitable for constructing a Hadoop Path - */ - CharSequence path(); - - /** - * @return format of the data file - */ - FileFormat format(); - - /** - * @return partition data for this file as a {@link StructLike} - */ - StructLike partition(); - - /** - * @return the number of top-level records in the data file - */ - long recordCount(); - - /** - * @return the data file size in bytes - */ - long fileSizeInBytes(); - - /** - * @return if collected, map from column ID to the size of the column in bytes, null otherwise - */ - Map columnSizes(); - - /** - * @return if collected, map from column ID to the count of its non-null values, null otherwise - */ - Map valueCounts(); - - /** - * @return if collected, map from column ID to its null value count, null otherwise - */ - Map nullValueCounts(); - - /** - * @return if collected, map from column ID to value lower bounds, null otherwise - */ - Map lowerBounds(); - - /** - * @return if collected, map from column ID to value upper bounds, null otherwise - */ - Map upperBounds(); - - /** - * @return metadata about how this file is encrypted, or null if the file is stored in plain - * text. - */ - ByteBuffer keyMetadata(); - - /** - * @return List of recommended split locations, if applicable, null otherwise. - * When available, this information is used for planning scan tasks whose boundaries - * are determined by these offsets. The returned list must be sorted in ascending order. - */ - List splitOffsets(); - - /** - * Copies this {@link DataFile data file}. Manifest readers can reuse data file instances; use - * this method to copy data when collecting files from tasks. - * - * @return a copy of this data file - */ - DataFile copy(); - - /** - * Copies this {@link DataFile data file} without file stats. Manifest readers can reuse data file instances; use - * this method to copy data without stats when collecting files. - * - * @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts - */ - DataFile copyWithoutStats(); } diff --git a/api/src/main/java/org/apache/iceberg/DeleteFile.java b/api/src/main/java/org/apache/iceberg/DeleteFile.java new file mode 100644 index 000000000000..9adc0fb547c8 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/DeleteFile.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; + +/** + * Interface for delete files listed in a table delete manifest. + */ +public interface DeleteFile extends ContentFile { + /** + * @return List of recommended split locations, if applicable, null otherwise. + * When available, this information is used for planning scan tasks whose boundaries + * are determined by these offsets. The returned list must be sorted in ascending order. + */ + @Override + default List splitOffsets() { + return null; + } +} diff --git a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java index 40695861a643..148ac4f94cab 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java +++ b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor; @@ -40,7 +41,7 @@ *

* This evaluation is inclusive: it returns true if a file may match and false if it cannot match. *

- * Files are passed to {@link #eval(DataFile)}, which returns true if the file may contain matching + * Files are passed to {@link #eval(ContentFile)}, which returns true if the file may contain matching * rows and false if the file cannot contain matching rows. Files may be skipped if and only if the * return value of {@code eval} is false. */ @@ -70,7 +71,7 @@ public InclusiveMetricsEvaluator(Schema schema, Expression unbound, boolean case * @param file a data file * @return false if the file cannot contain rows that match the expression, true otherwise. */ - public boolean eval(DataFile file) { + public boolean eval(ContentFile file) { // TODO: detect the case where a column is missing from the file using file's max field id. return visitor().eval(file); } @@ -84,7 +85,7 @@ private class MetricsEvalVisitor extends BoundExpressionVisitor { private Map lowerBounds = null; private Map upperBounds = null; - private boolean eval(DataFile file) { + private boolean eval(ContentFile file) { if (file.recordCount() == 0) { return ROWS_CANNOT_MATCH; } diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java new file mode 100644 index 000000000000..7b0341831a10 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificData; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +/** + * Base class for both {@link DataFile} and {@link DeleteFile}. + */ +abstract class BaseFile + implements ContentFile, IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable { + static final Types.StructType EMPTY_STRUCT_TYPE = Types.StructType.of(); + static final PartitionData EMPTY_PARTITION_DATA = new PartitionData(EMPTY_STRUCT_TYPE) { + @Override + public PartitionData copy() { + return this; // this does not change + } + }; + + private int[] fromProjectionPos; + private Types.StructType partitionType; + + private FileContent content = FileContent.DATA; + private String filePath = null; + private FileFormat format = null; + private PartitionData partitionData = null; + private Long recordCount = null; + private long fileSizeInBytes = -1L; + + // optional fields + private Map columnSizes = null; + private Map valueCounts = null; + private Map nullValueCounts = null; + private Map lowerBounds = null; + private Map upperBounds = null; + private List splitOffsets = null; + private byte[] keyMetadata = null; + + // cached schema + private transient Schema avroSchema = null; + + /** + * Used by Avro reflection to instantiate this class when reading manifest files. + */ + BaseFile(Schema avroSchema) { + this.avroSchema = avroSchema; + + Types.StructType schema = AvroSchemaUtil.convert(avroSchema).asNestedType().asStructType(); + + // partition type may be null if the field was not projected + Type partType = schema.fieldType("partition"); + if (partType != null) { + this.partitionType = partType.asNestedType().asStructType(); + } else { + this.partitionType = EMPTY_STRUCT_TYPE; + } + + List fields = schema.fields(); + List allFields = DataFile.getType(partitionType).fields(); + this.fromProjectionPos = new int[fields.size()]; + for (int i = 0; i < fromProjectionPos.length; i += 1) { + boolean found = false; + for (int j = 0; j < allFields.size(); j += 1) { + if (fields.get(i).fieldId() == allFields.get(j).fieldId()) { + found = true; + fromProjectionPos[i] = j; + } + } + + if (!found) { + throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i)); + } + } + + this.partitionData = new PartitionData(partitionType); + } + + BaseFile(FileContent content, String filePath, FileFormat format, + PartitionData partition, long fileSizeInBytes, long recordCount, + Map columnSizes, Map valueCounts, Map nullValueCounts, + Map lowerBounds, Map upperBounds, List splitOffsets, + ByteBuffer keyMetadata) { + this.content = content; + this.filePath = filePath; + this.format = format; + + // this constructor is used by DataFiles.Builder, which passes null for unpartitioned data + if (partition == null) { + this.partitionData = EMPTY_PARTITION_DATA; + this.partitionType = EMPTY_PARTITION_DATA.getPartitionType(); + } else { + this.partitionData = partition; + this.partitionType = partition.getPartitionType(); + } + + // this will throw NPE if metrics.recordCount is null + this.recordCount = recordCount; + this.fileSizeInBytes = fileSizeInBytes; + this.columnSizes = columnSizes; + this.valueCounts = valueCounts; + this.nullValueCounts = nullValueCounts; + this.lowerBounds = SerializableByteBufferMap.wrap(lowerBounds); + this.upperBounds = SerializableByteBufferMap.wrap(upperBounds); + this.splitOffsets = copy(splitOffsets); + this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); + } + + /** + * Copy constructor. + * + * @param toCopy a generic data file to copy. + * @param fullCopy whether to copy all fields or to drop column-level stats + */ + BaseFile(BaseFile toCopy, boolean fullCopy) { + this.content = toCopy.content; + this.filePath = toCopy.filePath; + this.format = toCopy.format; + this.partitionData = toCopy.partitionData.copy(); + this.partitionType = toCopy.partitionType; + this.recordCount = toCopy.recordCount; + this.fileSizeInBytes = toCopy.fileSizeInBytes; + if (fullCopy) { + // TODO: support lazy conversion to/from map + this.columnSizes = copy(toCopy.columnSizes); + this.valueCounts = copy(toCopy.valueCounts); + this.nullValueCounts = copy(toCopy.nullValueCounts); + this.lowerBounds = SerializableByteBufferMap.wrap(copy(toCopy.lowerBounds)); + this.upperBounds = SerializableByteBufferMap.wrap(copy(toCopy.upperBounds)); + } else { + this.columnSizes = null; + this.valueCounts = null; + this.nullValueCounts = null; + this.lowerBounds = null; + this.upperBounds = null; + } + this.fromProjectionPos = toCopy.fromProjectionPos; + this.keyMetadata = toCopy.keyMetadata == null ? null : Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length); + this.splitOffsets = copy(toCopy.splitOffsets); + } + + /** + * Constructor for Java serialization. + */ + BaseFile() { + } + + protected abstract Schema getAvroSchema(Types.StructType partitionStruct); + + @Override + public Schema getSchema() { + if (avroSchema == null) { + this.avroSchema = getAvroSchema(partitionType); + } + return avroSchema; + } + + @Override + @SuppressWarnings("unchecked") + public void put(int i, Object value) { + int pos = i; + // if the schema was projected, map the incoming ordinal to the expected one + if (fromProjectionPos != null) { + pos = fromProjectionPos[i]; + } + switch (pos) { + case 0: + this.content = value != null ? FileContent.values()[(Integer) value] : FileContent.DATA; + return; + case 1: + // always coerce to String for Serializable + this.filePath = value.toString(); + return; + case 2: + this.format = FileFormat.valueOf(value.toString()); + return; + case 3: + this.partitionData = (PartitionData) value; + return; + case 4: + this.recordCount = (Long) value; + return; + case 5: + this.fileSizeInBytes = (Long) value; + return; + case 6: + this.columnSizes = (Map) value; + return; + case 7: + this.valueCounts = (Map) value; + return; + case 8: + this.nullValueCounts = (Map) value; + return; + case 9: + this.lowerBounds = SerializableByteBufferMap.wrap((Map) value); + return; + case 10: + this.upperBounds = SerializableByteBufferMap.wrap((Map) value); + return; + case 11: + this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); + return; + case 12: + this.splitOffsets = (List) value; + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public void set(int pos, T value) { + put(pos, value); + } + + @Override + public Object get(int i) { + int pos = i; + // if the schema was projected, map the incoming ordinal to the expected one + if (fromProjectionPos != null) { + pos = fromProjectionPos[i]; + } + switch (pos) { + case 0: + return content.id(); + case 1: + return filePath; + case 2: + return format != null ? format.toString() : null; + case 3: + return partitionData; + case 4: + return recordCount; + case 5: + return fileSizeInBytes; + case 6: + return columnSizes; + case 7: + return valueCounts; + case 8: + return nullValueCounts; + case 9: + return lowerBounds; + case 10: + return upperBounds; + case 11: + return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null; + case 12: + return splitOffsets; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } + + @Override + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + @Override + public int size() { + return DataFile.getType(EMPTY_STRUCT_TYPE).fields().size(); + } + + public FileContent content() { + return content; + } + + public CharSequence path() { + return filePath; + } + + public FileFormat format() { + return format; + } + + public StructLike partition() { + return partitionData; + } + + public long recordCount() { + return recordCount; + } + + public long fileSizeInBytes() { + return fileSizeInBytes; + } + + public Map columnSizes() { + return columnSizes; + } + + public Map valueCounts() { + return valueCounts; + } + + public Map nullValueCounts() { + return nullValueCounts; + } + + public Map lowerBounds() { + return lowerBounds; + } + + public Map upperBounds() { + return upperBounds; + } + + public ByteBuffer keyMetadata() { + return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null; + } + + public List splitOffsets() { + return splitOffsets; + } + + private static Map copy(Map map) { + if (map != null) { + Map copy = Maps.newHashMapWithExpectedSize(map.size()); + copy.putAll(map); + return Collections.unmodifiableMap(copy); + } + return null; + } + + private static List copy(List list) { + if (list != null) { + List copy = Lists.newArrayListWithExpectedSize(list.size()); + copy.addAll(list); + return Collections.unmodifiableList(copy); + } + return null; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("content", content.toString().toLowerCase(Locale.ROOT)) + .add("file_path", filePath) + .add("file_format", format) + .add("partition", partitionData) + .add("record_count", recordCount) + .add("file_size_in_bytes", fileSizeInBytes) + .add("column_sizes", columnSizes) + .add("value_counts", valueCounts) + .add("null_value_counts", nullValueCounts) + .add("lower_bounds", lowerBounds) + .add("upper_bounds", upperBounds) + .add("key_metadata", keyMetadata == null ? "null" : "(redacted)") + .add("split_offsets", splitOffsets == null ? "null" : splitOffsets) + .toString(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseManifestReader.java b/core/src/main/java/org/apache/iceberg/BaseManifestReader.java index d253555a5afd..7217fc06eb7b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/BaseManifestReader.java @@ -47,17 +47,18 @@ /** * Base reader for data and delete manifest files. * - * @param The Java class of files returned by this reader. + * @param The Java class of files returned by this reader. * @param The Java class of this reader, returned by configuration methods. */ -abstract class BaseManifestReader extends CloseableGroup implements CloseableIterable { +abstract class BaseManifestReader, ThisT> + extends CloseableGroup implements CloseableIterable { static final ImmutableList ALL_COLUMNS = ImmutableList.of("*"); private static final Set STATS_COLUMNS = Sets.newHashSet( "value_counts", "null_value_counts", "lower_bounds", "upper_bounds"); protected enum FileType { DATA_FILES(GenericDataFile.class.getName()), - DELETE_FILES("..."); + DELETE_FILES(GenericDeleteFile.class.getName()); private final String fileClass; @@ -95,7 +96,7 @@ protected BaseManifestReader(InputFile file, Map specsBy this.content = content; try { - try (AvroIterable headerReader = Avro.read(file) + try (AvroIterable> headerReader = Avro.read(file) .project(ManifestEntry.getSchema(Types.StructType.of()).select("status")) .build()) { this.metadata = headerReader.getMetadata(); @@ -163,7 +164,7 @@ public ThisT caseSensitive(boolean isCaseSensitive) { return self(); } - CloseableIterable entries() { + CloseableIterable> entries() { if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) || (partFilter != null && partFilter != Expressions.alwaysTrue())) { Evaluator evaluator = evaluator(); @@ -183,13 +184,13 @@ CloseableIterable entries() { } } - private CloseableIterable open(Schema projection) { + private CloseableIterable> open(Schema projection) { FileFormat format = FileFormat.fromFileName(file.location()); Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file); switch (format) { case AVRO: - AvroIterable reader = Avro.read(file) + AvroIterable> reader = Avro.read(file) .project(ManifestEntry.wrapFileSchema(projection.asStruct())) .rename("manifest_entry", GenericManifestEntry.class.getName()) .rename("partition", PartitionData.class.getName()) @@ -209,7 +210,7 @@ private CloseableIterable open(Schema projection) { } } - CloseableIterable liveEntries() { + CloseableIterable> liveEntries() { return CloseableIterable.filter(entries(), entry -> entry != null && entry.status() != ManifestEntry.Status.DELETED); } @@ -218,13 +219,11 @@ CloseableIterable liveEntries() { * @return an Iterator of DataFile. Makes defensive copies of files before returning */ @Override - @SuppressWarnings("unchecked") - public CloseableIterator iterator() { + public CloseableIterator iterator() { if (dropStats(rowFilter, columns)) { - return (CloseableIterator) CloseableIterable.transform(liveEntries(), e -> e.file().copyWithoutStats()) - .iterator(); + return CloseableIterable.transform(liveEntries(), e -> e.file().copyWithoutStats()).iterator(); } else { - return (CloseableIterator) CloseableIterable.transform(liveEntries(), e -> e.file().copy()).iterator(); + return CloseableIterable.transform(liveEntries(), e -> e.file().copy()).iterator(); } } diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index 5fd02f5fe0e0..8d0951ece2f7 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -262,7 +262,7 @@ private static void deleteFiles(FileIO io, Set allManifests) { .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) .run(manifest -> { try (ManifestReader reader = ManifestFiles.read(manifest, io)) { - for (ManifestEntry entry : reader.entries()) { + for (ManifestEntry entry : reader.entries()) { // intern the file path because the weak key map uses identity (==) instead of equals String path = entry.file().path().toString().intern(); Boolean alreadyDeleted = deletedFiles.putIfAbsent(path, true); diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 405d3fe9e82a..24c3c2dbd40d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -287,7 +287,7 @@ private int activeFilesCount(Iterable manifests) { return activeFilesCount; } - private void appendEntry(ManifestEntry entry, Object key, int partitionSpecId) { + private void appendEntry(ManifestEntry entry, Object key, int partitionSpecId) { Preconditions.checkNotNull(entry, "Manifest entry cannot be null"); Preconditions.checkNotNull(key, "Key cannot be null"); @@ -323,13 +323,13 @@ long getManifestTargetSizeBytes() { class WriterWrapper { private final PartitionSpec spec; - private ManifestWriter writer; + private ManifestWriter writer; WriterWrapper(PartitionSpec spec) { this.spec = spec; } - synchronized void addEntry(ManifestEntry entry) { + synchronized void addEntry(ManifestEntry entry) { if (writer == null) { writer = newManifestWriter(spec); } else if (writer.length() >= getManifestTargetSizeBytes()) { diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index 91466c89d476..4da4b37be432 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -158,10 +158,10 @@ private void cacheChanges() { // read only manifests that were created by this snapshot Iterable changedManifests = Iterables.filter(manifests(), manifest -> Objects.equal(manifest.snapshotId(), snapshotId)); - try (CloseableIterable entries = new ManifestGroup(io, changedManifests) + try (CloseableIterable> entries = new ManifestGroup(io, changedManifests) .ignoreExisting() .entries()) { - for (ManifestEntry entry : entries) { + for (ManifestEntry entry : entries) { switch (entry.status()) { case ADDED: adds.add(entry.file().copy()); diff --git a/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java b/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java new file mode 100644 index 000000000000..dfeee6b99416 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Map; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; + +/** + * Reader for manifest files. + *

+ * Create readers using {@link ManifestFiles#readDeleteManifest(ManifestFile, FileIO, Map)}. + */ +public class DeleteManifestReader extends BaseManifestReader { + protected DeleteManifestReader(InputFile file, Map specsById, InheritableMetadata metadata) { + super(file, specsById, metadata, FileType.DELETE_FILES); + } + + @Override + protected DeleteManifestReader self() { + return this; + } +} diff --git a/core/src/main/java/org/apache/iceberg/FileHistory.java b/core/src/main/java/org/apache/iceberg/FileHistory.java index 89d8fd367059..b6645f70f371 100644 --- a/core/src/main/java/org/apache/iceberg/FileHistory.java +++ b/core/src/main/java/org/apache/iceberg/FileHistory.java @@ -80,7 +80,7 @@ public Builder before(long timestampMillis) { } @SuppressWarnings("unchecked") - public Iterable build() { + public Iterable> build() { Iterable snapshots = table.snapshots(); if (startTime != null) { @@ -100,11 +100,11 @@ public Iterable build() { // a manifest group will only read each manifest once ManifestGroup group = new ManifestGroup(((HasTableOperations) table).operations().io(), manifests); - List results = Lists.newArrayList(); - try (CloseableIterable entries = group.select(HISTORY_COLUMNS).entries()) { + List> results = Lists.newArrayList(); + try (CloseableIterable> entries = group.select(HISTORY_COLUMNS).entries()) { // TODO: replace this with an IN predicate CharSequenceWrapper locationWrapper = CharSequenceWrapper.wrap(null); - for (ManifestEntry entry : entries) { + for (ManifestEntry entry : entries) { if (entry != null && locations.contains(locationWrapper.set(entry.file().path()))) { results.add(entry.copy()); } diff --git a/core/src/main/java/org/apache/iceberg/FindFiles.java b/core/src/main/java/org/apache/iceberg/FindFiles.java index dfe202114540..94e6f8afcf6a 100644 --- a/core/src/main/java/org/apache/iceberg/FindFiles.java +++ b/core/src/main/java/org/apache/iceberg/FindFiles.java @@ -197,7 +197,7 @@ public CloseableIterable collect() { } // when snapshot is not null - CloseableIterable entries = new ManifestGroup(ops.io(), snapshot.manifests()) + CloseableIterable> entries = new ManifestGroup(ops.io(), snapshot.manifests()) .specsById(ops.current().specsById()) .filterData(rowFilter) .filterFiles(fileFilter) diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index 6c6b1baab3dc..4e8658745460 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -19,142 +19,43 @@ package org.apache.iceberg; -import java.io.Serializable; +import com.google.common.collect.ImmutableMap; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Map; -import org.apache.avro.generic.IndexedRecord; -import org.apache.avro.specific.SpecificData; +import org.apache.avro.Schema; import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.ByteBuffers; - -class GenericDataFile - implements DataFile, IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable { - private static final Types.StructType EMPTY_STRUCT_TYPE = Types.StructType.of(); - private static final PartitionData EMPTY_PARTITION_DATA = new PartitionData(EMPTY_STRUCT_TYPE) { - @Override - public PartitionData copy() { - return this; // this does not change - } - }; - - private int[] fromProjectionPos; - private Types.StructType partitionType; - - private String filePath = null; - private FileFormat format = null; - private PartitionData partitionData = null; - private Long recordCount = null; - private long fileSizeInBytes = -1L; - - // optional fields - private Map columnSizes = null; - private Map valueCounts = null; - private Map nullValueCounts = null; - private Map lowerBounds = null; - private Map upperBounds = null; - private List splitOffsets = null; - private byte[] keyMetadata = null; - - // cached schema - private transient org.apache.avro.Schema avroSchema = null; +class GenericDataFile extends BaseFile implements DataFile { /** * Used by Avro reflection to instantiate this class when reading manifest files. */ - @SuppressWarnings("checkstyle:RedundantModifier") // Must be public - public GenericDataFile(org.apache.avro.Schema avroSchema) { - this.avroSchema = avroSchema; - - Types.StructType schema = AvroSchemaUtil.convert(avroSchema).asNestedType().asStructType(); - - // partition type may be null if the field was not projected - Type partType = schema.fieldType("partition"); - if (partType != null) { - this.partitionType = partType.asNestedType().asStructType(); - } else { - this.partitionType = EMPTY_STRUCT_TYPE; - } - - List fields = schema.fields(); - List allFields = DataFile.getType(partitionType).fields(); - this.fromProjectionPos = new int[fields.size()]; - for (int i = 0; i < fromProjectionPos.length; i += 1) { - boolean found = false; - for (int j = 0; j < allFields.size(); j += 1) { - if (fields.get(i).fieldId() == allFields.get(j).fieldId()) { - found = true; - fromProjectionPos[i] = j; - } - } - - if (!found) { - throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i)); - } - } - - this.partitionData = new PartitionData(partitionType); + GenericDataFile(org.apache.avro.Schema avroSchema) { + super(avroSchema); } GenericDataFile(String filePath, FileFormat format, long recordCount, long fileSizeInBytes) { - this.filePath = filePath; - this.format = format; - this.partitionData = EMPTY_PARTITION_DATA; - this.partitionType = EMPTY_PARTITION_DATA.getPartitionType(); - this.recordCount = recordCount; - this.fileSizeInBytes = fileSizeInBytes; + this(filePath, format, null, recordCount, fileSizeInBytes); } GenericDataFile(String filePath, FileFormat format, PartitionData partition, long recordCount, long fileSizeInBytes) { - this.filePath = filePath; - this.format = format; - this.partitionData = partition; - this.partitionType = partition.getPartitionType(); - this.recordCount = recordCount; - this.fileSizeInBytes = fileSizeInBytes; + super(FileContent.DATA, filePath, format, partition, fileSizeInBytes, recordCount, + null, null, null, null, null, null, null); } GenericDataFile(String filePath, FileFormat format, PartitionData partition, long fileSizeInBytes, Metrics metrics, List splitOffsets) { - this.filePath = filePath; - this.format = format; - - // this constructor is used by DataFiles.Builder, which passes null for unpartitioned data - if (partition == null) { - this.partitionData = EMPTY_PARTITION_DATA; - this.partitionType = EMPTY_PARTITION_DATA.getPartitionType(); - } else { - this.partitionData = partition; - this.partitionType = partition.getPartitionType(); - } - - // this will throw NPE if metrics.recordCount is null - this.recordCount = metrics.recordCount(); - this.fileSizeInBytes = fileSizeInBytes; - this.columnSizes = metrics.columnSizes(); - this.valueCounts = metrics.valueCounts(); - this.nullValueCounts = metrics.nullValueCounts(); - this.lowerBounds = SerializableByteBufferMap.wrap(metrics.lowerBounds()); - this.upperBounds = SerializableByteBufferMap.wrap(metrics.upperBounds()); - this.splitOffsets = copy(splitOffsets); + this(filePath, format, partition, fileSizeInBytes, metrics, null, splitOffsets); } GenericDataFile(String filePath, FileFormat format, PartitionData partition, long fileSizeInBytes, Metrics metrics, ByteBuffer keyMetadata, List splitOffsets) { - this(filePath, format, partition, fileSizeInBytes, metrics, splitOffsets); - this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); + super(FileContent.DATA, filePath, format, partition, fileSizeInBytes, metrics.recordCount(), + metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(), + metrics.lowerBounds(), metrics.upperBounds(), splitOffsets, keyMetadata); } /** @@ -164,29 +65,7 @@ public GenericDataFile(org.apache.avro.Schema avroSchema) { * @param fullCopy whether to copy all fields or to drop column-level stats */ private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) { - this.filePath = toCopy.filePath; - this.format = toCopy.format; - this.partitionData = toCopy.partitionData.copy(); - this.partitionType = toCopy.partitionType; - this.recordCount = toCopy.recordCount; - this.fileSizeInBytes = toCopy.fileSizeInBytes; - if (fullCopy) { - // TODO: support lazy conversion to/from map - this.columnSizes = copy(toCopy.columnSizes); - this.valueCounts = copy(toCopy.valueCounts); - this.nullValueCounts = copy(toCopy.nullValueCounts); - this.lowerBounds = SerializableByteBufferMap.wrap(copy(toCopy.lowerBounds)); - this.upperBounds = SerializableByteBufferMap.wrap(copy(toCopy.upperBounds)); - } else { - this.columnSizes = null; - this.valueCounts = null; - this.nullValueCounts = null; - this.lowerBounds = null; - this.upperBounds = null; - } - this.fromProjectionPos = toCopy.fromProjectionPos; - this.keyMetadata = toCopy.keyMetadata == null ? null : Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length); - this.splitOffsets = copy(toCopy.splitOffsets); + super(toCopy, fullCopy); } /** @@ -195,213 +74,6 @@ private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) { GenericDataFile() { } - @Override - public FileContent content() { - return FileContent.DATA; - } - - @Override - public CharSequence path() { - return filePath; - } - - @Override - public FileFormat format() { - return format; - } - - @Override - public StructLike partition() { - return partitionData; - } - - @Override - public long recordCount() { - return recordCount; - } - - @Override - public long fileSizeInBytes() { - return fileSizeInBytes; - } - - @Override - public Map columnSizes() { - return columnSizes; - } - - @Override - public Map valueCounts() { - return valueCounts; - } - - @Override - public Map nullValueCounts() { - return nullValueCounts; - } - - @Override - public Map lowerBounds() { - return lowerBounds; - } - - @Override - public Map upperBounds() { - return upperBounds; - } - - @Override - public ByteBuffer keyMetadata() { - return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null; - } - - @Override - public List splitOffsets() { - return splitOffsets; - } - - @Override - public org.apache.avro.Schema getSchema() { - if (avroSchema == null) { - this.avroSchema = getAvroSchema(partitionType); - } - return avroSchema; - } - - @Override - @SuppressWarnings("unchecked") - public void put(int i, Object v) { - int pos = i; - // if the schema was projected, map the incoming ordinal to the expected one - if (fromProjectionPos != null) { - pos = fromProjectionPos[i]; - } - switch (pos) { - case 0: - Preconditions.checkState(v == null || (Integer) v == FileContent.DATA.id(), - "Invalid content for a DataFile: %s", v); - return; - case 1: - // always coerce to String for Serializable - this.filePath = v.toString(); - return; - case 2: - this.format = FileFormat.valueOf(v.toString()); - return; - case 3: - this.partitionData = (PartitionData) v; - return; - case 4: - this.recordCount = (Long) v; - return; - case 5: - this.fileSizeInBytes = (Long) v; - return; - case 6: - this.columnSizes = (Map) v; - return; - case 7: - this.valueCounts = (Map) v; - return; - case 8: - this.nullValueCounts = (Map) v; - return; - case 9: - this.lowerBounds = SerializableByteBufferMap.wrap((Map) v); - return; - case 10: - this.upperBounds = SerializableByteBufferMap.wrap((Map) v); - return; - case 11: - this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) v); - return; - case 12: - this.splitOffsets = (List) v; - return; - default: - // ignore the object, it must be from a newer version of the format - } - } - - @Override - public void set(int pos, T value) { - put(pos, value); - } - - @Override - public Object get(int i) { - int pos = i; - // if the schema was projected, map the incoming ordinal to the expected one - if (fromProjectionPos != null) { - pos = fromProjectionPos[i]; - } - switch (pos) { - case 0: - return FileContent.DATA.id(); - case 1: - return filePath; - case 2: - return format != null ? format.toString() : null; - case 3: - return partitionData; - case 4: - return recordCount; - case 5: - return fileSizeInBytes; - case 6: - return columnSizes; - case 7: - return valueCounts; - case 8: - return nullValueCounts; - case 9: - return lowerBounds; - case 10: - return upperBounds; - case 11: - return keyMetadata(); - case 12: - return splitOffsets; - default: - throw new UnsupportedOperationException("Unknown field ordinal: " + pos); - } - } - - @Override - public T get(int pos, Class javaClass) { - return javaClass.cast(get(pos)); - } - - private static org.apache.avro.Schema getAvroSchema(Types.StructType partitionType) { - Types.StructType type = DataFile.getType(partitionType); - return AvroSchemaUtil.convert(type, ImmutableMap.of( - type, GenericDataFile.class.getName(), - partitionType, PartitionData.class.getName())); - } - - @Override - public int size() { - return DataFile.getType(EMPTY_STRUCT_TYPE).fields().size(); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("file_path", filePath) - .add("file_format", format) - .add("partition", partitionData) - .add("record_count", recordCount) - .add("file_size_in_bytes", fileSizeInBytes) - .add("column_sizes", columnSizes) - .add("value_counts", valueCounts) - .add("null_value_counts", nullValueCounts) - .add("lower_bounds", lowerBounds) - .add("upper_bounds", upperBounds) - .add("key_metadata", keyMetadata == null ? "null" : "(redacted)") - .add("split_offsets", splitOffsets == null ? "null" : splitOffsets) - .toString(); - } - @Override public DataFile copyWithoutStats() { return new GenericDataFile(this, false /* drop stats */); @@ -412,21 +84,10 @@ public DataFile copy() { return new GenericDataFile(this, true /* full copy */); } - private static Map copy(Map map) { - if (map != null) { - Map copy = Maps.newHashMapWithExpectedSize(map.size()); - copy.putAll(map); - return Collections.unmodifiableMap(copy); - } - return null; - } - - private static List copy(List list) { - if (list != null) { - List copy = Lists.newArrayListWithExpectedSize(list.size()); - copy.addAll(list); - return Collections.unmodifiableList(copy); - } - return null; + protected Schema getAvroSchema(Types.StructType partitionStruct) { + Types.StructType type = DataFile.getType(partitionStruct); + return AvroSchemaUtil.convert(type, ImmutableMap.of( + type, GenericDataFile.class.getName(), + partitionStruct, PartitionData.class.getName())); } } diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java new file mode 100644 index 000000000000..f4a28d0fe346 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + + +import com.google.common.collect.ImmutableMap; +import java.nio.ByteBuffer; +import org.apache.avro.Schema; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.types.Types; + +class GenericDeleteFile extends BaseFile implements DeleteFile { + /** + * Used by Avro reflection to instantiate this class when reading manifest files. + */ + GenericDeleteFile(Schema avroSchema) { + super(avroSchema); + } + + GenericDeleteFile(FileContent content, String filePath, FileFormat format, PartitionData partition, + long fileSizeInBytes, Metrics metrics, ByteBuffer keyMetadata) { + super(content, filePath, format, partition, fileSizeInBytes, metrics.recordCount(), + metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(), + metrics.lowerBounds(), metrics.upperBounds(), null, keyMetadata); + } + + /** + * Copy constructor. + * + * @param toCopy a generic data file to copy. + * @param fullCopy whether to copy all fields or to drop column-level stats + */ + private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy) { + super(toCopy, fullCopy); + } + + /** + * Constructor for Java serialization. + */ + GenericDeleteFile() { + } + + @Override + public DeleteFile copyWithoutStats() { + return new GenericDeleteFile(this, false /* drop stats */); + } + + @Override + public DeleteFile copy() { + return new GenericDeleteFile(this, true /* full copy */); + } + + protected Schema getAvroSchema(Types.StructType partitionStruct) { + Types.StructType type = DataFile.getType(partitionStruct); + return AvroSchemaUtil.convert(type, ImmutableMap.of( + type, GenericDeleteFile.class.getName(), + partitionStruct, PartitionData.class.getName())); + } +} diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java index cc2781801b49..a4bc6f8201c1 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java @@ -25,27 +25,24 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.types.Types; -class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable, StructLike { +class GenericManifestEntry> + implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable, StructLike { private final org.apache.avro.Schema schema; - private final V1Metadata.IndexedDataFile fileWrapper; private Status status = Status.EXISTING; private Long snapshotId = null; private Long sequenceNumber = null; - private DataFile file = null; + private F file = null; GenericManifestEntry(org.apache.avro.Schema schema) { this.schema = schema; - this.fileWrapper = null; // do not use the file wrapper to read } GenericManifestEntry(Types.StructType partitionType) { this.schema = AvroSchemaUtil.convert(V1Metadata.entrySchema(partitionType), "manifest_entry"); - this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema()); } - private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) { + private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) { this.schema = toCopy.schema; - this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema()); this.status = toCopy.status; this.snapshotId = toCopy.snapshotId; if (fullCopy) { @@ -55,7 +52,7 @@ private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) { } } - ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { + ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, F newFile) { this.status = Status.EXISTING; this.snapshotId = newSnapshotId; this.sequenceNumber = newSequenceNumber; @@ -63,7 +60,7 @@ ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile return this; } - ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { + ManifestEntry wrapAppend(Long newSnapshotId, F newFile) { this.status = Status.ADDED; this.snapshotId = newSnapshotId; this.sequenceNumber = null; @@ -71,7 +68,7 @@ ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { return this; } - ManifestEntry wrapDelete(Long newSnapshotId, DataFile newFile) { + ManifestEntry wrapDelete(Long newSnapshotId, F newFile) { this.status = Status.DELETED; this.snapshotId = newSnapshotId; this.sequenceNumber = null; @@ -104,18 +101,18 @@ public Long sequenceNumber() { * @return a file */ @Override - public DataFile file() { + public F file() { return file; } @Override - public ManifestEntry copy() { - return new GenericManifestEntry(this, true /* full copy */); + public ManifestEntry copy() { + return new GenericManifestEntry<>(this, true /* full copy */); } @Override - public ManifestEntry copyWithoutStats() { - return new GenericManifestEntry(this, false /* drop stats */); + public ManifestEntry copyWithoutStats() { + return new GenericManifestEntry<>(this, false /* drop stats */); } @Override @@ -129,6 +126,7 @@ public void setSequenceNumber(long newSequenceNumber) { } @Override + @SuppressWarnings("unchecked") public void put(int i, Object v) { switch (i) { case 0: @@ -141,7 +139,7 @@ public void put(int i, Object v) { this.sequenceNumber = (Long) v; return; case 3: - this.file = (DataFile) v; + this.file = (F) v; return; default: // ignore the object, it must be from a newer version of the format @@ -163,11 +161,7 @@ public Object get(int i) { case 2: return sequenceNumber; case 3: - if (fileWrapper == null || file instanceof GenericDataFile) { - return file; - } else { - return fileWrapper.wrap(file); - } + return file; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); } diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java b/core/src/main/java/org/apache/iceberg/InheritableMetadata.java index 5eebe8a01861..44e05212af7b 100644 --- a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/InheritableMetadata.java @@ -22,5 +22,5 @@ import java.io.Serializable; interface InheritableMetadata extends Serializable { - ManifestEntry apply(ManifestEntry manifestEntry); + > ManifestEntry apply(ManifestEntry manifestEntry); } diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java index 3c271bc1b65a..d8ceb40da3a9 100644 --- a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java +++ b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java @@ -51,7 +51,7 @@ private BaseInheritableMetadata(long snapshotId, long sequenceNumber) { } @Override - public ManifestEntry apply(ManifestEntry manifestEntry) { + public > ManifestEntry apply(ManifestEntry manifestEntry) { if (manifestEntry.snapshotId() == null) { manifestEntry.setSnapshotId(snapshotId); } @@ -70,7 +70,7 @@ private CopyMetadata(long snapshotId) { } @Override - public ManifestEntry apply(ManifestEntry manifestEntry) { + public > ManifestEntry apply(ManifestEntry manifestEntry) { manifestEntry.setSnapshotId(snapshotId); return manifestEntry; } @@ -81,7 +81,7 @@ static class EmptyInheritableMetadata implements InheritableMetadata { private EmptyInheritableMetadata() {} @Override - public ManifestEntry apply(ManifestEntry manifestEntry) { + public > ManifestEntry apply(ManifestEntry manifestEntry) { if (manifestEntry.snapshotId() == null) { throw new IllegalArgumentException("Entries must have explicit snapshot ids if inherited metadata is empty"); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index 9e4b896a1caa..061230f5f48f 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -129,7 +129,7 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { public CloseableIterable rows() { return CloseableIterable.transform( ManifestFiles.read(manifest, io).project(fileSchema).entries(), - file -> (GenericManifestEntry) file); + file -> (GenericManifestEntry) file); } @Override diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java index bc03850c3633..e07b125a8c5c 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java @@ -25,7 +25,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; -interface ManifestEntry { +interface ManifestEntry> { enum Status { EXISTING(0), ADDED(1), @@ -89,9 +89,9 @@ static Schema wrapFileSchema(StructType fileType) { /** * @return a file */ - DataFile file(); + F file(); - ManifestEntry copy(); + ManifestEntry copy(); - ManifestEntry copyWithoutStats(); + ManifestEntry copyWithoutStats(); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index ea72e2f660af..730badf8d345 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -55,6 +55,8 @@ public static ManifestReader read(ManifestFile manifest, FileIO io) { * @return a {@link ManifestReader} */ public static ManifestReader read(ManifestFile manifest, FileIO io, Map specsById) { + Preconditions.checkArgument(manifest.content() == ManifestContent.DATA, + "Cannot read a delete manifest with a ManifestReader: %s", manifest); InputFile file = io.newInputFile(manifest.path()); InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); return new ManifestReader(file, specsById, inheritableMetadata); @@ -70,7 +72,7 @@ public static ManifestReader read(ManifestFile manifest, FileIO io, Map write(PartitionSpec spec, OutputFile outputFile) { return write(1, spec, outputFile, null); } @@ -83,7 +85,8 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID * @return a manifest writer */ - public static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + public static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, + Long snapshotId) { switch (formatVersion) { case 1: return new ManifestWriter.V1Writer(spec, outputFile, snapshotId); @@ -93,6 +96,43 @@ public static ManifestWriter write(int formatVersion, PartitionSpec spec, Output throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion); } + /** + * Returns a new {@link DeleteManifestReader} for a {@link ManifestFile}. + * + * @param manifest a {@link ManifestFile} + * @param io a {@link FileIO} + * @param specsById a Map from spec ID to partition spec + * @return a {@link DeleteManifestReader} + */ + public static DeleteManifestReader readDeleteManifest(ManifestFile manifest, FileIO io, + Map specsById) { + Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES, + "Cannot read a data manifest with a DeleteManifestReader: %s", manifest); + InputFile file = io.newInputFile(manifest.path()); + InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); + return new DeleteManifestReader(file, specsById, inheritableMetadata); + } + + /** + * Create a new {@link ManifestWriter} for the given format version. + * + * @param formatVersion a target format version + * @param spec a {@link PartitionSpec} + * @param outputFile an {@link OutputFile} where the manifest will be written + * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID + * @return a manifest writer + */ + public static ManifestWriter writeDeleteManifest(int formatVersion, PartitionSpec spec, + OutputFile outputFile, Long snapshotId) { + switch (formatVersion) { + case 1: + throw new IllegalArgumentException("Cannot write delete files in a v1 table"); + case 2: + return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId); + } + throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion); + } + static ManifestFile copyAppendManifest(int formatVersion, InputFile toCopy, Map specsById, OutputFile outputFile, long snapshotId, @@ -124,10 +164,10 @@ static ManifestFile copyRewriteManifest(int formatVersion, private static ManifestFile copyManifestInternal(int formatVersion, ManifestReader reader, OutputFile outputFile, long snapshotId, SnapshotSummary.Builder summaryBuilder, ManifestEntry.Status allowedEntryStatus) { - ManifestWriter writer = write(formatVersion, reader.spec(), outputFile, snapshotId); + ManifestWriter writer = write(formatVersion, reader.spec(), outputFile, snapshotId); boolean threw = true; try { - for (ManifestEntry entry : reader.entries()) { + for (ManifestEntry entry : reader.entries()) { Preconditions.checkArgument( allowedEntryStatus == entry.status(), "Invalid manifest entry status: %s (allowed status: %s)", diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index b98176e69264..3aca5f9edd4f 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -47,7 +47,7 @@ class ManifestGroup { private final FileIO io; private final Set manifests; private Predicate manifestPredicate; - private Predicate manifestEntryPredicate; + private Predicate> manifestEntryPredicate; private Map specsById; private Expression dataFilter; private Expression fileFilter; @@ -97,7 +97,7 @@ ManifestGroup filterManifests(Predicate newManifestPredicate) { return this; } - ManifestGroup filterManifestEntries(Predicate newManifestEntryPredicate) { + ManifestGroup filterManifestEntries(Predicate> newManifestEntryPredicate) { this.manifestEntryPredicate = manifestEntryPredicate.and(newManifestEntryPredicate); return this; } @@ -169,12 +169,12 @@ public CloseableIterable planFiles() { * * @return a CloseableIterable of manifest entries. */ - public CloseableIterable entries() { + public CloseableIterable> entries() { return CloseableIterable.concat(entries((manifest, entries) -> entries)); } private Iterable> entries( - BiFunction, CloseableIterable> entryFn) { + BiFunction>, CloseableIterable> entryFn) { LoadingCache evalCache = specsById == null ? null : Caffeine.newBuilder().build(specId -> { PartitionSpec spec = specsById.get(specId); @@ -215,7 +215,7 @@ private Iterable> entries( .caseSensitive(caseSensitive) .select(columns); - CloseableIterable entries = reader.entries(); + CloseableIterable> entries = reader.entries(); if (ignoreDeleted) { entries = reader.liveEntries(); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index ac10f4ca20ac..f73390bd43c1 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -28,33 +28,19 @@ /** * Writer for manifest files. + * + * @param Java class of files written to the manifest, either {@link DataFile} or {@link DeleteFile}. */ -public abstract class ManifestWriter implements FileAppender { +public abstract class ManifestWriter> implements FileAppender { // stand-in for the current sequence number that will be assigned when the commit is successful // this is replaced when writing a manifest list by the ManifestFile wrapper static final long UNASSIGNED_SEQ = -1L; - /** - * Create a new {@link ManifestWriter}. - *

- * Manifests created by this writer have all entry snapshot IDs set to null. - * All entries will inherit the snapshot ID that will be assigned to the manifest on commit. - * - * @param spec {@link PartitionSpec} used to produce {@link DataFile} partition tuples - * @param outputFile the destination file location - * @return a manifest writer - * @deprecated will be removed in 0.9.0; use {@link ManifestFiles#write(PartitionSpec, OutputFile)} instead. - */ - @Deprecated - public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { - return ManifestFiles.write(spec, outputFile); - } - private final OutputFile file; private final int specId; - private final FileAppender writer; + private final FileAppender> writer; private final Long snapshotId; - private final GenericManifestEntry reused; + private final GenericManifestEntry reused; private final PartitionSummary stats; private boolean closed = false; @@ -71,15 +57,19 @@ private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { this.specId = spec.specId(); this.writer = newAppender(spec, file); this.snapshotId = snapshotId; - this.reused = new GenericManifestEntry(spec.partitionType()); + this.reused = new GenericManifestEntry<>(spec.partitionType()); this.stats = new PartitionSummary(spec); } - protected abstract ManifestEntry prepare(ManifestEntry entry); + protected abstract ManifestEntry prepare(ManifestEntry entry); - protected abstract FileAppender newAppender(PartitionSpec spec, OutputFile outputFile); + protected abstract FileAppender> newAppender(PartitionSpec spec, OutputFile outputFile); - void addEntry(ManifestEntry entry) { + protected ManifestContent content() { + return ManifestContent.DATA; + } + + void addEntry(ManifestEntry entry) { switch (entry.status()) { case ADDED: addedFiles += 1; @@ -102,48 +92,48 @@ void addEntry(ManifestEntry entry) { } /** - * Add an added entry for a data file. + * Add an added entry for a file. *

* The entry's snapshot ID will be this manifest's snapshot ID. * * @param addedFile a data file */ @Override - public void add(DataFile addedFile) { + public void add(F addedFile) { addEntry(reused.wrapAppend(snapshotId, addedFile)); } - void add(ManifestEntry entry) { + void add(ManifestEntry entry) { addEntry(reused.wrapAppend(snapshotId, entry.file())); } /** - * Add an existing entry for a data file. + * Add an existing entry for a file. * - * @param existingFile a data file + * @param existingFile a file * @param fileSnapshotId snapshot ID when the data file was added to the table * @param sequenceNumber sequence number for the data file */ - public void existing(DataFile existingFile, long fileSnapshotId, long sequenceNumber) { + public void existing(F existingFile, long fileSnapshotId, long sequenceNumber) { addEntry(reused.wrapExisting(fileSnapshotId, sequenceNumber, existingFile)); } - void existing(ManifestEntry entry) { + void existing(ManifestEntry entry) { addEntry(reused.wrapExisting(entry.snapshotId(), entry.sequenceNumber(), entry.file())); } /** - * Add a delete entry for a data file. + * Add a delete entry for a file. *

* The entry's snapshot ID will be this manifest's snapshot ID. * - * @param deletedFile a data file + * @param deletedFile a file */ - public void delete(DataFile deletedFile) { + public void delete(F deletedFile) { addEntry(reused.wrapDelete(snapshotId, deletedFile)); } - void delete(ManifestEntry entry) { + void delete(ManifestEntry entry) { // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk // when this Snapshot has been removed or when there are no Snapshots older than this one. addEntry(reused.wrapDelete(snapshotId, entry.file())); @@ -164,7 +154,7 @@ public ManifestFile toManifestFile() { // if the minSequenceNumber is null, then no manifests with a sequence number have been written, so the min // sequence number is the one that will be assigned when this is committed. pass UNASSIGNED_SEQ to inherit it. long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ; - return new GenericManifestFile(file.location(), writer.length(), specId, ManifestContent.DATA, + return new GenericManifestFile(file.location(), writer.length(), specId, content(), UNASSIGNED_SEQ, minSeqNumber, snapshotId, addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); } @@ -175,21 +165,54 @@ public void close() throws IOException { writer.close(); } - static class V2Writer extends ManifestWriter { - private V2Metadata.IndexedManifestEntry entryWrapper; + static class V2Writer extends ManifestWriter { + private final V2Metadata.IndexedManifestEntry entryWrapper; V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { super(spec, file, snapshotId); - this.entryWrapper = new V2Metadata.IndexedManifestEntry(snapshotId, spec.partitionType()); + this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); + } + + @Override + protected ManifestEntry prepare(ManifestEntry entry) { + return entryWrapper.wrap(entry); + } + + @Override + protected FileAppender> newAppender(PartitionSpec spec, OutputFile file) { + Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); + try { + return Avro.write(file) + .schema(manifestSchema) + .named("manifest_entry") + .meta("schema", SchemaParser.toJson(spec.schema())) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) + .meta("format-version", "2") + .meta("content", "data") + .overwrite() + .build(); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file); + } + } + } + + static class V2DeleteWriter extends ManifestWriter { + private final V2Metadata.IndexedManifestEntry entryWrapper; + + V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { + super(spec, file, snapshotId); + this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @Override - protected ManifestEntry prepare(ManifestEntry entry) { + protected ManifestEntry prepare(ManifestEntry entry) { return entryWrapper.wrap(entry); } @Override - protected FileAppender newAppender(PartitionSpec spec, OutputFile file) { + protected FileAppender> newAppender(PartitionSpec spec, OutputFile file) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); try { return Avro.write(file) @@ -199,16 +222,22 @@ protected FileAppender newAppender(PartitionSpec spec, OutputFile .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) .meta("partition-spec-id", String.valueOf(spec.specId())) .meta("format-version", "2") + .meta("content", "deletes") .overwrite() .build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file); } } + + @Override + protected ManifestContent content() { + return ManifestContent.DELETES; + } } - static class V1Writer extends ManifestWriter { - private V1Metadata.IndexedManifestEntry entryWrapper; + static class V1Writer extends ManifestWriter { + private final V1Metadata.IndexedManifestEntry entryWrapper; V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { super(spec, file, snapshotId); @@ -216,12 +245,12 @@ static class V1Writer extends ManifestWriter { } @Override - protected ManifestEntry prepare(ManifestEntry entry) { + protected ManifestEntry prepare(ManifestEntry entry) { return entryWrapper.wrap(entry); } @Override - protected FileAppender newAppender(PartitionSpec spec, OutputFile file) { + protected FileAppender> newAppender(PartitionSpec spec, OutputFile file) { Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType()); try { return Avro.write(file) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 0ba09db6d857..f8fbe90184f8 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -526,7 +526,7 @@ private boolean manifestHasDeletedFiles( Evaluator inclusive = extractInclusiveDeleteExpression(reader); Evaluator strict = extractStrictDeleteExpression(reader); boolean hasDeletedFiles = false; - for (ManifestEntry entry : reader.entries()) { + for (ManifestEntry entry : reader.entries()) { DataFile file = entry.file(); boolean fileDelete = deletePaths.contains(pathWrapper.set(file.path())) || dropPartitions.contains(partitionWrapper.set(file.partition())); @@ -555,7 +555,7 @@ private ManifestFile filterManifestWithDeletedFiles( // manifest. produce a copy of the manifest with all deleted files removed. List deletedFiles = Lists.newArrayList(); Set deletedPaths = Sets.newHashSet(); - ManifestWriter writer = newManifestWriter(reader.spec()); + ManifestWriter writer = newManifestWriter(reader.spec()); try { reader.entries().forEach(entry -> { DataFile file = entry.file(); @@ -667,11 +667,11 @@ private ManifestFile createManifest(int specId, List bin) throws I return mergeManifests.get(bin); } - ManifestWriter writer = newManifestWriter(ops.current().spec()); + ManifestWriter writer = newManifestWriter(ops.current().spec()); try { for (ManifestFile manifest : bin) { try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { - for (ManifestEntry entry : reader.entries()) { + for (ManifestEntry entry : reader.entries()) { if (entry.status() == Status.DELETED) { // suppress deletes from previous snapshots. only files deleted by this snapshot // should be added to the new manifest diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 246458047209..f927728909d1 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -350,7 +350,7 @@ private Set findFilesToDelete(Set manifestsToScan, Set { // the manifest has deletes, scan it to find files to delete try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { - for (ManifestEntry entry : reader.entries()) { + for (ManifestEntry entry : reader.entries()) { // if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted if (entry.status() == ManifestEntry.Status.DELETED && !validIds.contains(entry.snapshotId())) { @@ -370,7 +370,7 @@ private Set findFilesToDelete(Set manifestsToScan, Set { // the manifest has deletes, scan it to find files to delete try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { - for (ManifestEntry entry : reader.entries()) { + for (ManifestEntry entry : reader.entries()) { // delete any ADDED file from manifests that were reverted if (entry.status() == ManifestEntry.Status.ADDED) { // use toString to ensure the path will not change (Utf8 is reused) diff --git a/core/src/main/java/org/apache/iceberg/ScanSummary.java b/core/src/main/java/org/apache/iceberg/ScanSummary.java index 126ada811429..f93f785d8915 100644 --- a/core/src/main/java/org/apache/iceberg/ScanSummary.java +++ b/core/src/main/java/org/apache/iceberg/ScanSummary.java @@ -218,7 +218,7 @@ private Map computeTopPartitionMetrics( TopN topN = new TopN<>( limit, throwIfLimited, Comparators.charSequences()); - try (CloseableIterable entries = new ManifestGroup(ops.io(), manifests) + try (CloseableIterable> entries = new ManifestGroup(ops.io(), manifests) .specsById(ops.current().specsById()) .filterData(rowFilter) .ignoreDeleted() @@ -226,7 +226,7 @@ private Map computeTopPartitionMetrics( .entries()) { PartitionSpec spec = table.spec(); - for (ManifestEntry entry : entries) { + for (ManifestEntry entry : entries) { Long timestamp = snapshotTimestamps.get(entry.snapshotId()); // if filtering, skip timestamps that are outside the range @@ -280,7 +280,7 @@ PartitionMetrics updateFromCounts(int numFiles, long filesRecordCount, long file return this; } - private PartitionMetrics updateFromFile(DataFile file, Long timestampMillis) { + private PartitionMetrics updateFromFile(ContentFile file, Long timestampMillis) { this.fileCount += 1; this.recordCount += file.recordCount(); this.totalSize += file.fileSizeInBytes(); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 012f7fe22ac5..e4d8adc559ce 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -331,7 +331,7 @@ protected OutputFile newManifestOutput() { ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement()))); } - protected ManifestWriter newManifestWriter(PartitionSpec spec) { + protected ManifestWriter newManifestWriter(PartitionSpec spec) { return ManifestFiles.write(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); } @@ -358,7 +358,7 @@ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manife Long snapshotId = null; long maxSnapshotId = Long.MIN_VALUE; - for (ManifestEntry entry : reader.entries()) { + for (ManifestEntry entry : reader.entries()) { if (entry.snapshotId() > maxSnapshotId) { maxSnapshotId = entry.snapshotId(); } diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index 3f38cd3953a5..5615ee182fc9 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -220,17 +220,17 @@ static Types.StructType dataFileSchema(Types.StructType partitionType) { /** * Wrapper used to write a ManifestEntry to v1 metadata. */ - static class IndexedManifestEntry implements ManifestEntry, IndexedRecord { + static class IndexedManifestEntry implements ManifestEntry, IndexedRecord { private final org.apache.avro.Schema avroSchema; private final IndexedDataFile fileWrapper; - private ManifestEntry wrapped = null; + private ManifestEntry wrapped = null; IndexedManifestEntry(Types.StructType partitionType) { this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); this.fileWrapper = new IndexedDataFile(avroSchema.getField("data_file").schema()); } - public IndexedManifestEntry wrap(ManifestEntry entry) { + public IndexedManifestEntry wrap(ManifestEntry entry) { this.wrapped = entry; return this; } @@ -294,12 +294,12 @@ public DataFile file() { } @Override - public ManifestEntry copy() { + public ManifestEntry copy() { return wrapped.copy(); } @Override - public ManifestEntry copyWithoutStats() { + public ManifestEntry copyWithoutStats() { return wrapped.copyWithoutStats(); } } @@ -364,6 +364,11 @@ public org.apache.avro.Schema getSchema() { return avroSchema; } + @Override + public FileContent content() { + return wrapped.content(); + } + @Override public CharSequence path() { return wrapped.path(); diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index ee57f24350ee..c3d3d5e9bf03 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -257,19 +257,19 @@ static Types.StructType fileType(Types.StructType partitionType) { ); } - static class IndexedManifestEntry implements ManifestEntry, IndexedRecord { + static class IndexedManifestEntry> implements ManifestEntry, IndexedRecord { private final org.apache.avro.Schema avroSchema; private final Long commitSnapshotId; - private final IndexedDataFile fileWrapper; - private ManifestEntry wrapped = null; + private final IndexedDataFile fileWrapper; + private ManifestEntry wrapped = null; IndexedManifestEntry(Long commitSnapshotId, Types.StructType partitionType) { this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); this.commitSnapshotId = commitSnapshotId; - this.fileWrapper = new IndexedDataFile(partitionType); + this.fileWrapper = new IndexedDataFile<>(partitionType); } - public IndexedManifestEntry wrap(ManifestEntry entry) { + public IndexedManifestEntry wrap(ManifestEntry entry) { this.wrapped = entry; return this; } @@ -335,36 +335,37 @@ public void setSequenceNumber(long sequenceNumber) { } @Override - public DataFile file() { + public F file() { return wrapped.file(); } @Override - public ManifestEntry copy() { + public ManifestEntry copy() { return wrapped.copy(); } @Override - public ManifestEntry copyWithoutStats() { + public ManifestEntry copyWithoutStats() { return wrapped.copyWithoutStats(); } } /** - * Wrapper used to write a DataFile to v2 metadata. + * Wrapper used to write DataFile or DeleteFile to v2 metadata. */ - static class IndexedDataFile implements DataFile, IndexedRecord { + static class IndexedDataFile implements ContentFile, IndexedRecord { private final org.apache.avro.Schema avroSchema; private final IndexedStructLike partitionWrapper; - private DataFile wrapped = null; + private ContentFile wrapped = null; IndexedDataFile(Types.StructType partitionType) { this.avroSchema = AvroSchemaUtil.convert(fileType(partitionType), "data_file"); this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema()); } - IndexedDataFile wrap(DataFile file) { - this.wrapped = file; + @SuppressWarnings("unchecked") + IndexedDataFile wrap(ContentFile file) { + this.wrapped = (ContentFile) file; return this; } @@ -377,7 +378,7 @@ public org.apache.avro.Schema getSchema() { public Object get(int pos) { switch (pos) { case 0: - return FileContent.DATA.id(); + return wrapped.content().id(); case 1: return wrapped.path().toString(); case 2: @@ -411,6 +412,11 @@ public void put(int i, Object v) { throw new UnsupportedOperationException("Cannot read into IndexedDataFile"); } + @Override + public FileContent content() { + return wrapped.content(); + } + @Override public CharSequence path() { return wrapped.path(); @@ -472,13 +478,13 @@ public List splitOffsets() { } @Override - public DataFile copy() { - return wrapped.copy(); + public F copy() { + throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper"); } @Override - public DataFile copyWithoutStats() { - return wrapped.copyWithoutStats(); + public F copyWithoutStats() { + throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper"); } } } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index e2c73782bf12..8de5e4b787de 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -147,7 +147,7 @@ ManifestFile writeManifest(Long snapshotId, DataFile... files) throws IOExceptio Assert.assertTrue(manifestFile.delete()); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); - ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId); + ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId); try { for (DataFile file : files) { writer.add(file); @@ -159,22 +159,22 @@ ManifestFile writeManifest(Long snapshotId, DataFile... files) throws IOExceptio return writer.toManifestFile(); } - ManifestFile writeManifest(String fileName, ManifestEntry... entries) throws IOException { + ManifestFile writeManifest(String fileName, ManifestEntry... entries) throws IOException { return writeManifest(null, fileName, entries); } - ManifestFile writeManifest(Long snapshotId, ManifestEntry... entries) throws IOException { + ManifestFile writeManifest(Long snapshotId, ManifestEntry... entries) throws IOException { return writeManifest(snapshotId, "input.m0.avro", entries); } - ManifestFile writeManifest(Long snapshotId, String fileName, ManifestEntry... entries) throws IOException { + ManifestFile writeManifest(Long snapshotId, String fileName, ManifestEntry... entries) throws IOException { File manifestFile = temp.newFile(fileName); Assert.assertTrue(manifestFile.delete()); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); - ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId); + ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId); try { - for (ManifestEntry entry : entries) { + for (ManifestEntry entry : entries) { writer.addEntry(entry); } } finally { @@ -189,7 +189,7 @@ ManifestFile writeManifestWithName(String name, DataFile... files) throws IOExce Assert.assertTrue(manifestFile.delete()); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); - ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, null); + ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, null); try { for (DataFile file : files) { writer.add(file); @@ -201,8 +201,8 @@ ManifestFile writeManifestWithName(String name, DataFile... files) throws IOExce return writer.toManifestFile(); } - ManifestEntry manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFile file) { - GenericManifestEntry entry = new GenericManifestEntry(table.spec().partitionType()); + ManifestEntry manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFile file) { + GenericManifestEntry entry = new GenericManifestEntry<>(table.spec().partitionType()); switch (status) { case ADDED: return entry.wrapAppend(snapshotId, file); @@ -240,7 +240,7 @@ void validateSnapshot(Snapshot old, Snapshot snap, Long sequenceNumber, DataFile long id = snap.snapshotId(); Iterator newPaths = paths(newFiles).iterator(); - for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { + for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); if (sequenceNumber != null) { V1Assert.assertEquals("Sequence number should default to 0", 0, entry.sequenceNumber().longValue()); @@ -283,7 +283,7 @@ void validateManifest(ManifestFile manifest, Iterator seqs, Iterator ids, Iterator expectedFiles) { - for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { + for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); DataFile expected = expectedFiles.next(); if (seqs != null) { @@ -303,7 +303,7 @@ static void validateManifestEntries(ManifestFile manifest, Iterator ids, Iterator expectedFiles, Iterator expectedStatuses) { - for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { + for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); DataFile expected = expectedFiles.next(); final ManifestEntry.Status expectedStatus = expectedStatuses.next(); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index b670fd40f3b9..e899fb770267 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -47,7 +47,7 @@ public TestManifestReader(int formatVersion) { public void testManifestReaderWithEmptyInheritableMetadata() throws IOException { ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 1000L, FILE_A)); try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { - ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); + ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); Assert.assertEquals(Status.EXISTING, entry.status()); Assert.assertEquals(FILE_A.path(), entry.file().path()); Assert.assertEquals(1000L, (long) entry.snapshotId()); @@ -67,7 +67,7 @@ public void testInvalidUsage() throws IOException { public void testManifestReaderWithPartitionMetadata() throws IOException { ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A)); try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { - ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); + ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); Assert.assertEquals(123L, (long) entry.snapshotId()); List fields = ((PartitionData) entry.file().partition()).getPartitionType().fields(); @@ -88,7 +88,7 @@ public void testManifestReaderWithUpdatedPartitionMetadataForV1Table() throws IO ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A)); try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { - ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); + ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); Assert.assertEquals(123L, (long) entry.snapshotId()); List fields = ((PartitionData) entry.file().partition()).getPartitionType().fields(); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index 61777971778e..32e2d9a86596 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -70,6 +70,9 @@ public class TestManifestWriterVersions { private static final DataFile DATA_FILE = new GenericDataFile( PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS); + private static final DeleteFile DELETE_FILE = new GenericDeleteFile( + FileContent.EQUALITY_DELETES, PATH, FORMAT, PARTITION, 22905L, METRICS, null); + @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -77,7 +80,14 @@ public class TestManifestWriterVersions { public void testV1Write() throws IOException { ManifestFile manifest = writeManifest(1); checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); - checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ); + checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA); + } + + @Test + public void testV1WriteDelete() { + AssertHelpers.assertThrows("Should fail to write a delete manifest for v1", + IllegalArgumentException.class, "Cannot write delete files in a v1 table", + () -> writeDeleteManifest(1)); } @Test @@ -86,23 +96,43 @@ public void testV1WriteWithInheritance() throws IOException { checkManifest(manifest, 0L); // v1 should be read using sequence number 0 because it was missing from the manifest list file - checkEntry(readManifest(manifest), 0L); + checkEntry(readManifest(manifest), 0L, FileContent.DATA); } @Test public void testV2Write() throws IOException { ManifestFile manifest = writeManifest(1); checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); - checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ); + Assert.assertEquals("Content", ManifestContent.DATA, manifest.content()); + checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA); } @Test public void testV2WriteWithInheritance() throws IOException { ManifestFile manifest = writeAndReadManifestList(writeManifest(2), 2); checkManifest(manifest, SEQUENCE_NUMBER); + Assert.assertEquals("Content", ManifestContent.DATA, manifest.content()); + + // v2 should use the correct sequence number by inheriting it + checkEntry(readManifest(manifest), SEQUENCE_NUMBER, FileContent.DATA); + } + + @Test + public void testV2WriteDelete() throws IOException { + ManifestFile manifest = writeDeleteManifest(2); + checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); + Assert.assertEquals("Content", ManifestContent.DELETES, manifest.content()); + checkEntry(readDeleteManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.EQUALITY_DELETES); + } + + @Test + public void testV2WriteDeleteWithInheritance() throws IOException { + ManifestFile manifest = writeAndReadManifestList(writeDeleteManifest(2), 2); + checkManifest(manifest, SEQUENCE_NUMBER); + Assert.assertEquals("Content", ManifestContent.DELETES, manifest.content()); // v2 should use the correct sequence number by inheriting it - checkEntry(readManifest(manifest), SEQUENCE_NUMBER); + checkEntry(readDeleteManifest(manifest), SEQUENCE_NUMBER, FileContent.EQUALITY_DELETES); } @Test @@ -117,7 +147,7 @@ public void testV2ManifestListRewriteWithInheritance() throws IOException { checkManifest(manifest2, 0L); // should not inherit the v2 sequence number because it was a rewrite - checkEntry(readManifest(manifest2), 0L); + checkEntry(readManifest(manifest2), 0L, FileContent.DATA); } @Test @@ -136,24 +166,26 @@ public void testV2ManifestRewriteWithInheritance() throws IOException { checkRewrittenManifest(manifest2, SEQUENCE_NUMBER, 0L); // should not inherit the v2 sequence number because it was written into the v2 manifest - checkRewrittenEntry(readManifest(manifest2), 0L); + checkRewrittenEntry(readManifest(manifest2), 0L, FileContent.DATA); } - void checkEntry(ManifestEntry entry, Long expectedSequenceNumber) { + void checkEntry(ManifestEntry entry, Long expectedSequenceNumber, FileContent content) { Assert.assertEquals("Status", ManifestEntry.Status.ADDED, entry.status()); Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId()); Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber()); - checkDataFile(entry.file()); + checkDataFile(entry.file(), content); } - void checkRewrittenEntry(ManifestEntry entry, Long expectedSequenceNumber) { + void checkRewrittenEntry(ManifestEntry entry, Long expectedSequenceNumber, FileContent content) { Assert.assertEquals("Status", ManifestEntry.Status.EXISTING, entry.status()); Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId()); Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber()); - checkDataFile(entry.file()); + checkDataFile(entry.file(), content); } - void checkDataFile(DataFile dataFile) { + void checkDataFile(ContentFile dataFile, FileContent content) { + // DataFile is the superclass of DeleteFile, so this method can check both + Assert.assertEquals("Content", content, dataFile.content()); Assert.assertEquals("Path", PATH, dataFile.path()); Assert.assertEquals("Format", FORMAT, dataFile.format()); Assert.assertEquals("Partition", PARTITION, dataFile.partition()); @@ -206,7 +238,7 @@ private ManifestFile writeAndReadManifestList(ManifestFile manifest, int formatV private ManifestFile rewriteManifest(ManifestFile manifest, int formatVersion) throws IOException { OutputFile manifestFile = Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString())); - ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID); + ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID); try { writer.existing(readManifest(manifest)); } finally { @@ -216,21 +248,46 @@ private ManifestFile rewriteManifest(ManifestFile manifest, int formatVersion) t } private ManifestFile writeManifest(int formatVersion) throws IOException { + return writeManifest(DATA_FILE, formatVersion); + } + + private ManifestFile writeManifest(DataFile file, int formatVersion) throws IOException { OutputFile manifestFile = Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString())); - ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID); + ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID); try { - writer.add(DATA_FILE); + writer.add(file); } finally { writer.close(); } return writer.toManifestFile(); } - private ManifestEntry readManifest(ManifestFile manifest) throws IOException { - try (CloseableIterable reader = ManifestFiles.read(manifest, FILE_IO).entries()) { - List files = Lists.newArrayList(reader); + private ManifestEntry readManifest(ManifestFile manifest) throws IOException { + try (CloseableIterable> reader = ManifestFiles.read(manifest, FILE_IO).entries()) { + List> files = Lists.newArrayList(reader); Assert.assertEquals("Should contain only one data file", 1, files.size()); return files.get(0); } } + + private ManifestFile writeDeleteManifest(int formatVersion) throws IOException { + OutputFile manifestFile = Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString())); + ManifestWriter writer = ManifestFiles.writeDeleteManifest( + formatVersion, SPEC, manifestFile, SNAPSHOT_ID); + try { + writer.add(DELETE_FILE); + } finally { + writer.close(); + } + return writer.toManifestFile(); + } + + private ManifestEntry readDeleteManifest(ManifestFile manifest) throws IOException { + try (CloseableIterable> reader = + ManifestFiles.readDeleteManifest(manifest, FILE_IO, null).entries()) { + List> entries = Lists.newArrayList(reader); + Assert.assertEquals("Should contain only one data file", 1, entries.size()); + return entries.get(0); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index 47f0d90d6f6d..a393adf63854 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -745,7 +745,7 @@ public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() { initialManifest, pending.manifests().get(1)); // field ids of manifest entries in two manifests with different specs of the same source field should be different - ManifestEntry entry = ManifestFiles.read(pending.manifests().get(0), FILE_IO).entries().iterator().next(); + ManifestEntry entry = ManifestFiles.read(pending.manifests().get(0), FILE_IO).entries().iterator().next(); Types.NestedField field = ((PartitionData) entry.file().partition()).getPartitionType().fields().get(0); Assert.assertEquals(1000, field.fieldId()); Assert.assertEquals("id_bucket", field.name()); diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java index f6d0134e8fd5..cb64793588c7 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java @@ -955,7 +955,7 @@ public void testManifestReplacementCombinedWithRewriteConcurrentDelete() throws Assert.assertEquals(3, Iterables.size(table.snapshots())); - ManifestEntry entry = manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A); + ManifestEntry entry = manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A); // update the entry's sequence number or else it will be rejected by the writer entry.setSequenceNumber(firstSnapshot.sequenceNumber()); ManifestFile newManifest = writeManifest("manifest-file-1.avro", entry); @@ -1005,7 +1005,7 @@ public void testInvalidUsage() throws IOException { Assert.assertEquals(1, manifests.size()); ManifestFile manifest = manifests.get(0); - ManifestEntry appendEntry = manifestEntry(ManifestEntry.Status.ADDED, snapshot.snapshotId(), FILE_A); + ManifestEntry appendEntry = manifestEntry(ManifestEntry.Status.ADDED, snapshot.snapshotId(), FILE_A); // update the entry's sequence number or else it will be rejected by the writer appendEntry.setSequenceNumber(snapshot.sequenceNumber()); @@ -1018,7 +1018,7 @@ public void testInvalidUsage() throws IOException { .addManifest(invalidAddedFileManifest) .commit()); - ManifestEntry deleteEntry = manifestEntry(ManifestEntry.Status.DELETED, snapshot.snapshotId(), FILE_A); + ManifestEntry deleteEntry = manifestEntry(ManifestEntry.Status.DELETED, snapshot.snapshotId(), FILE_A); // update the entry's sequence number or else it will be rejected by the writer deleteEntry.setSequenceNumber(snapshot.sequenceNumber());