Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,37 @@ static StructType getType(StructType partitionType) {
optional(128, "upper_bounds", MapType.ofRequired(129, 130,
IntegerType.get(), BinaryType.get())),
optional(131, "key_metadata", BinaryType.get()),
optional(132, "split_offsets", ListType.ofRequired(133, LongType.get()))
// NEXT ID TO ASSIGN: 134
optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())),
optional(134, "file_type", IntegerType.get())
// NEXT ID TO ASSIGN: 135
);
}

enum FileType {
DATA_FILE(0),
POSITION_DELETE_FILE(1),
EQUALITY_DELETE_FILE(2);

private final int type;
FileType(int type) {
this.type = type;
}

int type() {
return type;
}

@Override
public String toString() {
switch (type) {
case 0: return "data file";
case 1: return "position based delete file";
case 2: return "equality based delete file";
default: return "file type is not supported";
}
}
}

/**
* @return fully qualified path to the file, suitable for constructing a Hadoop Path
*/
Expand Down Expand Up @@ -140,4 +166,9 @@ static StructType getType(StructType partitionType) {
* are determined by these offsets. The returned list must be sorted in ascending order.
*/
List<Long> splitOffsets();

/**
* @return Type of data file. Either DATA_FILE, POSITION_DELETE_FILE, or EQUALITY_DELETE_FILE.
*/
FileType fileType();
}
5 changes: 5 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,5 +372,10 @@ public DataFile copyWithoutStats() {
public List<Long> splitOffsets() {
return null;
}

@Override
public FileType fileType() {
return FileType.DATA_FILE;
}
}
}
9 changes: 8 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public static class Builder {
private Map<Integer, ByteBuffer> upperBounds = null;
private ByteBuffer keyMetadata = null;
private List<Long> splitOffsets = null;
private DataFile.FileType fileType = null;

public Builder() {
this.spec = null;
Expand Down Expand Up @@ -235,6 +236,7 @@ public Builder copy(DataFile toCopy) {
this.keyMetadata = toCopy.keyMetadata() == null ? null
: ByteBuffers.copy(toCopy.keyMetadata());
this.splitOffsets = toCopy.splitOffsets() == null ? null : copyList(toCopy.splitOffsets());
this.fileType = toCopy.fileType();
return this;
}

Expand Down Expand Up @@ -328,6 +330,11 @@ public Builder withEncryptionKeyMetadata(EncryptionKeyMetadata newKeyMetadata) {
return withEncryptionKeyMetadata(newKeyMetadata.buffer());
}

public Builder withFileType(DataFile.FileType newFileType) {
this.fileType = newFileType;
return this;
}

public DataFile build() {
Preconditions.checkArgument(filePath != null, "File path is required");
if (format == null) {
Expand All @@ -341,7 +348,7 @@ public DataFile build() {
filePath, format, isPartitioned ? partitionData.copy() : null,
fileSizeInBytes, new Metrics(
recordCount, columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds),
keyMetadata, splitOffsets);
keyMetadata, splitOffsets, fileType);
}
}

Expand Down
22 changes: 21 additions & 1 deletion core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public PartitionData copy() {
private Map<Integer, ByteBuffer> upperBounds = null;
private List<Long> splitOffsets = null;
private byte[] keyMetadata = null;
private FileType fileType = null;

// cached schema
private transient org.apache.avro.Schema avroSchema = null;
Expand Down Expand Up @@ -158,6 +159,13 @@ public GenericDataFile(org.apache.avro.Schema avroSchema) {
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
}

GenericDataFile(String filePath, FileFormat format, PartitionData partition,
long fileSizeInBytes, Metrics metrics,
ByteBuffer keyMetadata, List<Long> splitOffsets, FileType fileType) {
this(filePath, format, partition, fileSizeInBytes, metrics, keyMetadata, splitOffsets);
this.fileType = fileType;
}

/**
* Copy constructor.
*
Expand All @@ -171,6 +179,7 @@ private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) {
this.partitionType = toCopy.partitionType;
this.recordCount = toCopy.recordCount;
this.fileSizeInBytes = toCopy.fileSizeInBytes;
this.fileType = toCopy.fileType;
if (fullCopy) {
// TODO: support lazy conversion to/from map
this.columnSizes = copy(toCopy.columnSizes);
Expand Down Expand Up @@ -256,6 +265,11 @@ public List<Long> splitOffsets() {
return splitOffsets;
}

@Override
public FileType fileType() {
return fileType != null ? fileType : FileType.DATA_FILE;
}

@Override
public org.apache.avro.Schema getSchema() {
if (avroSchema == null) {
Expand Down Expand Up @@ -312,6 +326,9 @@ public void put(int i, Object v) {
case 12:
this.splitOffsets = (List<Long>) v;
return;
case 13:
this.fileType = FileType.values()[(Integer) v];
return;
default:
// ignore the object, it must be from a newer version of the format
}
Expand Down Expand Up @@ -358,6 +375,8 @@ public Object get(int i) {
return keyMetadata();
case 12:
return splitOffsets;
case 13:
return fileType().type();
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
Expand All @@ -377,14 +396,15 @@ private static org.apache.avro.Schema getAvroSchema(Types.StructType partitionTy

@Override
public int size() {
return 13;
return 14;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("file_path", filePath)
.add("file_format", format)
.add("file_type", fileType())
.add("partition", partitionData)
.add("record_count", recordCount)
.add("file_size_in_bytes", fileSizeInBytes)
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/V1Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ public Object get(int pos) {
return wrapped.keyMetadata();
case 12:
return wrapped.splitOffsets();
case 13:
return wrapped.fileType().type();
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}
Expand Down Expand Up @@ -397,6 +399,11 @@ public List<Long> splitOffsets() {
return wrapped.splitOffsets();
}

@Override
public FileType fileType() {
return wrapped.fileType();
}

@Override
public DataFile copy() {
return wrapped.copy();
Expand Down
64 changes: 64 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestDataFileType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestDataFileType extends TableTestBase {
@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
new Object[] { 1 },
new Object[] { 2 },
};
}

public TestDataFileType(int formatVersion) {
super(formatVersion);
}

@Test
public void testMultipleDeletes() {
DataFiles.Builder builder = new DataFiles.Builder();
DataFile fileA = builder.copy(FILE_A).withFileType(DataFile.FileType.POSITION_DELETE_FILE).build();
DataFile fileB = builder.copy(FILE_B).withFileType(DataFile.FileType.EQUALITY_DELETE_FILE).build();
table.newAppend()
.appendFile(fileA)
.appendFile(fileB)
.appendFile(FILE_C)
.commit();

for (DataFile dataFile : table.currentSnapshot().addedFiles()) {
if (dataFile.path().toString().equals("/path/to/data-a.parquet")) {
Assert.assertEquals(dataFile.fileType(), DataFile.FileType.POSITION_DELETE_FILE);
}
if (dataFile.path().toString().equals("/path/to/data-b.parquet")) {
Assert.assertEquals(dataFile.fileType(), DataFile.FileType.EQUALITY_DELETE_FILE);
}
if (dataFile.path().toString().equals("/path/to/data-c.parquet")) {
Assert.assertEquals(dataFile.fileType(), DataFile.FileType.DATA_FILE);
}
}
}
}
1 change: 1 addition & 0 deletions site/docs/spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ The schema of a manifest file is a struct called `manifest_entry` with the follo
| **`128 upper_bounds`** | `optional map<129: int, 130: binary>` | Map from column id to upper bound in the column serialized as binary [1]. Each value must be greater than or equal to all values in the column for the file. |
| **`131 key_metadata`** | `optional binary` | Implementation-specific key metadata for encryption |
| **`132 split_offsets`** | `optional list` | Split offsets for the data file. For example, all row group offsets in a Parquet file. Must be sorted ascending. |
| **`134 file_type`** | `optional int` | Type of the data file. `0`: normal data file that has the same schema as the table's schema, `1`: position based delete file, `2`: value based delete file. |

Notes:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class SparkDataFile implements DataFile {
private final int upperBoundsPosition;
private final int keyMetadataPosition;
private final int splitOffsetsPosition;
private final int fileTypePosition;
private final Type lowerBoundsType;
private final Type upperBoundsType;
private final Type keyMetadataType;
Expand Down Expand Up @@ -77,6 +78,7 @@ public SparkDataFile(Types.StructType type, StructType sparkType) {
upperBoundsPosition = positions.get("upper_bounds");
keyMetadataPosition = positions.get("key_metadata");
splitOffsetsPosition = positions.get("split_offsets");
fileTypePosition = positions.get("file_type");
}

public SparkDataFile wrap(Row row) {
Expand Down Expand Up @@ -160,6 +162,12 @@ public List<Long> splitOffsets() {
return wrapped.isNullAt(splitOffsetsPosition) ? null : wrapped.getList(splitOffsetsPosition);
}

@Override
public FileType fileType() {
return wrapped.isNullAt(fileTypePosition) ? FileType.DATA_FILE :
DataFile.FileType.values()[(Integer) wrapped.getAs(fileTypePosition)];
}

private int fieldPosition(String name, StructType sparkType) {
try {
return sparkType.fieldIndex(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ private void checkDataFile(DataFile expected, DataFile actual) {
Assert.assertEquals("Upper bounds must match", expected.upperBounds(), actual.upperBounds());
Assert.assertEquals("Key metadata must match", expected.keyMetadata(), actual.keyMetadata());
Assert.assertEquals("Split offsets must match", expected.splitOffsets(), actual.splitOffsets());
Assert.assertEquals("File type must match", expected.fileType(), actual.fileType());

checkStructLike(expected.partition(), actual.partition());
}
Expand Down