Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
* @param <F> the concrete Java class of a ContentFile instance.
*/
public interface ContentFile<F> {
/**
* Returns the ordinal position of the file in a manifest, or null if it was not read from a manifest.
*/
Long pos();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: do we want to include anything in the name to indicate that it is a position in the manifest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be open to that, but I couldn't come up with a very good name for this field so I just used "pos" because we use that elsewhere to indicate position in a file.


/**
* Returns id of the partition spec used for partition metadata.
*/
Expand Down
5 changes: 5 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ public TestDataFile(String path, StructLike partition, long recordCount,
this.upperBounds = upperBounds;
}

@Override
public Long pos() {
return null;
}

@Override
public int specId() {
return 0;
Expand Down
18 changes: 17 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.avro.specific.SpecificData;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand All @@ -53,6 +54,7 @@ public PartitionData copy() {
private int[] fromProjectionPos;
private Types.StructType partitionType;

private Long fileOrdinal = null;
private int partitionSpecId = -1;
private FileContent content = FileContent.DATA;
private String filePath = null;
Expand Down Expand Up @@ -91,7 +93,10 @@ public PartitionData copy() {
}

List<Types.NestedField> fields = schema.fields();
List<Types.NestedField> allFields = DataFile.getType(partitionType).fields();
List<Types.NestedField> allFields = Lists.newArrayList();
allFields.addAll(DataFile.getType(partitionType).fields());
allFields.add(MetadataColumns.ROW_POSITION);

this.fromProjectionPos = new int[fields.size()];
for (int i = 0; i < fromProjectionPos.length; i += 1) {
boolean found = false;
Expand Down Expand Up @@ -149,6 +154,7 @@ public PartitionData copy() {
* @param fullCopy whether to copy all fields or to drop column-level stats
*/
BaseFile(BaseFile<F> toCopy, boolean fullCopy) {
this.fileOrdinal = toCopy.fileOrdinal;
this.partitionSpecId = toCopy.partitionSpecId;
this.content = toCopy.content;
this.filePath = toCopy.filePath;
Expand Down Expand Up @@ -255,6 +261,9 @@ public void put(int i, Object value) {
case 13:
this.equalityIds = ArrayUtil.toIntArray((List<Integer>) value);
return;
case 14:
this.fileOrdinal = (long) value;
return;
default:
// ignore the object, it must be from a newer version of the format
}
Expand Down Expand Up @@ -301,6 +310,8 @@ public Object get(int i) {
return splitOffsets();
case 13:
return equalityFieldIds();
case 14:
return pos;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
Expand All @@ -316,6 +327,11 @@ public int size() {
return DataFile.getType(EMPTY_STRUCT_TYPE).fields().size();
}

@Override
public Long pos() {
return fileOrdinal;
}

@Override
public FileContent content() {
return content;
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,14 @@ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
FileFormat format = FileFormat.fromFileName(file.location());
Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file);

List<Types.NestedField> fields = Lists.newArrayList();
fields.addAll(projection.asStruct().fields());
fields.add(MetadataColumns.ROW_POSITION);

switch (format) {
case AVRO:
AvroIterable<ManifestEntry<F>> reader = Avro.read(file)
.project(ManifestEntry.wrapFileSchema(projection.asStruct()))
.project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
.rename("manifest_entry", GenericManifestEntry.class.getName())
.rename("partition", PartitionData.class.getName())
.rename("r102", PartitionData.class.getName())
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/V1Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ public org.apache.avro.Schema getSchema() {
return avroSchema;
}

@Override
public Long pos() {
return null;
}

@Override
public int specId() {
return wrapped.specId();
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,11 @@ public void put(int i, Object v) {
throw new UnsupportedOperationException("Cannot read into IndexedDataFile");
}

@Override
public Long pos() {
return null;
}

@Override
public int specId() {
return wrapped.specId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
Expand All @@ -30,7 +31,7 @@
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.data.avro.DecoderResolver;

class GenericAvroReader<T> implements DatumReader<T> {
class GenericAvroReader<T> implements DatumReader<T>, SupportsRowPosition {

private final Schema readSchema;
private ClassLoader loader = Thread.currentThread().getContextClassLoader();
Expand All @@ -56,6 +57,13 @@ public void setClassLoader(ClassLoader newClassLoader) {
this.loader = newClassLoader;
}

@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
if (reader instanceof SupportsRowPosition) {
((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
}
}

@Override
public T read(T reuse, Decoder decoder) throws IOException {
return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse);
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,19 @@ public abstract static class StructReader<S> implements ValueReader<S>, Supports
private final Object[] constants;
private int posField = -1;

protected StructReader(List<ValueReader<?>> readers) {
protected StructReader(List<ValueReader<?>> readers, Schema schema) {
this.readers = readers.toArray(new ValueReader[0]);
this.positions = new int[0];
this.constants = new Object[0];

List<Schema.Field> fields = schema.getFields();
for (int pos = 0; pos < fields.size(); pos += 1) {
Schema.Field field = fields.get(pos);
if (AvroSchemaUtil.getFieldId(field) == MetadataColumns.ROW_POSITION.fieldId()) {
// track where the _pos field is located for setRowPositionSupplier
this.posField = pos;
}
}
}

protected StructReader(List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
Expand All @@ -609,7 +618,7 @@ protected StructReader(List<ValueReader<?>> readers, Types.StructType struct, Ma

@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
if (posField > 0) {
if (posField >= 0) {
long startingPos = posSupplier.get();
this.readers[posField] = new PositionReader(startingPos);
for (ValueReader<?> reader : readers) {
Expand Down Expand Up @@ -667,7 +676,7 @@ private static class RecordReader extends StructReader<GenericData.Record> {
private final Schema recordSchema;

private RecordReader(List<ValueReader<?>> readers, Schema recordSchema) {
super(readers);
super(readers, recordSchema);
this.recordSchema = recordSchema;
}

Expand Down Expand Up @@ -697,7 +706,7 @@ static class IndexedRecordReader<R extends IndexedRecord> extends StructReader<R
private final Schema schema;

IndexedRecordReader(List<ValueReader<?>> readers, Class<R> recordClass, Schema schema) {
super(readers);
super(readers, schema);
this.recordClass = recordClass;
this.ctor = DynConstructors.builder(IndexedRecord.class)
.hiddenImpl(recordClass, Schema.class)
Expand Down
11 changes: 11 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,15 @@ public void testManifestReaderWithUpdatedPartitionMetadataForV1Table() throws IO
}
}

@Test
public void testDataFilePositions() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE_A, FILE_B, FILE_C);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)) {
long expectedPos = 0L;
for (DataFile file : reader) {
Assert.assertEquals("Position should match", (Long) expectedPos, file.pos());
expectedPos += 1;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public SparkDataFile wrap(Row row) {
return this;
}

@Override
public Long pos() {
return null;
}

@Override
public int specId() {
return -1;
Expand Down