diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java index a4d97fa93fa1..c4cd3c06adfd 100644 --- a/api/src/main/java/org/apache/iceberg/ContentFile.java +++ b/api/src/main/java/org/apache/iceberg/ContentFile.java @@ -167,6 +167,13 @@ default Long fileSequenceNumber() { return null; } + /** + * Returns the starting row ID to assign to new rows in the data file (with _row_id set to null). + */ + default Long firstRowId() { + return null; + } + /** * Copies this file. Manifest readers can reuse file instances; use this method to copy data when * collecting files from tasks. diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index ea6262afac85..58742e18d77c 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -98,6 +98,8 @@ public interface DataFile extends ContentFile { Types.NestedField SORT_ORDER_ID = optional(140, "sort_order_id", IntegerType.get(), "Sort order ID"); Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID"); + Types.NestedField FIRST_ROW_ID = + optional(142, "first_row_id", LongType.get(), "Starting row ID to assign to new rows"); Types.NestedField REFERENCED_DATA_FILE = optional( 143, @@ -140,6 +142,7 @@ static StructType getType(StructType partitionType) { SPLIT_OFFSETS, EQUALITY_IDS, SORT_ORDER_ID, + FIRST_ROW_ID, REFERENCED_DATA_FILE, CONTENT_OFFSET, CONTENT_SIZE); diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java index 3cdf081543e4..2f732aef427f 100644 --- a/api/src/main/java/org/apache/iceberg/ManifestFile.java +++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java @@ -89,7 +89,13 @@ public interface ManifestFile { "Summary for each partition"); Types.NestedField KEY_METADATA = optional(519, "key_metadata", Types.BinaryType.get(), "Encryption key metadata blob"); - // next ID to assign: 520 + Types.NestedField FIRST_ROW_ID = + optional( + 520, + "first_row_id", + Types.LongType.get(), + "Starting row ID to assign to new rows in ADDED data files"); + // next ID to assign: 521 Schema SCHEMA = new Schema( @@ -107,7 +113,8 @@ public interface ManifestFile { EXISTING_ROWS_COUNT, DELETED_ROWS_COUNT, PARTITION_SUMMARIES, - KEY_METADATA); + KEY_METADATA, + FIRST_ROW_ID); static Schema schema() { return SCHEMA; @@ -198,6 +205,11 @@ default ByteBuffer keyMetadata() { return null; } + /** Returns the starting row ID to assign to new rows in ADDED data files. */ + default Long firstRowId() { + return null; + } + /** * Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use * this method to make defensive copies. diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java index 7b4e2b9ec523..c7ab2f44aa2c 100644 --- a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java +++ b/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java @@ -71,7 +71,8 @@ public void before() { Random random = new Random(System.currentTimeMillis()); try (ManifestListWriter listWriter = - ManifestLists.write(1, org.apache.iceberg.Files.localOutput(manifestListFile), 0, 1L, 0)) { + ManifestLists.write( + 1, org.apache.iceberg.Files.localOutput(manifestListFile), 0, 1L, 0, 0L)) { for (int i = 0; i < NUM_FILES; i++) { OutputFile manifestFile = org.apache.iceberg.Files.localOutput( diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java index 349c8e5d4fa2..28b10bbd6950 100644 --- a/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java +++ b/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java @@ -105,7 +105,8 @@ public void writeManifestFile(BenchmarkState state) throws IOException { org.apache.iceberg.Files.localOutput(manifestListFile), 0, 1L, - 0)) { + 0, + 0L)) { for (int i = 0; i < NUM_FILES; i++) { OutputFile manifestFile = org.apache.iceberg.Files.localOutput( diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index e9724637dfa3..bd7145179f6f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -80,6 +80,7 @@ public PartitionData copy() { private int[] equalityIds = null; private byte[] keyMetadata = null; private Integer sortOrderId; + private Long firstRowId = null; private String referencedDataFile = null; private Long contentOffset = null; private Long contentSizeInBytes = null; @@ -111,6 +112,7 @@ public PartitionData copy() { DataFile.SPLIT_OFFSETS, DataFile.EQUALITY_IDS, DataFile.SORT_ORDER_ID, + DataFile.FIRST_ROW_ID, DataFile.REFERENCED_DATA_FILE, DataFile.CONTENT_OFFSET, DataFile.CONTENT_SIZE, @@ -156,6 +158,7 @@ public PartitionData copy() { int[] equalityFieldIds, Integer sortOrderId, ByteBuffer keyMetadata, + Long firstRowId, String referencedDataFile, Long contentOffset, Long contentSizeInBytes) { @@ -187,6 +190,7 @@ public PartitionData copy() { this.equalityIds = equalityFieldIds; this.sortOrderId = sortOrderId; this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); + this.firstRowId = firstRowId; this.referencedDataFile = referencedDataFile; this.contentOffset = contentOffset; this.contentSizeInBytes = contentSizeInBytes; @@ -242,6 +246,7 @@ public PartitionData copy() { this.sortOrderId = toCopy.sortOrderId; this.dataSequenceNumber = toCopy.dataSequenceNumber; this.fileSequenceNumber = toCopy.fileSequenceNumber; + this.firstRowId = toCopy.firstRowId; this.referencedDataFile = toCopy.referencedDataFile; this.contentOffset = toCopy.contentOffset; this.contentSizeInBytes = toCopy.contentSizeInBytes; @@ -283,6 +288,15 @@ public void setFileSequenceNumber(Long fileSequenceNumber) { this.fileSequenceNumber = fileSequenceNumber; } + @Override + public Long firstRowId() { + return firstRowId; + } + + public void setFirstRowId(long firstRowId) { + this.firstRowId = firstRowId; + } + protected abstract Schema getAvroSchema(Types.StructType partitionStruct); @Override @@ -354,15 +368,18 @@ protected void internalSet(int pos, T value) { this.sortOrderId = (Integer) value; return; case 17: - this.referencedDataFile = value != null ? value.toString() : null; + this.firstRowId = (Long) value; return; case 18: - this.contentOffset = (Long) value; + this.referencedDataFile = value != null ? value.toString() : null; return; case 19: - this.contentSizeInBytes = (Long) value; + this.contentOffset = (Long) value; return; case 20: + this.contentSizeInBytes = (Long) value; + return; + case 21: this.fileOrdinal = (long) value; return; default: @@ -412,12 +429,14 @@ private Object getByPos(int basePos) { case 16: return sortOrderId; case 17: - return referencedDataFile; + return firstRowId; case 18: - return contentOffset; + return referencedDataFile; case 19: - return contentSizeInBytes; + return contentOffset; case 20: + return contentSizeInBytes; + case 21: return fileOrdinal; default: throw new UnsupportedOperationException("Unknown field ordinal: " + basePos); @@ -607,6 +626,7 @@ public String toString() { .add("sort_order_id", sortOrderId) .add("data_sequence_number", dataSequenceNumber == null ? "null" : dataSequenceNumber) .add("file_sequence_number", fileSequenceNumber == null ? "null" : fileSequenceNumber) + .add("first_row_id", firstRowId == null ? "null" : firstRowId) .add("referenced_data_file", referencedDataFile == null ? "null" : referencedDataFile) .add("content_offset", contentOffset == null ? "null" : contentOffset) .add("content_size_in_bytes", contentSizeInBytes == null ? "null" : contentSizeInBytes) diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index a86013fefc0c..28de63f7f6b8 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -158,6 +158,7 @@ private ManifestFile copyManifest(ManifestFile manifest) { return ManifestFiles.copyRewriteManifest( current.formatVersion(), manifest.partitionSpecId(), + manifest.firstRowId(), toCopy, specsById, newFile, diff --git a/core/src/main/java/org/apache/iceberg/ContentFileParser.java b/core/src/main/java/org/apache/iceberg/ContentFileParser.java index 1be06cb42602..63cd606356db 100644 --- a/core/src/main/java/org/apache/iceberg/ContentFileParser.java +++ b/core/src/main/java/org/apache/iceberg/ContentFileParser.java @@ -45,6 +45,7 @@ class ContentFileParser { private static final String SPLIT_OFFSETS = "split-offsets"; private static final String EQUALITY_IDS = "equality-ids"; private static final String SORT_ORDER_ID = "sort-order-id"; + private static final String FIRST_ROW_ID = "first-row-id"; private static final String REFERENCED_DATA_FILE = "referenced-data-file"; private static final String CONTENT_OFFSET = "content-offset"; private static final String CONTENT_SIZE = "content-size-in-bytes"; @@ -112,6 +113,8 @@ static void toJson(ContentFile contentFile, PartitionSpec spec, JsonGenerator generator.writeNumberField(SORT_ORDER_ID, contentFile.sortOrderId()); } + JsonUtil.writeLongFieldIfPresent(FIRST_ROW_ID, contentFile.firstRowId(), generator); + if (contentFile instanceof DeleteFile) { DeleteFile deleteFile = (DeleteFile) contentFile; @@ -164,6 +167,7 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { List splitOffsets = JsonUtil.getLongListOrNull(SPLIT_OFFSETS, jsonNode); int[] equalityFieldIds = JsonUtil.getIntArrayOrNull(EQUALITY_IDS, jsonNode); Integer sortOrderId = JsonUtil.getIntOrNull(SORT_ORDER_ID, jsonNode); + Long firstRowId = JsonUtil.getLongOrNull(FIRST_ROW_ID, jsonNode); String referencedDataFile = JsonUtil.getStringOrNull(REFERENCED_DATA_FILE, jsonNode); Long contentOffset = JsonUtil.getLongOrNull(CONTENT_OFFSET, jsonNode); Long contentSizeInBytes = JsonUtil.getLongOrNull(CONTENT_SIZE, jsonNode); @@ -178,7 +182,8 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { metrics, keyMetadata, splitOffsets, - sortOrderId); + sortOrderId, + firstRowId); } else { return new GenericDeleteFile( specId, diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java b/core/src/main/java/org/apache/iceberg/DataFiles.java index 0404f2da52b4..10d919ed2779 100644 --- a/core/src/main/java/org/apache/iceberg/DataFiles.java +++ b/core/src/main/java/org/apache/iceberg/DataFiles.java @@ -154,6 +154,7 @@ public static class Builder { private ByteBuffer keyMetadata = null; private List splitOffsets = null; private Integer sortOrderId = SortOrder.unsorted().orderId(); + private Long firstRowId = null; public Builder(PartitionSpec spec) { this.spec = spec; @@ -178,6 +179,7 @@ public void clear() { this.upperBounds = null; this.splitOffsets = null; this.sortOrderId = SortOrder.unsorted().orderId(); + this.firstRowId = null; } public Builder copy(DataFile toCopy) { @@ -201,6 +203,7 @@ public Builder copy(DataFile toCopy) { this.splitOffsets = toCopy.splitOffsets() == null ? null : ImmutableList.copyOf(toCopy.splitOffsets()); this.sortOrderId = toCopy.sortOrderId(); + this.firstRowId = toCopy.firstRowId(); return this; } @@ -315,6 +318,11 @@ public Builder withSortOrder(SortOrder newSortOrder) { return this; } + public Builder withFirstRowId(Long nextRowId) { + this.firstRowId = nextRowId; + return this; + } + public DataFile build() { Preconditions.checkArgument(filePath != null, "File path is required"); if (format == null) { @@ -340,7 +348,8 @@ public DataFile build() { upperBounds), keyMetadata, splitOffsets, - sortOrderId); + sortOrderId, + firstRowId); } } } diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 3d58e192a061..ccb2302506b7 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -136,6 +136,7 @@ private ManifestFile copyManifest(ManifestFile manifest) { return ManifestFiles.copyAppendManifest( current.formatVersion(), manifest.partitionSpecId(), + manifest.firstRowId(), toCopy, current.specsById(), newManifestFile, diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index a61cc1e0fb72..9033d0718bb1 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -46,7 +46,8 @@ class GenericDataFile extends BaseFile implements DataFile { Metrics metrics, ByteBuffer keyMetadata, List splitOffsets, - Integer sortOrderId) { + Integer sortOrderId, + Long firstRowId) { super( specId, FileContent.DATA, @@ -65,6 +66,7 @@ class GenericDataFile extends BaseFile implements DataFile { null /* no equality field IDs */, sortOrderId, keyMetadata, + firstRowId, null /* no referenced data file */, null /* no content offset */, null /* no content size */); diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java index 9205551f24b3..897d77c2fb5a 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java @@ -70,6 +70,7 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { equalityFieldIds, sortOrderId, keyMetadata, + null /* delete files do not use first-row-id */, referencedDataFile, contentOffset, contentSizeInBytes); diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index 9ccd59893d0e..0ac0fddf5516 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -61,6 +61,7 @@ public class GenericManifestFile extends SupportsIndexProjection private Long deletedRowsCount = null; private PartitionFieldSummary[] partitions = null; private byte[] keyMetadata = null; + private Long firstRowId = null; /** Used by Avro reflection to instantiate this class when reading manifest files. */ public GenericManifestFile(Schema avroSchema) { @@ -92,6 +93,7 @@ public GenericManifestFile(Schema avroSchema) { this.deletedRowsCount = null; this.partitions = null; this.keyMetadata = null; + this.firstRowId = null; } /** Adjust the arg order to avoid conflict with the public constructor below */ @@ -110,7 +112,8 @@ public GenericManifestFile(Schema avroSchema) { Integer existingFilesCount, Long existingRowsCount, Integer deletedFilesCount, - Long deletedRowsCount) { + Long deletedRowsCount, + Long firstRowId) { super(ManifestFile.schema().columns().size()); this.avroSchema = AVRO_SCHEMA; this.manifestPath = path; @@ -128,8 +131,15 @@ public GenericManifestFile(Schema avroSchema) { this.deletedRowsCount = deletedRowsCount; this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]); this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); + this.firstRowId = firstRowId; } + /** + * GenericManifestFile constructor. + * + * @deprecated will be removed in 1.10.0; use {@link ManifestWriter#toManifestFile()} instead. + */ + @Deprecated public GenericManifestFile( String path, long length, @@ -204,6 +214,7 @@ private GenericManifestFile(GenericManifestFile toCopy) { toCopy.keyMetadata == null ? null : Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length); + this.firstRowId = toCopy.firstRowId; } /** Constructor for Java serialization. */ @@ -299,6 +310,11 @@ public ByteBuffer keyMetadata() { return keyMetadata == null ? null : ByteBuffer.wrap(keyMetadata); } + @Override + public Long firstRowId() { + return firstRowId; + } + @Override public int size() { return ManifestFile.schema().columns().size(); @@ -346,6 +362,8 @@ private Object getByPos(int basePos) { return partitions(); case 14: return keyMetadata(); + case 15: + return firstRowId(); default: throw new UnsupportedOperationException("Unknown field ordinal: " + basePos); } @@ -404,6 +422,9 @@ protected void internalSet(int basePos, T value) { case 14: this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); return; + case 15: + this.firstRowId = (Long) value; + return; default: // ignore the object, it must be from a newer version of the format } @@ -458,6 +479,7 @@ public String toString() { .add("key_metadata", keyMetadata == null ? "null" : "(redacted)") .add("sequence_number", sequenceNumber) .add("min_sequence_number", minSequenceNumber) + .add("first_row_id", firstRowId) .toString(); } @@ -481,14 +503,15 @@ private CopyBuilder(ManifestFile toCopy) { toCopy.sequenceNumber(), toCopy.minSequenceNumber(), toCopy.snapshotId(), + copyList(toCopy.partitions(), PartitionFieldSummary::copy), + toCopy.keyMetadata(), toCopy.addedFilesCount(), toCopy.addedRowsCount(), toCopy.existingFilesCount(), toCopy.existingRowsCount(), toCopy.deletedFilesCount(), toCopy.deletedRowsCount(), - copyList(toCopy.partitions(), PartitionFieldSummary::copy), - toCopy.keyMetadata()); + toCopy.firstRowId()); } } diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java index e01346a76f3b..37e17e94ca1f 100644 --- a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java +++ b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java @@ -41,6 +41,7 @@ static InheritableMetadata fromManifest(ManifestFile manifest) { manifest.path()); } + /** Returns {@link InheritableMetadata} for rewriting a manifest before it is committed. */ static InheritableMetadata forCopy(long snapshotId) { return new CopyMetadata(snapshotId); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestFileParser.java b/core/src/main/java/org/apache/iceberg/ManifestFileParser.java index 80f1a0fcf7fc..7bd265777db8 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFileParser.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFileParser.java @@ -44,6 +44,7 @@ class ManifestFileParser { private static final String DELETED_ROWS_COUNT = "deleted-rows-count"; private static final String PARTITION_FIELD_SUMMARY = "partition-field-summary"; private static final String KEY_METADATA = "key-metadata"; + private static final String FIRST_ROW_ID = "first-row-id"; private ManifestFileParser() {} @@ -64,33 +65,18 @@ static void toJson(ManifestFile manifestFile, JsonGenerator generator) throws IO generator.writeNumberField(SEQUENCE_NUMBER, manifestFile.sequenceNumber()); generator.writeNumberField(MIN_SEQUENCE_NUMBER, manifestFile.minSequenceNumber()); - if (manifestFile.snapshotId() != null) { - generator.writeNumberField(ADDED_SNAPSHOT_ID, manifestFile.snapshotId()); - } - - if (manifestFile.addedFilesCount() != null) { - generator.writeNumberField(ADDED_FILES_COUNT, manifestFile.addedFilesCount()); - } - - if (manifestFile.existingFilesCount() != null) { - generator.writeNumberField(EXISTING_FILES_COUNT, manifestFile.existingFilesCount()); - } - - if (manifestFile.deletedFilesCount() != null) { - generator.writeNumberField(DELETED_FILES_COUNT, manifestFile.deletedFilesCount()); - } - - if (manifestFile.addedRowsCount() != null) { - generator.writeNumberField(ADDED_ROWS_COUNT, manifestFile.addedRowsCount()); - } - - if (manifestFile.existingRowsCount() != null) { - generator.writeNumberField(EXISTING_ROWS_COUNT, manifestFile.existingRowsCount()); - } - - if (manifestFile.deletedRowsCount() != null) { - generator.writeNumberField(DELETED_ROWS_COUNT, manifestFile.deletedRowsCount()); - } + JsonUtil.writeLongFieldIfPresent(ADDED_SNAPSHOT_ID, manifestFile.snapshotId(), generator); + JsonUtil.writeIntegerFieldIfPresent( + ADDED_FILES_COUNT, manifestFile.addedFilesCount(), generator); + JsonUtil.writeIntegerFieldIfPresent( + EXISTING_FILES_COUNT, manifestFile.existingFilesCount(), generator); + JsonUtil.writeIntegerFieldIfPresent( + DELETED_FILES_COUNT, manifestFile.deletedFilesCount(), generator); + JsonUtil.writeLongFieldIfPresent(ADDED_ROWS_COUNT, manifestFile.addedRowsCount(), generator); + JsonUtil.writeLongFieldIfPresent( + EXISTING_ROWS_COUNT, manifestFile.existingRowsCount(), generator); + JsonUtil.writeLongFieldIfPresent( + DELETED_ROWS_COUNT, manifestFile.deletedRowsCount(), generator); if (manifestFile.partitions() != null) { generator.writeArrayFieldStart(PARTITION_FIELD_SUMMARY); @@ -106,6 +92,8 @@ static void toJson(ManifestFile manifestFile, JsonGenerator generator) throws IO SingleValueParser.toJson(DataFile.KEY_METADATA.type(), manifestFile.keyMetadata(), generator); } + JsonUtil.writeLongFieldIfPresent(FIRST_ROW_ID, manifestFile.firstRowId(), generator); + generator.writeEndObject(); } @@ -125,41 +113,13 @@ static ManifestFile fromJson(JsonNode jsonNode) { long sequenceNumber = JsonUtil.getLong(SEQUENCE_NUMBER, jsonNode); long minSequenceNumber = JsonUtil.getLong(MIN_SEQUENCE_NUMBER, jsonNode); - - Long addedSnapshotId = null; - if (jsonNode.has(ADDED_SNAPSHOT_ID)) { - addedSnapshotId = JsonUtil.getLong(ADDED_SNAPSHOT_ID, jsonNode); - } - - Integer addedFilesCount = null; - if (jsonNode.has(ADDED_FILES_COUNT)) { - addedFilesCount = JsonUtil.getInt(ADDED_FILES_COUNT, jsonNode); - } - - Integer existingFilesCount = null; - if (jsonNode.has(EXISTING_FILES_COUNT)) { - existingFilesCount = JsonUtil.getInt(EXISTING_FILES_COUNT, jsonNode); - } - - Integer deletedFilesCount = null; - if (jsonNode.has(DELETED_FILES_COUNT)) { - deletedFilesCount = JsonUtil.getInt(DELETED_FILES_COUNT, jsonNode); - } - - Long addedRowsCount = null; - if (jsonNode.has(ADDED_ROWS_COUNT)) { - addedRowsCount = JsonUtil.getLong(ADDED_ROWS_COUNT, jsonNode); - } - - Long existingRowsCount = null; - if (jsonNode.has(EXISTING_ROWS_COUNT)) { - existingRowsCount = JsonUtil.getLong(EXISTING_ROWS_COUNT, jsonNode); - } - - Long deletedRowsCount = null; - if (jsonNode.has(DELETED_ROWS_COUNT)) { - deletedRowsCount = JsonUtil.getLong(DELETED_ROWS_COUNT, jsonNode); - } + Long addedSnapshotId = JsonUtil.getLongOrNull(ADDED_SNAPSHOT_ID, jsonNode); + Integer addedFilesCount = JsonUtil.getIntOrNull(ADDED_FILES_COUNT, jsonNode); + Integer existingFilesCount = JsonUtil.getIntOrNull(EXISTING_FILES_COUNT, jsonNode); + Integer deletedFilesCount = JsonUtil.getIntOrNull(DELETED_FILES_COUNT, jsonNode); + Long addedRowsCount = JsonUtil.getLongOrNull(ADDED_ROWS_COUNT, jsonNode); + Long existingRowsCount = JsonUtil.getLongOrNull(EXISTING_ROWS_COUNT, jsonNode); + Long deletedRowsCount = JsonUtil.getLongOrNull(DELETED_ROWS_COUNT, jsonNode); List partitionFieldSummaries = null; if (jsonNode.has(PARTITION_FIELD_SUMMARY)) { @@ -181,6 +141,8 @@ static ManifestFile fromJson(JsonNode jsonNode) { ByteBuffer keyMetadata = JsonUtil.getByteBufferOrNull(KEY_METADATA, jsonNode); + Long firstRowId = JsonUtil.getLongOrNull(FIRST_ROW_ID, jsonNode); + return new GenericManifestFile( path, length, @@ -196,7 +158,8 @@ static ManifestFile fromJson(JsonNode jsonNode) { existingFilesCount, existingRowsCount, deletedFilesCount, - deletedRowsCount); + deletedRowsCount, + firstRowId); } private static class PartitionFieldSummaryParser { diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index c9f6b783b93f..a1dfb66f32d2 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -131,7 +131,12 @@ public static ManifestReader read( InputFile file = newInputFile(io, manifest); InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); return new ManifestReader<>( - file, manifest.partitionSpecId(), specsById, inheritableMetadata, FileType.DATA_FILES); + file, + manifest.partitionSpecId(), + specsById, + inheritableMetadata, + manifest.firstRowId(), + FileType.DATA_FILES); } /** @@ -177,13 +182,31 @@ public static ManifestWriter write( PartitionSpec spec, EncryptedOutputFile encryptedOutputFile, Long snapshotId) { + return newWriter(formatVersion, spec, encryptedOutputFile, snapshotId, null); + } + + /** + * Create a new {@link ManifestWriter} for the given format version. + * + * @param formatVersion a target format version + * @param spec a {@link PartitionSpec} + * @param encryptedOutputFile an {@link EncryptedOutputFile} where the manifest will be written + * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID + * @return a manifest writer + */ + private static ManifestWriter newWriter( + int formatVersion, + PartitionSpec spec, + EncryptedOutputFile encryptedOutputFile, + Long snapshotId, + Long firstRowId) { switch (formatVersion) { case 1: return new ManifestWriter.V1Writer(spec, encryptedOutputFile, snapshotId); case 2: return new ManifestWriter.V2Writer(spec, encryptedOutputFile, snapshotId); case 3: - return new ManifestWriter.V3Writer(spec, encryptedOutputFile, snapshotId); + return new ManifestWriter.V3Writer(spec, encryptedOutputFile, snapshotId, firstRowId); } throw new UnsupportedOperationException( "Cannot write manifest for table version: " + formatVersion); @@ -291,17 +314,21 @@ static ManifestReader open( static ManifestFile copyAppendManifest( int formatVersion, int specId, + Long firstRowId, InputFile toCopy, Map specsById, EncryptedOutputFile outputFile, long snapshotId, SnapshotSummary.Builder summaryBuilder) { // use metadata that will add the current snapshot's ID for the rewrite + // read first_row_id as null because this copies the incoming manifest before commit InheritableMetadata inheritableMetadata = InheritableMetadataFactory.forCopy(snapshotId); try (ManifestReader reader = - new ManifestReader<>(toCopy, specId, specsById, inheritableMetadata, FileType.DATA_FILES)) { + new ManifestReader<>( + toCopy, specId, specsById, inheritableMetadata, null, FileType.DATA_FILES)) { return copyManifestInternal( formatVersion, + firstRowId, reader, outputFile, snapshotId, @@ -315,6 +342,7 @@ static ManifestFile copyAppendManifest( static ManifestFile copyRewriteManifest( int formatVersion, int specId, + Long firstRowId, InputFile toCopy, Map specsById, EncryptedOutputFile outputFile, @@ -324,9 +352,11 @@ static ManifestFile copyRewriteManifest( // exception if it is not InheritableMetadata inheritableMetadata = InheritableMetadataFactory.empty(); try (ManifestReader reader = - new ManifestReader<>(toCopy, specId, specsById, inheritableMetadata, FileType.DATA_FILES)) { + new ManifestReader<>( + toCopy, specId, specsById, inheritableMetadata, firstRowId, FileType.DATA_FILES)) { return copyManifestInternal( formatVersion, + firstRowId, reader, outputFile, snapshotId, @@ -340,12 +370,14 @@ static ManifestFile copyRewriteManifest( @SuppressWarnings("Finally") private static ManifestFile copyManifestInternal( int formatVersion, + Long firstRowId, ManifestReader reader, EncryptedOutputFile outputFile, long snapshotId, SnapshotSummary.Builder summaryBuilder, ManifestEntry.Status allowedEntryStatus) { - ManifestWriter writer = write(formatVersion, reader.spec(), outputFile, snapshotId); + ManifestWriter writer = + newWriter(formatVersion, reader.spec(), outputFile, snapshotId, firstRowId); boolean threw = true; try { for (ManifestEntry entry : reader.entries()) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index be5afd190dc5..c2cb1bf8c85d 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -69,23 +69,44 @@ public long length() { return writer.length(); } + public Long nextRowId() { + return null; + } + static class V3Writer extends ManifestListWriter { private final V3Metadata.ManifestFileWrapper wrapper; - - V3Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + private Long nextRowId; + + V3Writer( + OutputFile snapshotFile, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber, + long firstRowId) { super( snapshotFile, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId), "sequence-number", String.valueOf(sequenceNumber), + "first-row-id", String.valueOf(firstRowId), "format-version", "3")); this.wrapper = new V3Metadata.ManifestFileWrapper(snapshotId, sequenceNumber); + this.nextRowId = firstRowId; } @Override protected ManifestFile prepare(ManifestFile manifest) { - return wrapper.wrap(manifest); + if (manifest.content() != ManifestContent.DATA || manifest.firstRowId() != null) { + return wrapper.wrap(manifest, null); + } else { + // assign first-row-id and update the next to assign + wrapper.wrap(manifest, nextRowId); + // leave space for existing and added rows, in case any of the existing data files do not + // have an assigned first-row-id (this is the case with manifests from pre-v3 snapshots) + this.nextRowId += manifest.existingRowsCount() + manifest.addedRowsCount(); + return wrapper; + } } @Override @@ -103,6 +124,11 @@ protected FileAppender newAppender(OutputFile file, Map fileClass() { private final InputFile file; private final InheritableMetadata inheritableMetadata; + private final Long firstRowId; private final FileType content; private final PartitionSpec spec; private final Schema fileSchema; @@ -103,8 +105,22 @@ protected ManifestReader( Map specsById, InheritableMetadata inheritableMetadata, FileType content) { + this(file, specId, specsById, inheritableMetadata, null, content); + } + + protected ManifestReader( + InputFile file, + int specId, + Map specsById, + InheritableMetadata inheritableMetadata, + Long firstRowId, + FileType content) { + Preconditions.checkArgument( + firstRowId == null || content == FileType.DATA_FILES, + "First row ID is not valid for delete manifests"); this.file = file; this.inheritableMetadata = inheritableMetadata; + this.firstRowId = firstRowId; this.content = content; if (specsById != null) { @@ -259,6 +275,9 @@ private CloseableIterable> open(Schema projection) { List fields = Lists.newArrayList(); fields.addAll(projection.asStruct().fields()); + if (projection.findField(DataFile.RECORD_COUNT.fieldId()) == null) { + fields.add(DataFile.RECORD_COUNT); + } fields.add(MetadataColumns.ROW_POSITION); CloseableIterable> reader = @@ -272,7 +291,9 @@ private CloseableIterable> open(Schema projection) { addCloseable(reader); - return CloseableIterable.transform(reader, inheritableMetadata::apply); + CloseableIterable> withMetadata = + CloseableIterable.transform(reader, inheritableMetadata::apply); + return CloseableIterable.transform(withMetadata, idAssigner(firstRowId)); } CloseableIterable> liveEntries() { @@ -370,4 +391,28 @@ static List withStatsColumns(Collection columns) { return projectColumns; } } + + private static > Function, ManifestEntry> idAssigner( + Long firstRowId) { + if (firstRowId != null) { + return new Function<>() { + private long nextRowId = firstRowId; + + @Override + public ManifestEntry apply(ManifestEntry entry) { + if (entry.file() instanceof BaseFile && entry.status() != ManifestEntry.Status.DELETED) { + BaseFile file = (BaseFile) entry.file(); + if (null == file.firstRowId()) { + file.setFirstRowId(nextRowId); + nextRowId += file.recordCount(); + } + } + + return entry; + } + }; + } else { + return Function.identity(); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 4f6f5c67d620..95064759ebe9 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -44,6 +44,7 @@ public abstract class ManifestWriter> implements FileAp private final Long snapshotId; private final GenericManifestEntry reused; private final PartitionSummary stats; + private final Long firstRowId; private boolean closed = false; private int addedFiles = 0; @@ -54,7 +55,8 @@ public abstract class ManifestWriter> implements FileAp private long deletedRows = 0L; private Long minDataSequenceNumber = null; - private ManifestWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { + private ManifestWriter( + PartitionSpec spec, EncryptedOutputFile file, Long snapshotId, Long firstRowId) { this.file = file.encryptingOutputFile(); this.specId = spec.specId(); this.writer = newAppender(spec, this.file); @@ -62,6 +64,7 @@ private ManifestWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapsh this.reused = new GenericManifestEntry<>(V1Metadata.entrySchema(spec.partitionType()).asStruct()); this.stats = new PartitionSummary(spec); + this.firstRowId = firstRowId; this.keyMetadataBuffer = (file.keyMetadata() == null) ? null : file.keyMetadata().buffer(); } @@ -201,14 +204,15 @@ public ManifestFile toManifestFile() { UNASSIGNED_SEQ, minSeqNumber, snapshotId, + stats.summaries(), + keyMetadataBuffer, addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, - stats.summaries(), - keyMetadataBuffer); + firstRowId); } @Override @@ -220,8 +224,8 @@ public void close() throws IOException { static class V3Writer extends ManifestWriter { private final V3Metadata.ManifestEntryWrapper entryWrapper; - V3Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + V3Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId, Long firstRowId) { + super(spec, file, snapshotId, firstRowId); this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId); } @@ -256,7 +260,7 @@ static class V3DeleteWriter extends ManifestWriter { private final V3Metadata.ManifestEntryWrapper entryWrapper; V3DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + super(spec, file, snapshotId, null); this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId); } @@ -296,7 +300,7 @@ static class V2Writer extends ManifestWriter { private final V2Metadata.ManifestEntryWrapper entryWrapper; V2Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + super(spec, file, snapshotId, null); this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId); } @@ -331,7 +335,7 @@ static class V2DeleteWriter extends ManifestWriter { private final V2Metadata.ManifestEntryWrapper entryWrapper; V2DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + super(spec, file, snapshotId, null); this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId); } @@ -371,7 +375,7 @@ static class V1Writer extends ManifestWriter { private final V1Metadata.ManifestEntryWrapper entryWrapper; V1Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + super(spec, file, snapshotId, null); this.entryWrapper = new V1Metadata.ManifestEntryWrapper(); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 75dd7410115e..6713912151aa 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -326,6 +326,7 @@ private ManifestFile copyManifest(ManifestFile manifest) { return ManifestFiles.copyAppendManifest( current.formatVersion(), manifest.partitionSpecId(), + manifest.firstRowId(), toCopy, current.specsById(), newManifestFile, diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index e38683161d20..c585d5e995d6 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -252,7 +252,8 @@ public static RewriteResult rewriteManifestList( outputFile, snapshot.snapshotId(), snapshot.parentId(), - snapshot.sequenceNumber())) { + snapshot.sequenceNumber(), + snapshot.firstRowId())) { for (ManifestFile file : manifestFiles) { ManifestFile newFile = file.copy(); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index ced5fddc5f04..c4225e191b72 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -41,7 +41,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.UUID; @@ -75,6 +74,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.IntMath; import org.apache.iceberg.util.Exceptions; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; @@ -263,14 +263,16 @@ public Snapshot apply() { OutputFile manifestList = manifestListPath(); - try (ManifestListWriter writer = + ManifestListWriter writer = ManifestLists.write( ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId, - sequenceNumber)) { + sequenceNumber, + base.nextRowId()); + try (writer) { // keep track of the manifest lists created manifestLists.add(manifestList.location()); @@ -287,11 +289,28 @@ public Snapshot apply() { throw new RuntimeIOException(e, "Failed to write manifest list file"); } - Long addedRows = null; - Long firstRowId = null; + Long nextRowId = null; + Long assignedRows = null; if (base.formatVersion() >= 3) { - addedRows = calculateAddedRows(manifests); - firstRowId = base.nextRowId(); + nextRowId = base.nextRowId(); + assignedRows = writer.nextRowId() - base.nextRowId(); + } + + Map summary = summary(); + String operation = operation(); + + if (summary != null && DataOperations.REPLACE.equals(operation)) { + long addedRecords = + PropertyUtil.propertyAsLong(summary, SnapshotSummary.ADDED_RECORDS_PROP, 0L); + long replacedRecords = + PropertyUtil.propertyAsLong(summary, SnapshotSummary.DELETED_RECORDS_PROP, 0L); + + // added may be less than replaced when records are already deleted by delete files + Preconditions.checkArgument( + addedRecords <= replacedRecords, + "Invalid REPLACE operation: %s added records > %s replaced records", + addedRecords, + replacedRecords); } return new BaseSnapshot( @@ -303,27 +322,8 @@ public Snapshot apply() { summary(base), base.currentSchemaId(), manifestList.location(), - firstRowId, - addedRows); - } - - private Long calculateAddedRows(List manifests) { - return manifests.stream() - .filter( - manifest -> - manifest.snapshotId() == null - || Objects.equals(manifest.snapshotId(), this.snapshotId)) - .filter(manifest -> manifest.content() == ManifestContent.DATA) - .mapToLong( - manifest -> { - Preconditions.checkArgument( - manifest.addedRowsCount() != null, - "Cannot determine number of added rows in snapshot because" - + " the entry for manifest %s is missing the field `added-rows-count`", - manifest.path()); - return manifest.addedRowsCount(); - }) - .sum(); + nextRowId, + assignedRows); } protected abstract Map summary(); @@ -752,13 +752,14 @@ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manife manifest.sequenceNumber(), manifest.minSequenceNumber(), snapshotId, + stats.summaries(), + null, addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, - stats.summaries(), null); } catch (IOException e) { diff --git a/core/src/main/java/org/apache/iceberg/V3Metadata.java b/core/src/main/java/org/apache/iceberg/V3Metadata.java index d9134951dabf..266baefaf2d0 100644 --- a/core/src/main/java/org/apache/iceberg/V3Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V3Metadata.java @@ -46,7 +46,8 @@ private V3Metadata() {} ManifestFile.EXISTING_ROWS_COUNT.asRequired(), ManifestFile.DELETED_ROWS_COUNT.asRequired(), ManifestFile.PARTITION_SUMMARIES, - ManifestFile.KEY_METADATA); + ManifestFile.KEY_METADATA, + ManifestFile.FIRST_ROW_ID); /** * A wrapper class to write any ManifestFile implementation to Avro using the v3 write schema. @@ -58,14 +59,16 @@ static class ManifestFileWrapper implements ManifestFile, StructLike { private final long commitSnapshotId; private final long sequenceNumber; private ManifestFile wrapped = null; + private Long wrappedFirstRowId = null; ManifestFileWrapper(long commitSnapshotId, long sequenceNumber) { this.commitSnapshotId = commitSnapshotId; this.sequenceNumber = sequenceNumber; } - public ManifestFile wrap(ManifestFile file) { + public ManifestFile wrap(ManifestFile file, Long firstRowId) { this.wrapped = file; + this.wrappedFirstRowId = firstRowId; return this; } @@ -140,6 +143,22 @@ private Object get(int pos) { return wrapped.partitions(); case 14: return wrapped.keyMetadata(); + case 15: + if (wrappedFirstRowId != null) { + // if first-row-id is assigned, ensure that it is valid + Preconditions.checkState( + wrapped.content() == ManifestContent.DATA && wrapped.firstRowId() == null, + "Found invalid first-row-id assignment: %s", + wrapped); + return wrappedFirstRowId; + } else if (wrapped.content() != ManifestContent.DATA) { + return null; + } else { + Preconditions.checkState( + wrapped.firstRowId() != null, + "Found unassigned first-row-id for file: " + wrapped.path()); + return wrapped.firstRowId(); + } default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -235,6 +254,11 @@ public ByteBuffer keyMetadata() { return wrapped.keyMetadata(); } + @Override + public Long firstRowId() { + return wrapped.firstRowId(); + } + @Override public ManifestFile copy() { return wrapped.copy(); @@ -274,6 +298,7 @@ static Types.StructType fileType(Types.StructType partitionType) { DataFile.SPLIT_OFFSETS, DataFile.EQUALITY_IDS, DataFile.SORT_ORDER_ID, + DataFile.FIRST_ROW_ID, DataFile.REFERENCED_DATA_FILE, DataFile.CONTENT_OFFSET, DataFile.CONTENT_SIZE); @@ -462,18 +487,24 @@ private Object get(int pos) { case 15: return wrapped.sortOrderId(); case 16: + if (wrapped.content() == FileContent.DATA) { + return wrapped.firstRowId(); + } else { + return null; + } + case 17: if (wrapped.content() == FileContent.POSITION_DELETES) { return ((DeleteFile) wrapped).referencedDataFile(); } else { return null; } - case 17: + case 18: if (wrapped.content() == FileContent.POSITION_DELETES) { return ((DeleteFile) wrapped).contentOffset(); } else { return null; } - case 18: + case 19: if (wrapped.content() == FileContent.POSITION_DELETES) { return ((DeleteFile) wrapped).contentSizeInBytes(); } else { @@ -588,6 +619,11 @@ public Long fileSequenceNumber() { return wrapped.fileSequenceNumber(); } + @Override + public Long firstRowId() { + return wrapped.firstRowId(); + } + @Override public F copy() { throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper"); diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java index 37d48f5a02c5..de72d3207a20 100644 --- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java @@ -339,6 +339,11 @@ public static void writeIntegerFieldIf( } } + public static void writeIntegerFieldIfPresent(String key, Integer value, JsonGenerator generator) + throws IOException { + writeIntegerFieldIf(value != null, key, value, generator); + } + public static void writeLongFieldIf( boolean condition, String key, Long value, JsonGenerator generator) throws IOException { if (condition) { @@ -346,6 +351,11 @@ public static void writeLongFieldIf( } } + public static void writeLongFieldIfPresent(String key, Long value, JsonGenerator generator) + throws IOException { + writeLongFieldIf(value != null, key, value, generator); + } + abstract static class JsonArrayIterator implements Iterator { private final Iterator elements; diff --git a/core/src/test/java/org/apache/iceberg/TestFilesTableTaskParser.java b/core/src/test/java/org/apache/iceberg/TestFilesTableTaskParser.java index bea60601377e..aaeeb8d9f677 100644 --- a/core/src/test/java/org/apache/iceberg/TestFilesTableTaskParser.java +++ b/core/src/test/java/org/apache/iceberg/TestFilesTableTaskParser.java @@ -109,7 +109,8 @@ private String taskJson() { + "\"added-files-count\":1,\"existing-files-count\":3,\"deleted-files-count\":0," + "\"added-rows-count\":10,\"existing-rows-count\":30,\"deleted-rows-count\":0," + "\"partition-field-summary\":[{\"contains-null\":true,\"contains-nan\":false," - + "\"lower-bound\":\"0A000000\",\"upper-bound\":\"64000000\"}],\"key-metadata\":\"DB030000\"}}"; + + "\"lower-bound\":\"0A000000\",\"upper-bound\":\"64000000\"}],\"key-metadata\":\"DB030000\"," + + "\"first-row-id\":10}}"; } private void assertTaskEquals( diff --git a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java index 6438b794cf53..e4693f68d350 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java @@ -18,236 +18,12 @@ */ package org.apache.iceberg; -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Path; -import java.util.List; -import org.apache.avro.InvalidAvroMagicException; -import org.apache.iceberg.encryption.EncryptedOutputFile; -import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.encryption.EncryptionTestHelpers; -import org.apache.iceberg.encryption.PlaintextEncryptionManager; -import org.apache.iceberg.exceptions.RuntimeIOException; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Conversions; -import org.apache.iceberg.types.Types; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -public class TestManifestEncryption { - private static final FileIO FILE_IO = new TestTables.LocalFileIO(); - - private static final Schema SCHEMA = - new Schema( - required(1, "id", Types.LongType.get()), - required(2, "timestamp", Types.TimestampType.withZone()), - required(3, "category", Types.StringType.get()), - required(4, "data", Types.StringType.get()), - required(5, "double", Types.DoubleType.get())); - - private static final PartitionSpec SPEC = - PartitionSpec.builderFor(SCHEMA) - .identity("category") - .hour("timestamp") - .bucket("id", 16) - .build(); - - private static final long SNAPSHOT_ID = 987134631982734L; - private static final String PATH = - "s3://bucket/table/category=cheesy/timestamp_hour=10/id_bucket=3/file.avro"; - private static final FileFormat FORMAT = FileFormat.AVRO; - private static final PartitionData PARTITION = - DataFiles.data(SPEC, "category=cheesy/timestamp_hour=10/id_bucket=3"); - private static final Metrics METRICS = - new Metrics( - 1587L, - ImmutableMap.of(1, 15L, 2, 122L, 3, 4021L, 4, 9411L, 5, 15L), // sizes - ImmutableMap.of(1, 100L, 2, 100L, 3, 100L, 4, 100L, 5, 100L), // value counts - ImmutableMap.of(1, 0L, 2, 0L, 3, 0L, 4, 0L, 5, 0L), // null value counts - ImmutableMap.of(5, 10L), // nan value counts - ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1)), // lower bounds - ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds - private static final List OFFSETS = ImmutableList.of(4L); - private static final Integer SORT_ORDER_ID = 2; - - private static final ByteBuffer CONTENT_KEY_METADATA = ByteBuffer.allocate(100); - - private static final DataFile DATA_FILE = - new GenericDataFile( - 0, - PATH, - FORMAT, - PARTITION, - 150972L, - METRICS, - CONTENT_KEY_METADATA, - OFFSETS, - SORT_ORDER_ID); - - private static final List EQUALITY_IDS = ImmutableList.of(1); - private static final int[] EQUALITY_ID_ARR = new int[] {1}; - - private static final DeleteFile DELETE_FILE = - new GenericDeleteFile( - SPEC.specId(), - FileContent.EQUALITY_DELETES, - PATH, - FORMAT, - PARTITION, - 22905L, - METRICS, - EQUALITY_ID_ARR, - SORT_ORDER_ID, - null, - CONTENT_KEY_METADATA, - null, - null, - null); - - private static final EncryptionManager ENCRYPTION_MANAGER = - EncryptionTestHelpers.createEncryptionManager(); - - @TempDir private Path temp; - - @Test - public void testV1Write() throws IOException { - ManifestFile manifest = writeManifest(1); - checkEntry( - readManifest(manifest), - ManifestWriter.UNASSIGNED_SEQ, - ManifestWriter.UNASSIGNED_SEQ, - FileContent.DATA); - } - - @Test - public void testV2Write() throws IOException { - ManifestFile manifest = writeManifest(2); - checkEntry( - readManifest(manifest), - ManifestWriter.UNASSIGNED_SEQ, - ManifestWriter.UNASSIGNED_SEQ, - FileContent.DATA); - } - - @Test - public void testV2WriteDelete() throws IOException { - ManifestFile manifest = writeDeleteManifest(2); - checkEntry( - readDeleteManifest(manifest), - ManifestWriter.UNASSIGNED_SEQ, - ManifestWriter.UNASSIGNED_SEQ, - FileContent.EQUALITY_DELETES); - } - - void checkEntry( - ManifestEntry entry, - Long expectedDataSequenceNumber, - Long expectedFileSequenceNumber, - FileContent content) { - assertThat(entry.status()).isEqualTo(ManifestEntry.Status.ADDED); - assertThat(entry.snapshotId()).isEqualTo(SNAPSHOT_ID); - assertThat(entry.dataSequenceNumber()).isEqualTo(expectedDataSequenceNumber); - assertThat(entry.fileSequenceNumber()).isEqualTo(expectedFileSequenceNumber); - checkDataFile(entry.file(), content); - } - - void checkDataFile(ContentFile dataFile, FileContent content) { - // DataFile is the superclass of DeleteFile, so this method can check both - assertThat(dataFile.content()).isEqualTo(content); - assertThat(dataFile.location()).isEqualTo(PATH); - assertThat(dataFile.format()).isEqualTo(FORMAT); - assertThat(dataFile.partition()).isEqualTo(PARTITION); - assertThat(dataFile.recordCount()).isEqualTo(METRICS.recordCount()); - assertThat(dataFile.columnSizes()).isEqualTo(METRICS.columnSizes()); - assertThat(dataFile.valueCounts()).isEqualTo(METRICS.valueCounts()); - assertThat(dataFile.nullValueCounts()).isEqualTo(METRICS.nullValueCounts()); - assertThat(dataFile.nanValueCounts()).isEqualTo(METRICS.nanValueCounts()); - assertThat(dataFile.lowerBounds()).isEqualTo(METRICS.lowerBounds()); - assertThat(dataFile.upperBounds()).isEqualTo(METRICS.upperBounds()); - assertThat(dataFile.sortOrderId()).isEqualTo(SORT_ORDER_ID); - if (dataFile.content() == FileContent.EQUALITY_DELETES) { - assertThat(dataFile.equalityFieldIds()).isEqualTo(EQUALITY_IDS); - } else { - assertThat(dataFile.equalityFieldIds()).isNull(); - } - } - - private ManifestFile writeManifest(int formatVersion) throws IOException { - return writeManifest(DATA_FILE, formatVersion); - } - - private ManifestFile writeManifest(DataFile file, int formatVersion) throws IOException { - OutputFile manifestFile = - Files.localOutput( - FileFormat.AVRO.addExtension( - File.createTempFile("manifest", null, temp.toFile()).toString())); - EncryptedOutputFile encryptedManifest = ENCRYPTION_MANAGER.encrypt(manifestFile); - ManifestWriter writer = - ManifestFiles.write(formatVersion, SPEC, encryptedManifest, SNAPSHOT_ID); - try { - writer.add(file); - } finally { - writer.close(); - } - return writer.toManifestFile(); - } - - private ManifestEntry readManifest(ManifestFile manifest) throws IOException { - // First try to read without decryption - assertThatThrownBy( - () -> - ManifestFiles.read( - manifest, - EncryptingFileIO.combine(FILE_IO, PlaintextEncryptionManager.instance()), - null)) - .isInstanceOf(RuntimeIOException.class) - .hasMessageContaining("Failed to open file") - .hasCauseInstanceOf(InvalidAvroMagicException.class); - - try (CloseableIterable> reader = - ManifestFiles.read(manifest, EncryptingFileIO.combine(FILE_IO, ENCRYPTION_MANAGER), null) - .entries()) { - List> files = Lists.newArrayList(reader); - assertThat(files).hasSize(1); - return files.get(0); - } - } - - private ManifestFile writeDeleteManifest(int formatVersion) throws IOException { - OutputFile manifestFile = - Files.localOutput( - FileFormat.AVRO.addExtension( - File.createTempFile("manifest", null, temp.toFile()).toString())); - EncryptedOutputFile encryptedManifest = ENCRYPTION_MANAGER.encrypt(manifestFile); - ManifestWriter writer = - ManifestFiles.writeDeleteManifest(formatVersion, SPEC, encryptedManifest, SNAPSHOT_ID); - try { - writer.add(DELETE_FILE); - } finally { - writer.close(); - } - return writer.toManifestFile(); - } - private ManifestEntry readDeleteManifest(ManifestFile manifest) throws IOException { - try (CloseableIterable> reader = - ManifestFiles.readDeleteManifest( - manifest, EncryptingFileIO.combine(FILE_IO, ENCRYPTION_MANAGER), null) - .entries()) { - List> entries = Lists.newArrayList(reader); - assertThat(entries).hasSize(1); - return entries.get(0); - } +public class TestManifestEncryption extends TestManifestWriterVersions { + @Override + protected EncryptionManager encryptionManager() { + return EncryptionTestHelpers.createEncryptionManager(); } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java b/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java index 5a6e99c984f0..cfc42fe96427 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java @@ -89,14 +89,15 @@ static ManifestFile createManifestFile() { 1L, 2L, 12345678901234567L, + partitionFieldSummaries, + keyMetadata, 1, 10L, 3, 30L, 0, 0L, - partitionFieldSummaries, - keyMetadata); + 10L); } private String manifestFileJson() { @@ -106,6 +107,6 @@ private String manifestFileJson() { + "\"added-rows-count\":10,\"existing-rows-count\":30,\"deleted-rows-count\":0," + "\"partition-field-summary\":[{\"contains-null\":true,\"contains-nan\":false," + "\"lower-bound\":\"0A000000\",\"upper-bound\":\"64000000\"}]," - + "\"key-metadata\":\"DB030000\"}"; + + "\"key-metadata\":\"DB030000\",\"first-row-id\":10}"; } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java index afbee9be1375..70590b1bb385 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java @@ -45,8 +45,12 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; public class TestManifestListVersions { + private static final int ROW_LINEAGE_FORMAT_VERSION = 3; + private static final String PATH = "s3://bucket/table/m1.avro"; private static final long LENGTH = 1024L; private static final int SPEC_ID = 1; @@ -62,6 +66,8 @@ public class TestManifestListVersions { private static final List PARTITION_SUMMARIES = ImmutableList.of(); private static final ByteBuffer KEY_METADATA = null; + private static final long FIRST_ROW_ID = 100L; + private static final long SNAPSHOT_FIRST_ROW_ID = 130L; private static final ManifestFile TEST_MANIFEST = new GenericManifestFile( @@ -72,14 +78,15 @@ public class TestManifestListVersions { SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, + PARTITION_SUMMARIES, + KEY_METADATA, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - PARTITION_SUMMARIES, - KEY_METADATA); + FIRST_ROW_ID); private static final ManifestFile TEST_DELETE_MANIFEST = new GenericManifestFile( @@ -90,14 +97,15 @@ public class TestManifestListVersions { SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, + PARTITION_SUMMARIES, + KEY_METADATA, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - PARTITION_SUMMARIES, - KEY_METADATA); + null); @TempDir private Path temp; @@ -112,6 +120,9 @@ public void testV1WriteDeleteManifest() { public void testV1Write() throws IOException { ManifestFile manifest = writeAndReadManifestList(1); + // v3 fields are not written and are defaulted + assertThat(manifest.firstRowId()).isNull(); + // v2 fields are not written and are defaulted assertThat(manifest.sequenceNumber()).isEqualTo(0); assertThat(manifest.minSequenceNumber()).isEqualTo(0); @@ -134,6 +145,9 @@ public void testV1Write() throws IOException { public void testV2Write() throws IOException { ManifestFile manifest = writeAndReadManifestList(2); + // v3 fields are not written and are defaulted + assertThat(manifest.firstRowId()).isNull(); + // all v2 fields should be read correctly assertThat(manifest.path()).isEqualTo(PATH); assertThat(manifest.length()).isEqualTo(LENGTH); @@ -150,25 +164,149 @@ public void testV2Write() throws IOException { assertThat(manifest.deletedRowsCount()).isEqualTo(DELETED_ROWS); } + @Test + public void testV3Write() throws IOException { + ManifestFile manifest = writeAndReadManifestList(3); + + // all v3 fields should be read correctly + assertThat(manifest.path()).isEqualTo(PATH); + assertThat(manifest.length()).isEqualTo(LENGTH); + assertThat(manifest.partitionSpecId()).isEqualTo(SPEC_ID); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + assertThat(manifest.sequenceNumber()).isEqualTo(SEQ_NUM); + assertThat(manifest.minSequenceNumber()).isEqualTo(MIN_SEQ_NUM); + assertThat(manifest.snapshotId()).isEqualTo(SNAPSHOT_ID); + assertThat(manifest.addedFilesCount()).isEqualTo(ADDED_FILES); + assertThat(manifest.addedRowsCount()).isEqualTo(ADDED_ROWS); + assertThat(manifest.existingFilesCount()).isEqualTo(EXISTING_FILES); + assertThat(manifest.existingRowsCount()).isEqualTo(EXISTING_ROWS); + assertThat(manifest.deletedFilesCount()).isEqualTo(DELETED_FILES); + assertThat(manifest.deletedRowsCount()).isEqualTo(DELETED_ROWS); + assertThat(manifest.firstRowId()).isEqualTo(FIRST_ROW_ID); + } + + @Test + public void testV3WriteFirstRowIdAssignment() throws IOException { + ManifestFile missingFirstRowId = + new GenericManifestFile( + PATH, + LENGTH, + SPEC_ID, + ManifestContent.DATA, + SEQ_NUM, + MIN_SEQ_NUM, + SNAPSHOT_ID, + PARTITION_SUMMARIES, + KEY_METADATA, + ADDED_FILES, + ADDED_ROWS, + EXISTING_FILES, + EXISTING_ROWS, + DELETED_FILES, + DELETED_ROWS, + null); + + // write uses firstRowId=SNAPSHOT_FIRST_ROW_ID and ADDED_ROWS are assigned + long nextRowId = + SNAPSHOT_FIRST_ROW_ID + + missingFirstRowId.addedRowsCount() + + missingFirstRowId.existingRowsCount(); + ManifestFile manifest = + Iterables.getOnlyElement( + ManifestLists.read(writeManifestList(3, nextRowId, missingFirstRowId))); + + // all v3 fields should be read correctly + assertThat(manifest.path()).isEqualTo(PATH); + assertThat(manifest.length()).isEqualTo(LENGTH); + assertThat(manifest.partitionSpecId()).isEqualTo(SPEC_ID); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + assertThat(manifest.sequenceNumber()).isEqualTo(SEQ_NUM); + assertThat(manifest.minSequenceNumber()).isEqualTo(MIN_SEQ_NUM); + assertThat(manifest.snapshotId()).isEqualTo(SNAPSHOT_ID); + assertThat(manifest.addedFilesCount()).isEqualTo(ADDED_FILES); + assertThat(manifest.addedRowsCount()).isEqualTo(ADDED_ROWS); + assertThat(manifest.existingFilesCount()).isEqualTo(EXISTING_FILES); + assertThat(manifest.existingRowsCount()).isEqualTo(EXISTING_ROWS); + assertThat(manifest.deletedFilesCount()).isEqualTo(DELETED_FILES); + assertThat(manifest.deletedRowsCount()).isEqualTo(DELETED_ROWS); + assertThat(manifest.firstRowId()).isEqualTo(SNAPSHOT_FIRST_ROW_ID); + } + + @Test + public void testV3WriteMixedRowIdAssignment() throws IOException { + ManifestFile missingFirstRowId = + new GenericManifestFile( + PATH, + LENGTH, + SPEC_ID, + ManifestContent.DATA, + SEQ_NUM, + MIN_SEQ_NUM, + SNAPSHOT_ID, + PARTITION_SUMMARIES, + KEY_METADATA, + ADDED_FILES, + ADDED_ROWS, + EXISTING_FILES, + EXISTING_ROWS, + DELETED_FILES, + DELETED_ROWS, + null); + + // write uses firstRowId=SNAPSHOT_FIRST_ROW_ID and ADDED_ROWS are assigned twice + long nextRowId = + SNAPSHOT_FIRST_ROW_ID + + 2 * (missingFirstRowId.addedRowsCount() + missingFirstRowId.existingRowsCount()); + List manifests = + ManifestLists.read( + writeManifestList(3, nextRowId, missingFirstRowId, TEST_MANIFEST, missingFirstRowId)); + + // all v2 fields should be read correctly + for (ManifestFile manifest : manifests) { + assertThat(manifest.path()).isEqualTo(PATH); + assertThat(manifest.length()).isEqualTo(LENGTH); + assertThat(manifest.partitionSpecId()).isEqualTo(SPEC_ID); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + assertThat(manifest.sequenceNumber()).isEqualTo(SEQ_NUM); + assertThat(manifest.minSequenceNumber()).isEqualTo(MIN_SEQ_NUM); + assertThat(manifest.snapshotId()).isEqualTo(SNAPSHOT_ID); + assertThat(manifest.addedFilesCount()).isEqualTo(ADDED_FILES); + assertThat(manifest.addedRowsCount()).isEqualTo(ADDED_ROWS); + assertThat(manifest.existingFilesCount()).isEqualTo(EXISTING_FILES); + assertThat(manifest.existingRowsCount()).isEqualTo(EXISTING_ROWS); + assertThat(manifest.deletedFilesCount()).isEqualTo(DELETED_FILES); + assertThat(manifest.deletedRowsCount()).isEqualTo(DELETED_ROWS); + } + + assertThat(manifests.get(0).firstRowId()).isEqualTo(SNAPSHOT_FIRST_ROW_ID); + assertThat(manifests.get(1).firstRowId()).isEqualTo(TEST_MANIFEST.firstRowId()); + assertThat(manifests.get(2).firstRowId()) + .isEqualTo( + SNAPSHOT_FIRST_ROW_ID + + missingFirstRowId.existingRowsCount() + + missingFirstRowId.addedRowsCount()); + } + @Test public void testV1ForwardCompatibility() throws IOException { InputFile manifestList = writeManifestList(TEST_MANIFEST, 1); GenericData.Record generic = readGeneric(manifestList, V1Metadata.MANIFEST_LIST_SCHEMA); // v1 metadata should match even though order changed - assertThat(generic.get("manifest_path")).asString().isEqualTo(PATH); - assertThat(generic.get("manifest_length")).isEqualTo(LENGTH); - assertThat(generic.get("partition_spec_id")).isEqualTo(SPEC_ID); - assertThat(generic.get("added_snapshot_id")).isEqualTo(SNAPSHOT_ID); - assertThat(generic.get("added_files_count")).isEqualTo(ADDED_FILES); - assertThat(generic.get("existing_files_count")).isEqualTo(EXISTING_FILES); - assertThat(generic.get("deleted_files_count")).isEqualTo(DELETED_FILES); - assertThat(generic.get("added_rows_count")).isEqualTo(ADDED_ROWS); - assertThat(generic.get("existing_rows_count")).isEqualTo(EXISTING_ROWS); - assertThat(generic.get("deleted_rows_count")).isEqualTo(DELETED_ROWS); + assertThat(generic.get(ManifestFile.PATH.name())).asString().isEqualTo(PATH); + assertThat(generic.get(ManifestFile.LENGTH.name())).isEqualTo(LENGTH); + assertThat(generic.get(ManifestFile.SPEC_ID.name())).isEqualTo(SPEC_ID); + assertThat(generic.get(ManifestFile.SNAPSHOT_ID.name())).isEqualTo(SNAPSHOT_ID); + assertThat(generic.get(ManifestFile.ADDED_FILES_COUNT.name())).isEqualTo(ADDED_FILES); + assertThat(generic.get(ManifestFile.EXISTING_FILES_COUNT.name())).isEqualTo(EXISTING_FILES); + assertThat(generic.get(ManifestFile.DELETED_FILES_COUNT.name())).isEqualTo(DELETED_FILES); + assertThat(generic.get(ManifestFile.ADDED_ROWS_COUNT.name())).isEqualTo(ADDED_ROWS); + assertThat(generic.get(ManifestFile.EXISTING_ROWS_COUNT.name())).isEqualTo(EXISTING_ROWS); + assertThat(generic.get(ManifestFile.DELETED_ROWS_COUNT.name())).isEqualTo(DELETED_ROWS); assertEmptyAvroField(generic, ManifestFile.MANIFEST_CONTENT.name()); assertEmptyAvroField(generic, ManifestFile.SEQUENCE_NUMBER.name()); assertEmptyAvroField(generic, ManifestFile.MIN_SEQUENCE_NUMBER.name()); + assertEmptyAvroField(generic, ManifestFile.FIRST_ROW_ID.name()); } @Test @@ -179,19 +317,20 @@ public void testV2ForwardCompatibility() throws IOException { GenericData.Record generic = readGeneric(manifestList, V1Metadata.MANIFEST_LIST_SCHEMA); // v1 metadata should match even though order changed - assertThat(generic.get("manifest_path")).asString().isEqualTo(PATH); - assertThat(generic.get("manifest_length")).isEqualTo(LENGTH); - assertThat(generic.get("partition_spec_id")).isEqualTo(SPEC_ID); - assertThat(generic.get("added_snapshot_id")).isEqualTo(SNAPSHOT_ID); - assertThat(generic.get("added_files_count")).isEqualTo(ADDED_FILES); - assertThat(generic.get("existing_files_count")).isEqualTo(EXISTING_FILES); - assertThat(generic.get("deleted_files_count")).isEqualTo(DELETED_FILES); - assertThat(generic.get("added_rows_count")).isEqualTo(ADDED_ROWS); - assertThat(generic.get("existing_rows_count")).isEqualTo(EXISTING_ROWS); - assertThat(generic.get("deleted_rows_count")).isEqualTo(DELETED_ROWS); + assertThat(generic.get(ManifestFile.PATH.name())).asString().isEqualTo(PATH); + assertThat(generic.get(ManifestFile.LENGTH.name())).isEqualTo(LENGTH); + assertThat(generic.get(ManifestFile.SPEC_ID.name())).isEqualTo(SPEC_ID); + assertThat(generic.get(ManifestFile.SNAPSHOT_ID.name())).isEqualTo(SNAPSHOT_ID); + assertThat(generic.get(ManifestFile.ADDED_FILES_COUNT.name())).isEqualTo(ADDED_FILES); + assertThat(generic.get(ManifestFile.EXISTING_FILES_COUNT.name())).isEqualTo(EXISTING_FILES); + assertThat(generic.get(ManifestFile.DELETED_FILES_COUNT.name())).isEqualTo(DELETED_FILES); + assertThat(generic.get(ManifestFile.ADDED_ROWS_COUNT.name())).isEqualTo(ADDED_ROWS); + assertThat(generic.get(ManifestFile.EXISTING_ROWS_COUNT.name())).isEqualTo(EXISTING_ROWS); + assertThat(generic.get(ManifestFile.DELETED_ROWS_COUNT.name())).isEqualTo(DELETED_ROWS); assertEmptyAvroField(generic, ManifestFile.MANIFEST_CONTENT.name()); assertEmptyAvroField(generic, ManifestFile.SEQUENCE_NUMBER.name()); assertEmptyAvroField(generic, ManifestFile.MIN_SEQUENCE_NUMBER.name()); + assertEmptyAvroField(generic, ManifestFile.FIRST_ROW_ID.name()); } @Test @@ -201,14 +340,14 @@ public void testManifestsWithoutRowStats() throws IOException { Collection columnNamesWithoutRowStats = ImmutableList.of( - "manifest_path", - "manifest_length", - "partition_spec_id", - "added_snapshot_id", - "added_files_count", - "existing_files_count", - "deleted_files_count", - "partitions"); + ManifestFile.PATH.name(), + ManifestFile.LENGTH.name(), + ManifestFile.SPEC_ID.name(), + ManifestFile.SNAPSHOT_ID.name(), + ManifestFile.ADDED_FILES_COUNT.name(), + ManifestFile.EXISTING_FILES_COUNT.name(), + ManifestFile.DELETED_FILES_COUNT.name(), + ManifestFile.PARTITION_SUMMARIES.name()); Schema schemaWithoutRowStats = V1Metadata.MANIFEST_LIST_SCHEMA.select(columnNamesWithoutRowStats); @@ -224,14 +363,14 @@ public void testManifestsWithoutRowStats() throws IOException { AvroSchemaUtil.convert(schemaWithoutRowStats, "manifest_file"); GenericData.Record withoutRowStats = new GenericRecordBuilder(avroSchema) - .set("manifest_path", "path/to/manifest.avro") - .set("manifest_length", 1024L) - .set("partition_spec_id", 1) - .set("added_snapshot_id", 100L) - .set("added_files_count", 2) - .set("existing_files_count", 3) - .set("deleted_files_count", 4) - .set("partitions", null) + .set(ManifestFile.PATH.name(), "path/to/manifest.avro") + .set(ManifestFile.LENGTH.name(), 1024L) + .set(ManifestFile.SPEC_ID.name(), 1) + .set(ManifestFile.SNAPSHOT_ID.name(), 100L) + .set(ManifestFile.ADDED_FILES_COUNT.name(), 2) + .set(ManifestFile.EXISTING_FILES_COUNT.name(), 3) + .set(ManifestFile.DELETED_FILES_COUNT.name(), 4) + .set(ManifestFile.PARTITION_SUMMARIES.name(), null) .build(); appender.add(withoutRowStats); } @@ -248,10 +387,12 @@ public void testManifestsWithoutRowStats() throws IOException { assertThat(manifest.hasDeletedFiles()).isTrue(); assertThat(manifest.deletedFilesCount()).isEqualTo(4); assertThat(manifest.deletedRowsCount()).isNull(); + assertThat(manifest.firstRowId()).isNull(); } - @Test - public void testManifestsPartitionSummary() throws IOException { + @ParameterizedTest + @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS") + public void testManifestsPartitionSummary(int formatVersion) throws IOException { ByteBuffer firstSummaryLowerBound = Conversions.toByteBuffer(Types.IntegerType.get(), 10); ByteBuffer firstSummaryUpperBound = Conversions.toByteBuffer(Types.IntegerType.get(), 100); ByteBuffer secondSummaryLowerBound = Conversions.toByteBuffer(Types.IntegerType.get(), 20); @@ -271,16 +412,21 @@ public void testManifestsPartitionSummary() throws IOException { SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, + partitionFieldSummaries, + KEY_METADATA, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - partitionFieldSummaries, - KEY_METADATA); + null); - InputFile manifestList = writeManifestList(manifest, 2); + InputFile manifestList = + writeManifestList( + formatVersion, + SNAPSHOT_FIRST_ROW_ID + manifest.addedRowsCount() + manifest.existingRowsCount(), + manifest); List files = ManifestLists.read(manifestList); ManifestFile returnedManifest = Iterables.getOnlyElement(files); @@ -300,17 +446,34 @@ public void testManifestsPartitionSummary() throws IOException { } private InputFile writeManifestList(ManifestFile manifest, int formatVersion) throws IOException { - OutputFile manifestList = new InMemoryOutputFile(); - try (FileAppender writer = + return writeManifestList(formatVersion, SNAPSHOT_FIRST_ROW_ID, manifest); + } + + private InputFile writeManifestList( + int formatVersion, long expectedNextRowId, ManifestFile... manifests) throws IOException { + OutputFile outputFile = new InMemoryOutputFile(); + ManifestListWriter writer = ManifestLists.write( formatVersion, - manifestList, + outputFile, SNAPSHOT_ID, SNAPSHOT_ID - 1, - formatVersion > 1 ? SEQ_NUM : 0)) { - writer.add(manifest); + formatVersion > 1 ? SEQ_NUM : 0, + SNAPSHOT_FIRST_ROW_ID); + + try (writer) { + for (ManifestFile manifest : manifests) { + writer.add(manifest); + } } - return manifestList.toInputFile(); + + if (formatVersion >= ROW_LINEAGE_FORMAT_VERSION) { + assertThat(writer.nextRowId()).isEqualTo(expectedNextRowId); + } else { + assertThat(writer.nextRowId()).isNull(); + } + + return outputFile.toInputFile(); } private GenericData.Record readGeneric(InputFile manifestList, Schema schema) throws IOException { diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index 4c74d3f6308d..fe4e4a74d1c4 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -130,9 +130,6 @@ public void testDataFilePositions() throws IOException { long expectedPos = 0L; for (DataFile file : reader) { assertThat(file.pos()).as("Position should match").isEqualTo(expectedPos); - assertThat(((BaseFile) file).get(20)) - .as("Position from field index should match") - .isEqualTo(expectedPos); expectedPos += 1; } } @@ -158,9 +155,6 @@ public void testDeleteFilePositions() throws IOException { long expectedPos = 0L; for (DeleteFile file : reader) { assertThat(file.pos()).as("Position should match").isEqualTo(expectedPos); - assertThat(((BaseFile) file).get(20)) - .as("Position from field index should match") - .isEqualTo(expectedPos); expectedPos += 1; } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java b/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java index cd223212a4dd..0e2f4c0ebec3 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java @@ -154,7 +154,7 @@ public void testReadIteratorWithProjectStats() throws IOException { assertThat(entry.nanValueCounts()).isNull(); assertThat(entry.lowerBounds()).isNull(); assertThat(entry.upperBounds()).isNull(); - assertNullRecordCount(entry); + assertThat(entry.recordCount()).isEqualTo(FILE.recordCount()); } } @@ -177,7 +177,7 @@ public void testReadEntriesWithSelectNotProjectStats() throws IOException { assertThat(dataFile.nanValueCounts()).isNull(); assertThat(dataFile.lowerBounds()).isNull(); assertThat(dataFile.upperBounds()).isNull(); - assertNullRecordCount(dataFile); + assertThat(dataFile.recordCount()).isEqualTo(FILE.recordCount()); } } @@ -199,7 +199,7 @@ public void testReadEntriesWithSelectCertainStatNotProjectStats() throws IOExcep assertThat(dataFile.nanValueCounts()).isNull(); assertThat(dataFile.lowerBounds()).isNull(); assertThat(dataFile.upperBounds()).isNull(); - assertNullRecordCount(dataFile); + assertThat(dataFile.recordCount()).isEqualTo(FILE.recordCount()); } } @@ -265,11 +265,4 @@ private void assertStatsDropped(DataFile dataFile) { assertThat(dataFile.location()) .isEqualTo(FILE_PATH); // always select file path in all test cases } - - @SuppressWarnings("checkstyle:AssertThatThrownByWithMessageCheck") - private void assertNullRecordCount(DataFile dataFile) { - // record count is a primitive type, accessing null record count will throw NPE - // no check on the underlying error msg as it might be missing based on the JDK version - assertThatThrownBy(dataFile::recordCount).isInstanceOf(NullPointerException.class); - } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index 0d3cec7d6d55..ca34deec9211 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -21,12 +21,16 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; -import java.io.File; import java.io.IOException; import java.nio.file.Path; import java.util.List; -import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptingFileIO; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.inmemory.InMemoryFileIO; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileIO; @@ -34,14 +38,17 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; public class TestManifestWriterVersions { - private static final FileIO FILE_IO = new TestTables.LocalFileIO(); + private final FileIO io = EncryptingFileIO.combine(new InMemoryFileIO(), encryptionManager()); private static final Schema SCHEMA = new Schema( @@ -76,10 +83,11 @@ public class TestManifestWriterVersions { ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds private static final List OFFSETS = ImmutableList.of(4L); private static final Integer SORT_ORDER_ID = 2; + private static final long FIRST_ROW_ID = 100L; private static final DataFile DATA_FILE = new GenericDataFile( - 0, PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS, SORT_ORDER_ID); + 0, PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS, SORT_ORDER_ID, FIRST_ROW_ID); private static final List EQUALITY_IDS = ImmutableList.of(1); private static final int[] EQUALITY_ID_ARR = new int[] {1}; @@ -152,9 +160,12 @@ public void testV2WriteWithInheritance() throws IOException { checkEntry(readManifest(manifest), SEQUENCE_NUMBER, SEQUENCE_NUMBER, FileContent.DATA); } - @Test - public void testV2WriteDelete() throws IOException { - ManifestFile manifest = writeDeleteManifest(2); + @ParameterizedTest + @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS") + public void testV2PlusWriteDelete(int formatVersion) throws IOException { + assumeThat(formatVersion).isNotEqualTo(1); + + ManifestFile manifest = writeDeleteManifest(formatVersion); checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); assertThat(manifest.content()).isEqualTo(ManifestContent.DELETES); checkEntry( @@ -164,9 +175,13 @@ public void testV2WriteDelete() throws IOException { FileContent.EQUALITY_DELETES); } - @Test - public void testV2WriteDeleteWithInheritance() throws IOException { - ManifestFile manifest = writeAndReadManifestList(writeDeleteManifest(2), 2); + @ParameterizedTest + @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS") + public void testV2WriteDeleteWithInheritance(int formatVersion) throws IOException { + assumeThat(formatVersion).isNotEqualTo(1); + + ManifestFile manifest = + writeAndReadManifestList(writeDeleteManifest(formatVersion), formatVersion); checkManifest(manifest, SEQUENCE_NUMBER); assertThat(manifest.content()).isEqualTo(ManifestContent.DELETES); @@ -213,27 +228,125 @@ public void testV2ManifestRewriteWithInheritance() throws IOException { checkRewrittenEntry(readManifest(manifest2), 0L, FileContent.DATA); } + @Test + public void testV3Write() throws IOException { + ManifestFile manifest = writeManifest(3); + checkEntry( + readManifest(manifest), + ManifestWriter.UNASSIGNED_SEQ, + ManifestWriter.UNASSIGNED_SEQ, + FileContent.DATA, + FIRST_ROW_ID); + } + + @Test + public void testV3WriteWithInheritance() throws IOException { + DataFile withoutFirstRowId = + DataFiles.builder(SPEC).copy(DATA_FILE).withFirstRowId(null).build(); + + ManifestFile manifest = writeAndReadManifestList(writeManifest(3, withoutFirstRowId), 3); + checkManifest(manifest, SEQUENCE_NUMBER); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + + // v2+ should use the correct sequence number by inheriting it + // v3 should use the correct first-row-id by inheriting it + checkEntry( + readManifest(manifest), SEQUENCE_NUMBER, SEQUENCE_NUMBER, FileContent.DATA, FIRST_ROW_ID); + } + + @Test + public void testV3WriteFirstRowIdAssignment() throws IOException { + long rowsPerFile = METRICS.recordCount(); + DataFile withoutFirstRowId = + DataFiles.builder(SPEC).copy(DATA_FILE).withFirstRowId(null).build(); + + ManifestFile manifest = + writeAndReadManifestList(writeManifest(3, withoutFirstRowId, withoutFirstRowId), 3); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + + // v2+ should use the correct sequence number by inheriting it + // v3 should use the correct first-row-id by inheriting it + List> entries = readManifestAsList(manifest); + long expectedFirstRowId = FIRST_ROW_ID; + for (ManifestEntry entry : entries) { + checkEntry(entry, SEQUENCE_NUMBER, SEQUENCE_NUMBER, FileContent.DATA, expectedFirstRowId); + expectedFirstRowId += rowsPerFile; + } + } + + @Test + public void testV3ManifestListRewriteWithInheritance() throws IOException { + // write with v1 + ManifestFile manifest = writeAndReadManifestList(writeManifest(1), 1); + checkManifest(manifest, 0L); + + // rewrite existing metadata with a manifest list + ManifestFile manifest3 = writeAndReadManifestList(manifest, 3); + // the ManifestFile did not change and should still have its original sequence number, 0 + checkManifest(manifest3, 0L); + + // should not inherit the sequence number because it was a rewrite + checkEntry(readManifest(manifest3), 0L, 0L, FileContent.DATA, FIRST_ROW_ID); + } + + @Test + public void testV3ManifestRewriteWithInheritance() throws IOException { + // write with v1 + ManifestFile manifest = writeAndReadManifestList(writeManifest(1), 1); + checkManifest(manifest, 0L); + + // rewrite the manifest file using a v3 manifest + ManifestFile rewritten = rewriteManifest(manifest, 3); + checkRewrittenManifest(rewritten, ManifestWriter.UNASSIGNED_SEQ, 0L); + + // add the v3 manifest to a v3 manifest list, with a sequence number + ManifestFile manifest3 = writeAndReadManifestList(rewritten, 3); + // the ManifestFile is new so it has a sequence number, but the min sequence number 0 is from + // the entry + checkRewrittenManifest(manifest3, SEQUENCE_NUMBER, 0L); + + // should not inherit the v3 sequence number because it was written into the v3 manifest + checkRewrittenEntry(readManifest(manifest3), 0L, FileContent.DATA, FIRST_ROW_ID); + } + void checkEntry( ManifestEntry entry, Long expectedDataSequenceNumber, Long expectedFileSequenceNumber, FileContent content) { + checkEntry(entry, expectedDataSequenceNumber, expectedFileSequenceNumber, content, null); + } + + void checkEntry( + ManifestEntry entry, + Long expectedDataSequenceNumber, + Long expectedFileSequenceNumber, + FileContent content, + Long expectedRowId) { assertThat(entry.status()).isEqualTo(ManifestEntry.Status.ADDED); assertThat(entry.snapshotId()).isEqualTo(SNAPSHOT_ID); assertThat(entry.dataSequenceNumber()).isEqualTo(expectedDataSequenceNumber); assertThat(entry.fileSequenceNumber()).isEqualTo(expectedFileSequenceNumber); - checkDataFile(entry.file(), content); + checkDataFile(entry.file(), content, expectedRowId); } void checkRewrittenEntry( ManifestEntry entry, Long expectedSequenceNumber, FileContent content) { + checkRewrittenEntry(entry, expectedSequenceNumber, content, null); + } + + void checkRewrittenEntry( + ManifestEntry entry, + Long expectedSequenceNumber, + FileContent content, + Long expectedRowId) { assertThat(entry.status()).isEqualTo(ManifestEntry.Status.EXISTING); assertThat(entry.snapshotId()).isEqualTo(SNAPSHOT_ID); assertThat(entry.dataSequenceNumber()).isEqualTo(expectedSequenceNumber); - checkDataFile(entry.file(), content); + checkDataFile(entry.file(), content, expectedRowId); } - void checkDataFile(ContentFile dataFile, FileContent content) { + void checkDataFile(ContentFile dataFile, FileContent content, Long expectedRowId) { // DataFile is the superclass of DeleteFile, so this method can check both assertThat(dataFile.content()).isEqualTo(content); assertThat(dataFile.location()).isEqualTo(PATH); @@ -247,10 +360,19 @@ void checkDataFile(ContentFile dataFile, FileContent content) { assertThat(dataFile.lowerBounds()).isEqualTo(METRICS.lowerBounds()); assertThat(dataFile.upperBounds()).isEqualTo(METRICS.upperBounds()); assertThat(dataFile.sortOrderId()).isEqualTo(SORT_ORDER_ID); - if (dataFile.content() == FileContent.EQUALITY_DELETES) { - assertThat(dataFile.equalityFieldIds()).isEqualTo(EQUALITY_IDS); - } else { - assertThat(dataFile.equalityFieldIds()).isNull(); + switch (dataFile.content()) { + case DATA: + assertThat(dataFile.firstRowId()).isEqualTo(expectedRowId); + assertThat(dataFile.equalityFieldIds()).isNull(); + break; + case EQUALITY_DELETES: + assertThat(dataFile.firstRowId()).isNull(); + assertThat(dataFile.equalityFieldIds()).isEqualTo(EQUALITY_IDS); + break; + case POSITION_DELETES: + assertThat(dataFile.firstRowId()).isNull(); + assertThat(dataFile.equalityFieldIds()).isNull(); + break; } } @@ -279,17 +401,23 @@ void checkRewrittenManifest( assertThat(manifest.deletedRowsCount()).isEqualTo(0); } + protected EncryptionManager encryptionManager() { + return PlaintextEncryptionManager.instance(); + } + private InputFile writeManifestList(ManifestFile manifest, int formatVersion) throws IOException { - OutputFile manifestList = new InMemoryOutputFile(); + OutputFile manifestList = io.newOutputFile("manifest-list"); try (FileAppender writer = ManifestLists.write( formatVersion, manifestList, SNAPSHOT_ID, SNAPSHOT_ID - 1, - formatVersion > 1 ? SEQUENCE_NUMBER : 0)) { + formatVersion > 1 ? SEQUENCE_NUMBER : 0, + FIRST_ROW_ID)) { writer.add(manifest); } + return manifestList.toInputFile(); } @@ -302,10 +430,8 @@ private ManifestFile writeAndReadManifestList(ManifestFile manifest, int formatV private ManifestFile rewriteManifest(ManifestFile manifest, int formatVersion) throws IOException { - OutputFile manifestFile = - Files.localOutput( - FileFormat.AVRO.addExtension( - File.createTempFile("manifest", null, temp.toFile()).toString())); + String filename = FileFormat.AVRO.addExtension("rewrite-manifest"); + EncryptedOutputFile manifestFile = encryptionManager().encrypt(io.newOutputFile(filename)); ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID); try { @@ -317,38 +443,41 @@ private ManifestFile rewriteManifest(ManifestFile manifest, int formatVersion) } private ManifestFile writeManifest(int formatVersion) throws IOException { - return writeManifest(DATA_FILE, formatVersion); + return writeManifest(formatVersion, DATA_FILE); } - private ManifestFile writeManifest(DataFile file, int formatVersion) throws IOException { - OutputFile manifestFile = - Files.localOutput( - FileFormat.AVRO.addExtension( - File.createTempFile("manifest", null, temp.toFile()).toString())); + private ManifestFile writeManifest(int formatVersion, DataFile... files) throws IOException { + String filename = FileFormat.AVRO.addExtension("manifest"); + EncryptedOutputFile manifestFile = encryptionManager().encrypt(io.newOutputFile(filename)); ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID); try { - writer.add(file); + for (DataFile file : files) { + writer.add(file); + } } finally { writer.close(); } return writer.toManifestFile(); } - private ManifestEntry readManifest(ManifestFile manifest) throws IOException { + private List> readManifestAsList(ManifestFile manifest) + throws IOException { try (CloseableIterable> reader = - ManifestFiles.read(manifest, FILE_IO).entries()) { - List> files = Lists.newArrayList(reader); - assertThat(files).hasSize(1); - return files.get(0); + ManifestFiles.read(manifest, io).entries()) { + return Lists.newArrayList(Iterables.transform(reader, ManifestEntry::copy)); } } + private ManifestEntry readManifest(ManifestFile manifest) throws IOException { + List> files = readManifestAsList(manifest); + assertThat(files).hasSize(1); + return files.get(0); + } + private ManifestFile writeDeleteManifest(int formatVersion) throws IOException { - OutputFile manifestFile = - Files.localOutput( - FileFormat.AVRO.addExtension( - File.createTempFile("manifest", null, temp.toFile()).toString())); + String filename = FileFormat.AVRO.addExtension("manifest"); + EncryptedOutputFile manifestFile = encryptionManager().encrypt(io.newOutputFile(filename)); ManifestWriter writer = ManifestFiles.writeDeleteManifest(formatVersion, SPEC, manifestFile, SNAPSHOT_ID); try { @@ -361,7 +490,7 @@ private ManifestFile writeDeleteManifest(int formatVersion) throws IOException { private ManifestEntry readDeleteManifest(ManifestFile manifest) throws IOException { try (CloseableIterable> reader = - ManifestFiles.readDeleteManifest(manifest, FILE_IO, null).entries()) { + ManifestFiles.readDeleteManifest(manifest, io, null).entries()) { List> entries = Lists.newArrayList(reader); assertThat(entries).hasSize(1); return entries.get(0); diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index 74ab396f82db..b318776d517d 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -1333,7 +1333,8 @@ private String createManifestListWithManifestFiles(long snapshotId, Long parentS new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0, snapshotId)); try (ManifestListWriter writer = - ManifestLists.write(1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0)) { + ManifestLists.write( + 1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0, 0L)) { writer.addAll(manifests); } diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index f99c180c21c9..cbb4db476678 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -50,7 +50,7 @@ import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(ParameterizedTestExtension.class) -public class TestRowDelta extends V2TableTestBase { +public class TestRowDelta extends TestBase { @Parameter(index = 1) private String branch; diff --git a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java new file mode 100644 index 000000000000..404e083f48d3 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java @@ -0,0 +1,705 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestRowLineageAssignment { + public static final Schema SCHEMA = + new Schema( + NestedField.required(3, "id", Types.IntegerType.get()), + NestedField.required(4, "data", Types.StringType.get())); + + static final DataFile FILE_A = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(100) + .withRecordCount(125) + .build(); + + static final DeleteFile FILE_A_DV = + FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned()) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.puffin") + .withFileSizeInBytes(10) + .withRecordCount(15) + .withReferencedDataFile(FILE_A.location()) + .withContentOffset(4) + .withContentSizeInBytes(35) + .build(); + + static final DeleteFile FILE_A_DELETES = + FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned()) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withRecordCount(15) + .withReferencedDataFile(FILE_A.location()) + .build(); + + static final DataFile FILE_B = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(100) + .withRecordCount(100) + .build(); + + static final DataFile FILE_C = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/data-c.parquet") + .withFileSizeInBytes(100) + .withRecordCount(90) + .build(); + + @TempDir private File location; + + private BaseTable table; + + @BeforeEach + public void createTable() { + // create a table that uses random snapshot IDs so that conflicts can be tested. otherwise, + // conflict cases use the same snapshot ID that is suppressed by the TableMetadata builder. + this.table = + TestTables.create( + location, + "test", + SCHEMA, + PartitionSpec.unpartitioned(), + 3, + Map.of("random-snapshot-ids", "true")); + } + + @AfterEach + public void cleanup() { + TestTables.clearTables(); + } + + @Test + public void testSingleFileAppend() { + assertThat(table.operations().current().nextRowId()).isEqualTo(0L); + + table.newAppend().appendFile(FILE_A).commit(); + Snapshot current = table.currentSnapshot(); + assertThat(current.firstRowId()).isEqualTo(0L); + assertThat(table.operations().current().nextRowId()).isEqualTo(FILE_A.recordCount()); + checkManifestListAssignment(table.io().newInputFile(current.manifestListLocation()), 0L); + + ManifestFile manifest = Iterables.getOnlyElement(current.dataManifests(table.io())); + checkDataFileAssignment(table, manifest, 0L); + } + + @Test + public void testMultiFileAppend() { + assertThat(table.operations().current().nextRowId()).isEqualTo(0L); + + table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + Snapshot current = table.currentSnapshot(); + assertThat(current.firstRowId()).isEqualTo(0L); + assertThat(table.operations().current().nextRowId()) + .isEqualTo(FILE_A.recordCount() + FILE_B.recordCount()); + checkManifestListAssignment(table.io().newInputFile(current.manifestListLocation()), 0L); + + ManifestFile manifest = Iterables.getOnlyElement(current.dataManifests(table.io())); + checkDataFileAssignment(table, manifest, 0L, FILE_A.recordCount()); + } + + @Test + public void testMultipleFileAppends() { + // write and validate a multi-file commit + testMultiFileAppend(); + + long startingNextRowId = table.operations().current().nextRowId(); + + // add another append commit + table.newAppend().appendFile(FILE_C).commit(); + Snapshot current = table.currentSnapshot(); + assertThat(current.firstRowId()).isEqualTo(startingNextRowId); + assertThat(table.operations().current().nextRowId()) + .isEqualTo(startingNextRowId + FILE_C.recordCount()); + checkManifestListAssignment( + table.io().newInputFile(current.manifestListLocation()), startingNextRowId, 0L); + + List manifests = current.dataManifests(table.io()); + assertThat(manifests.size()).isEqualTo(2); + checkDataFileAssignment(table, manifests.get(0), startingNextRowId); + } + + @Test + public void testCommitConflict() { + // start with a non-empty table + testSingleFileAppend(); + + String startingManifest = + Iterables.getOnlyElement(table.currentSnapshot().allManifests(table.io())).path(); + long startingNextRowId = table.operations().current().nextRowId(); + + // stage a new snapshot that is not committed + AppendFiles stagedAppend = table.newAppend().appendFile(FILE_B); + Snapshot staged = stagedAppend.apply(); + assertThat(table.operations().current().nextRowId()).isEqualTo(startingNextRowId); + assertThat(staged.firstRowId()).isEqualTo(startingNextRowId); + checkManifestListAssignment( + table.io().newInputFile(staged.manifestListLocation()), startingNextRowId, 0L); + + List stagedManifests = staged.dataManifests(table.io()); + assertThat(stagedManifests.size()).isEqualTo(2); + + ManifestFile stagedManifest = stagedManifests.get(0); + checkDataFileAssignment(table, stagedManifest, startingNextRowId); + + // commit a concurrent operation with a second table reference + BaseTable sameTable = TestTables.load(location, table.name()); + sameTable.newAppend().appendFile(FILE_C).commit(); + + long secondNextRowId = sameTable.operations().current().nextRowId(); + assertThat(secondNextRowId).isEqualTo(startingNextRowId + FILE_C.recordCount()); + + // committed snapshot should have the same first row ID values as the staged snapshot + Snapshot committedFirst = sameTable.currentSnapshot(); + assertThat(committedFirst.firstRowId()).isEqualTo(startingNextRowId); + + checkManifestListAssignment( + table.io().newInputFile(committedFirst.manifestListLocation()), startingNextRowId, 0L); + + List committedManifests = committedFirst.dataManifests(table.io()); + assertThat(committedManifests.size()).isEqualTo(2); + + ManifestFile committedManifest = committedManifests.get(0); + checkDataFileAssignment(table, committedManifest, startingNextRowId); + assertThat(committedManifests.get(1).path()).isEqualTo(startingManifest); + + // committing the staged snapshot should reassign all first row ID values + stagedAppend.commit(); + assertThat(table.operations().refresh().nextRowId()) + .isEqualTo(secondNextRowId + FILE_B.recordCount()); + + sameTable.refresh(); + assertThat(table.currentSnapshot().snapshotId()) + .as("Both references should have the same current snapshot") + .isEqualTo(sameTable.currentSnapshot().snapshotId()); + + Snapshot committedSecond = table.currentSnapshot(); + assertThat(committedSecond.firstRowId()).isEqualTo(secondNextRowId); + + InputFile newManifestList = table.io().newInputFile(committedSecond.manifestListLocation()); + checkManifestListAssignment(newManifestList, secondNextRowId, startingNextRowId, 0L); + + List newManifests = committedSecond.dataManifests(table.io()); + assertThat(newManifests.size()).isEqualTo(3); + + ManifestFile newManifest = newManifests.get(0); + checkDataFileAssignment(table, newManifest, secondNextRowId); + assertThat(newManifests.get(1)).isEqualTo(committedManifest); + assertThat(newManifests.get(2).path()).isEqualTo(startingManifest); + } + + @Test + public void testOverwrite() { + // start with a non-empty table + testSingleFileAppend(); + + long startingNextRowId = table.operations().current().nextRowId(); + long nextRowId = startingNextRowId + FILE_B.recordCount(); + + table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).commit(); + assertThat(table.operations().current().nextRowId()).isEqualTo(nextRowId); + + Snapshot current = table.currentSnapshot(); + InputFile manifestList = table.io().newInputFile(current.manifestListLocation()); + // manifest removing FILE_A is written with first_row_id=startingNextRowId + FILE_B.recordCount + // and the table's nextRowId is the same as the deleted manifest's firstRowId because the + // manifest has 0 added or existing records + checkManifestListAssignment(manifestList, startingNextRowId, nextRowId); + + List manifests = current.dataManifests(table.io()); + assertThat(manifests.size()).isEqualTo(2); + checkDataFileAssignment(table, manifests.get(0), startingNextRowId); + checkDataFileAssignment(table, manifests.get(1), 0L); + } + + @Test + public void testOverwriteWithFilteredManifest() { + // start with multiple data files + testMultiFileAppend(); + + long startingNextRowId = table.operations().current().nextRowId(); + + assertThat(table.currentSnapshot().allManifests(table.io()).size()).isEqualTo(1); + + table.newOverwrite().deleteFile(FILE_A).addFile(FILE_C).commit(); + // the table's nextRowId is incremented by FILE_B.recordCount() because it is in a new manifest + long nextRowId = startingNextRowId + FILE_B.recordCount() + FILE_C.recordCount(); + assertThat(table.operations().current().nextRowId()).isEqualTo(nextRowId); + + Snapshot current = table.currentSnapshot(); + InputFile manifestList = table.io().newInputFile(current.manifestListLocation()); + // manifest removing FILE_A is written with first_row_id=startingNextRowId + FILE_C.recordCount + checkManifestListAssignment( + manifestList, startingNextRowId, startingNextRowId + FILE_C.recordCount()); + + List manifests = current.dataManifests(table.io()); + assertThat(manifests.size()).isEqualTo(2); + checkDataFileAssignment(table, manifests.get(0), startingNextRowId); + // the starting row ID for FILE_B does not change + checkDataFileAssignment(table, manifests.get(1), FILE_A.recordCount()); + } + + @Test + public void testRowDelta() { + // start with a non-empty table + testSingleFileAppend(); + + long startingNextRowId = table.operations().current().nextRowId(); + long nextRowId = startingNextRowId + FILE_B.recordCount(); + + table.newRowDelta().addDeletes(FILE_A_DV).addRows(FILE_B).commit(); + assertThat(table.operations().current().nextRowId()).isEqualTo(nextRowId); + + Snapshot current = table.currentSnapshot(); + InputFile manifestList = table.io().newInputFile(current.manifestListLocation()); + // only one new data manifest is written + checkManifestListAssignment(manifestList, startingNextRowId, 0L); + + List manifests = current.dataManifests(table.io()); + assertThat(manifests.size()).isEqualTo(2); + checkDataFileAssignment(table, manifests.get(0), startingNextRowId); + checkDataFileAssignment(table, manifests.get(1), 0L); + } + + @Test + public void testAssignmentWithManifestCompaction() { + // start with a non-empty table + // data manifests: [added(FILE_A)] + testSingleFileAppend(); + + long startingFirstRowId = table.operations().current().nextRowId(); + + // add FILE_B and set the min so metadata is merged on the next commit + table.newAppend().appendFile(FILE_B).commit(); + table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1").commit(); + // data manifests: [added(FILE_B)], [added(FILE_A)] + + long preMergeNextRowId = startingFirstRowId + FILE_B.recordCount(); + assertThat(table.operations().current().nextRowId()).isEqualTo(preMergeNextRowId); + assertThat(table.currentSnapshot().allManifests(table.io()).size()).isEqualTo(2); + InputFile preMergeManifestList = + table.io().newInputFile(table.currentSnapshot().manifestListLocation()); + checkManifestListAssignment(preMergeManifestList, startingFirstRowId, 0L); + List preMergeManifests = table.currentSnapshot().dataManifests(table.io()); + assertThat(preMergeManifests.size()).isEqualTo(2); + checkDataFileAssignment(table, preMergeManifests.get(0), startingFirstRowId); + checkDataFileAssignment(table, preMergeManifests.get(1), 0L); + + table.newAppend().appendFile(FILE_C).commit(); + // data manifests: [add(FILE_C), exist(FILE_B), exist(FILE_A)] + + long mergedNextRowId = + preMergeNextRowId + FILE_C.recordCount() + FILE_B.recordCount() + FILE_A.recordCount(); + + assertThat(table.operations().current().nextRowId()).isEqualTo(mergedNextRowId); + assertThat(table.currentSnapshot().allManifests(table.io()).size()).isEqualTo(1); + InputFile mergedManifestList = + table.io().newInputFile(table.currentSnapshot().manifestListLocation()); + checkManifestListAssignment(mergedManifestList, preMergeNextRowId); + List mergedManifests = table.currentSnapshot().dataManifests(table.io()); + checkDataFileAssignment( + table, mergedManifests.get(0), preMergeNextRowId, startingFirstRowId, 0L); + } + + @Test + public void testTableUpgrade(@TempDir File altLocation) { + BaseTable upgradeTable = + TestTables.create(altLocation, "test_upgrade", SCHEMA, PartitionSpec.unpartitioned(), 2); + + // create data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)] + // and delete manifests: [added(FILE_A_DELETES)] + upgradeTable.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + upgradeTable.newRowDelta().addDeletes(FILE_A_DELETES).commit(); // does not affect assignment + upgradeTable.newOverwrite().deleteFile(FILE_B).addFile(FILE_C).commit(); + + assertThat(upgradeTable.operations().current().nextRowId()) + .as("v2 tables should always have next-row-id=0") + .isEqualTo(0L); + + TestTables.upgrade(altLocation, "test_upgrade", 3); + upgradeTable.refresh(); + + assertThat(upgradeTable.operations().current().nextRowId()) + .as("next-row-id should start at 0") + .isEqualTo(0L); + + for (Snapshot snapshot : upgradeTable.snapshots()) { + assertThat(snapshot.firstRowId()) + .as("Existing snapshots should not have first-row-id") + .isNull(); + } + + Snapshot current = upgradeTable.currentSnapshot(); + InputFile manifestList = upgradeTable.io().newInputFile(current.manifestListLocation()); + // existing manifests should not have first_row_id assigned + checkManifestListAssignment(manifestList, null, null); + + List manifests = current.dataManifests(upgradeTable.io()); + assertThat(manifests.size()).isEqualTo(2); + // manifests without first_row_id will not assign first_row_id + checkDataFileAssignment(upgradeTable, manifests.get(0), (Long) null); + checkDataFileAssignment(upgradeTable, manifests.get(1), (Long) null); + } + + @Test + public void testAssignmentAfterUpgrade(@TempDir File altLocation) { + // data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)] + testTableUpgrade(altLocation); + + BaseTable upgradeTable = TestTables.load(altLocation, "test_upgrade"); + long startingFirstRowId = upgradeTable.operations().current().nextRowId(); + + List existingManifests = + upgradeTable.currentSnapshot().dataManifests(upgradeTable.io()); + assertThat(existingManifests.size()).isEqualTo(2); + + // any commit (even empty) should assign first_row_id to the entire metadata tree + upgradeTable.newFastAppend().commit(); + // data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)] + + assertThat(upgradeTable.operations().current().nextRowId()) + .as("next-row-id should be updated to include the assigned data") + .isEqualTo(startingFirstRowId + FILE_C.recordCount() + FILE_A.recordCount()); + + Snapshot assigned = upgradeTable.currentSnapshot(); + + assertThat(assigned.firstRowId()).isEqualTo(startingFirstRowId); + InputFile manifestList = table.io().newInputFile(assigned.manifestListLocation()); + // the first manifest has added FILE_C, the second has deleted FILE_A and existing FILE_B + checkManifestListAssignment(manifestList, 0L, FILE_C.recordCount()); + + List manifests = assigned.dataManifests(upgradeTable.io()); + assertThat(manifests.size()).isEqualTo(2); + checkDataFileAssignment(upgradeTable, manifests.get(0), 0L); + checkDataFileAssignment(upgradeTable, manifests.get(1), FILE_C.recordCount()); + // the existing manifests were reused without modification + assertThat(manifests.get(0).path()).isEqualTo(existingManifests.get(0).path()); + assertThat(manifests.get(1).path()).isEqualTo(existingManifests.get(1).path()); + } + + @Test + public void testDeleteAssignmentAfterUpgrade(@TempDir File altLocation) { + // data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)] + testTableUpgrade(altLocation); + + BaseTable upgradeTable = TestTables.load(altLocation, "test_upgrade"); + long startingFirstRowId = upgradeTable.operations().current().nextRowId(); + + List existingManifests = + upgradeTable.currentSnapshot().dataManifests(upgradeTable.io()); + assertThat(existingManifests.size()).isEqualTo(2); + + // any commit (even empty) should assign first_row_id to the entire metadata tree + upgradeTable.newDelete().deleteFile(FILE_C).commit(); + // data manifests: [deleted(FILE_C)], [existing(FILE_A), deleted(FILE_B)] + + assertThat(upgradeTable.operations().current().nextRowId()) + .as("next-row-id should be updated to include the assigned data") + .isEqualTo(startingFirstRowId + FILE_A.recordCount()); + + Snapshot assigned = upgradeTable.currentSnapshot(); + + assertThat(assigned.firstRowId()).isEqualTo(startingFirstRowId); + InputFile manifestList = table.io().newInputFile(assigned.manifestListLocation()); + // the first manifest has added FILE_C, the second has deleted FILE_A and existing FILE_B + checkManifestListAssignment(manifestList, 0L, 0L); + + List manifests = assigned.dataManifests(upgradeTable.io()); + assertThat(manifests.size()).isEqualTo(2); + checkDataFileAssignment(upgradeTable, manifests.get(0), 0L); + checkDataFileAssignment(upgradeTable, manifests.get(1), 0L); + // the existing manifests were reused without modification + assertThat(manifests.get(1).path()).isEqualTo(existingManifests.get(1).path()); + } + + @Test + public void testBranchAssignmentAfterUpgrade(@TempDir File altLocation) { + // data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)] + testTableUpgrade(altLocation); + + BaseTable upgradeTable = TestTables.load(altLocation, "test_upgrade"); + long startingFirstRowId = upgradeTable.operations().current().nextRowId(); + + List existingManifests = + upgradeTable.currentSnapshot().dataManifests(upgradeTable.io()); + assertThat(existingManifests.size()).isEqualTo(2); + + // any commit (even empty) should assign first_row_id to the branch's tree + upgradeTable.manageSnapshots().createBranch("branch").commit(); + upgradeTable.newAppend().toBranch("branch").commit(); + // data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)] + + assertThat(upgradeTable.operations().current().nextRowId()) + .as("next-row-id should be updated to include the assigned data in branch") + .isEqualTo(startingFirstRowId + FILE_C.recordCount() + FILE_A.recordCount()); + + // the main branch is unmodified and has no row IDs + Snapshot current = upgradeTable.currentSnapshot(); + InputFile mainManifestList = upgradeTable.io().newInputFile(current.manifestListLocation()); + checkManifestListAssignment(mainManifestList, null, null); + + List mainManifests = current.dataManifests(upgradeTable.io()); + assertThat(mainManifests.size()).isEqualTo(2); + checkDataFileAssignment(upgradeTable, mainManifests.get(0), (Long) null); + checkDataFileAssignment(upgradeTable, mainManifests.get(1), (Long) null); + assertThat(mainManifests.get(0).path()).isEqualTo(existingManifests.get(0).path()); + assertThat(mainManifests.get(1).path()).isEqualTo(existingManifests.get(1).path()); + + // the branch should have row IDs assigned + Snapshot assigned = upgradeTable.snapshot("branch"); + + assertThat(assigned.firstRowId()).isEqualTo(startingFirstRowId); + InputFile branchManifestList = table.io().newInputFile(assigned.manifestListLocation()); + // the first manifest has added FILE_C, the second has deleted FILE_A and existing FILE_B + checkManifestListAssignment(branchManifestList, 0L, FILE_C.recordCount()); + + List branchManifests = assigned.dataManifests(upgradeTable.io()); + assertThat(branchManifests.size()).isEqualTo(2); + checkDataFileAssignment(upgradeTable, branchManifests.get(0), 0L); + checkDataFileAssignment(upgradeTable, branchManifests.get(1), FILE_C.recordCount()); + // the existing manifests were reused without modification + assertThat(branchManifests.get(0).path()).isEqualTo(existingManifests.get(0).path()); + assertThat(branchManifests.get(1).path()).isEqualTo(existingManifests.get(1).path()); + } + + @Test + public void testOverwriteAssignmentAfterUpgrade(@TempDir File altLocation) { + // data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)] + testTableUpgrade(altLocation); + + BaseTable upgradeTable = TestTables.load(altLocation, "test_upgrade"); + long startingFirstRowId = upgradeTable.operations().current().nextRowId(); + + List existingManifests = + upgradeTable.currentSnapshot().dataManifests(upgradeTable.io()); + assertThat(existingManifests.size()).isEqualTo(2); + + // any commit should assign first_row_id to the entire metadata tree + upgradeTable.newOverwrite().deleteFile(FILE_C).addFile(FILE_B).commit(); + // data manifests: [added(FILE_B)], [deleted(FILE_C)], [existing(FILE_A), deleted(FILE_B)] + + assertThat(upgradeTable.operations().current().nextRowId()) + .as("next-row-id should be updated to account for existing data and new changes") + .isEqualTo(startingFirstRowId + FILE_B.recordCount() + FILE_A.recordCount()); + + Snapshot assigned = upgradeTable.currentSnapshot(); + + assertThat(assigned.firstRowId()).isEqualTo(startingFirstRowId); + InputFile manifestList = table.io().newInputFile(assigned.manifestListLocation()); + // the second manifest only has deleted files and does not use ID space + checkManifestListAssignment(manifestList, 0L, FILE_B.recordCount(), FILE_B.recordCount()); + + List manifests = assigned.dataManifests(upgradeTable.io()); + assertThat(manifests.size()).isEqualTo(3); + checkDataFileAssignment(upgradeTable, manifests.get(0), 0L); + checkDataFileAssignment(upgradeTable, manifests.get(1)); // no live files + checkDataFileAssignment(upgradeTable, manifests.get(2), FILE_B.recordCount()); + // the last manifest is reused without modification + assertThat(manifests.get(2).path()).isEqualTo(existingManifests.get(1).path()); + } + + @Test + public void testRowDeltaAssignmentAfterUpgrade(@TempDir File altLocation) { + // data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)] + testTableUpgrade(altLocation); + + BaseTable upgradeTable = TestTables.load(altLocation, "test_upgrade"); + long startingFirstRowId = upgradeTable.operations().current().nextRowId(); + + List existingManifests = + upgradeTable.currentSnapshot().dataManifests(upgradeTable.io()); + assertThat(existingManifests.size()).isEqualTo(2); + + // any commit (even empty) should assign first_row_id to the entire metadata tree + upgradeTable.newRowDelta().addDeletes(FILE_A_DV).commit(); + // data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)] + + assertThat(upgradeTable.operations().current().nextRowId()) + .as("next-row-id should be updated to include the assigned data") + .isEqualTo(startingFirstRowId + FILE_C.recordCount() + FILE_A.recordCount()); + + Snapshot assigned = upgradeTable.currentSnapshot(); + + assertThat(assigned.firstRowId()).isEqualTo(startingFirstRowId); + InputFile manifestList = table.io().newInputFile(assigned.manifestListLocation()); + // the first manifest has added FILE_C, the second has deleted FILE_A and existing FILE_B + checkManifestListAssignment(manifestList, 0L, FILE_C.recordCount()); + + List manifests = assigned.dataManifests(upgradeTable.io()); + assertThat(manifests.size()).isEqualTo(2); + checkDataFileAssignment(upgradeTable, manifests.get(0), 0L); + checkDataFileAssignment(upgradeTable, manifests.get(1), FILE_C.recordCount()); + // the existing manifests were reused without modification + assertThat(manifests.get(0).path()).isEqualTo(existingManifests.get(0).path()); + assertThat(manifests.get(1).path()).isEqualTo(existingManifests.get(1).path()); + } + + @Test + public void testUpgradeAssignmentWithManifestCompaction(@TempDir File altLocation) { + // create a non-empty upgrade table with FILE_A + BaseTable upgradeTable = + TestTables.create(altLocation, "test_upgrade", SCHEMA, PartitionSpec.unpartitioned(), 2); + + upgradeTable.newAppend().appendFile(FILE_A).commit(); + upgradeTable.newAppend().appendFile(FILE_B).commit(); + upgradeTable.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1").commit(); + // data manifests: [added(FILE_B)], [added(FILE_A)] + + assertThat(upgradeTable.operations().current().nextRowId()) + .as("v2 tables should always have next-row-id=0") + .isEqualTo(0L); + + TestTables.upgrade(altLocation, "test_upgrade", 3); + upgradeTable.refresh(); + + assertThat(upgradeTable.operations().current().nextRowId()) + .as("next-row-id should start at 0") + .isEqualTo(0L); + + for (Snapshot snapshot : upgradeTable.snapshots()) { + assertThat(snapshot.firstRowId()) + .as("Existing snapshots should not have first-row-id") + .isNull(); + } + + assertThat(upgradeTable.currentSnapshot().allManifests(upgradeTable.io()).size()).isEqualTo(2); + InputFile preMergeManifestList = + upgradeTable.io().newInputFile(upgradeTable.currentSnapshot().manifestListLocation()); + checkManifestListAssignment(preMergeManifestList, null, null); + List preMergeManifests = + upgradeTable.currentSnapshot().dataManifests(upgradeTable.io()); + assertThat(preMergeManifests.size()).isEqualTo(2); + checkDataFileAssignment(upgradeTable, preMergeManifests.get(0), (Long) null); + checkDataFileAssignment(upgradeTable, preMergeManifests.get(1), (Long) null); + + // add FILE_C and trigger metadata compaction + upgradeTable.newAppend().appendFile(FILE_C).commit(); + // data manifests: [add(FILE_C), exist(FILE_B), exist(FILE_A)] + + long mergedNextRowId = FILE_C.recordCount() + FILE_B.recordCount() + FILE_A.recordCount(); + + assertThat(upgradeTable.operations().current().nextRowId()).isEqualTo(mergedNextRowId); + assertThat(upgradeTable.currentSnapshot().allManifests(upgradeTable.io()).size()).isEqualTo(1); + InputFile mergedManifestList = + upgradeTable.io().newInputFile(upgradeTable.currentSnapshot().manifestListLocation()); + checkManifestListAssignment(mergedManifestList, 0L); + List mergedManifests = + upgradeTable.currentSnapshot().dataManifests(upgradeTable.io()); + checkDataFileAssignment( + upgradeTable, + mergedManifests.get(0), + 0L, + FILE_C.recordCount(), + FILE_C.recordCount() + FILE_B.recordCount()); + } + + private static ManifestContent content(int ordinal) { + return ManifestContent.values()[ordinal]; + } + + private static void checkManifestListAssignment(InputFile in, Long... firstRowIds) { + try (CloseableIterable reader = + InternalData.read(FileFormat.AVRO, in) + .project(ManifestFile.schema().select("first_row_id", "content")) + .build()) { + + // all row IDs must be assigned at write time + int index = 0; + for (Record manifest : reader) { + if (content((Integer) manifest.getField("content")) != ManifestContent.DATA) { + assertThat(manifest.getField("first_row_id")) + .as("Row ID for delete manifest (%s) should be null", index) + .isNull(); + } else if (index < firstRowIds.length) { + assertThat(manifest.getField("first_row_id")) + .as("Row ID for data manifest (%s) should match", index) + .isEqualTo(firstRowIds[index]); + } else { + fail("No expected first row ID for manifest: %s=%s", index, manifest); + } + + index += 1; + } + + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + // also check that the values are read correctly + int index = 0; + for (ManifestFile manifest : ManifestLists.read(in)) { + if (manifest.content() != ManifestContent.DATA) { + assertThat(manifest.firstRowId()).isNull(); + } else if (index < firstRowIds.length) { + assertThat(manifest.firstRowId()).isEqualTo(firstRowIds[index]); + } else { + fail("No expected first row ID for manifest: " + manifest); + } + + index += 1; + } + } + + private static void checkDataFileAssignment( + Table table, ManifestFile manifest, Long... firstRowIds) { + // all row IDs must be assigned at write time + int index = 0; + try (ManifestReader reader = + ManifestFiles.read(manifest, table.io(), table.specs())) { + + for (DataFile file : reader) { + assertThat(file.content()).isEqualTo(FileContent.DATA); + if (index < firstRowIds.length) { + assertThat(file.firstRowId()) + .as("Row ID for data file (%s) should match", index) + .isEqualTo(firstRowIds[index]); + } else { + fail("No expected first row ID for file: " + manifest); + } + + index += 1; + } + + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java index e11d987581d7..47418fa201f9 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java @@ -359,12 +359,40 @@ public void testReplace() { table.newRewrite().deleteFile(filePart1).deleteFile(filePart2).addFile(fileCompacted).commit(); - // Rewrites are currently just treated as appends. In the future we could treat these as no-ops + // rewrites produce new manifests without first-row-id or any information about how many rows + // are new. without tracking a new metric for a manifest (e.g., assigned-rows) or assuming that + // rewrites do not assign any new IDs, replace will allocate ranges like normal writes. assertThat(table.currentSnapshot().firstRowId()).isEqualTo(60); assertThat(table.currentSnapshot().addedRows()).isEqualTo(60); assertThat(table.ops().current().nextRowId()).isEqualTo(120); } + @TestTemplate + public void testMetadataRewrite() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + DataFile file1 = fileWithRows(30); + DataFile file2 = fileWithRows(30); + + table.newAppend().appendFile(file1).appendFile(file2).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(60); + assertThat(table.ops().current().nextRowId()).isEqualTo(60); + + table.rewriteManifests().commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(60); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(60); + } + private final AtomicInteger fileNum = new AtomicInteger(0); private DataFile fileWithRows(long numRows) { diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java index 3fda738e4435..d68f99d99d1d 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java @@ -229,7 +229,8 @@ private String createManifestListWithManifestFiles(long snapshotId, Long parentS new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0, snapshotId)); try (ManifestListWriter writer = - ManifestLists.write(1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0)) { + ManifestLists.write( + 1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0, 0L)) { writer.addAll(manifests); } diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index 56c4676dfb1c..a9e6f6ed52f3 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -1808,7 +1808,8 @@ private String createManifestListWithManifestFile( manifestList.deleteOnExit(); try (ManifestListWriter writer = - ManifestLists.write(1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0)) { + ManifestLists.write( + 1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0, 0L)) { writer.addAll( ImmutableList.of( new GenericManifestFile(localInput(manifestFile), SPEC_5.specId(), snapshotId))); diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index fd558a490ee7..ad5369ea5e1a 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -39,6 +39,14 @@ public class TestTables { private TestTables() {} + public static TestTable upgrade(File temp, String name, int newFormatVersion) { + TestTable table = load(temp, name); + TableOperations ops = table.ops(); + TableMetadata base = ops.current(); + ops.commit(base, ops.current().upgradeToFormatVersion(newFormatVersion)); + return table; + } + public static TestTable create( File temp, String name, Schema schema, PartitionSpec spec, int formatVersion) { return create(temp, name, schema, spec, SortOrder.unsorted(), formatVersion); @@ -328,9 +336,15 @@ public String metadataFileLocation(String fileName) { @Override public long newSnapshotId() { - long nextSnapshotId = lastSnapshotId + 1; - this.lastSnapshotId = nextSnapshotId; - return nextSnapshotId; + TableMetadata currentMetadata = current(); + if (currentMetadata != null + && currentMetadata.propertyAsBoolean("random-snapshot-ids", false)) { + return SnapshotIdGeneratorUtil.generateSnapshotID(); + } else { + long nextSnapshotId = lastSnapshotId + 1; + this.lastSnapshotId = nextSnapshotId; + return nextSnapshotId; + } } } diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java index 89982c4e9b20..80eae1fe5cc5 100644 --- a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import com.fasterxml.jackson.databind.JsonNode; import org.apache.iceberg.PartitionSpec; @@ -31,7 +32,7 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.FieldSource; public class TestLoadTableResponseParser { @@ -137,8 +138,10 @@ public void roundTripSerdeV1() { } @ParameterizedTest - @ValueSource(ints = 3) + @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS") public void roundTripSerdeV3andHigher(int formatVersion) { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + String uuid = "386b9f01-002b-4d8c-b77f-42c3fd3b7c9b"; TableMetadata metadata = TableMetadata.buildFromEmpty(formatVersion) diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java index 3d2bf75e700e..82a065073d94 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.avro.generic.GenericData.Record; @@ -57,6 +58,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -153,7 +155,10 @@ public void testUnpartitionedTable() throws Exception { assertThat(expectedDataManifests).as("Should have 1 data manifest").hasSize(1); assertThat(expectedDeleteManifests).as("Should have 1 delete manifest").hasSize(1); - Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); + Schema entriesTableSchema = + TypeUtil.selectNot( + Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(), + Set.of(DataFile.FIRST_ROW_ID.fieldId())); Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".files").schema(); // check delete files table @@ -290,7 +295,10 @@ public void testPartitionedTable() throws Exception { sql("DELETE FROM %s WHERE id=1 AND data='b'", tableName); Table table = Spark3Util.loadIcebergTable(spark, tableName); - Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); + Schema entriesTableSchema = + TypeUtil.selectNot( + Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(), + Set.of(DataFile.FIRST_ROW_ID.fieldId())); List expectedDataManifests = TestHelpers.dataManifests(table); List expectedDeleteManifests = TestHelpers.deleteManifests(table); @@ -308,7 +316,7 @@ public void testPartitionedTable() throws Exception { Dataset actualDeleteFilesDs = spark.sql("SELECT * FROM " + tableName + ".delete_files " + "WHERE partition.data='a'"); - List actualDeleteFiles = actualDeleteFilesDs.collectAsList(); + List actualDeleteFiles = TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList(); assertThat(actualDeleteFiles).as("Metadata table should return one delete file").hasSize(1); TestHelpers.assertEqualsSafe( @@ -387,7 +395,10 @@ public void testAllFilesUnpartitioned() throws Exception { List results = sql("DELETE FROM %s", tableName); assertThat(results).as("Table should be cleared").isEmpty(); - Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); + Schema entriesTableSchema = + TypeUtil.selectNot( + Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(), + Set.of(DataFile.FIRST_ROW_ID.fieldId())); Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".all_data_files").schema(); @@ -421,7 +432,7 @@ public void testAllFilesUnpartitioned() throws Exception { // Check all files table Dataset actualFilesDs = spark.sql("SELECT * FROM " + tableName + ".all_files ORDER BY content"); - List actualFiles = actualFilesDs.collectAsList(); + List actualFiles = TestHelpers.selectNonDerived(actualFilesDs).collectAsList(); List expectedFiles = ListUtils.union(expectedDataFiles, expectedDeleteFiles); expectedFiles.sort(Comparator.comparing(r -> ((Integer) r.get("content")))); assertThat(actualFiles).as("Metadata table should return two files").hasSize(2); @@ -469,7 +480,10 @@ public void testAllFilesPartitioned() throws Exception { List results = sql("DELETE FROM %s", tableName); assertThat(results).as("Table should be cleared").isEmpty(); - Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); + Schema entriesTableSchema = + TypeUtil.selectNot( + Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(), + Set.of(DataFile.FIRST_ROW_ID.fieldId())); Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".all_data_files").schema(); @@ -653,7 +667,10 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception { + currentSnapshotId + " ORDER BY content"); List actualFiles = TestHelpers.selectNonDerived(actualFilesDs).collectAsList(); - Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); + Schema entriesTableSchema = + TypeUtil.selectNot( + Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(), + Set.of(DataFile.FIRST_ROW_ID.fieldId())); List expectedDataManifests = TestHelpers.dataManifests(table); List expectedFiles = expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, null); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index 1f910cbb576e..d8c0de32bf00 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -887,11 +887,14 @@ public static void asMetadataRecord(GenericData.Record file) { file.put(3, 0); // specId } + // suppress the readable metrics and first-row-id that are not in manifest files + private static final Set DERIVED_FIELDS = Set.of("readable_metrics", "first_row_id"); + public static Dataset selectNonDerived(Dataset metadataTable) { StructField[] fields = metadataTable.schema().fields(); return metadataTable.select( Stream.of(fields) - .filter(f -> !f.name().equals("readable_metrics")) // derived field + .filter(f -> !DERIVED_FIELDS.contains(f.name())) .map(f -> new Column(f.name())) .toArray(Column[]::new)); } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java index 3d2bf75e700e..82a065073d94 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.avro.generic.GenericData.Record; @@ -57,6 +58,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -153,7 +155,10 @@ public void testUnpartitionedTable() throws Exception { assertThat(expectedDataManifests).as("Should have 1 data manifest").hasSize(1); assertThat(expectedDeleteManifests).as("Should have 1 delete manifest").hasSize(1); - Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); + Schema entriesTableSchema = + TypeUtil.selectNot( + Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(), + Set.of(DataFile.FIRST_ROW_ID.fieldId())); Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".files").schema(); // check delete files table @@ -290,7 +295,10 @@ public void testPartitionedTable() throws Exception { sql("DELETE FROM %s WHERE id=1 AND data='b'", tableName); Table table = Spark3Util.loadIcebergTable(spark, tableName); - Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); + Schema entriesTableSchema = + TypeUtil.selectNot( + Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(), + Set.of(DataFile.FIRST_ROW_ID.fieldId())); List expectedDataManifests = TestHelpers.dataManifests(table); List expectedDeleteManifests = TestHelpers.deleteManifests(table); @@ -308,7 +316,7 @@ public void testPartitionedTable() throws Exception { Dataset actualDeleteFilesDs = spark.sql("SELECT * FROM " + tableName + ".delete_files " + "WHERE partition.data='a'"); - List actualDeleteFiles = actualDeleteFilesDs.collectAsList(); + List actualDeleteFiles = TestHelpers.selectNonDerived(actualDeleteFilesDs).collectAsList(); assertThat(actualDeleteFiles).as("Metadata table should return one delete file").hasSize(1); TestHelpers.assertEqualsSafe( @@ -387,7 +395,10 @@ public void testAllFilesUnpartitioned() throws Exception { List results = sql("DELETE FROM %s", tableName); assertThat(results).as("Table should be cleared").isEmpty(); - Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); + Schema entriesTableSchema = + TypeUtil.selectNot( + Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(), + Set.of(DataFile.FIRST_ROW_ID.fieldId())); Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".all_data_files").schema(); @@ -421,7 +432,7 @@ public void testAllFilesUnpartitioned() throws Exception { // Check all files table Dataset actualFilesDs = spark.sql("SELECT * FROM " + tableName + ".all_files ORDER BY content"); - List actualFiles = actualFilesDs.collectAsList(); + List actualFiles = TestHelpers.selectNonDerived(actualFilesDs).collectAsList(); List expectedFiles = ListUtils.union(expectedDataFiles, expectedDeleteFiles); expectedFiles.sort(Comparator.comparing(r -> ((Integer) r.get("content")))); assertThat(actualFiles).as("Metadata table should return two files").hasSize(2); @@ -469,7 +480,10 @@ public void testAllFilesPartitioned() throws Exception { List results = sql("DELETE FROM %s", tableName); assertThat(results).as("Table should be cleared").isEmpty(); - Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); + Schema entriesTableSchema = + TypeUtil.selectNot( + Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(), + Set.of(DataFile.FIRST_ROW_ID.fieldId())); Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".all_data_files").schema(); @@ -653,7 +667,10 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception { + currentSnapshotId + " ORDER BY content"); List actualFiles = TestHelpers.selectNonDerived(actualFilesDs).collectAsList(); - Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); + Schema entriesTableSchema = + TypeUtil.selectNot( + Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(), + Set.of(DataFile.FIRST_ROW_ID.fieldId())); List expectedDataManifests = TestHelpers.dataManifests(table); List expectedFiles = expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, null); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index 9b496ea52c15..6111e1b0c38b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -892,11 +892,14 @@ public static void asMetadataRecord(GenericData.Record file) { file.put(3, 0); // specId } + // suppress the readable metrics and first-row-id that are not in manifest files + private static final Set DERIVED_FIELDS = Set.of("readable_metrics", "first_row_id"); + public static Dataset selectNonDerived(Dataset metadataTable) { StructField[] fields = metadataTable.schema().fields(); return metadataTable.select( Stream.of(fields) - .filter(f -> !f.name().equals("readable_metrics")) // derived field + .filter(f -> !DERIVED_FIELDS.contains(f.name())) .map(f -> new Column(f.name())) .toArray(Column[]::new)); }