From f384e090e4e6b1385971e84bb506df2f168fd8b4 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 10 Sep 2024 16:32:31 -0700 Subject: [PATCH] Move internal struct projection to SupportsIndexProjection. --- .../java/org/apache/iceberg/BaseFile.java | 103 ++++++++++-------- .../org/apache/iceberg/GenericDataFile.java | 5 + .../org/apache/iceberg/GenericDeleteFile.java | 5 + .../apache/iceberg/GenericManifestEntry.java | 5 +- .../apache/iceberg/GenericManifestFile.java | 65 ++++------- .../org/apache/iceberg/ManifestWriter.java | 2 +- .../iceberg/avro/SupportsIndexProjection.java | 88 +++++++++++++++ .../java/org/apache/iceberg/TestBase.java | 20 +++- 8 files changed, 199 insertions(+), 94 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/avro/SupportsIndexProjection.java diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index d4583b31c7a1..674f5db09303 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg; + import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; @@ -26,12 +27,15 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.avro.SupportsIndexProjection; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; @@ -39,7 +43,7 @@ import org.apache.iceberg.util.SerializableMap; /** Base class for both {@link DataFile} and {@link DeleteFile}. */ -abstract class BaseFile +abstract class BaseFile extends SupportsIndexProjection implements ContentFile, IndexedRecord, StructLike, @@ -55,7 +59,6 @@ public PartitionData copy() { } }; - private int[] fromProjectionPos; private Types.StructType partitionType; private Long fileOrdinal = null; @@ -84,40 +87,52 @@ public PartitionData copy() { // cached schema private transient Schema avroSchema = null; + // struct type that corresponds to the positions used for internalGet and internalSet + private static final Types.StructType BASE_TYPE = + Types.StructType.of(Streams.concat(DataFile.getType(EMPTY_STRUCT_TYPE).fields().stream(), Stream.of(MetadataColumns.ROW_POSITION)).collect(Collectors.toList())); +// Types.StructType.of( +// DataFile.CONTENT, +// DataFile.FILE_PATH, +// DataFile.FILE_FORMAT, +// DataFile.SPEC_ID, +// required( +// DataFile.PARTITION_ID, +// DataFile.PARTITION_NAME, +// EMPTY_STRUCT_TYPE, +// DataFile.PARTITION_DOC), +// DataFile.RECORD_COUNT, +// DataFile.FILE_SIZE, +// DataFile.COLUMN_SIZES, +// DataFile.VALUE_COUNTS, +// DataFile.NULL_VALUE_COUNTS, +// DataFile.NAN_VALUE_COUNTS, +// DataFile.LOWER_BOUNDS, +// DataFile.UPPER_BOUNDS, +// DataFile.KEY_METADATA, +// DataFile.SPLIT_OFFSETS, +// DataFile.EQUALITY_IDS, +// DataFile.SORT_ORDER_ID, +// MetadataColumns.ROW_POSITION); + /** Used by Avro reflection to instantiate this class when reading manifest files. */ BaseFile(Schema avroSchema) { + this(AvroSchemaUtil.convert(avroSchema).asStructType()); this.avroSchema = avroSchema; + } - Types.StructType schema = AvroSchemaUtil.convert(avroSchema).asNestedType().asStructType(); + /** Used by internal readers to instantiate this class with a projection schema. */ + BaseFile(Types.StructType projection) { + super(BASE_TYPE, projection); + this.avroSchema = AvroSchemaUtil.convert(projection, "data_file"); // partition type may be null if the field was not projected - Type partType = schema.fieldType("partition"); + Type partType = projection.fieldType("partition"); if (partType != null) { this.partitionType = partType.asNestedType().asStructType(); } else { this.partitionType = EMPTY_STRUCT_TYPE; } - List fields = schema.fields(); - List allFields = Lists.newArrayList(); - allFields.addAll(DataFile.getType(partitionType).fields()); - allFields.add(MetadataColumns.ROW_POSITION); - - this.fromProjectionPos = new int[fields.size()]; - for (int i = 0; i < fromProjectionPos.length; i += 1) { - boolean found = false; - for (int j = 0; j < allFields.size(); j += 1) { - if (fields.get(i).fieldId() == allFields.get(j).fieldId()) { - found = true; - fromProjectionPos[i] = j; - } - } - - if (!found) { - throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i)); - } - } - this.partitionData = new PartitionData(partitionType); } @@ -139,6 +154,7 @@ public PartitionData copy() { int[] equalityFieldIds, Integer sortOrderId, ByteBuffer keyMetadata) { + super(BASE_TYPE.fields().size()); this.partitionSpecId = specId; this.content = content; this.filePath = filePath; @@ -177,6 +193,7 @@ public PartitionData copy() { * column stat is kept. */ BaseFile(BaseFile toCopy, boolean copyStats, Set requestedColumnIds) { + super(toCopy); this.fileOrdinal = toCopy.fileOrdinal; this.partitionSpecId = toCopy.partitionSpecId; this.content = toCopy.content; @@ -201,7 +218,6 @@ public PartitionData copy() { this.lowerBounds = null; this.upperBounds = null; } - this.fromProjectionPos = toCopy.fromProjectionPos; this.keyMetadata = toCopy.keyMetadata == null ? null @@ -220,7 +236,9 @@ public PartitionData copy() { } /** Constructor for Java serialization. */ - BaseFile() {} + BaseFile() { + super(BASE_TYPE.fields().size()); + } @Override public int specId() { @@ -260,13 +278,12 @@ public Schema getSchema() { } @Override - @SuppressWarnings("unchecked") public void put(int i, Object value) { - int pos = i; - // if the schema was projected, map the incoming ordinal to the expected one - if (fromProjectionPos != null) { - pos = fromProjectionPos[i]; - } + set(i, value); + } + + @Override + protected void internalSet(int pos, T value) { switch (pos) { case 0: this.content = value != null ? FILE_CONTENT_VALUES[(Integer) value] : FileContent.DATA; @@ -329,18 +346,12 @@ public void put(int i, Object value) { } @Override - public void set(int pos, T value) { - put(pos, value); + protected T internalGet(int pos, Class javaClass) { + return javaClass.cast(getByPos(pos)); } - @Override - public Object get(int i) { - int pos = i; - // if the schema was projected, map the incoming ordinal to the expected one - if (fromProjectionPos != null) { - pos = fromProjectionPos[i]; - } - switch (pos) { + private Object getByPos(int basePos) { + switch (basePos) { case 0: return content.id(); case 1: @@ -378,13 +389,13 @@ public Object get(int i) { case 17: return fileOrdinal; default: - throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + throw new UnsupportedOperationException("Unknown field ordinal: " + basePos); } } @Override - public T get(int pos, Class javaClass) { - return javaClass.cast(get(pos)); + public Object get(int pos) { + return get(pos, Object.class); } @Override diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index 8fe7ec756abf..7b99e7b60ab8 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -32,6 +32,11 @@ class GenericDataFile extends BaseFile implements DataFile { super(avroSchema); } + /** Used by internal readers to instantiate this class with a projection schema. */ + GenericDataFile(Types.StructType projection) { + super(projection); + } + GenericDataFile( int specId, String filePath, diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java index 509bf4c16b03..77e0d8505af6 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java @@ -32,6 +32,11 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { super(avroSchema); } + /** Used by internal readers to instantiate this class with a projection schema. */ + GenericDeleteFile(Types.StructType projection) { + super(projection); + } + GenericDeleteFile( int specId, FileContent content, diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java index 959e2446c710..0e773c5f4399 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java @@ -38,8 +38,9 @@ class GenericManifestEntry> this.schema = schema; } - GenericManifestEntry(Types.StructType partitionType) { - this.schema = AvroSchemaUtil.convert(V1Metadata.entrySchema(partitionType), "manifest_entry"); + /** Used by internal readers to instantiate this class with a projection schema. */ + GenericManifestEntry(Types.StructType schema) { + this.schema = AvroSchemaUtil.convert(schema, "manifest_entry"); } private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) { diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index d081e0bdd568..7707c57a6905 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -28,21 +28,20 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData.SchemaConstructable; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.avro.SupportsIndexProjection; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; -public class GenericManifestFile +public class GenericManifestFile extends SupportsIndexProjection implements ManifestFile, StructLike, IndexedRecord, SchemaConstructable, Serializable { private static final Schema AVRO_SCHEMA = AvroSchemaUtil.convert(ManifestFile.schema(), "manifest_file"); private static final ManifestContent[] MANIFEST_CONTENT_VALUES = ManifestContent.values(); private transient Schema avroSchema; // not final for Java serialization - private int[] fromProjectionPos; // data fields private InputFile file = null; @@ -64,28 +63,12 @@ public class GenericManifestFile /** Used by Avro reflection to instantiate this class when reading manifest files. */ public GenericManifestFile(Schema avroSchema) { + super(ManifestFile.schema().asStruct(), AvroSchemaUtil.convert(avroSchema).asStructType()); this.avroSchema = avroSchema; - - List fields = AvroSchemaUtil.convert(avroSchema).asStructType().fields(); - List allFields = ManifestFile.schema().asStruct().fields(); - - this.fromProjectionPos = new int[fields.size()]; - for (int i = 0; i < fromProjectionPos.length; i += 1) { - boolean found = false; - for (int j = 0; j < allFields.size(); j += 1) { - if (fields.get(i).fieldId() == allFields.get(j).fieldId()) { - found = true; - fromProjectionPos[i] = j; - } - } - - if (!found) { - throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i)); - } - } } GenericManifestFile(InputFile file, int specId) { + super(ManifestFile.schema().columns().size()); this.avroSchema = AVRO_SCHEMA; this.file = file; this.manifestPath = file.location(); @@ -101,7 +84,6 @@ public GenericManifestFile(Schema avroSchema) { this.deletedFilesCount = null; this.deletedRowsCount = null; this.partitions = null; - this.fromProjectionPos = null; this.keyMetadata = null; } @@ -122,6 +104,7 @@ public GenericManifestFile(Schema avroSchema) { Long existingRowsCount, Integer deletedFilesCount, Long deletedRowsCount) { + super(ManifestFile.schema().columns().size()); this.avroSchema = AVRO_SCHEMA; this.manifestPath = path; this.length = length; @@ -137,7 +120,6 @@ public GenericManifestFile(Schema avroSchema) { this.deletedFilesCount = deletedFilesCount; this.deletedRowsCount = deletedRowsCount; this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]); - this.fromProjectionPos = null; this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); } @@ -157,6 +139,7 @@ public GenericManifestFile( long deletedRowsCount, List partitions, ByteBuffer keyMetadata) { + super(ManifestFile.schema().columns().size()); this.avroSchema = AVRO_SCHEMA; this.manifestPath = path; this.length = length; @@ -172,7 +155,6 @@ public GenericManifestFile( this.deletedFilesCount = deletedFilesCount; this.deletedRowsCount = deletedRowsCount; this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]); - this.fromProjectionPos = null; this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); } @@ -182,6 +164,7 @@ public GenericManifestFile( * @param toCopy a generic manifest file to copy. */ private GenericManifestFile(GenericManifestFile toCopy) { + super(toCopy); this.avroSchema = toCopy.avroSchema; this.manifestPath = toCopy.manifestPath; this.length = toCopy.length; @@ -204,7 +187,6 @@ private GenericManifestFile(GenericManifestFile toCopy) { } else { this.partitions = null; } - this.fromProjectionPos = toCopy.fromProjectionPos; this.keyMetadata = toCopy.keyMetadata == null ? null @@ -212,7 +194,9 @@ private GenericManifestFile(GenericManifestFile toCopy) { } /** Constructor for Java serialization. */ - GenericManifestFile() {} + GenericManifestFile() { + super(ManifestFile.schema().columns().size()); + } @Override public String path() { @@ -308,18 +292,17 @@ public int size() { } @Override - public T get(int pos, Class javaClass) { - return javaClass.cast(get(pos)); + public Object get(int pos) { + return internalGet(pos, Object.class); } @Override - public Object get(int i) { - int pos = i; - // if the schema was projected, map the incoming ordinal to the expected one - if (fromProjectionPos != null) { - pos = fromProjectionPos[i]; - } - switch (pos) { + protected T internalGet(int pos, Class javaClass) { + return javaClass.cast(getByPos(pos)); + } + + private Object getByPos(int basePos) { + switch (basePos) { case 0: return manifestPath; case 1: @@ -351,19 +334,13 @@ public Object get(int i) { case 14: return keyMetadata(); default: - throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + throw new UnsupportedOperationException("Unknown field ordinal: " + basePos); } } @Override - @SuppressWarnings("unchecked") - public void set(int i, T value) { - int pos = i; - // if the schema was projected, map the incoming ordinal to the expected one - if (fromProjectionPos != null) { - pos = fromProjectionPos[i]; - } - switch (pos) { + protected void internalSet(int basePos, T value) { + switch (basePos) { case 0: // always coerce to String for Serializable this.manifestPath = value.toString(); diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 88587a1ebc89..ae034d592e28 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -60,7 +60,7 @@ private ManifestWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapsh this.specId = spec.specId(); this.writer = newAppender(spec, this.file); this.snapshotId = snapshotId; - this.reused = new GenericManifestEntry<>(spec.partitionType()); + this.reused = new GenericManifestEntry<>(V1Metadata.entrySchema(spec.partitionType()).asStruct()); this.stats = new PartitionSummary(spec); this.keyMetadataBuffer = (file.keyMetadata() == null) ? null : file.keyMetadata().buffer(); } diff --git a/core/src/main/java/org/apache/iceberg/avro/SupportsIndexProjection.java b/core/src/main/java/org/apache/iceberg/avro/SupportsIndexProjection.java new file mode 100644 index 000000000000..d67085c916f2 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/SupportsIndexProjection.java @@ -0,0 +1,88 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.iceberg.avro; + +import java.io.Serializable; +import java.util.List; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Types; + +public abstract class SupportsIndexProjection implements StructLike, Serializable { + private final int[] fromProjectionPos; + + /** Noop constructor that does not project fields */ + protected SupportsIndexProjection(int size) { + this.fromProjectionPos = new int[size]; + for (int i = 0; i < fromProjectionPos.length; i++) { + fromProjectionPos[i] = i; + } + } + + /** Base constructor for building the type mapping */ + protected SupportsIndexProjection(Types.StructType baseType, Types.StructType projectionType) { + List allFields = baseType.fields(); + List fields = projectionType.fields(); + + this.fromProjectionPos = new int[fields.size()]; + for (int i = 0; i < fromProjectionPos.length; i += 1) { + boolean found = false; + for (int j = 0; j < allFields.size(); j += 1) { + if (fields.get(i).fieldId() == allFields.get(j).fieldId()) { + found = true; + fromProjectionPos[i] = j; + } + } + + if (!found) { + throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i)); + } + } + } + + /** Copy constructor */ + protected SupportsIndexProjection(SupportsIndexProjection toCopy) { + this.fromProjectionPos = toCopy.fromProjectionPos; + } + + protected abstract T internalGet(int pos, Class javaClass); + + protected abstract void internalSet(int pos, T value); + + private int pos(int basePos) { + return fromProjectionPos[basePos]; + } + + @Override + public int size() { + return fromProjectionPos.length; + } + + @Override + public T get(int basePos, Class javaClass) { + return internalGet(pos(basePos), javaClass); + } + + @Override + public void set(int basePos, T value) { + internalSet(pos(basePos), value); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java index e03a1efd5156..46adc5d34938 100644 --- a/core/src/test/java/org/apache/iceberg/TestBase.java +++ b/core/src/test/java/org/apache/iceberg/TestBase.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Set; import java.util.UUID; +import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -349,7 +350,24 @@ > ManifestEntry manifestEntry( Long fileSequenceNumber, F file) { - GenericManifestEntry entry = new GenericManifestEntry<>(table.spec().partitionType()); + Schema manifestEntrySchema; + switch (table.ops().current().formatVersion()) { + case 1: + manifestEntrySchema = V1Metadata.entrySchema(table.spec().partitionType()); + break; + case 2: + manifestEntrySchema = V2Metadata.entrySchema(table.spec().partitionType()); + break; + case 3: + manifestEntrySchema = V3Metadata.entrySchema(table.spec().partitionType()); + break; + default: + throw new IllegalArgumentException( + "Unsupported format version: " + table.ops().current().formatVersion()); + } + + GenericManifestEntry entry = + new GenericManifestEntry<>(AvroSchemaUtil.convert(manifestEntrySchema, "manifest_entry")); switch (status) { case ADDED: if (dataSequenceNumber != null && dataSequenceNumber != 0) {