Skip to content

Commit

Permalink
Move internal struct projection to SupportsIndexProjection.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Sep 13, 2024
1 parent 0747b60 commit f384e09
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 94 deletions.
103 changes: 57 additions & 46 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg;


import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
Expand All @@ -26,20 +27,23 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.SupportsIndexProjection;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.SerializableMap;

/** Base class for both {@link DataFile} and {@link DeleteFile}. */
abstract class BaseFile<F>
abstract class BaseFile<F> extends SupportsIndexProjection
implements ContentFile<F>,
IndexedRecord,
StructLike,
Expand All @@ -55,7 +59,6 @@ public PartitionData copy() {
}
};

private int[] fromProjectionPos;
private Types.StructType partitionType;

private Long fileOrdinal = null;
Expand Down Expand Up @@ -84,40 +87,52 @@ public PartitionData copy() {
// cached schema
private transient Schema avroSchema = null;

// struct type that corresponds to the positions used for internalGet and internalSet
private static final Types.StructType BASE_TYPE =
Types.StructType.of(Streams.concat(DataFile.getType(EMPTY_STRUCT_TYPE).fields().stream(), Stream.of(MetadataColumns.ROW_POSITION)).collect(Collectors.toList()));
// Types.StructType.of(
// DataFile.CONTENT,
// DataFile.FILE_PATH,
// DataFile.FILE_FORMAT,
// DataFile.SPEC_ID,
// required(
// DataFile.PARTITION_ID,
// DataFile.PARTITION_NAME,
// EMPTY_STRUCT_TYPE,
// DataFile.PARTITION_DOC),
// DataFile.RECORD_COUNT,
// DataFile.FILE_SIZE,
// DataFile.COLUMN_SIZES,
// DataFile.VALUE_COUNTS,
// DataFile.NULL_VALUE_COUNTS,
// DataFile.NAN_VALUE_COUNTS,
// DataFile.LOWER_BOUNDS,
// DataFile.UPPER_BOUNDS,
// DataFile.KEY_METADATA,
// DataFile.SPLIT_OFFSETS,
// DataFile.EQUALITY_IDS,
// DataFile.SORT_ORDER_ID,
// MetadataColumns.ROW_POSITION);

/** Used by Avro reflection to instantiate this class when reading manifest files. */
BaseFile(Schema avroSchema) {
this(AvroSchemaUtil.convert(avroSchema).asStructType());
this.avroSchema = avroSchema;
}

Types.StructType schema = AvroSchemaUtil.convert(avroSchema).asNestedType().asStructType();
/** Used by internal readers to instantiate this class with a projection schema. */
BaseFile(Types.StructType projection) {
super(BASE_TYPE, projection);
this.avroSchema = AvroSchemaUtil.convert(projection, "data_file");

// partition type may be null if the field was not projected
Type partType = schema.fieldType("partition");
Type partType = projection.fieldType("partition");
if (partType != null) {
this.partitionType = partType.asNestedType().asStructType();
} else {
this.partitionType = EMPTY_STRUCT_TYPE;
}

List<Types.NestedField> fields = schema.fields();
List<Types.NestedField> allFields = Lists.newArrayList();
allFields.addAll(DataFile.getType(partitionType).fields());
allFields.add(MetadataColumns.ROW_POSITION);

this.fromProjectionPos = new int[fields.size()];
for (int i = 0; i < fromProjectionPos.length; i += 1) {
boolean found = false;
for (int j = 0; j < allFields.size(); j += 1) {
if (fields.get(i).fieldId() == allFields.get(j).fieldId()) {
found = true;
fromProjectionPos[i] = j;
}
}

if (!found) {
throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i));
}
}

this.partitionData = new PartitionData(partitionType);
}

Expand All @@ -139,6 +154,7 @@ public PartitionData copy() {
int[] equalityFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata) {
super(BASE_TYPE.fields().size());
this.partitionSpecId = specId;
this.content = content;
this.filePath = filePath;
Expand Down Expand Up @@ -177,6 +193,7 @@ public PartitionData copy() {
* column stat is kept.
*/
BaseFile(BaseFile<F> toCopy, boolean copyStats, Set<Integer> requestedColumnIds) {
super(toCopy);
this.fileOrdinal = toCopy.fileOrdinal;
this.partitionSpecId = toCopy.partitionSpecId;
this.content = toCopy.content;
Expand All @@ -201,7 +218,6 @@ public PartitionData copy() {
this.lowerBounds = null;
this.upperBounds = null;
}
this.fromProjectionPos = toCopy.fromProjectionPos;
this.keyMetadata =
toCopy.keyMetadata == null
? null
Expand All @@ -220,7 +236,9 @@ public PartitionData copy() {
}

/** Constructor for Java serialization. */
BaseFile() {}
BaseFile() {
super(BASE_TYPE.fields().size());
}

@Override
public int specId() {
Expand Down Expand Up @@ -260,13 +278,12 @@ public Schema getSchema() {
}

@Override
@SuppressWarnings("unchecked")
public void put(int i, Object value) {
int pos = i;
// if the schema was projected, map the incoming ordinal to the expected one
if (fromProjectionPos != null) {
pos = fromProjectionPos[i];
}
set(i, value);
}

@Override
protected <T> void internalSet(int pos, T value) {
switch (pos) {
case 0:
this.content = value != null ? FILE_CONTENT_VALUES[(Integer) value] : FileContent.DATA;
Expand Down Expand Up @@ -329,18 +346,12 @@ public void put(int i, Object value) {
}

@Override
public <T> void set(int pos, T value) {
put(pos, value);
protected <T> T internalGet(int pos, Class<T> javaClass) {
return javaClass.cast(getByPos(pos));
}

@Override
public Object get(int i) {
int pos = i;
// if the schema was projected, map the incoming ordinal to the expected one
if (fromProjectionPos != null) {
pos = fromProjectionPos[i];
}
switch (pos) {
private Object getByPos(int basePos) {
switch (basePos) {
case 0:
return content.id();
case 1:
Expand Down Expand Up @@ -378,13 +389,13 @@ public Object get(int i) {
case 17:
return fileOrdinal;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
throw new UnsupportedOperationException("Unknown field ordinal: " + basePos);
}
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
return javaClass.cast(get(pos));
public Object get(int pos) {
return get(pos, Object.class);
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
super(avroSchema);
}

/** Used by internal readers to instantiate this class with a projection schema. */
GenericDataFile(Types.StructType projection) {
super(projection);
}

GenericDataFile(
int specId,
String filePath,
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
super(avroSchema);
}

/** Used by internal readers to instantiate this class with a projection schema. */
GenericDeleteFile(Types.StructType projection) {
super(projection);
}

GenericDeleteFile(
int specId,
FileContent content,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ class GenericManifestEntry<F extends ContentFile<F>>
this.schema = schema;
}

GenericManifestEntry(Types.StructType partitionType) {
this.schema = AvroSchemaUtil.convert(V1Metadata.entrySchema(partitionType), "manifest_entry");
/** Used by internal readers to instantiate this class with a projection schema. */
GenericManifestEntry(Types.StructType schema) {
this.schema = AvroSchemaUtil.convert(schema, "manifest_entry");
}

private GenericManifestEntry(GenericManifestEntry<F> toCopy, boolean fullCopy) {
Expand Down
Loading

0 comments on commit f384e09

Please sign in to comment.