diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index 52280a41620f..8cdc63422be7 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -178,8 +178,7 @@ default Integer schemaId() { * value were created in a snapshot that was added to the table (but not necessarily commited to * this branch) in the past. * - * @return the first row-id to be used in this snapshot or null if row lineage was not enabled - * when the table was created. + * @return the first row-id to be used in this snapshot or null when row lineage is not supported */ default Long firstRowId() { return null; @@ -189,7 +188,7 @@ default Long firstRowId() { * The total number of newly added rows in this snapshot. It should be the summation of {@link * ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this snapshot. * - *

This field is optional but is required when row lineage is enabled. + *

This field is optional but is required when the table version supports row lineage. * * @return the total number of new rows in this snapshot or null if the value was not stored. */ diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index c3c1159ef8df..f29ecfa58271 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -66,6 +66,17 @@ class BaseSnapshot implements Snapshot { String manifestList, Long firstRowId, Long addedRows) { + Preconditions.checkArgument( + firstRowId == null || firstRowId >= 0, + "Invalid first-row-id (cannot be negative): %s", + firstRowId); + Preconditions.checkArgument( + addedRows == null || addedRows >= 0, + "Invalid added-rows (cannot be negative): %s", + addedRows); + Preconditions.checkArgument( + firstRowId == null || addedRows != null, + "Invalid added-rows (required when first-row-id is set): null"); this.sequenceNumber = sequenceNumber; this.snapshotId = snapshotId; this.parentId = parentId; @@ -76,7 +87,7 @@ class BaseSnapshot implements Snapshot { this.manifestListLocation = manifestList; this.v1ManifestLocations = null; this.firstRowId = firstRowId; - this.addedRows = addedRows; + this.addedRows = firstRowId != null ? addedRows : null; } BaseSnapshot( diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 8f90b5691a1a..c1b910312495 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -521,6 +521,12 @@ public void applyTo(ViewMetadata.Builder viewMetadataBuilder) { } } + /** + * Update to enable row lineage. + * + * @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables. + */ + @Deprecated class EnableRowLineage implements MetadataUpdate { @Override public void applyTo(TableMetadata.Builder metadataBuilder) { diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java index 19c48de958bb..56e2b9781e79 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java @@ -61,7 +61,6 @@ private MetadataUpdateParser() {} static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics"; static final String REMOVE_PARTITION_SPECS = "remove-partition-specs"; static final String REMOVE_SCHEMAS = "remove-schemas"; - static final String ENABLE_ROW_LINEAGE = "enable-row-lineage"; // AssignUUID private static final String UUID = "uuid"; @@ -160,7 +159,6 @@ private MetadataUpdateParser() {} .put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION) .put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS) .put(MetadataUpdate.RemoveSchemas.class, REMOVE_SCHEMAS) - .put(MetadataUpdate.EnableRowLineage.class, ENABLE_ROW_LINEAGE) .buildOrThrow(); public static String toJson(MetadataUpdate metadataUpdate) { @@ -259,8 +257,6 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator case REMOVE_SCHEMAS: writeRemoveSchemas((MetadataUpdate.RemoveSchemas) metadataUpdate, generator); break; - case ENABLE_ROW_LINEAGE: - break; default: throw new IllegalArgumentException( String.format( @@ -336,8 +332,6 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) { return readRemovePartitionSpecs(jsonNode); case REMOVE_SCHEMAS: return readRemoveSchemas(jsonNode); - case ENABLE_ROW_LINEAGE: - return new MetadataUpdate.EnableRowLineage(); default: throw new UnsupportedOperationException( String.format("Cannot convert metadata update action to json: %s", action)); diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index 714c0b3bfe67..e38683161d20 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -131,9 +131,8 @@ public static TableMetadata replacePaths( updatePathInStatisticsFiles(metadata.statisticsFiles(), sourcePrefix, targetPrefix), // TODO: update partition statistics file paths metadata.partitionStatisticsFiles(), - metadata.changes(), - metadata.rowLineageEnabled(), - metadata.nextRowId()); + metadata.nextRowId(), + metadata.changes()); } private static Map updateProperties( diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 6703a04dc69f..9f409457364f 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -284,10 +284,10 @@ public Snapshot apply() { } Long addedRows = null; - Long lastRowId = null; - if (base.rowLineageEnabled()) { + Long firstRowId = null; + if (base.formatVersion() >= 3) { addedRows = calculateAddedRows(manifests); - lastRowId = base.nextRowId(); + firstRowId = base.nextRowId(); } return new BaseSnapshot( @@ -299,7 +299,7 @@ public Snapshot apply() { summary(base), base.currentSchemaId(), manifestList.location(), - lastRowId, + firstRowId, addedRows); } @@ -309,6 +309,7 @@ private Long calculateAddedRows(List manifests) { manifest -> manifest.snapshotId() == null || Objects.equals(manifest.snapshotId(), this.snapshotId)) + .filter(manifest -> manifest.content() == ManifestContent.DATA) .mapToLong( manifest -> { Preconditions.checkArgument( diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 61a127fed697..0951f4cd1974 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -53,12 +53,11 @@ public class TableMetadata implements Serializable { static final long INVALID_SEQUENCE_NUMBER = -1; static final int DEFAULT_TABLE_FORMAT_VERSION = 2; static final int SUPPORTED_TABLE_FORMAT_VERSION = 3; + static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3; static final int INITIAL_SPEC_ID = 0; static final int INITIAL_SORT_ORDER_ID = 1; static final int INITIAL_SCHEMA_ID = 0; static final int INITIAL_ROW_ID = 0; - static final boolean DEFAULT_ROW_LINEAGE = false; - static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3; private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1); @@ -133,11 +132,6 @@ static TableMetadata newTableMetadata( int freshSortOrderId = sortOrder.isUnsorted() ? sortOrder.orderId() : INITIAL_SORT_ORDER_ID; SortOrder freshSortOrder = freshSortOrder(freshSortOrderId, freshSchema, sortOrder); - // configure row lineage using table properties - Boolean rowLineage = - PropertyUtil.propertyAsBoolean( - properties, TableProperties.ROW_LINEAGE, DEFAULT_ROW_LINEAGE); - // Validate the metrics configuration. Note: we only do this on new tables to we don't // break existing tables. MetricsConfig.fromProperties(properties).validateReferencedColumns(schema); @@ -151,7 +145,6 @@ static TableMetadata newTableMetadata( .setDefaultSortOrder(freshSortOrder) .setLocation(location) .setProperties(properties) - .setRowLineage(rowLineage) .build(); } @@ -266,13 +259,12 @@ public String toString() { private final List statisticsFiles; private final List partitionStatisticsFiles; private final List changes; + private final long nextRowId; private SerializableSupplier> snapshotsSupplier; private volatile List snapshots; private volatile Map snapshotsById; private volatile Map refs; private volatile boolean snapshotsLoaded; - private final Boolean rowLineageEnabled; - private final long nextRowId; @SuppressWarnings("checkstyle:CyclomaticComplexity") TableMetadata( @@ -299,9 +291,8 @@ public String toString() { Map refs, List statisticsFiles, List partitionStatisticsFiles, - List changes, - boolean rowLineageEnabled, - long nextRowId) { + long nextRowId, + List changes) { Preconditions.checkArgument( specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty"); Preconditions.checkArgument( @@ -320,10 +311,6 @@ public String toString() { Preconditions.checkArgument( metadataFileLocation == null || changes.isEmpty(), "Cannot create TableMetadata with a metadata location and changes"); - Preconditions.checkArgument( - formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE || !rowLineageEnabled, - "Cannot enable row lineage when Table Version is less than V3. Table Version is %s", - formatVersion); this.metadataFileLocation = metadataFileLocation; this.formatVersion = formatVersion; @@ -359,7 +346,6 @@ public String toString() { this.partitionStatisticsFiles = ImmutableList.copyOf(partitionStatisticsFiles); // row lineage - this.rowLineageEnabled = rowLineageEnabled; this.nextRowId = nextRowId; HistoryEntry last = null; @@ -584,8 +570,14 @@ public TableMetadata withUUID() { return new Builder(this).assignUUID().build(); } + /** + * Whether row lineage is enabled. + * + * @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables. + */ + @Deprecated public boolean rowLineageEnabled() { - return rowLineageEnabled; + return formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE; } public long nextRowId() { @@ -634,15 +626,10 @@ public TableMetadata replaceProperties(Map rawProperties) { int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion); - Boolean newRowLineage = - PropertyUtil.propertyAsBoolean( - rawProperties, TableProperties.ROW_LINEAGE, rowLineageEnabled); - return new Builder(this) .setProperties(updated) .removeProperties(removed) .upgradeFormatVersion(newFormatVersion) - .setRowLineage(newRowLineage) .build(); } @@ -927,7 +914,6 @@ public static class Builder { private final Map> statisticsFiles; private final Map> partitionStatisticsFiles; private boolean suppressHistoricalSnapshots = false; - private boolean rowLineage; private long nextRowId; // change tracking @@ -975,7 +961,6 @@ private Builder(int formatVersion) { this.schemasById = Maps.newHashMap(); this.specsById = Maps.newHashMap(); this.sortOrdersById = Maps.newHashMap(); - this.rowLineage = DEFAULT_ROW_LINEAGE; this.nextRowId = INITIAL_ROW_ID; } @@ -1011,10 +996,25 @@ private Builder(TableMetadata base) { this.specsById = Maps.newHashMap(base.specsById); this.sortOrdersById = Maps.newHashMap(base.sortOrdersById); - this.rowLineage = base.rowLineageEnabled; this.nextRowId = base.nextRowId; } + /** + * Enables row lineage in v3 tables. + * + * @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables. + */ + @Deprecated + public Builder enableRowLineage() { + if (formatVersion < MIN_FORMAT_VERSION_ROW_LINEAGE) { + throw new UnsupportedOperationException( + "Cannot enable row lineage for format-version=" + formatVersion); + } + + // otherwise this is a no-op + return null; + } + public Builder withMetadataLocation(String newMetadataLocation) { this.metadataLocation = newMetadataLocation; if (null != base) { @@ -1269,18 +1269,14 @@ public Builder addSnapshot(Snapshot snapshot) { snapshotsById.put(snapshot.snapshotId(), snapshot); changes.add(new MetadataUpdate.AddSnapshot(snapshot)); - if (rowLineage) { + if (formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE) { + ValidationException.check( + snapshot.firstRowId() != null, "Cannot add a snapshot: first-row-id is null"); ValidationException.check( - snapshot.firstRowId() >= nextRowId, - "Cannot add a snapshot whose 'first-row-id' (%s) is less than the metadata 'next-row-id' (%s) because this will end up generating duplicate row_ids.", + snapshot.firstRowId() != null && snapshot.firstRowId() >= nextRowId, + "Cannot add a snapshot, first-row-id is behind table next-row-id: %s < %s", snapshot.firstRowId(), nextRowId); - ValidationException.check( - snapshot.addedRows() != null, - "Cannot add a snapshot with a null 'added-rows' field when row lineage is enabled"); - Preconditions.checkArgument( - snapshot.addedRows() >= 0, - "Cannot decrease 'last-row-id'. 'last-row-id' must increase monotonically. Snapshot reports %s added rows"); this.nextRowId += snapshot.addedRows(); } @@ -1508,34 +1504,6 @@ public Builder setPreviousFileLocation(String previousFileLocation) { return this; } - private Builder setRowLineage(Boolean newRowLineage) { - if (newRowLineage == null) { - return this; - } - - boolean disablingRowLineage = rowLineage && !newRowLineage; - - Preconditions.checkArgument( - !disablingRowLineage, "Cannot disable row lineage once it has been enabled"); - - if (!rowLineage && newRowLineage) { - return enableRowLineage(); - } else { - return this; - } - } - - public Builder enableRowLineage() { - Preconditions.checkArgument( - formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE, - "Cannot use row lineage with format version %s. Only format version %s or higher support row lineage", - formatVersion, - MIN_FORMAT_VERSION_ROW_LINEAGE); - this.rowLineage = true; - changes.add(new MetadataUpdate.EnableRowLineage()); - return this; - } - private boolean hasChanges() { return changes.size() != startingChangeCount || (discardChanges && !changes.isEmpty()) @@ -1603,9 +1571,8 @@ public TableMetadata build() { partitionStatisticsFiles.values().stream() .flatMap(List::stream) .collect(Collectors.toList()), - discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes), - rowLineage, - nextRowId); + nextRowId, + discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes)); } private int addSchemaInternal(Schema schema, int newLastColumnId) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index adaa9442f4fc..6a3db715aa55 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -110,7 +110,6 @@ private TableMetadataParser() {} static final String METADATA_LOG = "metadata-log"; static final String STATISTICS = "statistics"; static final String PARTITION_STATISTICS = "partition-statistics"; - static final String ROW_LINEAGE = "row-lineage"; static final String NEXT_ROW_ID = "next-row-id"; static final int MIN_NULL_CURRENT_SNAPSHOT_VERSION = 3; @@ -226,8 +225,7 @@ public static void toJson(TableMetadata metadata, JsonGenerator generator) throw } } - if (metadata.rowLineageEnabled()) { - generator.writeBooleanField(ROW_LINEAGE, metadata.rowLineageEnabled()); + if (metadata.formatVersion() >= 3) { generator.writeNumberField(NEXT_ROW_ID, metadata.nextRowId()); } @@ -465,12 +463,10 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) { currentSnapshotId = -1L; } - Boolean rowLineage = JsonUtil.getBoolOrNull(ROW_LINEAGE, node); long lastRowId; - if (rowLineage != null && rowLineage) { + if (formatVersion >= 3) { lastRowId = JsonUtil.getLong(NEXT_ROW_ID, node); } else { - rowLineage = TableMetadata.DEFAULT_ROW_LINEAGE; lastRowId = TableMetadata.INITIAL_ROW_ID; } @@ -565,9 +561,8 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) { refs, statisticsFiles, partitionStatisticsFiles, - ImmutableList.of() /* no changes from the file */, - rowLineage, - lastRowId); + lastRowId, + ImmutableList.of() /* no changes from the file */); } private static Map refsFromJson(JsonNode refMap) { diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index cd7cda23c2d3..61b4b5a489b5 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -389,5 +389,10 @@ private TableProperties() {} public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; - public static final String ROW_LINEAGE = "row-lineage"; + /** + * Property to enable row lineage. + * + * @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables. + */ + @Deprecated public static final String ROW_LINEAGE = "row-lineage"; } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index 4d66163d6c79..c62309ef98e5 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -941,17 +941,6 @@ public void testRemoveSchemas() { .isEqualTo(json); } - @Test - public void testEnableRowLineage() { - String action = MetadataUpdateParser.ENABLE_ROW_LINEAGE; - String json = "{\"action\":\"enable-row-lineage\"}"; - MetadataUpdate expected = new MetadataUpdate.EnableRowLineage(); - assertEquals(action, expected, MetadataUpdateParser.fromJson(json)); - assertThat(MetadataUpdateParser.toJson(expected)) - .as("Enable row lineage should convert to the correct JSON value") - .isEqualTo(json); - } - public void assertEquals( String action, MetadataUpdate expectedUpdate, MetadataUpdate actualUpdate) { switch (action) { @@ -1066,9 +1055,6 @@ public void assertEquals( (MetadataUpdate.RemoveSchemas) expectedUpdate, (MetadataUpdate.RemoveSchemas) actualUpdate); break; - case MetadataUpdateParser.ENABLE_ROW_LINEAGE: - assertThat(actualUpdate).isInstanceOf(MetadataUpdate.EnableRowLineage.class); - break; default: fail("Unrecognized metadata update action: " + action); } diff --git a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java index fd8e00078993..e11d987581d7 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java @@ -26,17 +26,16 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.primitives.Ints; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; @ExtendWith(ParameterizedTestExtension.class) public class TestRowLineageMetadata { - @Parameters(name = "formatVersion = {0}") private static List formatVersion() { return Ints.asList(TestHelpers.ALL_VERSIONS); @@ -55,7 +54,6 @@ private static List formatVersion() { private TableMetadata baseMetadata() { return TableMetadata.buildFromEmpty(formatVersion) - .enableRowLineage() .addSchema(TEST_SCHEMA) .setLocation(TEST_LOCATION) .addPartitionSpec(PartitionSpec.unpartitioned()) @@ -70,15 +68,42 @@ public void cleanup() { TestTables.clearTables(); } - @TestTemplate - public void testRowLineageSupported() { - if (formatVersion >= TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE) { - assertThat(TableMetadata.buildFromEmpty(formatVersion).enableRowLineage()).isNotNull(); - } else { - assertThatThrownBy(() -> TableMetadata.buildFromEmpty(formatVersion).enableRowLineage()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Cannot use row lineage"); - } + @Test + public void testSnapshotRowIDValidation() { + Snapshot snapshot = + new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", null, null); + assertThat(snapshot.firstRowId()).isNull(); + assertThat(snapshot.addedRows()).isNull(); + + // added-rows will be set to null if first-row-id is null + snapshot = + new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", null, 10L); + assertThat(snapshot.firstRowId()).isNull(); + assertThat(snapshot.addedRows()).isNull(); + + // added-rows and first-row-id can be 0 + snapshot = new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", 0L, 0L); + assertThat(snapshot.firstRowId()).isEqualTo(0); + assertThat(snapshot.addedRows()).isEqualTo(0); + + assertThatThrownBy( + () -> + new BaseSnapshot( + 0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", 10L, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid added-rows (required when first-row-id is set): null"); + + assertThatThrownBy( + () -> + new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", 0L, -1L)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid added-rows (cannot be negative): -1"); + + assertThatThrownBy( + () -> + new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", -1L, 1L)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid first-row-id (cannot be negative): -1"); } @TestTemplate @@ -116,21 +141,26 @@ public void testInvalidSnapshotAddition() { TableMetadata base = baseMetadata(); Snapshot invalidLastRow = - new BaseSnapshot( - 0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", base.nextRowId() - 3, newRows); + new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", null, newRows); assertThatThrownBy(() -> TableMetadata.buildFrom(base).addSnapshot(invalidLastRow)) .isInstanceOf(ValidationException.class) - .hasMessageContaining("Cannot add a snapshot whose 'first-row-id'"); + .hasMessageContaining("Cannot add a snapshot: first-row-id is null"); + + // add rows to check TableMetadata validation; Snapshot rejects negative next-row-id + Snapshot addRows = + new BaseSnapshot( + 0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", base.nextRowId(), newRows); + TableMetadata added = TableMetadata.buildFrom(base).addSnapshot(addRows).build(); Snapshot invalidNewRows = new BaseSnapshot( - 0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", base.nextRowId(), null); + 1, 2, 1L, 0, DataOperations.APPEND, null, 1, "foo", added.nextRowId() - 1, 10L); - assertThatThrownBy(() -> TableMetadata.buildFrom(base).addSnapshot(invalidNewRows)) + assertThatThrownBy(() -> TableMetadata.buildFrom(added).addSnapshot(invalidNewRows)) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Cannot add a snapshot with a null 'added-rows' field when row lineage is enabled"); + "Cannot add a snapshot, first-row-id is behind table next-row-id: 29 < 30"); } @TestTemplate @@ -140,10 +170,7 @@ public void testFastAppend() { TestTables.TestTable table = TestTables.create( tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - TableMetadata base = table.ops().current(); - table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); assertThat(table.ops().current().nextRowId()).isEqualTo(0L); table.newFastAppend().appendFile(fileWithRows(30)).commit(); @@ -164,10 +191,7 @@ public void testAppend() { TestTables.TestTable table = TestTables.create( tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - TableMetadata base = table.ops().current(); - table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); assertThat(table.ops().current().nextRowId()).isEqualTo(0L); table.newAppend().appendFile(fileWithRows(30)).commit(); @@ -193,10 +217,6 @@ public void testAppendBranch() { TestTables.create( tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - TableMetadata base = table.ops().current(); - table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); - - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); assertThat(table.ops().current().nextRowId()).isEqualTo(0L); // Write to Branch @@ -225,10 +245,7 @@ public void testDeletes() { TestTables.TestTable table = TestTables.create( tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - TableMetadata base = table.ops().current(); - table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); assertThat(table.ops().current().nextRowId()).isEqualTo(0L); DataFile file = fileWithRows(30); @@ -241,85 +258,111 @@ public void testDeletes() { table.newDelete().deleteFile(file).commit(); // Deleting a file should create a new snapshot which should inherit last-row-id from the - // previous metadata and not - // change last-row-id for this metadata. + // previous metadata and not change last-row-id for this metadata. assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); assertThat(table.currentSnapshot().addedRows()).isEqualTo(0); assertThat(table.ops().current().nextRowId()).isEqualTo(30); } @TestTemplate - public void testReplace() { + public void testPositionDeletes() { assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); TestTables.TestTable table = TestTables.create( tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - TableMetadata base = table.ops().current(); - table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); - - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); assertThat(table.ops().current().nextRowId()).isEqualTo(0L); - DataFile filePart1 = fileWithRows(30); - DataFile filePart2 = fileWithRows(30); - DataFile fileCompacted = fileWithRows(60); + DataFile file = fileWithRows(30); - table.newAppend().appendFile(filePart1).appendFile(filePart2).commit(); + table.newAppend().appendFile(file).commit(); assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); - assertThat(table.ops().current().nextRowId()).isEqualTo(60); - - table.newRewrite().deleteFile(filePart1).deleteFile(filePart2).addFile(fileCompacted).commit(); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); - // Rewrites are currently just treated as appends. In the future we could treat these as no-ops - assertThat(table.currentSnapshot().firstRowId()).isEqualTo(60); - assertThat(table.ops().current().nextRowId()).isEqualTo(120); + // v3 only allows Puffin-based DVs for position deletes + DeleteFile deletes = + FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned()) + .ofPositionDeletes() + .withFormat(FileFormat.PUFFIN) + .withFileSizeInBytes(100) + .withRecordCount(10) + .withContentOffset(0) + .withContentSizeInBytes(50) + .withPath("deletes.puffin") + .withReferencedDataFile(file.location()) + .build(); + + table.newRowDelta().addDeletes(deletes).commit(); + + // Delete file records do not count toward added rows + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); } @TestTemplate - public void testEnableRowLineageViaProperty() { + public void testEqualityDeletes() { assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); TestTables.TestTable table = TestTables.create( tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - assertThat(table.ops().current().rowLineageEnabled()).isFalse(); + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); - // No-op - table.updateProperties().set(TableProperties.ROW_LINEAGE, "false").commit(); - assertThat(table.ops().current().rowLineageEnabled()).isFalse(); + DataFile file = fileWithRows(30); - // Enable row lineage - table.updateProperties().set(TableProperties.ROW_LINEAGE, "true").commit(); - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); + table.newAppend().appendFile(file).commit(); - // Disabling row lineage is not allowed - assertThatThrownBy( - () -> table.updateProperties().set(TableProperties.ROW_LINEAGE, "false").commit()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Cannot disable row lineage once it has been enabled"); + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + + DeleteFile deletes = + FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned()) + .ofEqualityDeletes(table.schema().findField("x").fieldId()) + .withFormat(FileFormat.PARQUET) + .withFileSizeInBytes(100) + .withRecordCount(10) + .withPath("deletes.parquet") + .withReferencedDataFile(file.location()) + .build(); - // No-op - table.updateProperties().set(TableProperties.ROW_LINEAGE, "true").commit(); - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); + table.newRowDelta().addDeletes(deletes).commit(); + + // Delete file records do not count toward added rows + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); } @TestTemplate - public void testEnableRowLineageViaPropertyAtTableCreation() { + public void testReplace() { assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); TestTables.TestTable table = TestTables.create( - tableDir, - "test", - TEST_SCHEMA, - PartitionSpec.unpartitioned(), - formatVersion, - ImmutableMap.of(TableProperties.ROW_LINEAGE, "true")); - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + DataFile filePart1 = fileWithRows(30); + DataFile filePart2 = fileWithRows(30); + DataFile fileCompacted = fileWithRows(60); + + table.newAppend().appendFile(filePart1).appendFile(filePart2).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(60); + assertThat(table.ops().current().nextRowId()).isEqualTo(60); + + table.newRewrite().deleteFile(filePart1).deleteFile(filePart2).addFile(fileCompacted).commit(); + + // Rewrites are currently just treated as appends. In the future we could treat these as no-ops + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(60); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(60); + assertThat(table.ops().current().nextRowId()).isEqualTo(120); } private final AtomicInteger fileNum = new AtomicInteger(0); diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index cb1ac9b953a2..56c4676dfb1c 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -197,9 +197,8 @@ public void testJsonConversion() throws Exception { refs, statisticsFiles, partitionStatisticsFiles, - ImmutableList.of(), - true, - 40); + 40, + ImmutableList.of()); String asJson = TableMetadataParser.toJson(expected); TableMetadata metadata = TableMetadataParser.fromJson(asJson); @@ -232,7 +231,6 @@ public void testJsonConversion() throws Exception { assertThat(metadata.statisticsFiles()).isEqualTo(statisticsFiles); assertThat(metadata.partitionStatisticsFiles()).isEqualTo(partitionStatisticsFiles); assertThat(metadata.refs()).isEqualTo(refs); - assertThat(metadata.rowLineageEnabled()).isEqualTo(expected.rowLineageEnabled()); assertThat(metadata.nextRowId()).isEqualTo(expected.nextRowId()); } @@ -301,9 +299,8 @@ public void testBackwardCompat() throws Exception { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0); + 0, + ImmutableList.of()); String asJson = toJsonWithoutSpecAndSchemaList(expected); TableMetadata metadata = TableMetadataParser.fromJson(asJson); @@ -342,7 +339,6 @@ public void testBackwardCompat() throws Exception { .isEqualTo(previousSnapshot.allManifests(ops.io())); assertThat(metadata.previousFiles()).isEqualTo(expected.previousFiles()); assertThat(metadata.snapshot(previousSnapshotId).schemaId()).isNull(); - assertThat(metadata.rowLineageEnabled()).isEqualTo(expected.rowLineageEnabled()); assertThat(metadata.nextRowId()).isEqualTo(expected.nextRowId()); } @@ -424,9 +420,8 @@ public void testInvalidMainBranch() throws IOException { refs, ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L)) + 0L, + ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Current snapshot ID does not match main branch"); } @@ -472,9 +467,8 @@ public void testMainWithoutCurrent() throws IOException { refs, ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L)) + 0L, + ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Current snapshot is not set, but main branch exists"); } @@ -514,9 +508,8 @@ public void testBranchSnapshotMissing() { refs, ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L)) + 0L, + ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessageEndingWith("does not exist in the existing snapshots list"); } @@ -631,9 +624,8 @@ public void testJsonWithPreviousMetadataLog() throws Exception { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L); + 0L, + ImmutableList.of()); String asJson = TableMetadataParser.toJson(base); TableMetadata metadataFromJson = TableMetadataParser.fromJson(asJson); @@ -720,9 +712,8 @@ public void testAddPreviousMetadataRemoveNone() throws IOException { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L); + 0L, + ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -824,9 +815,8 @@ public void testAddPreviousMetadataRemoveOne() throws IOException { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L); + 0L, + ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -932,9 +922,8 @@ public void testAddPreviousMetadataRemoveMultiple() throws IOException { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L); + 0L, + ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -980,9 +969,8 @@ public void testV2UUIDValidation() { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L)) + 0L, + ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("UUID is required in format v2"); } @@ -1017,9 +1005,8 @@ public void testVersionValidation() { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L)) + 0L, + ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "Unsupported format version: v%s (supported: v%s)", @@ -1065,9 +1052,8 @@ public void testVersionValidation() { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L)) + 0L, + ImmutableList.of())) .isNotNull(); assertThat( diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java index 2f3c4380f8ac..89982c4e9b20 100644 --- a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.rest.responses; -import static org.apache.iceberg.TestHelpers.MAX_FORMAT_VERSION; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -138,8 +137,8 @@ public void roundTripSerdeV1() { } @ParameterizedTest - @ValueSource(ints = {2, MAX_FORMAT_VERSION}) - public void roundTripSerdeV2andHigher(int formatVersion) { + @ValueSource(ints = 3) + public void roundTripSerdeV3andHigher(int formatVersion) { String uuid = "386b9f01-002b-4d8c-b77f-42c3fd3b7c9b"; TableMetadata metadata = TableMetadata.buildFromEmpty(formatVersion) @@ -189,7 +188,8 @@ public void roundTripSerdeV2andHigher(int formatVersion) { + " \"fields\" : [ ]\n" + " } ],\n" + " \"properties\" : { },\n" - + " \"current-snapshot-id\" : %s,\n" + + " \"current-snapshot-id\" : null,\n" + + " \"next-row-id\" : 0,\n" + " \"refs\" : { },\n" + " \"snapshots\" : [ ],\n" + " \"statistics\" : [ ],\n" @@ -198,7 +198,76 @@ public void roundTripSerdeV2andHigher(int formatVersion) { + " \"metadata-log\" : [ ]\n" + " }\n" + "}", - formatVersion, metadata.lastUpdatedMillis(), formatVersion >= 3 ? "null" : "-1"); + formatVersion, metadata.lastUpdatedMillis()); + + String json = LoadTableResponseParser.toJson(response, true); + assertThat(json).isEqualTo(expectedJson); + // can't do an equality comparison because Schema doesn't implement equals/hashCode + assertThat(LoadTableResponseParser.toJson(LoadTableResponseParser.fromJson(json), true)) + .isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeV2() { + String uuid = "386b9f01-002b-4d8c-b77f-42c3fd3b7c9b"; + TableMetadata metadata = + TableMetadata.buildFromEmpty(2) + .assignUUID(uuid) + .setLocation("location") + .setCurrentSchema( + new Schema(Types.NestedField.required(1, "x", Types.LongType.get())), 1) + .addPartitionSpec(PartitionSpec.unpartitioned()) + .addSortOrder(SortOrder.unsorted()) + .discardChanges() + .withMetadataLocation("metadata-location") + .build(); + + LoadTableResponse response = LoadTableResponse.builder().withTableMetadata(metadata).build(); + + String expectedJson = + String.format( + "{\n" + + " \"metadata-location\" : \"metadata-location\",\n" + + " \"metadata\" : {\n" + + " \"format-version\" : 2,\n" + + " \"table-uuid\" : \"386b9f01-002b-4d8c-b77f-42c3fd3b7c9b\",\n" + + " \"location\" : \"location\",\n" + + " \"last-sequence-number\" : 0,\n" + + " \"last-updated-ms\" : %d,\n" + + " \"last-column-id\" : 1,\n" + + " \"current-schema-id\" : 0,\n" + + " \"schemas\" : [ {\n" + + " \"type\" : \"struct\",\n" + + " \"schema-id\" : 0,\n" + + " \"fields\" : [ {\n" + + " \"id\" : 1,\n" + + " \"name\" : \"x\",\n" + + " \"required\" : true,\n" + + " \"type\" : \"long\"\n" + + " } ]\n" + + " } ],\n" + + " \"default-spec-id\" : 0,\n" + + " \"partition-specs\" : [ {\n" + + " \"spec-id\" : 0,\n" + + " \"fields\" : [ ]\n" + + " } ],\n" + + " \"last-partition-id\" : 999,\n" + + " \"default-sort-order-id\" : 0,\n" + + " \"sort-orders\" : [ {\n" + + " \"order-id\" : 0,\n" + + " \"fields\" : [ ]\n" + + " } ],\n" + + " \"properties\" : { },\n" + + " \"current-snapshot-id\" : -1,\n" + + " \"refs\" : { },\n" + + " \"snapshots\" : [ ],\n" + + " \"statistics\" : [ ],\n" + + " \"partition-statistics\" : [ ],\n" + + " \"snapshot-log\" : [ ],\n" + + " \"metadata-log\" : [ ]\n" + + " }\n" + + "}", + metadata.lastUpdatedMillis()); String json = LoadTableResponseParser.toJson(response, true); assertThat(json).isEqualTo(expectedJson);