diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java index 102a8071a8b0..dca2fec57e61 100644 --- a/api/src/main/java/org/apache/iceberg/ContentFile.java +++ b/api/src/main/java/org/apache/iceberg/ContentFile.java @@ -29,6 +29,11 @@ * @param the concrete Java class of a ContentFile instance. */ public interface ContentFile { + /** + * Returns the ordinal position of the file in a manifest, or null if it was not read from a manifest. + */ + Long pos(); + /** * Returns id of the partition spec used for partition metadata. */ diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index aa1a8cac1ef1..efdceba0806d 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -332,6 +332,11 @@ public TestDataFile(String path, StructLike partition, long recordCount, this.upperBounds = upperBounds; } + @Override + public Long pos() { + return null; + } + @Override public int specId() { return 0; diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index 1d746717d3ab..9fb79c807f0a 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -31,6 +31,7 @@ import org.apache.avro.specific.SpecificData; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -53,6 +54,7 @@ public PartitionData copy() { private int[] fromProjectionPos; private Types.StructType partitionType; + private Long fileOrdinal = null; private int partitionSpecId = -1; private FileContent content = FileContent.DATA; private String filePath = null; @@ -91,7 +93,10 @@ public PartitionData copy() { } List fields = schema.fields(); - List allFields = DataFile.getType(partitionType).fields(); + List allFields = Lists.newArrayList(); + allFields.addAll(DataFile.getType(partitionType).fields()); + allFields.add(MetadataColumns.ROW_POSITION); + this.fromProjectionPos = new int[fields.size()]; for (int i = 0; i < fromProjectionPos.length; i += 1) { boolean found = false; @@ -149,6 +154,7 @@ public PartitionData copy() { * @param fullCopy whether to copy all fields or to drop column-level stats */ BaseFile(BaseFile toCopy, boolean fullCopy) { + this.fileOrdinal = toCopy.fileOrdinal; this.partitionSpecId = toCopy.partitionSpecId; this.content = toCopy.content; this.filePath = toCopy.filePath; @@ -255,6 +261,9 @@ public void put(int i, Object value) { case 13: this.equalityIds = ArrayUtil.toIntArray((List) value); return; + case 14: + this.fileOrdinal = (long) value; + return; default: // ignore the object, it must be from a newer version of the format } @@ -301,6 +310,8 @@ public Object get(int i) { return splitOffsets(); case 13: return equalityFieldIds(); + case 14: + return pos; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -316,6 +327,11 @@ public int size() { return DataFile.getType(EMPTY_STRUCT_TYPE).fields().size(); } + @Override + public Long pos() { + return fileOrdinal; + } + @Override public FileContent content() { return content; diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 8311c1998ebd..3aada63ffa9f 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -189,10 +189,14 @@ private CloseableIterable> open(Schema projection) { FileFormat format = FileFormat.fromFileName(file.location()); Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file); + List fields = Lists.newArrayList(); + fields.addAll(projection.asStruct().fields()); + fields.add(MetadataColumns.ROW_POSITION); + switch (format) { case AVRO: AvroIterable> reader = Avro.read(file) - .project(ManifestEntry.wrapFileSchema(projection.asStruct())) + .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields))) .rename("manifest_entry", GenericManifestEntry.class.getName()) .rename("partition", PartitionData.class.getName()) .rename("r102", PartitionData.class.getName()) diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index cb6c922349f9..4b1186074267 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -364,6 +364,11 @@ public org.apache.avro.Schema getSchema() { return avroSchema; } + @Override + public Long pos() { + return null; + } + @Override public int specId() { return wrapped.specId(); diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 94ff1da9b745..55a91c95f182 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -415,6 +415,11 @@ public void put(int i, Object v) { throw new UnsupportedOperationException("Cannot read into IndexedDataFile"); } + @Override + public Long pos() { + return null; + } + @Override public int specId() { return wrapped.specId(); diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java index c8a81ca887c5..5957cae30488 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; +import java.util.function.Supplier; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -30,7 +31,7 @@ import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.data.avro.DecoderResolver; -class GenericAvroReader implements DatumReader { +class GenericAvroReader implements DatumReader, SupportsRowPosition { private final Schema readSchema; private ClassLoader loader = Thread.currentThread().getContextClassLoader(); @@ -56,6 +57,13 @@ public void setClassLoader(ClassLoader newClassLoader) { this.loader = newClassLoader; } + @Override + public void setRowPositionSupplier(Supplier posSupplier) { + if (reader instanceof SupportsRowPosition) { + ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier); + } + } + @Override public T read(T reuse, Decoder decoder) throws IOException { return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse); diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index e69c11151f92..5e72c9b1572b 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -580,10 +580,19 @@ public abstract static class StructReader implements ValueReader, Supports private final Object[] constants; private int posField = -1; - protected StructReader(List> readers) { + protected StructReader(List> readers, Schema schema) { this.readers = readers.toArray(new ValueReader[0]); this.positions = new int[0]; this.constants = new Object[0]; + + List fields = schema.getFields(); + for (int pos = 0; pos < fields.size(); pos += 1) { + Schema.Field field = fields.get(pos); + if (AvroSchemaUtil.getFieldId(field) == MetadataColumns.ROW_POSITION.fieldId()) { + // track where the _pos field is located for setRowPositionSupplier + this.posField = pos; + } + } } protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { @@ -609,7 +618,7 @@ protected StructReader(List> readers, Types.StructType struct, Ma @Override public void setRowPositionSupplier(Supplier posSupplier) { - if (posField > 0) { + if (posField >= 0) { long startingPos = posSupplier.get(); this.readers[posField] = new PositionReader(startingPos); for (ValueReader reader : readers) { @@ -667,7 +676,7 @@ private static class RecordReader extends StructReader { private final Schema recordSchema; private RecordReader(List> readers, Schema recordSchema) { - super(readers); + super(readers, recordSchema); this.recordSchema = recordSchema; } @@ -697,7 +706,7 @@ static class IndexedRecordReader extends StructReader> readers, Class recordClass, Schema schema) { - super(readers); + super(readers, schema); this.recordClass = recordClass; this.ctor = DynConstructors.builder(IndexedRecord.class) .hiddenImpl(recordClass, Schema.class) diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index 1706404e9d23..54ee35eefcdf 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -100,4 +100,15 @@ public void testManifestReaderWithUpdatedPartitionMetadataForV1Table() throws IO } } + @Test + public void testDataFilePositions() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE_A, FILE_B, FILE_C); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { + long expectedPos = 0L; + for (DataFile file : reader) { + Assert.assertEquals("Position should match", (Long) expectedPos, file.pos()); + expectedPos += 1; + } + } + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index 48dd00124273..14233c949815 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -87,6 +87,11 @@ public SparkDataFile wrap(Row row) { return this; } + @Override + public Long pos() { + return null; + } + @Override public int specId() { return -1;