diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 90758a3d73d0..8699a635cee1 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.BinaryType; import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.types.Types.ListType; @@ -37,31 +38,68 @@ * Interface for files listed in a table manifest. */ public interface DataFile { + // DataFile fields from ManifestEntry with new ids + Types.NestedField STATUS = optional(50, "status", Types.IntegerType.get()); + Types.NestedField SNAPSHOT_ID = optional(51, "snapshot_id", Types.LongType.get()); + Types.NestedField SEQUENCE_NUMBER = optional(52, "sequence_number", Types.LongType.get()); + + // original DataFile fields + Types.NestedField FILE_PATH = required(100, "file_path", StringType.get()); + Types.NestedField FILE_FORMAT = required(101, "file_format", StringType.get()); + Types.NestedField RECORD_COUNT = required(103, "record_count", LongType.get()); + Types.NestedField FILE_SIZE = required(104, "file_size_in_bytes", LongType.get()); + Types.NestedField COLUMN_SIZES = optional(108, "column_sizes", MapType.ofRequired(117, 118, + IntegerType.get(), LongType.get())); + Types.NestedField VALUE_COUNTS = optional(109, "value_counts", MapType.ofRequired(119, 120, + IntegerType.get(), LongType.get())); + Types.NestedField NULL_VALUE_COUNTS = optional(110, "null_value_counts", MapType.ofRequired(121, 122, + IntegerType.get(), LongType.get())); + Types.NestedField LOWER_BOUNDS = optional(125, "lower_bounds", MapType.ofRequired(126, 127, + IntegerType.get(), BinaryType.get())); + Types.NestedField UPPER_BOUNDS = optional(128, "upper_bounds", MapType.ofRequired(129, 130, + IntegerType.get(), BinaryType.get())); + Types.NestedField KEY_METADATA = optional(131, "key_metadata", BinaryType.get()); + Types.NestedField SPLIT_OFFSETS = optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())); + int PARTITION_ID = 102; + String PARTITION_NAME = "partition"; + // NEXT ID TO ASSIGN: 134 + static StructType getType(StructType partitionType) { // IDs start at 100 to leave room for changes to ManifestEntry return StructType.of( - required(100, "file_path", StringType.get()), - required(101, "file_format", StringType.get()), - required(102, "partition", partitionType), - required(103, "record_count", LongType.get()), - required(104, "file_size_in_bytes", LongType.get()), - required(105, "block_size_in_bytes", LongType.get()), - optional(108, "column_sizes", MapType.ofRequired(117, 118, - IntegerType.get(), LongType.get())), - optional(109, "value_counts", MapType.ofRequired(119, 120, - IntegerType.get(), LongType.get())), - optional(110, "null_value_counts", MapType.ofRequired(121, 122, - IntegerType.get(), LongType.get())), - optional(125, "lower_bounds", MapType.ofRequired(126, 127, - IntegerType.get(), BinaryType.get())), - optional(128, "upper_bounds", MapType.ofRequired(129, 130, - IntegerType.get(), BinaryType.get())), - optional(131, "key_metadata", BinaryType.get()), - optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())) - // NEXT ID TO ASSIGN: 134 + STATUS, + SNAPSHOT_ID, + SEQUENCE_NUMBER, + FILE_PATH, + FILE_FORMAT, + required(PARTITION_ID, PARTITION_NAME, partitionType), + RECORD_COUNT, + FILE_SIZE, + COLUMN_SIZES, + VALUE_COUNTS, + NULL_VALUE_COUNTS, + LOWER_BOUNDS, + UPPER_BOUNDS, + KEY_METADATA, + SPLIT_OFFSETS ); } + /** + * @return the status of the file, whether EXISTING, ADDED, or DELETED + */ + FileStatus status(); + + /** + * @return id of the snapshot in which the file was added to the table + */ + Long snapshotId(); + + /** + * @return the sequence number of the snapshot in which the file was added to the table + */ + Long sequenceNumber(); + /** * @return fully qualified path to the file, suitable for constructing a Hadoop Path */ diff --git a/api/src/main/java/org/apache/iceberg/FileStatus.java b/api/src/main/java/org/apache/iceberg/FileStatus.java new file mode 100644 index 000000000000..8378e3d962bd --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/FileStatus.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +/** + * The status of a data or delete file in a manifest: + * 0 - existing (added in a different manifest) + * 1 - added to the table in the manifest + * 2 - deleted from the table in the manifest + */ +public enum FileStatus { + EXISTING(0), + ADDED(1), + DELETED(2); + + private final int id; + + FileStatus(int id) { + this.id = id; + } + + public int id() { + return id; + } +} diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index 4989281ffe96..b2ad2215076c 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -303,6 +303,21 @@ public TestDataFile(String path, StructLike partition, long recordCount, this.upperBounds = upperBounds; } + @Override + public FileStatus status() { + return FileStatus.ADDED; + } + + @Override + public Long snapshotId() { + return 0L; + } + + @Override + public Long sequenceNumber() { + return 0L; + } + @Override public CharSequence path() { return path; diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java index 2ecf3b6ee2b2..17e0ff9540a9 100644 --- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java @@ -29,7 +29,6 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ParallelIterable; import org.apache.iceberg.util.ThreadPools; @@ -66,13 +65,7 @@ public TableScan newScan() { @Override public Schema schema() { - Schema schema = new Schema(DataFile.getType(table.spec().partitionType()).fields()); - if (table.spec().fields().size() < 1) { - // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102) - return TypeUtil.selectNot(schema, Sets.newHashSet(102)); - } else { - return schema; - } + return MetadataTables.filesTableSchema(table.spec().partitionType()); } public static class AllDataFilesTableScan extends BaseAllMetadataTableScan { diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java index 9c6aa8561178..122d2a7c0767 100644 --- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java @@ -20,12 +20,10 @@ package org.apache.iceberg; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; import java.util.Collection; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.TypeUtil; /** * A {@link Table} implementation that exposes a table's manifest entries as rows. @@ -59,13 +57,7 @@ public TableScan newScan() { @Override public Schema schema() { - Schema schema = ManifestEntry.getSchema(table.spec().partitionType()); - if (table.spec().fields().size() < 1) { - // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102) - return TypeUtil.selectNot(schema, Sets.newHashSet(102)); - } else { - return schema; - } + return MetadataTables.entriesTableSchema(table.spec().partitionType()); } private static class Scan extends BaseAllMetadataTableScan { diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java index 53b6a97dd1c2..7ab65a035b12 100644 --- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java @@ -21,13 +21,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; import java.util.Collection; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.types.TypeUtil; /** * A {@link Table} implementation that exposes a table's data files as rows. @@ -58,13 +56,7 @@ public TableScan newScan() { @Override public Schema schema() { - Schema schema = new Schema(DataFile.getType(table.spec().partitionType()).fields()); - if (table.spec().fields().size() < 1) { - // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102) - return TypeUtil.selectNot(schema, Sets.newHashSet(102)); - } else { - return schema; - } + return MetadataTables.filesTableSchema(table.spec().partitionType()); } public static class FilesTableScan extends BaseTableScan { diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index aa427cbc5e16..bb28c5968391 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData; import org.apache.iceberg.avro.AvroSchemaUtil; @@ -46,9 +47,16 @@ public PartitionData copy() { } }; + private boolean skipEntryFields; private int[] fromProjectionPos; private Types.StructType partitionType; + // ManifestEntry fields + private ManifestEntry asEntry = null; + private FileStatus status = FileStatus.ADDED; + private Long snapshotId = null; + private Long sequenceNumber = null; + private String filePath = null; private FileFormat format = null; private PartitionData partitionData = null; @@ -67,14 +75,19 @@ public PartitionData copy() { // cached schema private transient org.apache.avro.Schema avroSchema = null; - private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024; + /** + * Used by Avro to instantiate this class when reading manifest files. + */ + private GenericDataFile(org.apache.avro.Schema avroSchema) { + this(avroSchema, null); + } /** - * Used by Avro reflection to instantiate this class when reading manifest files. + * Used by AsManifestEntry to instantiate this class when reading manifest files. */ - @SuppressWarnings("checkstyle:RedundantModifier") // Must be public - public GenericDataFile(org.apache.avro.Schema avroSchema) { + private GenericDataFile(org.apache.avro.Schema avroSchema, AsManifestEntry asEntry) { this.avroSchema = avroSchema; + this.skipEntryFields = asEntry != null; Types.StructType schema = AvroSchemaUtil.convert(avroSchema).asNestedType().asStructType(); @@ -86,6 +99,12 @@ public GenericDataFile(org.apache.avro.Schema avroSchema) { this.partitionType = EMPTY_STRUCT_TYPE; } + if (asEntry != null) { + this.asEntry = asEntry; + } else { + this.asEntry = new AsManifestEntry(this, partitionType); + } + List fields = schema.fields(); List allFields = DataFile.getType(partitionType).fields(); this.fromProjectionPos = new int[fields.size()]; @@ -112,6 +131,7 @@ public GenericDataFile(org.apache.avro.Schema avroSchema) { this.format = format; this.partitionData = EMPTY_PARTITION_DATA; this.partitionType = EMPTY_PARTITION_DATA.getPartitionType(); + this.asEntry = new AsManifestEntry(this, partitionType); this.recordCount = recordCount; this.fileSizeInBytes = fileSizeInBytes; } @@ -122,6 +142,7 @@ public GenericDataFile(org.apache.avro.Schema avroSchema) { this.format = format; this.partitionData = partition; this.partitionType = partition.getPartitionType(); + this.asEntry = new AsManifestEntry(this, partitionType); this.recordCount = recordCount; this.fileSizeInBytes = fileSizeInBytes; } @@ -139,6 +160,7 @@ public GenericDataFile(org.apache.avro.Schema avroSchema) { this.partitionData = partition; this.partitionType = partition.getPartitionType(); } + this.asEntry = new AsManifestEntry(this, partitionType); // this will throw NPE if metrics.recordCount is null this.recordCount = metrics.recordCount(); @@ -165,10 +187,14 @@ public GenericDataFile(org.apache.avro.Schema avroSchema) { * @param fullCopy whether to copy all fields or to drop column-level stats */ private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) { + this.status = toCopy.status; + this.snapshotId = toCopy.snapshotId; + this.sequenceNumber = toCopy.sequenceNumber; this.filePath = toCopy.filePath; this.format = toCopy.format; this.partitionData = toCopy.partitionData.copy(); this.partitionType = toCopy.partitionType; + this.asEntry = new AsManifestEntry(this, partitionType); this.recordCount = toCopy.recordCount; this.fileSizeInBytes = toCopy.fileSizeInBytes; if (fullCopy) { @@ -196,6 +222,25 @@ private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) { GenericDataFile() { } + @Override + public FileStatus status() { + return status; + } + + @Override + public Long snapshotId() { + return snapshotId; + } + + @Override + public Long sequenceNumber() { + return sequenceNumber; + } + + public ManifestEntry asEntry() { + return asEntry; + } + @Override public CharSequence path() { return filePath; @@ -265,52 +310,66 @@ public org.apache.avro.Schema getSchema() { } @Override - @SuppressWarnings("unchecked") public void put(int i, Object v) { int pos = i; // if the schema was projected, map the incoming ordinal to the expected one if (fromProjectionPos != null) { pos = fromProjectionPos[i]; } + + if (!(skipEntryFields && pos < 3)) { + setInternal(pos, v); + } + } + + @SuppressWarnings("unchecked") + private void setInternal(int pos, T value) { switch (pos) { case 0: - // always coerce to String for Serializable - this.filePath = v.toString(); + this.status = FileStatus.values()[(Integer) value]; return; case 1: - this.format = FileFormat.valueOf(v.toString()); + this.snapshotId = (Long) value; return; case 2: - this.partitionData = (PartitionData) v; + this.sequenceNumber = (Long) value; return; case 3: - this.recordCount = (Long) v; + // always coerce to String for Serializable + this.filePath = value.toString(); return; case 4: - this.fileSizeInBytes = (Long) v; + this.format = FileFormat.valueOf(value.toString()); return; case 5: + this.partitionData = (PartitionData) value; return; case 6: - this.columnSizes = (Map) v; + this.recordCount = (Long) value; return; case 7: - this.valueCounts = (Map) v; + this.fileSizeInBytes = (Long) value; return; case 8: - this.nullValueCounts = (Map) v; + this.columnSizes = (Map) value; return; case 9: - this.lowerBounds = SerializableByteBufferMap.wrap((Map) v); + this.valueCounts = (Map) value; return; case 10: - this.upperBounds = SerializableByteBufferMap.wrap((Map) v); + this.nullValueCounts = (Map) value; return; case 11: - this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) v); + this.lowerBounds = SerializableByteBufferMap.wrap((Map) value); return; case 12: - this.splitOffsets = (List) v; + this.upperBounds = SerializableByteBufferMap.wrap((Map) value); + return; + case 13: + this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); + return; + case 14: + this.splitOffsets = (List) value; return; default: // ignore the object, it must be from a newer version of the format @@ -329,34 +388,37 @@ public Object get(int i) { if (fromProjectionPos != null) { pos = fromProjectionPos[i]; } + switch (pos) { case 0: - return filePath; + return status.id(); case 1: - return format != null ? format.toString() : null; + return snapshotId; case 2: - return partitionData; + return sequenceNumber; case 3: - return recordCount; + return filePath; case 4: - return fileSizeInBytes; + return format != null ? format.toString() : null; case 5: - // block_size_in_bytes is not used. However, it is a required avro field in DataFile. So - // to maintain compatibility, we need to return something. - return DEFAULT_BLOCK_SIZE; + return partitionData; case 6: - return columnSizes; + return recordCount; case 7: - return valueCounts; + return fileSizeInBytes; case 8: - return nullValueCounts; + return columnSizes; case 9: - return lowerBounds; + return valueCounts; case 10: - return upperBounds; + return nullValueCounts; case 11: - return keyMetadata(); + return lowerBounds; case 12: + return upperBounds; + case 13: + return keyMetadata(); + case 14: return splitOffsets; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); @@ -377,12 +439,15 @@ private static org.apache.avro.Schema getAvroSchema(Types.StructType partitionTy @Override public int size() { - return 13; + return 15; } @Override public String toString() { return MoreObjects.toStringHelper(this) + .add("status", status) + .add("snapshot_id", snapshotId) + .add("sequence_number", sequenceNumber) .add("file_path", filePath) .add("file_format", format) .add("partition", partitionData) @@ -399,12 +464,12 @@ public String toString() { } @Override - public DataFile copyWithoutStats() { + public GenericDataFile copyWithoutStats() { return new GenericDataFile(this, false /* drop stats */); } @Override - public DataFile copy() { + public GenericDataFile copy() { return new GenericDataFile(this, true /* full copy */); } @@ -425,4 +490,147 @@ private static List copy(List list) { } return null; } + + /** + * An adapter that makes a DataFile appear like a ManifestEntry for v1 metadata files. + */ + static class AsManifestEntry + implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable, StructLike, Serializable { + private transient Schema avroSchema = null; + private GenericDataFile file = null; + + /** + * Used by Avro reflection to instantiate this class when reading manifest files. + * + * @param avroSchema an Avro read schema + */ + protected AsManifestEntry(Schema avroSchema) { + this.avroSchema = avroSchema; + this.file = new GenericDataFile(avroSchema.getField("data_file").schema(), this); + } + + /** + * Used by DataFile to create a ManifestEntry adapter. + * + * @param file a GenericDataFile that contains manifest entry data + */ + protected AsManifestEntry(GenericDataFile file, Types.StructType partitionType) { + this.avroSchema = AvroSchemaUtil.convert(ManifestEntry.getSchema(partitionType), "manifest_entry"); + this.file = file; + } + + /** + * Constructor for Java serialization. + */ + AsManifestEntry() { + } + + @Override + public Schema getSchema() { + return avroSchema; + } + + @Override + public Status status() { + return ManifestEntry.Status.values()[file.status.id()]; + } + + @Override + public Long snapshotId() { + return file.snapshotId; + } + + @Override + public void setSnapshotId(long snapshotId) { + file.snapshotId = snapshotId; + } + + @Override + public Long sequenceNumber() { + return file.sequenceNumber; + } + + @Override + public void setSequenceNumber(long sequenceNumber) { + file.sequenceNumber = sequenceNumber; + } + + @Override + public DataFile file() { + return file; + } + + @Override + public ManifestEntry copy() { + return file.copy().asEntry; + } + + @Override + public ManifestEntry copyWithoutStats() { + return file.copyWithoutStats().asEntry; + } + + @Override + public void put(int pos, Object value) { + switch (pos) { + case 0: + file.status = FileStatus.values()[(Integer) value]; + return; + case 1: + file.snapshotId = (Long) value; + return; + case 2: + file.sequenceNumber = (Long) value; + return; + case 3: + if (file != value) { + throw new IllegalArgumentException("Cannot replace data file"); + } + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public void set(int pos, T value) { + put(pos, value); + } + + @Override + public Object get(int i) { + switch (i) { + case 0: + return file.status.id(); + case 1: + return file.snapshotId; + case 2: + return file.sequenceNumber; + case 3: + return file; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } + + @Override + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + @Override + public int size() { + return avroSchema.getFields().size(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("status", file.status) + .add("snapshot_id", file.snapshotId) + .add("sequence_number", file.sequenceNumber) + .add("file", file) + .toString(); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java index 9fc29a62bbff..49a812cc8239 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java @@ -19,182 +19,21 @@ package org.apache.iceberg; -import com.google.common.base.MoreObjects; -import org.apache.avro.generic.IndexedRecord; -import org.apache.avro.specific.SpecificData; -import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.types.Types; - -class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable, StructLike { - private final org.apache.avro.Schema schema; - private final V1Metadata.IndexedDataFile fileWrapper; - private Status status = Status.EXISTING; - private Long snapshotId = null; - private Long sequenceNumber = null; - private DataFile file = null; - - GenericManifestEntry(org.apache.avro.Schema schema) { - this.schema = schema; - this.fileWrapper = null; // do not use the file wrapper to read - } - - GenericManifestEntry(Types.StructType partitionType) { - this.schema = AvroSchemaUtil.convert(V1Metadata.entrySchema(partitionType), "manifest_entry"); - this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema()); - } - - private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) { - this.schema = toCopy.schema; - this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema()); - this.status = toCopy.status; - this.snapshotId = toCopy.snapshotId; - if (fullCopy) { - this.file = toCopy.file().copy(); - } else { - this.file = toCopy.file().copyWithoutStats(); - } - } - - ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { - this.status = Status.EXISTING; - this.snapshotId = newSnapshotId; - this.sequenceNumber = newSequenceNumber; - this.file = newFile; - return this; - } - - ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { - this.status = Status.ADDED; - this.snapshotId = newSnapshotId; - this.sequenceNumber = null; - this.file = newFile; - return this; - } - - ManifestEntry wrapDelete(Long newSnapshotId, DataFile newFile) { - this.status = Status.DELETED; - this.snapshotId = newSnapshotId; - this.sequenceNumber = null; - this.file = newFile; - return this; - } - - /** - * @return the status of the file, whether EXISTING, ADDED, or DELETED - */ - @Override - public Status status() { - return status; - } +import org.apache.avro.Schema; +class GenericManifestEntry extends GenericDataFile.AsManifestEntry { /** - * @return id of the snapshot in which the file was added to the table + * Used by Avro reflection to instantiate this class when reading manifest files. + * + * @param avroSchema an Avro read schema */ - @Override - public Long snapshotId() { - return snapshotId; - } - - @Override - public Long sequenceNumber() { - return sequenceNumber; + GenericManifestEntry(Schema avroSchema) { + super(avroSchema); } /** - * @return a file + * Constructor for Java serialization. */ - @Override - public DataFile file() { - return file; - } - - @Override - public ManifestEntry copy() { - return new GenericManifestEntry(this, true /* full copy */); - } - - @Override - public ManifestEntry copyWithoutStats() { - return new GenericManifestEntry(this, false /* drop stats */); - } - - @Override - public void setSnapshotId(long newSnapshotId) { - this.snapshotId = newSnapshotId; - } - - @Override - public void setSequenceNumber(long newSequenceNumber) { - this.sequenceNumber = newSequenceNumber; - } - - @Override - public void put(int i, Object v) { - switch (i) { - case 0: - this.status = Status.values()[(Integer) v]; - return; - case 1: - this.snapshotId = (Long) v; - return; - case 2: - this.sequenceNumber = (Long) v; - return; - case 3: - this.file = (DataFile) v; - return; - default: - // ignore the object, it must be from a newer version of the format - } - } - - @Override - public void set(int pos, T value) { - put(pos, value); - } - - @Override - public Object get(int i) { - switch (i) { - case 0: - return status.id(); - case 1: - return snapshotId; - case 2: - return sequenceNumber; - case 3: - if (fileWrapper == null || file instanceof GenericDataFile) { - return file; - } else { - return fileWrapper.wrap(file); - } - default: - throw new UnsupportedOperationException("Unknown field ordinal: " + i); - } - } - - @Override - public T get(int pos, Class javaClass) { - return javaClass.cast(get(pos)); - } - - @Override - public org.apache.avro.Schema getSchema() { - return schema; - } - - @Override - public int size() { - return 4; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("status", status) - .add("snapshot_id", snapshotId) - .add("sequence_number", sequenceNumber) - .add("file", file) - .toString(); + GenericManifestEntry() { } } diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java b/core/src/main/java/org/apache/iceberg/InheritableMetadata.java index 5eebe8a01861..7cc09ddf01f3 100644 --- a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/InheritableMetadata.java @@ -23,4 +23,9 @@ interface InheritableMetadata extends Serializable { ManifestEntry apply(ManifestEntry manifestEntry); + + default DataFile apply(GenericDataFile file) { + apply(file.asEntry()); + return file; + } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index d2458f80a93c..2e2954484061 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -21,13 +21,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; import java.util.Collection; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.types.TypeUtil; /** * A {@link Table} implementation that exposes a table's manifest entries as rows. @@ -61,13 +59,7 @@ public TableScan newScan() { @Override public Schema schema() { - Schema schema = ManifestEntry.getSchema(table.spec().partitionType()); - if (table.spec().fields().size() < 1) { - // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102) - return TypeUtil.selectNot(schema, Sets.newHashSet(102)); - } else { - return schema; - } + return MetadataTables.entriesTableSchema(table.spec().partitionType()); } private static class EntriesTableScan extends BaseTableScan { @@ -129,7 +121,7 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { public CloseableIterable rows() { return CloseableIterable.transform( ManifestFiles.read(manifest, io).project(fileSchema).allEntries(), - file -> (GenericManifestEntry) file); + GenericDataFile.AsManifestEntry.class::cast); } @Override diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index 767fae734766..7f3823d08309 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -183,7 +183,8 @@ private Iterable> entries( spec, caseSensitive); }); - Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter, caseSensitive); + Schema fileProjection = new Schema(DataFile.getType(EMPTY_STRUCT).fields()).select(columns); + Evaluator evaluator = new Evaluator(fileProjection.asStruct(), fileFilter, caseSensitive); Iterable matchingManifests = evalCache == null ? manifests : Iterables.filter(manifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest)); diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 045ebe17da50..c89144c3c715 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -38,6 +38,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PropertyUtil; import static org.apache.iceberg.expressions.Expressions.alwaysTrue; @@ -86,6 +87,7 @@ public static ManifestReader read(InputFile file, Function metadata; + private final int formatVersion; private final PartitionSpec spec; private final Schema fileSchema; @@ -104,15 +106,15 @@ private ManifestReader(InputFile file, Function specLook this.inheritableMetadata = inheritableMetadata; try { - try (AvroIterable headerReader = Avro.read(file) - .project(ManifestEntry.getSchema(Types.StructType.of()).select("status")) - .build()) { + try (AvroIterable headerReader = Avro.read(file).project(new Schema()).build()) { this.metadata = headerReader.getMetadata(); } } catch (IOException e) { throw new RuntimeIOException(e); } + // TODO: is there a better way to get the format version? maybe it should be tracked in ManifestFile + this.formatVersion = PropertyUtil.propertyAsInt(metadata, "format-version", 1); int specId = TableMetadata.INITIAL_SPEC_ID; String specProperty = metadata.get("partition-spec-id"); if (specProperty != null) { @@ -214,16 +216,31 @@ CloseableIterable entries(Schema fileProjection) { switch (format) { case AVRO: - AvroIterable reader = Avro.read(file) - .project(ManifestEntry.wrapFileSchema(fileProjection.asStruct())) - .rename("manifest_entry", GenericManifestEntry.class.getName()) - .rename("partition", PartitionData.class.getName()) - .rename("r102", PartitionData.class.getName()) - .rename("data_file", GenericDataFile.class.getName()) - .rename("r2", GenericDataFile.class.getName()) - .classLoader(GenericManifestFile.class.getClassLoader()) - .reuseContainers() - .build(); + CloseableIterable reader; + if (formatVersion < 2) { + reader = Avro.read(file) + .project(ManifestEntry.wrapFileSchema(fileProjection.asStruct())) + .rename("manifest_entry", GenericManifestEntry.class.getName()) + .rename("partition", PartitionData.class.getName()) + .rename("r102", PartitionData.class.getName()) + .rename("data_file", GenericDataFile.class.getName()) + .rename("r2", GenericDataFile.class.getName()) + .classLoader(GenericManifestFile.class.getClassLoader()) + .reuseContainers() + .build(); + } else { + AvroIterable files = Avro.read(file) + .project(addRequiredColumns(fileProjection)) + .rename("partition", PartitionData.class.getName()) + .rename("r102", PartitionData.class.getName()) + .rename("data_file", GenericDataFile.class.getName()) + .rename("r2", GenericDataFile.class.getName()) + .classLoader(GenericManifestFile.class.getClassLoader()) + .reuseContainers() + .build(); + + reader = CloseableIterable.transform(files, GenericDataFile::asEntry); + } addCloseable(reader); @@ -247,4 +264,27 @@ Iterator iterator(Schema fileProjection) { ManifestEntry::file).iterator(); } + private static Schema addRequiredColumns(Schema fileProjection) { + List columns = Lists.newArrayList(); + + if (fileProjection.findField(DataFile.STATUS.fieldId()) == null) { + columns.add(DataFile.STATUS); + } + + if (fileProjection.findField(DataFile.SNAPSHOT_ID.fieldId()) == null) { + columns.add(DataFile.SNAPSHOT_ID); + } + + if (fileProjection.findField(DataFile.SEQUENCE_NUMBER.fieldId()) == null) { + columns.add(DataFile.SEQUENCE_NUMBER); + } + + if (columns.isEmpty()) { + return fileProjection; + } + + columns.addAll(fileProjection.columns()); + + return new Schema(columns); + } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index ddd74c8a3e1d..14847f30ba0f 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -52,11 +52,10 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { private final OutputFile file; private final int specId; - private final FileAppender writer; private final Long snapshotId; - private final GenericManifestEntry reused; private final PartitionSummary stats; + private FileAppender writer = null; private boolean closed = false; private int addedFiles = 0; private long addedRows = 0L; @@ -69,36 +68,48 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { this.file = file; this.specId = spec.specId(); - this.writer = newAppender(spec, file); this.snapshotId = snapshotId; - this.reused = new GenericManifestEntry(spec.partitionType()); this.stats = new PartitionSummary(spec); } - protected abstract ManifestEntry prepare(ManifestEntry entry); - - protected abstract FileAppender newAppender(PartitionSpec spec, OutputFile outputFile); - void addEntry(ManifestEntry entry) { switch (entry.status()) { + case ADDED: + add(entry); + break; + case EXISTING: + existing(entry); + break; + case DELETED: + delete(entry); + break; + } + } + + protected void updateStats(FileStatus status, Long sequenceNumber, DataFile dataFile) { + switch (status) { case ADDED: addedFiles += 1; - addedRows += entry.file().recordCount(); + addedRows += dataFile.recordCount(); break; case EXISTING: existingFiles += 1; - existingRows += entry.file().recordCount(); + existingRows += dataFile.recordCount(); break; case DELETED: deletedFiles += 1; - deletedRows += entry.file().recordCount(); + deletedRows += dataFile.recordCount(); break; } - stats.update(entry.file().partition()); - if (entry.sequenceNumber() != null && (minSequenceNumber == null || entry.sequenceNumber() < minSequenceNumber)) { - this.minSequenceNumber = entry.sequenceNumber(); + stats.update(dataFile.partition()); + if (sequenceNumber != null && (minSequenceNumber == null || sequenceNumber < minSequenceNumber)) { + this.minSequenceNumber = sequenceNumber; } - writer.add(prepare(entry)); + } + + protected FileAppender setWriter(FileAppender appender) { + this.writer = appender; + return appender; } /** @@ -109,12 +120,17 @@ void addEntry(ManifestEntry entry) { * @param addedFile a data file */ @Override - public void add(DataFile addedFile) { - addEntry(reused.wrapAppend(snapshotId, addedFile)); - } + public abstract void add(DataFile addedFile); - void add(ManifestEntry entry) { - addEntry(reused.wrapAppend(snapshotId, entry.file())); + abstract void add(ManifestEntry entry); + + /** + * Add an existing entry for a data file. + * + * @param existingFile a data file + */ + public void existing(DataFile existingFile) { + existing(existingFile, existingFile.snapshotId(), existingFile.sequenceNumber()); } /** @@ -124,13 +140,9 @@ void add(ManifestEntry entry) { * @param fileSnapshotId snapshot ID when the data file was added to the table * @param sequenceNumber sequence number for the data file */ - public void existing(DataFile existingFile, long fileSnapshotId, long sequenceNumber) { - addEntry(reused.wrapExisting(fileSnapshotId, sequenceNumber, existingFile)); - } + public abstract void existing(DataFile existingFile, long fileSnapshotId, long sequenceNumber); - void existing(ManifestEntry entry) { - addEntry(reused.wrapExisting(entry.snapshotId(), entry.sequenceNumber(), entry.file())); - } + abstract void existing(ManifestEntry entry); /** * Add a delete entry for a data file. @@ -139,15 +151,9 @@ void existing(ManifestEntry entry) { * * @param deletedFile a data file */ - public void delete(DataFile deletedFile) { - addEntry(reused.wrapDelete(snapshotId, deletedFile)); - } + public abstract void delete(DataFile deletedFile); - void delete(ManifestEntry entry) { - // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk - // when this Snapshot has been removed or when there are no Snapshots older than this one. - addEntry(reused.wrapDelete(snapshotId, entry.file())); - } + abstract void delete(ManifestEntry entry); @Override public Metrics metrics() { @@ -175,25 +181,65 @@ public void close() throws IOException { } static class V2Writer extends ManifestWriter { - private V2Metadata.IndexedManifestEntry entryWrapper; + private final FileAppender writer; + private V2Metadata.IndexedDataFile fileWrapper; + private final Long snapshotId; V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { super(spec, file, snapshotId); - this.entryWrapper = new V2Metadata.IndexedManifestEntry(snapshotId, spec.partitionType()); + this.writer = setWriter(newAppender(spec, file)); + this.fileWrapper = new V2Metadata.IndexedDataFile(snapshotId, spec.partitionType()); + this.snapshotId = snapshotId; } @Override - protected ManifestEntry prepare(ManifestEntry entry) { - return entryWrapper.wrap(entry); + public void add(DataFile addedFile) { + updateStats(FileStatus.ADDED, addedFile.sequenceNumber(), addedFile); + fileWrapper.wrapAppend(snapshotId, addedFile); + writer.add(fileWrapper); } @Override - protected FileAppender newAppender(PartitionSpec spec, OutputFile file) { - Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); + void add(ManifestEntry entry) { + updateStats(FileStatus.ADDED, entry.sequenceNumber(), entry.file()); + fileWrapper.wrapAppend(snapshotId, entry.file()); + writer.add(fileWrapper); + } + + @Override + public void existing(DataFile existingFile, long existingSnapshotId, long existingSequenceNumber) { + updateStats(FileStatus.EXISTING, existingSequenceNumber, existingFile); + fileWrapper.wrapExisting(existingSnapshotId, existingSequenceNumber, existingFile); + writer.add(fileWrapper); + } + + @Override + void existing(ManifestEntry entry) { + updateStats(FileStatus.EXISTING, entry.sequenceNumber(), entry.file()); + fileWrapper.wrapExisting(entry.snapshotId(), entry.sequenceNumber(), entry.file()); + writer.add(fileWrapper); + } + + @Override + public void delete(DataFile deletedFile) { + updateStats(FileStatus.DELETED, deletedFile.sequenceNumber(), deletedFile); + fileWrapper.wrapDelete(snapshotId, deletedFile.sequenceNumber(), deletedFile); + writer.add(fileWrapper); + } + + @Override + void delete(ManifestEntry entry) { + updateStats(FileStatus.DELETED, entry.sequenceNumber(), entry.file()); + fileWrapper.wrapDelete(snapshotId, entry.sequenceNumber(), entry.file()); + writer.add(fileWrapper); + } + + protected FileAppender newAppender(PartitionSpec spec, OutputFile file) { + Schema manifestSchema = V2Metadata.manifestSchema(spec.partitionType()); try { return Avro.write(file) .schema(manifestSchema) - .named("manifest_entry") + .named("data_file") .meta("schema", SchemaParser.toJson(spec.schema())) .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) .meta("partition-spec-id", String.valueOf(spec.specId())) @@ -207,20 +253,60 @@ protected FileAppender newAppender(PartitionSpec spec, OutputFile } static class V1Writer extends ManifestWriter { - private V1Metadata.IndexedManifestEntry entryWrapper; + private final FileAppender writer; + private final V1Metadata.IndexedManifestEntry entryWrapper; + private final Long snapshotId; V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { super(spec, file, snapshotId); + this.writer = setWriter(newAppender(spec, file)); this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType()); + this.snapshotId = snapshotId; + } + + private void updateStats(ManifestEntry entry) { + updateStats(FileStatus.values()[entry.status().id()], entry.sequenceNumber(), entry.file()); } @Override - protected ManifestEntry prepare(ManifestEntry entry) { - return entryWrapper.wrap(entry); + public void add(DataFile addedFile) { + updateStats(entryWrapper.wrapAppend(snapshotId, addedFile)); + writer.add(entryWrapper); } @Override - protected FileAppender newAppender(PartitionSpec spec, OutputFile file) { + void add(ManifestEntry entry) { + updateStats(entryWrapper.wrapAppend(snapshotId, entry.file())); + writer.add(entryWrapper); + } + + @Override + public void existing(DataFile existingFile, long existingSnapshotId, long existingSequenceNumber) { + updateStats(entryWrapper.wrapExisting(existingSnapshotId, existingSequenceNumber, existingFile)); + writer.add(entryWrapper); + } + + @Override + void existing(ManifestEntry entry) { + updateStats(entryWrapper.wrapExisting(entry.snapshotId(), entry.sequenceNumber(), entry.file())); + writer.add(entryWrapper); + } + + @Override + public void delete(DataFile deletedFile) { + updateStats(entryWrapper.wrapDelete(snapshotId, deletedFile.sequenceNumber(), deletedFile)); + writer.add(entryWrapper); + } + + @Override + void delete(ManifestEntry entry) { + // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk + // when this Snapshot has been removed or when there are no Snapshots older than this one. + updateStats(entryWrapper.wrapDelete(snapshotId, entry.sequenceNumber(), entry.file())); + writer.add(entryWrapper); + } + + private FileAppender newAppender(PartitionSpec spec, OutputFile file) { Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType()); try { return Avro.write(file) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 9c83ebfbcf10..1c0e4cc27b7e 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -652,7 +652,7 @@ private ManifestFile createManifest(int specId, List bin) throws I return mergeManifests.get(bin); } - ManifestWriter writer = newManifestWriter(ops.current().spec()); + ManifestWriter writer = newManifestWriter(ops.current().specsById().get(specId)); try { for (ManifestFile manifest : bin) { try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { @@ -661,11 +661,11 @@ private ManifestFile createManifest(int specId, List bin) throws I // suppress deletes from previous snapshots. only files deleted by this snapshot // should be added to the new manifest if (entry.snapshotId() == snapshotId()) { - writer.addEntry(entry); + writer.delete(entry); } } else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) { // adds from this snapshot are still adds, otherwise they should be existing - writer.addEntry(entry); + writer.add(entry); } else { // add all files from the old manifest as existing files writer.existing(entry); diff --git a/core/src/main/java/org/apache/iceberg/MetadataTables.java b/core/src/main/java/org/apache/iceberg/MetadataTables.java new file mode 100644 index 000000000000..5d2b0a5b07ae --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/MetadataTables.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.types.Types; + +import static org.apache.iceberg.types.Types.NestedField.required; + +class MetadataTables { + private MetadataTables() { + } + + static Schema entriesTableSchema(Types.StructType partitionType) { + return ManifestEntry.wrapFileSchema(filesTableType(partitionType)); + } + + static Schema filesTableSchema(Types.StructType partitionType) { + return new Schema(filesTableType(partitionType).fields()); + } + + private static Types.StructType filesTableType(Types.StructType partitionType) { + if (partitionType.fields().size() > 0) { + return Types.StructType.of( + DataFile.FILE_PATH, + DataFile.FILE_FORMAT, + required(DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType), + DataFile.RECORD_COUNT, + DataFile.FILE_SIZE, + DataFile.COLUMN_SIZES, + DataFile.VALUE_COUNTS, + DataFile.NULL_VALUE_COUNTS, + DataFile.LOWER_BOUNDS, + DataFile.UPPER_BOUNDS, + DataFile.KEY_METADATA, + DataFile.SPLIT_OFFSETS + ); + } else { + return Types.StructType.of( + DataFile.FILE_PATH, + DataFile.FILE_FORMAT, + DataFile.RECORD_COUNT, + DataFile.FILE_SIZE, + DataFile.COLUMN_SIZES, + DataFile.VALUE_COUNTS, + DataFile.NULL_VALUE_COUNTS, + DataFile.LOWER_BOUNDS, + DataFile.UPPER_BOUNDS, + DataFile.KEY_METADATA, + DataFile.SPLIT_OFFSETS + ); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index a905b4d6ec55..9f3b7c07018e 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -25,6 +25,7 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; import static org.apache.iceberg.types.Types.NestedField.required; @@ -181,29 +182,78 @@ public ManifestFile copy() { } } - static Schema entrySchema(Types.StructType partitionType) { - return wrapFileSchema(DataFile.getType(partitionType)); + static Schema entrySchema(StructType partitionType) { + return wrapFileSchema(dataFileSchema(partitionType)); } - static Schema wrapFileSchema(Types.StructType fileSchema) { + static Schema wrapFileSchema(StructType fileSchema) { // this is used to build projection schemas return new Schema( ManifestEntry.STATUS, ManifestEntry.SNAPSHOT_ID, required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema)); } + private static final Types.NestedField BLOCK_SIZE = required(105, "block_size_in_bytes", Types.LongType.get()); + + static StructType dataFileSchema(StructType partitionType) { + // IDs start at 100 to leave room for changes to ManifestEntry + return StructType.of( + DataFile.FILE_PATH, + DataFile.FILE_FORMAT, + required(DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType), + DataFile.RECORD_COUNT, + DataFile.FILE_SIZE, + BLOCK_SIZE, + DataFile.COLUMN_SIZES, + DataFile.VALUE_COUNTS, + DataFile.NULL_VALUE_COUNTS, + DataFile.LOWER_BOUNDS, + DataFile.UPPER_BOUNDS, + DataFile.KEY_METADATA, + DataFile.SPLIT_OFFSETS + ); + } + + /** + * Wrapper used to write a ManifestEntry to v1 metadata. This will override the entry's status, snapshot id, and + * sequence number with correct values for the commit that are passed when wrapping an entry. This also implements + * Avro's IndexedRecord for writing to Avro files. + */ static class IndexedManifestEntry implements ManifestEntry, IndexedRecord { private final org.apache.avro.Schema avroSchema; private final IndexedDataFile fileWrapper; - private ManifestEntry wrapped = null; - IndexedManifestEntry(Types.StructType partitionType) { + private DataFile file = null; + private Status status = null; + private Long snapshotId = null; + private Long sequenceNumber = null; + + IndexedManifestEntry(StructType partitionType) { this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); this.fileWrapper = new IndexedDataFile(avroSchema.getField("data_file").schema()); } - public IndexedManifestEntry wrap(ManifestEntry entry) { - this.wrapped = entry; + ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { + this.status = Status.EXISTING; + this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; + this.file = newFile; + return this; + } + + ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { + this.status = Status.ADDED; + this.snapshotId = newSnapshotId; + this.sequenceNumber = null; + this.file = newFile; + return this; + } + + ManifestEntry wrapDelete(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { + this.status = Status.DELETED; + this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; + this.file = newFile; return this; } @@ -221,16 +271,14 @@ public void put(int i, Object v) { public Object get(int i) { switch (i) { case 0: - return wrapped.status().id(); + return status.id(); case 1: - return wrapped.snapshotId(); + return snapshotId; case 2: - DataFile file = wrapped.file(); - if (file == null || file instanceof GenericDataFile) { - return file; - } else { + if (file != null) { return fileWrapper.wrap(file); } + return null; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); } @@ -238,42 +286,42 @@ public Object get(int i) { @Override public Status status() { - return wrapped.status(); + return status; } @Override public Long snapshotId() { - return wrapped.snapshotId(); + return snapshotId; } @Override public void setSnapshotId(long snapshotId) { - wrapped.setSnapshotId(snapshotId); + this.snapshotId = snapshotId; } @Override public Long sequenceNumber() { - return wrapped.sequenceNumber(); + return sequenceNumber; } @Override public void setSequenceNumber(long sequenceNumber) { - wrapped.setSequenceNumber(sequenceNumber); + this.sequenceNumber = sequenceNumber; } @Override public DataFile file() { - return wrapped.file(); + return file; } @Override public ManifestEntry copy() { - return wrapped.copy(); + throw new UnsupportedOperationException("Cannot copy a ManifestEntryWrapper"); } @Override public ManifestEntry copyWithoutStats() { - return wrapped.copyWithoutStats(); + throw new UnsupportedOperationException("Cannot copy a ManifestEntryWrapper"); } } @@ -337,6 +385,21 @@ public org.apache.avro.Schema getSchema() { return avroSchema; } + @Override + public FileStatus status() { + return wrapped.status(); + } + + @Override + public Long snapshotId() { + return wrapped.snapshotId(); + } + + @Override + public Long sequenceNumber() { + return wrapped.sequenceNumber(); + } + @Override public CharSequence path() { return wrapped.path(); diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 6ee9d3f20dbf..8d354e2b39b0 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -20,7 +20,9 @@ package org.apache.iceberg; import com.google.common.base.Preconditions; +import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import org.apache.avro.generic.IndexedRecord; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.types.Types; @@ -230,107 +232,193 @@ public ManifestFile copy() { } } - static Schema entrySchema(Types.StructType partitionType) { - return wrapFileSchema(DataFile.getType(partitionType)); + static Schema manifestSchema(Types.StructType partitionType) { + // if v2 diverges from the read schema, then copy the schema here + return new Schema(DataFile.getType(partitionType).fields()); } - static Schema wrapFileSchema(Types.StructType fileSchema) { - // this is used to build projection schemas - return new Schema( - ManifestEntry.STATUS, ManifestEntry.SNAPSHOT_ID, ManifestEntry.SEQUENCE_NUMBER, - required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema)); - } - - static class IndexedManifestEntry implements ManifestEntry, IndexedRecord { - private final org.apache.avro.Schema avroSchema; + /** + * Wrapper used to write a DataFile to v2 metadata. This will override the file's status, snapshot id, and sequence + * number with correct values for the commit that are passed when wrapping a file. This also implements Avro's + * IndexedRecord for writing to Avro files. + */ + static class IndexedDataFile implements DataFile, IndexedRecord { private final Long commitSnapshotId; - private final V1Metadata.IndexedDataFile fileWrapper; - private ManifestEntry wrapped = null; + private final org.apache.avro.Schema avroSchema; + private final IndexedStructLike partitionWrapper; + + private FileStatus status = null; + private Long snapshotId = null; + private Long sequenceNumber = null; + private DataFile wrapped = null; - IndexedManifestEntry(Long commitSnapshotId, Types.StructType partitionType) { - this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); + IndexedDataFile(Long commitSnapshotId, Types.StructType partitionType) { this.commitSnapshotId = commitSnapshotId; - // TODO: when v2 data files differ from v1, this should use a v2 wrapper - this.fileWrapper = new V1Metadata.IndexedDataFile(avroSchema.getField("data_file").schema()); + this.avroSchema = AvroSchemaUtil.convert(manifestSchema(partitionType), "data_file"); + this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema()); } - public IndexedManifestEntry wrap(ManifestEntry entry) { - this.wrapped = entry; + DataFile wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile file) { + this.status = FileStatus.EXISTING; + this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; + this.wrapped = file; return this; } - @Override - public org.apache.avro.Schema getSchema() { - return avroSchema; + DataFile wrapAppend(Long newSnapshotId, DataFile file) { + this.status = FileStatus.ADDED; + this.snapshotId = newSnapshotId; + this.sequenceNumber = null; + this.wrapped = file; + return this; } - @Override - public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot read using IndexedManifestEntry"); + DataFile wrapDelete(Long newSnapshotId, Long newSequenceNumber, DataFile file) { + this.status = FileStatus.DELETED; + this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; + this.wrapped = file; + return this; } @Override - public Object get(int i) { - switch (i) { + public Object get(int pos) { + switch (pos) { case 0: - return wrapped.status().id(); + return status.id(); case 1: - return wrapped.snapshotId(); + return snapshotId; case 2: - if (wrapped.sequenceNumber() == null) { + if (sequenceNumber == null) { // if the entry's sequence number is null, then it will inherit the sequence number of the current commit. // to validate that this is correct, check that the snapshot id is either null (will also be inherited) or // that it matches the id of the current commit. - Preconditions.checkState( - wrapped.snapshotId() == null || wrapped.snapshotId().equals(commitSnapshotId), - "Found unassigned sequence number for an entry from snapshot: %s", wrapped.snapshotId()); + Preconditions.checkState(snapshotId == null || snapshotId.equals(commitSnapshotId), + "Found unassigned sequence number for an entry from snapshot: %s", snapshotId); return null; } - return wrapped.sequenceNumber(); + return sequenceNumber; case 3: - return fileWrapper.wrap(wrapped.file()); - default: - throw new UnsupportedOperationException("Unknown field ordinal: " + i); + return wrapped.path().toString(); + case 4: + return wrapped.format() != null ? wrapped.format().toString() : null; + case 5: + return partitionWrapper.wrap(wrapped.partition()); + case 6: + return wrapped.recordCount(); + case 7: + return wrapped.fileSizeInBytes(); + case 8: + return wrapped.columnSizes(); + case 9: + return wrapped.valueCounts(); + case 10: + return wrapped.nullValueCounts(); + case 11: + return wrapped.lowerBounds(); + case 12: + return wrapped.upperBounds(); + case 13: + return wrapped.keyMetadata(); + case 14: + return wrapped.splitOffsets(); } + throw new IllegalArgumentException("Unknown field ordinal: " + pos); } @Override - public Status status() { - return wrapped.status(); + public void put(int i, Object v) { + throw new UnsupportedOperationException("Cannot read into IndexedDataFile"); } @Override - public Long snapshotId() { - return wrapped.snapshotId(); + public org.apache.avro.Schema getSchema() { + return avroSchema; } @Override - public void setSnapshotId(long snapshotId) { - wrapped.setSnapshotId(snapshotId); + public FileStatus status() { + return status; + } + + @Override + public Long snapshotId() { + return snapshotId; } @Override public Long sequenceNumber() { - return wrapped.sequenceNumber(); + return sequenceNumber; + } + + @Override + public CharSequence path() { + return wrapped.path(); + } + + @Override + public FileFormat format() { + return wrapped.format(); + } + + @Override + public StructLike partition() { + return wrapped.partition(); + } + + @Override + public long recordCount() { + return wrapped.recordCount(); + } + + @Override + public long fileSizeInBytes() { + return wrapped.fileSizeInBytes(); + } + + @Override + public Map columnSizes() { + return wrapped.columnSizes(); + } + + @Override + public Map valueCounts() { + return wrapped.valueCounts(); + } + + @Override + public Map nullValueCounts() { + return wrapped.nullValueCounts(); + } + + @Override + public Map lowerBounds() { + return wrapped.lowerBounds(); + } + + @Override + public Map upperBounds() { + return wrapped.upperBounds(); } @Override - public void setSequenceNumber(long sequenceNumber) { - wrapped.setSequenceNumber(sequenceNumber); + public ByteBuffer keyMetadata() { + return wrapped.keyMetadata(); } @Override - public DataFile file() { - return wrapped.file(); + public List splitOffsets() { + return wrapped.splitOffsets(); } @Override - public ManifestEntry copy() { + public DataFile copy() { return wrapped.copy(); } @Override - public ManifestEntry copyWithoutStats() { + public DataFile copyWithoutStats() { return wrapped.copyWithoutStats(); } } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index ef4466dec2c9..c43ade67af99 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -183,17 +183,47 @@ ManifestFile writeManifestWithName(String name, DataFile... files) throws IOExce } ManifestEntry manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFile file) { - GenericManifestEntry entry = new GenericManifestEntry(table.spec().partitionType()); - switch (status) { - case ADDED: - return entry.wrapAppend(snapshotId, file); - case EXISTING: - return entry.wrapExisting(snapshotId, 0L, file); - case DELETED: - return entry.wrapDelete(snapshotId, file); - default: - throw new IllegalArgumentException("Unexpected entry status: " + status); - } + return new ManifestEntry() { + @Override + public Status status() { + return status; + } + + @Override + public Long snapshotId() { + return snapshotId; + } + + @Override + public void setSnapshotId(long snapshotId) { + + } + + @Override + public Long sequenceNumber() { + return 0L; + } + + @Override + public void setSequenceNumber(long sequenceNumber) { + + } + + @Override + public DataFile file() { + return file; + } + + @Override + public ManifestEntry copy() { + return null; + } + + @Override + public ManifestEntry copyWithoutStats() { + return null; + } + }; } void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) { diff --git a/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java b/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java index 2255b6e3fb31..b0487aed553a 100644 --- a/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java +++ b/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java @@ -50,7 +50,7 @@ public void testEntriesTable() { Table entriesTable = new ManifestEntriesTable(table.ops(), table); - Schema expectedSchema = ManifestEntry.getSchema(table.spec().partitionType()); + Schema expectedSchema = MetadataTables.entriesTableSchema(table.spec().partitionType()); assertEquals("A tableScan.select() should prune the schema", expectedSchema.asStruct(), @@ -67,7 +67,7 @@ public void testEntriesTableScan() { Table entriesTable = new ManifestEntriesTable(table.ops(), table); TableScan scan = entriesTable.newScan(); - Schema expectedSchema = ManifestEntry.getSchema(table.spec().partitionType()); + Schema expectedSchema = MetadataTables.entriesTableSchema(table.spec().partitionType()); assertEquals("A tableScan.select() should prune the schema", expectedSchema.asStruct(), diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index 423cd51074dd..f2dea9bde298 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -77,7 +77,8 @@ public class TestManifestWriterVersions { public void testV1Write() throws IOException { ManifestFile manifest = writeManifest(1); checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); - checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ); + checkEntry(readManifest(manifest).asEntry(), ManifestEntry.Status.ADDED, ManifestWriter.UNASSIGNED_SEQ); + checkDataFile(readManifest(manifest), FileStatus.ADDED, ManifestWriter.UNASSIGNED_SEQ); } @Test @@ -86,14 +87,16 @@ public void testV1WriteWithInheritance() throws IOException { checkManifest(manifest, 0L); // v1 should be read using sequence number 0 because it was missing from the manifest list file - checkEntry(readManifest(manifest), 0L); + checkEntry(readManifest(manifest).asEntry(), ManifestEntry.Status.ADDED, 0L); + checkDataFile(readManifest(manifest), FileStatus.ADDED, 0L); } @Test public void testV2Write() throws IOException { ManifestFile manifest = writeManifest(1); checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); - checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ); + checkEntry(readManifest(manifest).asEntry(), ManifestEntry.Status.ADDED, ManifestWriter.UNASSIGNED_SEQ); + checkDataFile(readManifest(manifest), FileStatus.ADDED, ManifestWriter.UNASSIGNED_SEQ); } @Test @@ -102,7 +105,8 @@ public void testV2WriteWithInheritance() throws IOException { checkManifest(manifest, SEQUENCE_NUMBER); // v2 should use the correct sequence number by inheriting it - checkEntry(readManifest(manifest), SEQUENCE_NUMBER); + checkEntry(readManifest(manifest).asEntry(), ManifestEntry.Status.ADDED, SEQUENCE_NUMBER); + checkDataFile(readManifest(manifest), FileStatus.ADDED, SEQUENCE_NUMBER); } @Test @@ -117,7 +121,8 @@ public void testV2ManifestListRewriteWithInheritance() throws IOException { checkManifest(manifest2, 0L); // should not inherit the v2 sequence number because it was a rewrite - checkEntry(readManifest(manifest2), 0L); + checkEntry(readManifest(manifest2).asEntry(), ManifestEntry.Status.ADDED, 0L); + checkDataFile(readManifest(manifest2), FileStatus.ADDED, 0L); } @Test @@ -136,24 +141,31 @@ public void testV2ManifestRewriteWithInheritance() throws IOException { checkRewrittenManifest(manifest2, SEQUENCE_NUMBER, 0L); // should not inherit the v2 sequence number because it was written into the v2 manifest - checkRewrittenEntry(readManifest(manifest2), 0L); + checkEntry(readManifest(manifest2).asEntry(), ManifestEntry.Status.EXISTING, 0L); + checkDataFile(readManifest(manifest2), FileStatus.EXISTING, 0L); } - void checkEntry(ManifestEntry entry, Long expectedSequenceNumber) { - Assert.assertEquals("Status", ManifestEntry.Status.ADDED, entry.status()); + void checkEntry(ManifestEntry entry, ManifestEntry.Status status, Long expectedSequenceNumber) { + Assert.assertEquals("Status", status, entry.status()); Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId()); Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber()); - checkDataFile(entry.file()); - } - void checkRewrittenEntry(ManifestEntry entry, Long expectedSequenceNumber) { - Assert.assertEquals("Status", ManifestEntry.Status.EXISTING, entry.status()); - Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId()); - Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber()); - checkDataFile(entry.file()); + DataFile dataFile = entry.file(); + Assert.assertEquals("Path", PATH, dataFile.path()); + Assert.assertEquals("Format", FORMAT, dataFile.format()); + Assert.assertEquals("Partition", PARTITION, dataFile.partition()); + Assert.assertEquals("Record count", METRICS.recordCount(), (Long) dataFile.recordCount()); + Assert.assertEquals("Column sizes", METRICS.columnSizes(), dataFile.columnSizes()); + Assert.assertEquals("Value counts", METRICS.valueCounts(), dataFile.valueCounts()); + Assert.assertEquals("Null value counts", METRICS.nullValueCounts(), dataFile.nullValueCounts()); + Assert.assertEquals("Lower bounds", METRICS.lowerBounds(), dataFile.lowerBounds()); + Assert.assertEquals("Upper bounds", METRICS.upperBounds(), dataFile.upperBounds()); } - void checkDataFile(DataFile dataFile) { + void checkDataFile(DataFile dataFile, FileStatus expectedStatus, Long expectedSequenceNumber) { + Assert.assertEquals("Status", expectedStatus, dataFile.status()); + Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, dataFile.snapshotId()); + Assert.assertEquals("Sequence number", expectedSequenceNumber, dataFile.sequenceNumber()); Assert.assertEquals("Path", PATH, dataFile.path()); Assert.assertEquals("Format", FORMAT, dataFile.format()); Assert.assertEquals("Partition", PARTITION, dataFile.partition()); @@ -226,11 +238,11 @@ private ManifestFile writeManifest(int formatVersion) throws IOException { return writer.toManifestFile(); } - private ManifestEntry readManifest(ManifestFile manifest) throws IOException { - try (CloseableIterable reader = ManifestFiles.read(manifest, FILE_IO).entries()) { - List files = Lists.newArrayList(reader); + private GenericDataFile readManifest(ManifestFile manifest) throws IOException { + try (CloseableIterable reader = ManifestFiles.read(manifest, FILE_IO)) { + List files = Lists.newArrayList(reader); Assert.assertEquals("Should contain only one data file", 1, files.size()); - return files.get(0); + return (GenericDataFile) files.get(0); } } } diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java b/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java index b8f88ea54e6b..14eb341de9c4 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java @@ -31,7 +31,6 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; -import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; @@ -49,7 +48,6 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.spark.SparkDataFile; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.spark.api.java.JavaSparkContext; @@ -344,8 +342,7 @@ private static ManifestFile writeManifest( Path manifestPath = new Path(location, manifestName); OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString())); - Types.StructType dataFileType = DataFile.getType(spec.partitionType()); - SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType); + SparkDataFile wrapper = new SparkDataFile(spec.partitionType(), sparkType); ManifestWriter writer = ManifestFiles.write(format, spec, outputFile, null); diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index 4991980d83d4..37d5d9a6031a 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileStatus; import org.apache.iceberg.StructLike; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -46,28 +47,21 @@ public class SparkDataFile implements DataFile { private final int upperBoundsPosition; private final int keyMetadataPosition; private final int splitOffsetsPosition; - private final Type lowerBoundsType; - private final Type upperBoundsType; - private final Type keyMetadataType; private final SparkStructLike wrappedPartition; private Row wrapped; - public SparkDataFile(Types.StructType type, StructType sparkType) { - this.lowerBoundsType = type.fieldType("lower_bounds"); - this.upperBoundsType = type.fieldType("upper_bounds"); - this.keyMetadataType = type.fieldType("key_metadata"); - this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType()); + public SparkDataFile(Types.StructType partitionType, StructType sparkType) { + this.wrappedPartition = new SparkStructLike(partitionType); Map positions = Maps.newHashMap(); - type.fields().forEach(field -> { - String fieldName = field.name(); - positions.put(fieldName, fieldPosition(fieldName, sparkType)); - }); + for (int pos = 0; pos < sparkType.size(); pos += 1) { + positions.put(sparkType.fields()[pos].name(), pos); + } filePathPosition = positions.get("file_path"); fileFormatPosition = positions.get("file_format"); - partitionPosition = positions.get("partition"); + partitionPosition = positions.getOrDefault("partition", -1); recordCountPosition = positions.get("record_count"); fileSizeInBytesPosition = positions.get("file_size_in_bytes"); columnSizesPosition = positions.get("column_sizes"); @@ -87,6 +81,21 @@ public SparkDataFile wrap(Row row) { return this; } + @Override + public FileStatus status() { + return null; + } + + @Override + public Long snapshotId() { + return null; + } + + @Override + public Long sequenceNumber() { + return null; + } + @Override public CharSequence path() { return wrapped.getAs(filePathPosition); @@ -131,18 +140,18 @@ public Map nullValueCounts() { @Override public Map lowerBounds() { Map lowerBounds = wrapped.isNullAt(lowerBoundsPosition) ? null : wrapped.getJavaMap(lowerBoundsPosition); - return convert(lowerBoundsType, lowerBounds); + return convert(DataFile.LOWER_BOUNDS.type(), lowerBounds); } @Override public Map upperBounds() { Map upperBounds = wrapped.isNullAt(upperBoundsPosition) ? null : wrapped.getJavaMap(upperBoundsPosition); - return convert(upperBoundsType, upperBounds); + return convert(DataFile.UPPER_BOUNDS.type(), upperBounds); } @Override public ByteBuffer keyMetadata() { - return convert(keyMetadataType, wrapped.get(keyMetadataPosition)); + return convert(DataFile.KEY_METADATA.type(), wrapped.get(keyMetadataPosition)); } @Override diff --git a/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java b/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java index 64c9d519f16b..3bc75f01e0f5 100644 --- a/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java +++ b/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java @@ -129,6 +129,12 @@ public void testDataFileJavaSerialization() throws Exception { } private void checkDataFile(DataFile expected, DataFile actual) { + Assert.assertEquals("Should match the serialized status", + expected.status(), actual.status()); + Assert.assertEquals("Should match the serialized snapshot id", + expected.snapshotId(), actual.snapshotId()); + Assert.assertEquals("Should match the serialized sequence number", + expected.sequenceNumber(), actual.sequenceNumber()); Assert.assertEquals("Should match the serialized record path", expected.path(), actual.path()); Assert.assertEquals("Should match the serialized record format", @@ -151,7 +157,7 @@ private void checkDataFile(DataFile expected, DataFile actual) { expected.keyMetadata(), actual.keyMetadata()); Assert.assertEquals("Should match the serialized record offsets", expected.splitOffsets(), actual.splitOffsets()); - Assert.assertEquals("Should match the serialized record offsets", + Assert.assertEquals("Should match the serialized key metadata", expected.keyMetadata(), actual.keyMetadata()); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java b/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java index 2ebb3dd473f7..8cc7176bc28a 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java +++ b/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java @@ -175,9 +175,8 @@ private void checkSparkDataFile(Table table) throws IOException { Assert.assertEquals("The number of files should match", dataFiles.size(), sparkDataFiles.size()); - Types.StructType dataFileType = DataFile.getType(table.spec().partitionType()); StructType sparkDataFileType = sparkDataFiles.get(0).schema(); - SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkDataFileType); + SparkDataFile wrapper = new SparkDataFile(table.spec().partitionType(), sparkDataFileType); for (int i = 0; i < dataFiles.size(); i++) { checkDataFile(dataFiles.get(i), wrapper.wrap(sparkDataFiles.get(i)));