diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 90758a3d73d0..3fcbb2a95342 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -57,11 +57,37 @@ static StructType getType(StructType partitionType) { optional(128, "upper_bounds", MapType.ofRequired(129, 130, IntegerType.get(), BinaryType.get())), optional(131, "key_metadata", BinaryType.get()), - optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())) - // NEXT ID TO ASSIGN: 134 + optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())), + optional(134, "file_type", IntegerType.get()) + // NEXT ID TO ASSIGN: 135 ); } + enum FileType { + DATA_FILE(0), + POSITION_DELETE_FILE(1), + EQUALITY_DELETE_FILE(2); + + private final int type; + FileType(int type) { + this.type = type; + } + + int type() { + return type; + } + + @Override + public String toString() { + switch (type) { + case 0: return "data file"; + case 1: return "position based delete file"; + case 2: return "equality based delete file"; + default: return "file type is not supported"; + } + } + } + /** * @return fully qualified path to the file, suitable for constructing a Hadoop Path */ @@ -140,4 +166,9 @@ static StructType getType(StructType partitionType) { * are determined by these offsets. The returned list must be sorted in ascending order. */ List splitOffsets(); + + /** + * @return Type of data file. Either DATA_FILE, POSITION_DELETE_FILE, or EQUALITY_DELETE_FILE. + */ + FileType fileType(); } diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index 4989281ffe96..935e6adbf074 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -372,5 +372,10 @@ public DataFile copyWithoutStats() { public List splitOffsets() { return null; } + + @Override + public FileType fileType() { + return FileType.DATA_FILE; + } } } diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java b/core/src/main/java/org/apache/iceberg/DataFiles.java index abab0f97d4fe..56c28de008d7 100644 --- a/core/src/main/java/org/apache/iceberg/DataFiles.java +++ b/core/src/main/java/org/apache/iceberg/DataFiles.java @@ -190,6 +190,7 @@ public static class Builder { private Map upperBounds = null; private ByteBuffer keyMetadata = null; private List splitOffsets = null; + private DataFile.FileType fileType = null; public Builder() { this.spec = null; @@ -235,6 +236,7 @@ public Builder copy(DataFile toCopy) { this.keyMetadata = toCopy.keyMetadata() == null ? null : ByteBuffers.copy(toCopy.keyMetadata()); this.splitOffsets = toCopy.splitOffsets() == null ? null : copyList(toCopy.splitOffsets()); + this.fileType = toCopy.fileType(); return this; } @@ -328,6 +330,11 @@ public Builder withEncryptionKeyMetadata(EncryptionKeyMetadata newKeyMetadata) { return withEncryptionKeyMetadata(newKeyMetadata.buffer()); } + public Builder withFileType(DataFile.FileType newFileType) { + this.fileType = newFileType; + return this; + } + public DataFile build() { Preconditions.checkArgument(filePath != null, "File path is required"); if (format == null) { @@ -341,7 +348,7 @@ public DataFile build() { filePath, format, isPartitioned ? partitionData.copy() : null, fileSizeInBytes, new Metrics( recordCount, columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds), - keyMetadata, splitOffsets); + keyMetadata, splitOffsets, fileType); } } diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index aa427cbc5e16..46aee3ce812c 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -63,6 +63,7 @@ public PartitionData copy() { private Map upperBounds = null; private List splitOffsets = null; private byte[] keyMetadata = null; + private FileType fileType = null; // cached schema private transient org.apache.avro.Schema avroSchema = null; @@ -158,6 +159,13 @@ public GenericDataFile(org.apache.avro.Schema avroSchema) { this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); } + GenericDataFile(String filePath, FileFormat format, PartitionData partition, + long fileSizeInBytes, Metrics metrics, + ByteBuffer keyMetadata, List splitOffsets, FileType fileType) { + this(filePath, format, partition, fileSizeInBytes, metrics, keyMetadata, splitOffsets); + this.fileType = fileType; + } + /** * Copy constructor. * @@ -171,6 +179,7 @@ private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) { this.partitionType = toCopy.partitionType; this.recordCount = toCopy.recordCount; this.fileSizeInBytes = toCopy.fileSizeInBytes; + this.fileType = toCopy.fileType; if (fullCopy) { // TODO: support lazy conversion to/from map this.columnSizes = copy(toCopy.columnSizes); @@ -256,6 +265,11 @@ public List splitOffsets() { return splitOffsets; } + @Override + public FileType fileType() { + return fileType != null ? fileType : FileType.DATA_FILE; + } + @Override public org.apache.avro.Schema getSchema() { if (avroSchema == null) { @@ -312,6 +326,9 @@ public void put(int i, Object v) { case 12: this.splitOffsets = (List) v; return; + case 13: + this.fileType = FileType.values()[(Integer) v]; + return; default: // ignore the object, it must be from a newer version of the format } @@ -358,6 +375,8 @@ public Object get(int i) { return keyMetadata(); case 12: return splitOffsets; + case 13: + return fileType().type(); default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -377,7 +396,7 @@ private static org.apache.avro.Schema getAvroSchema(Types.StructType partitionTy @Override public int size() { - return 13; + return 14; } @Override @@ -385,6 +404,7 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("file_path", filePath) .add("file_format", format) + .add("file_type", fileType()) .add("partition", partitionData) .add("record_count", recordCount) .add("file_size_in_bytes", fileSizeInBytes) diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index a905b4d6ec55..9b1b9c944224 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -323,6 +323,8 @@ public Object get(int pos) { return wrapped.keyMetadata(); case 12: return wrapped.splitOffsets(); + case 13: + return wrapped.fileType().type(); } throw new IllegalArgumentException("Unknown field ordinal: " + pos); } @@ -397,6 +399,11 @@ public List splitOffsets() { return wrapped.splitOffsets(); } + @Override + public FileType fileType() { + return wrapped.fileType(); + } + @Override public DataFile copy() { return wrapped.copy(); diff --git a/core/src/test/java/org/apache/iceberg/TestDataFileType.java b/core/src/test/java/org/apache/iceberg/TestDataFileType.java new file mode 100644 index 000000000000..d993463afbcc --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestDataFileType.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestDataFileType extends TableTestBase { + @Parameterized.Parameters + public static Object[][] parameters() { + return new Object[][] { + new Object[] { 1 }, + new Object[] { 2 }, + }; + } + + public TestDataFileType(int formatVersion) { + super(formatVersion); + } + + @Test + public void testMultipleDeletes() { + DataFiles.Builder builder = new DataFiles.Builder(); + DataFile fileA = builder.copy(FILE_A).withFileType(DataFile.FileType.POSITION_DELETE_FILE).build(); + DataFile fileB = builder.copy(FILE_B).withFileType(DataFile.FileType.EQUALITY_DELETE_FILE).build(); + table.newAppend() + .appendFile(fileA) + .appendFile(fileB) + .appendFile(FILE_C) + .commit(); + + for (DataFile dataFile : table.currentSnapshot().addedFiles()) { + if (dataFile.path().toString().equals("/path/to/data-a.parquet")) { + Assert.assertEquals(dataFile.fileType(), DataFile.FileType.POSITION_DELETE_FILE); + } + if (dataFile.path().toString().equals("/path/to/data-b.parquet")) { + Assert.assertEquals(dataFile.fileType(), DataFile.FileType.EQUALITY_DELETE_FILE); + } + if (dataFile.path().toString().equals("/path/to/data-c.parquet")) { + Assert.assertEquals(dataFile.fileType(), DataFile.FileType.DATA_FILE); + } + } + } +} diff --git a/site/docs/spec.md b/site/docs/spec.md index 9940af84ca67..57a610f6ed7f 100644 --- a/site/docs/spec.md +++ b/site/docs/spec.md @@ -244,6 +244,7 @@ The schema of a manifest file is a struct called `manifest_entry` with the follo | **`128 upper_bounds`** | `optional map<129: int, 130: binary>` | Map from column id to upper bound in the column serialized as binary [1]. Each value must be greater than or equal to all values in the column for the file. | | **`131 key_metadata`** | `optional binary` | Implementation-specific key metadata for encryption | | **`132 split_offsets`** | `optional list` | Split offsets for the data file. For example, all row group offsets in a Parquet file. Must be sorted ascending. | +| **`134 file_type`** | `optional int` | Type of the data file. `0`: normal data file that has the same schema as the table's schema, `1`: position based delete file, `2`: value based delete file. | Notes: diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index 4991980d83d4..680e49cbda58 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -46,6 +46,7 @@ public class SparkDataFile implements DataFile { private final int upperBoundsPosition; private final int keyMetadataPosition; private final int splitOffsetsPosition; + private final int fileTypePosition; private final Type lowerBoundsType; private final Type upperBoundsType; private final Type keyMetadataType; @@ -77,6 +78,7 @@ public SparkDataFile(Types.StructType type, StructType sparkType) { upperBoundsPosition = positions.get("upper_bounds"); keyMetadataPosition = positions.get("key_metadata"); splitOffsetsPosition = positions.get("split_offsets"); + fileTypePosition = positions.get("file_type"); } public SparkDataFile wrap(Row row) { @@ -160,6 +162,12 @@ public List splitOffsets() { return wrapped.isNullAt(splitOffsetsPosition) ? null : wrapped.getList(splitOffsetsPosition); } + @Override + public FileType fileType() { + return wrapped.isNullAt(fileTypePosition) ? FileType.DATA_FILE : + DataFile.FileType.values()[(Integer) wrapped.getAs(fileTypePosition)]; + } + private int fieldPosition(String name, StructType sparkType) { try { return sparkType.fieldIndex(name); diff --git a/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java b/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java index 2ebb3dd473f7..e455f97abe6e 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java +++ b/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java @@ -195,6 +195,7 @@ private void checkDataFile(DataFile expected, DataFile actual) { Assert.assertEquals("Upper bounds must match", expected.upperBounds(), actual.upperBounds()); Assert.assertEquals("Key metadata must match", expected.keyMetadata(), actual.keyMetadata()); Assert.assertEquals("Split offsets must match", expected.splitOffsets(), actual.splitOffsets()); + Assert.assertEquals("File type must match", expected.fileType(), actual.fileType()); checkStructLike(expected.partition(), actual.partition()); }