From a66942b361dca6305d4f60c2c002be2d94c5b127 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 20 Mar 2025 15:58:16 -0700 Subject: [PATCH 1/8] Core: Enable row lineage for all v3 tables. --- .../org/apache/iceberg/MetadataUpdate.java | 7 - .../apache/iceberg/MetadataUpdateParser.java | 6 - .../apache/iceberg/RewriteTablePathUtil.java | 5 +- .../org/apache/iceberg/SnapshotProducer.java | 2 +- .../org/apache/iceberg/TableMetadata.java | 68 +--- .../apache/iceberg/TableMetadataParser.java | 13 +- .../org/apache/iceberg/TableProperties.java | 2 - .../org/apache/iceberg/util/JsonUtil.java | 7 - .../iceberg/TestMetadataUpdateParser.java | 14 - .../iceberg/TestRowLineageMetadata.java | 334 ------------------ .../org/apache/iceberg/TestTableMetadata.java | 70 ++-- .../org/apache/iceberg/util/TestJsonUtil.java | 20 -- 12 files changed, 40 insertions(+), 508 deletions(-) delete mode 100644 core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 8f90b5691a1a..da8c627ce897 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -520,11 +520,4 @@ public void applyTo(ViewMetadata.Builder viewMetadataBuilder) { viewMetadataBuilder.setCurrentVersionId(versionId); } } - - class EnableRowLineage implements MetadataUpdate { - @Override - public void applyTo(TableMetadata.Builder metadataBuilder) { - metadataBuilder.enableRowLineage(); - } - } } diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java index 19c48de958bb..56e2b9781e79 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java @@ -61,7 +61,6 @@ private MetadataUpdateParser() {} static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics"; static final String REMOVE_PARTITION_SPECS = "remove-partition-specs"; static final String REMOVE_SCHEMAS = "remove-schemas"; - static final String ENABLE_ROW_LINEAGE = "enable-row-lineage"; // AssignUUID private static final String UUID = "uuid"; @@ -160,7 +159,6 @@ private MetadataUpdateParser() {} .put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION) .put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS) .put(MetadataUpdate.RemoveSchemas.class, REMOVE_SCHEMAS) - .put(MetadataUpdate.EnableRowLineage.class, ENABLE_ROW_LINEAGE) .buildOrThrow(); public static String toJson(MetadataUpdate metadataUpdate) { @@ -259,8 +257,6 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator case REMOVE_SCHEMAS: writeRemoveSchemas((MetadataUpdate.RemoveSchemas) metadataUpdate, generator); break; - case ENABLE_ROW_LINEAGE: - break; default: throw new IllegalArgumentException( String.format( @@ -336,8 +332,6 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) { return readRemovePartitionSpecs(jsonNode); case REMOVE_SCHEMAS: return readRemoveSchemas(jsonNode); - case ENABLE_ROW_LINEAGE: - return new MetadataUpdate.EnableRowLineage(); default: throw new UnsupportedOperationException( String.format("Cannot convert metadata update action to json: %s", action)); diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index 714c0b3bfe67..e38683161d20 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -131,9 +131,8 @@ public static TableMetadata replacePaths( updatePathInStatisticsFiles(metadata.statisticsFiles(), sourcePrefix, targetPrefix), // TODO: update partition statistics file paths metadata.partitionStatisticsFiles(), - metadata.changes(), - metadata.rowLineageEnabled(), - metadata.nextRowId()); + metadata.nextRowId(), + metadata.changes()); } private static Map updateProperties( diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 6703a04dc69f..928405024aa3 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -285,7 +285,7 @@ public Snapshot apply() { Long addedRows = null; Long lastRowId = null; - if (base.rowLineageEnabled()) { + if (base.formatVersion() >= 3) { addedRows = calculateAddedRows(manifests); lastRowId = base.nextRowId(); } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 61a127fed697..3f9db3438c87 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -57,8 +57,6 @@ public class TableMetadata implements Serializable { static final int INITIAL_SORT_ORDER_ID = 1; static final int INITIAL_SCHEMA_ID = 0; static final int INITIAL_ROW_ID = 0; - static final boolean DEFAULT_ROW_LINEAGE = false; - static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3; private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1); @@ -133,11 +131,6 @@ static TableMetadata newTableMetadata( int freshSortOrderId = sortOrder.isUnsorted() ? sortOrder.orderId() : INITIAL_SORT_ORDER_ID; SortOrder freshSortOrder = freshSortOrder(freshSortOrderId, freshSchema, sortOrder); - // configure row lineage using table properties - Boolean rowLineage = - PropertyUtil.propertyAsBoolean( - properties, TableProperties.ROW_LINEAGE, DEFAULT_ROW_LINEAGE); - // Validate the metrics configuration. Note: we only do this on new tables to we don't // break existing tables. MetricsConfig.fromProperties(properties).validateReferencedColumns(schema); @@ -151,7 +144,6 @@ static TableMetadata newTableMetadata( .setDefaultSortOrder(freshSortOrder) .setLocation(location) .setProperties(properties) - .setRowLineage(rowLineage) .build(); } @@ -266,13 +258,12 @@ public String toString() { private final List statisticsFiles; private final List partitionStatisticsFiles; private final List changes; + private final long nextRowId; private SerializableSupplier> snapshotsSupplier; private volatile List snapshots; private volatile Map snapshotsById; private volatile Map refs; private volatile boolean snapshotsLoaded; - private final Boolean rowLineageEnabled; - private final long nextRowId; @SuppressWarnings("checkstyle:CyclomaticComplexity") TableMetadata( @@ -299,9 +290,8 @@ public String toString() { Map refs, List statisticsFiles, List partitionStatisticsFiles, - List changes, - boolean rowLineageEnabled, - long nextRowId) { + long nextRowId, + List changes) { Preconditions.checkArgument( specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty"); Preconditions.checkArgument( @@ -320,10 +310,6 @@ public String toString() { Preconditions.checkArgument( metadataFileLocation == null || changes.isEmpty(), "Cannot create TableMetadata with a metadata location and changes"); - Preconditions.checkArgument( - formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE || !rowLineageEnabled, - "Cannot enable row lineage when Table Version is less than V3. Table Version is %s", - formatVersion); this.metadataFileLocation = metadataFileLocation; this.formatVersion = formatVersion; @@ -359,7 +345,6 @@ public String toString() { this.partitionStatisticsFiles = ImmutableList.copyOf(partitionStatisticsFiles); // row lineage - this.rowLineageEnabled = rowLineageEnabled; this.nextRowId = nextRowId; HistoryEntry last = null; @@ -584,10 +569,6 @@ public TableMetadata withUUID() { return new Builder(this).assignUUID().build(); } - public boolean rowLineageEnabled() { - return rowLineageEnabled; - } - public long nextRowId() { return nextRowId; } @@ -634,15 +615,10 @@ public TableMetadata replaceProperties(Map rawProperties) { int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion); - Boolean newRowLineage = - PropertyUtil.propertyAsBoolean( - rawProperties, TableProperties.ROW_LINEAGE, rowLineageEnabled); - return new Builder(this) .setProperties(updated) .removeProperties(removed) .upgradeFormatVersion(newFormatVersion) - .setRowLineage(newRowLineage) .build(); } @@ -927,7 +903,6 @@ public static class Builder { private final Map> statisticsFiles; private final Map> partitionStatisticsFiles; private boolean suppressHistoricalSnapshots = false; - private boolean rowLineage; private long nextRowId; // change tracking @@ -975,7 +950,6 @@ private Builder(int formatVersion) { this.schemasById = Maps.newHashMap(); this.specsById = Maps.newHashMap(); this.sortOrdersById = Maps.newHashMap(); - this.rowLineage = DEFAULT_ROW_LINEAGE; this.nextRowId = INITIAL_ROW_ID; } @@ -1011,7 +985,6 @@ private Builder(TableMetadata base) { this.specsById = Maps.newHashMap(base.specsById); this.sortOrdersById = Maps.newHashMap(base.sortOrdersById); - this.rowLineage = base.rowLineageEnabled; this.nextRowId = base.nextRowId; } @@ -1269,7 +1242,7 @@ public Builder addSnapshot(Snapshot snapshot) { snapshotsById.put(snapshot.snapshotId(), snapshot); changes.add(new MetadataUpdate.AddSnapshot(snapshot)); - if (rowLineage) { + if (formatVersion >= 3) { ValidationException.check( snapshot.firstRowId() >= nextRowId, "Cannot add a snapshot whose 'first-row-id' (%s) is less than the metadata 'next-row-id' (%s) because this will end up generating duplicate row_ids.", @@ -1508,34 +1481,6 @@ public Builder setPreviousFileLocation(String previousFileLocation) { return this; } - private Builder setRowLineage(Boolean newRowLineage) { - if (newRowLineage == null) { - return this; - } - - boolean disablingRowLineage = rowLineage && !newRowLineage; - - Preconditions.checkArgument( - !disablingRowLineage, "Cannot disable row lineage once it has been enabled"); - - if (!rowLineage && newRowLineage) { - return enableRowLineage(); - } else { - return this; - } - } - - public Builder enableRowLineage() { - Preconditions.checkArgument( - formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE, - "Cannot use row lineage with format version %s. Only format version %s or higher support row lineage", - formatVersion, - MIN_FORMAT_VERSION_ROW_LINEAGE); - this.rowLineage = true; - changes.add(new MetadataUpdate.EnableRowLineage()); - return this; - } - private boolean hasChanges() { return changes.size() != startingChangeCount || (discardChanges && !changes.isEmpty()) @@ -1603,9 +1548,8 @@ public TableMetadata build() { partitionStatisticsFiles.values().stream() .flatMap(List::stream) .collect(Collectors.toList()), - discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes), - rowLineage, - nextRowId); + nextRowId, + discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes)); } private int addSchemaInternal(Schema schema, int newLastColumnId) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index adaa9442f4fc..6a3db715aa55 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -110,7 +110,6 @@ private TableMetadataParser() {} static final String METADATA_LOG = "metadata-log"; static final String STATISTICS = "statistics"; static final String PARTITION_STATISTICS = "partition-statistics"; - static final String ROW_LINEAGE = "row-lineage"; static final String NEXT_ROW_ID = "next-row-id"; static final int MIN_NULL_CURRENT_SNAPSHOT_VERSION = 3; @@ -226,8 +225,7 @@ public static void toJson(TableMetadata metadata, JsonGenerator generator) throw } } - if (metadata.rowLineageEnabled()) { - generator.writeBooleanField(ROW_LINEAGE, metadata.rowLineageEnabled()); + if (metadata.formatVersion() >= 3) { generator.writeNumberField(NEXT_ROW_ID, metadata.nextRowId()); } @@ -465,12 +463,10 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) { currentSnapshotId = -1L; } - Boolean rowLineage = JsonUtil.getBoolOrNull(ROW_LINEAGE, node); long lastRowId; - if (rowLineage != null && rowLineage) { + if (formatVersion >= 3) { lastRowId = JsonUtil.getLong(NEXT_ROW_ID, node); } else { - rowLineage = TableMetadata.DEFAULT_ROW_LINEAGE; lastRowId = TableMetadata.INITIAL_ROW_ID; } @@ -565,9 +561,8 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) { refs, statisticsFiles, partitionStatisticsFiles, - ImmutableList.of() /* no changes from the file */, - rowLineage, - lastRowId); + lastRowId, + ImmutableList.of() /* no changes from the file */); } private static Map refsFromJson(JsonNode refMap) { diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index cd7cda23c2d3..c137bcd3a2c3 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -388,6 +388,4 @@ private TableProperties() {} public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; - - public static final String ROW_LINEAGE = "row-lineage"; } diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java index 37d48f5a02c5..da2564609c54 100644 --- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java @@ -149,13 +149,6 @@ public static long getLong(String property, JsonNode node) { return pNode.asLong(); } - public static Boolean getBoolOrNull(String property, JsonNode node) { - if (!node.hasNonNull(property)) { - return null; - } - return getBool(property, node); - } - public static boolean getBool(String property, JsonNode node) { Preconditions.checkArgument(node.has(property), "Cannot parse missing boolean: %s", property); JsonNode pNode = node.get(property); diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index 4d66163d6c79..c62309ef98e5 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -941,17 +941,6 @@ public void testRemoveSchemas() { .isEqualTo(json); } - @Test - public void testEnableRowLineage() { - String action = MetadataUpdateParser.ENABLE_ROW_LINEAGE; - String json = "{\"action\":\"enable-row-lineage\"}"; - MetadataUpdate expected = new MetadataUpdate.EnableRowLineage(); - assertEquals(action, expected, MetadataUpdateParser.fromJson(json)); - assertThat(MetadataUpdateParser.toJson(expected)) - .as("Enable row lineage should convert to the correct JSON value") - .isEqualTo(json); - } - public void assertEquals( String action, MetadataUpdate expectedUpdate, MetadataUpdate actualUpdate) { switch (action) { @@ -1066,9 +1055,6 @@ public void assertEquals( (MetadataUpdate.RemoveSchemas) expectedUpdate, (MetadataUpdate.RemoveSchemas) actualUpdate); break; - case MetadataUpdateParser.ENABLE_ROW_LINEAGE: - assertThat(actualUpdate).isInstanceOf(MetadataUpdate.EnableRowLineage.class); - break; default: fail("Unrecognized metadata update action: " + action); } diff --git a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java deleted file mode 100644 index fd8e00078993..000000000000 --- a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Assumptions.assumeThat; - -import java.io.File; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.primitives.Ints; -import org.apache.iceberg.types.Types; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; - -@ExtendWith(ParameterizedTestExtension.class) -public class TestRowLineageMetadata { - - @Parameters(name = "formatVersion = {0}") - private static List formatVersion() { - return Ints.asList(TestHelpers.ALL_VERSIONS); - } - - @Parameter private int formatVersion; - - private static final String TEST_LOCATION = "s3://bucket/test/location"; - - private static final Schema TEST_SCHEMA = - new Schema( - 7, - Types.NestedField.required(1, "x", Types.LongType.get()), - Types.NestedField.required(2, "y", Types.LongType.get(), "comment"), - Types.NestedField.required(3, "z", Types.LongType.get())); - - private TableMetadata baseMetadata() { - return TableMetadata.buildFromEmpty(formatVersion) - .enableRowLineage() - .addSchema(TEST_SCHEMA) - .setLocation(TEST_LOCATION) - .addPartitionSpec(PartitionSpec.unpartitioned()) - .addSortOrder(SortOrder.unsorted()) - .build(); - } - - @TempDir private File tableDir = null; - - @AfterEach - public void cleanup() { - TestTables.clearTables(); - } - - @TestTemplate - public void testRowLineageSupported() { - if (formatVersion >= TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE) { - assertThat(TableMetadata.buildFromEmpty(formatVersion).enableRowLineage()).isNotNull(); - } else { - assertThatThrownBy(() -> TableMetadata.buildFromEmpty(formatVersion).enableRowLineage()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Cannot use row lineage"); - } - } - - @TestTemplate - public void testSnapshotAddition() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); - - long newRows = 30L; - - TableMetadata base = baseMetadata(); - - Snapshot addRows = - new BaseSnapshot( - 0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", base.nextRowId(), newRows); - - TableMetadata firstAddition = TableMetadata.buildFrom(base).addSnapshot(addRows).build(); - - assertThat(firstAddition.nextRowId()).isEqualTo(newRows); - - Snapshot addMoreRows = - new BaseSnapshot( - 1, 2, 1L, 0, DataOperations.APPEND, null, 1, "foo", firstAddition.nextRowId(), newRows); - - TableMetadata secondAddition = - TableMetadata.buildFrom(firstAddition).addSnapshot(addMoreRows).build(); - - assertThat(secondAddition.nextRowId()).isEqualTo(newRows * 2); - } - - @TestTemplate - public void testInvalidSnapshotAddition() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); - - Long newRows = 30L; - - TableMetadata base = baseMetadata(); - - Snapshot invalidLastRow = - new BaseSnapshot( - 0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", base.nextRowId() - 3, newRows); - - assertThatThrownBy(() -> TableMetadata.buildFrom(base).addSnapshot(invalidLastRow)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("Cannot add a snapshot whose 'first-row-id'"); - - Snapshot invalidNewRows = - new BaseSnapshot( - 0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", base.nextRowId(), null); - - assertThatThrownBy(() -> TableMetadata.buildFrom(base).addSnapshot(invalidNewRows)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Cannot add a snapshot with a null 'added-rows' field when row lineage is enabled"); - } - - @TestTemplate - public void testFastAppend() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); - - TestTables.TestTable table = - TestTables.create( - tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - TableMetadata base = table.ops().current(); - table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); - - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); - assertThat(table.ops().current().nextRowId()).isEqualTo(0L); - - table.newFastAppend().appendFile(fileWithRows(30)).commit(); - - assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); - assertThat(table.ops().current().nextRowId()).isEqualTo(30); - - table.newFastAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit(); - - assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); - assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11); - } - - @TestTemplate - public void testAppend() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); - - TestTables.TestTable table = - TestTables.create( - tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - TableMetadata base = table.ops().current(); - table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); - - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); - assertThat(table.ops().current().nextRowId()).isEqualTo(0L); - - table.newAppend().appendFile(fileWithRows(30)).commit(); - - assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); - assertThat(table.ops().current().nextRowId()).isEqualTo(30); - - table.newAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit(); - - assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); - assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11); - } - - @TestTemplate - public void testAppendBranch() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); - // Appends to a branch should still change last-row-id even if not on main, these changes - // should also affect commits to main - - String branch = "some_branch"; - - TestTables.TestTable table = - TestTables.create( - tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - - TableMetadata base = table.ops().current(); - table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); - - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); - assertThat(table.ops().current().nextRowId()).isEqualTo(0L); - - // Write to Branch - table.newAppend().appendFile(fileWithRows(30)).toBranch(branch).commit(); - - assertThat(table.currentSnapshot()).isNull(); - assertThat(table.snapshot(branch).firstRowId()).isEqualTo(0L); - assertThat(table.ops().current().nextRowId()).isEqualTo(30); - - // Write to Main - table.newAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit(); - - assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); - assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11); - - // Write again to branch - table.newAppend().appendFile(fileWithRows(21)).toBranch(branch).commit(); - assertThat(table.snapshot(branch).firstRowId()).isEqualTo(30 + 17 + 11); - assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11 + 21); - } - - @TestTemplate - public void testDeletes() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); - - TestTables.TestTable table = - TestTables.create( - tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - TableMetadata base = table.ops().current(); - table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); - - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); - assertThat(table.ops().current().nextRowId()).isEqualTo(0L); - - DataFile file = fileWithRows(30); - - table.newAppend().appendFile(file).commit(); - - assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); - assertThat(table.ops().current().nextRowId()).isEqualTo(30); - - table.newDelete().deleteFile(file).commit(); - - // Deleting a file should create a new snapshot which should inherit last-row-id from the - // previous metadata and not - // change last-row-id for this metadata. - assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); - assertThat(table.currentSnapshot().addedRows()).isEqualTo(0); - assertThat(table.ops().current().nextRowId()).isEqualTo(30); - } - - @TestTemplate - public void testReplace() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); - - TestTables.TestTable table = - TestTables.create( - tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - TableMetadata base = table.ops().current(); - - table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); - - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); - assertThat(table.ops().current().nextRowId()).isEqualTo(0L); - - DataFile filePart1 = fileWithRows(30); - DataFile filePart2 = fileWithRows(30); - DataFile fileCompacted = fileWithRows(60); - - table.newAppend().appendFile(filePart1).appendFile(filePart2).commit(); - - assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); - assertThat(table.ops().current().nextRowId()).isEqualTo(60); - - table.newRewrite().deleteFile(filePart1).deleteFile(filePart2).addFile(fileCompacted).commit(); - - // Rewrites are currently just treated as appends. In the future we could treat these as no-ops - assertThat(table.currentSnapshot().firstRowId()).isEqualTo(60); - assertThat(table.ops().current().nextRowId()).isEqualTo(120); - } - - @TestTemplate - public void testEnableRowLineageViaProperty() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); - - TestTables.TestTable table = - TestTables.create( - tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - - assertThat(table.ops().current().rowLineageEnabled()).isFalse(); - - // No-op - table.updateProperties().set(TableProperties.ROW_LINEAGE, "false").commit(); - assertThat(table.ops().current().rowLineageEnabled()).isFalse(); - - // Enable row lineage - table.updateProperties().set(TableProperties.ROW_LINEAGE, "true").commit(); - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); - - // Disabling row lineage is not allowed - assertThatThrownBy( - () -> table.updateProperties().set(TableProperties.ROW_LINEAGE, "false").commit()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Cannot disable row lineage once it has been enabled"); - - // No-op - table.updateProperties().set(TableProperties.ROW_LINEAGE, "true").commit(); - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); - } - - @TestTemplate - public void testEnableRowLineageViaPropertyAtTableCreation() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); - - TestTables.TestTable table = - TestTables.create( - tableDir, - "test", - TEST_SCHEMA, - PartitionSpec.unpartitioned(), - formatVersion, - ImmutableMap.of(TableProperties.ROW_LINEAGE, "true")); - assertThat(table.ops().current().rowLineageEnabled()).isTrue(); - } - - private final AtomicInteger fileNum = new AtomicInteger(0); - - private DataFile fileWithRows(long numRows) { - return DataFiles.builder(PartitionSpec.unpartitioned()) - .withRecordCount(numRows) - .withFileSizeInBytes(numRows * 100) - .withPath("file://file_" + fileNum.incrementAndGet() + ".parquet") - .build(); - } -} diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index cb1ac9b953a2..94df6b0ce91b 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -197,9 +197,8 @@ public void testJsonConversion() throws Exception { refs, statisticsFiles, partitionStatisticsFiles, - ImmutableList.of(), - true, - 40); + 40, + ImmutableList.of()); String asJson = TableMetadataParser.toJson(expected); TableMetadata metadata = TableMetadataParser.fromJson(asJson); @@ -232,8 +231,6 @@ public void testJsonConversion() throws Exception { assertThat(metadata.statisticsFiles()).isEqualTo(statisticsFiles); assertThat(metadata.partitionStatisticsFiles()).isEqualTo(partitionStatisticsFiles); assertThat(metadata.refs()).isEqualTo(refs); - assertThat(metadata.rowLineageEnabled()).isEqualTo(expected.rowLineageEnabled()); - assertThat(metadata.nextRowId()).isEqualTo(expected.nextRowId()); } @Test @@ -301,9 +298,8 @@ public void testBackwardCompat() throws Exception { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0); + 0, + ImmutableList.of()); String asJson = toJsonWithoutSpecAndSchemaList(expected); TableMetadata metadata = TableMetadataParser.fromJson(asJson); @@ -342,8 +338,6 @@ public void testBackwardCompat() throws Exception { .isEqualTo(previousSnapshot.allManifests(ops.io())); assertThat(metadata.previousFiles()).isEqualTo(expected.previousFiles()); assertThat(metadata.snapshot(previousSnapshotId).schemaId()).isNull(); - assertThat(metadata.rowLineageEnabled()).isEqualTo(expected.rowLineageEnabled()); - assertThat(metadata.nextRowId()).isEqualTo(expected.nextRowId()); } @Test @@ -402,7 +396,7 @@ public void testInvalidMainBranch() throws IOException { () -> new TableMetadata( null, - MAX_FORMAT_VERSION, + 2, UUID.randomUUID().toString(), TEST_LOCATION, SEQ_NO, @@ -424,9 +418,8 @@ public void testInvalidMainBranch() throws IOException { refs, ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L)) + 0L, + ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Current snapshot ID does not match main branch"); } @@ -450,7 +443,7 @@ public void testMainWithoutCurrent() throws IOException { () -> new TableMetadata( null, - MAX_FORMAT_VERSION, + 2, UUID.randomUUID().toString(), TEST_LOCATION, SEQ_NO, @@ -472,9 +465,8 @@ public void testMainWithoutCurrent() throws IOException { refs, ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L)) + 0L, + ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Current snapshot is not set, but main branch exists"); } @@ -492,7 +484,7 @@ public void testBranchSnapshotMissing() { () -> new TableMetadata( null, - MAX_FORMAT_VERSION, + 2, UUID.randomUUID().toString(), TEST_LOCATION, SEQ_NO, @@ -514,9 +506,8 @@ public void testBranchSnapshotMissing() { refs, ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L)) + 0L, + ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessageEndingWith("does not exist in the existing snapshots list"); } @@ -631,9 +622,8 @@ public void testJsonWithPreviousMetadataLog() throws Exception { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L); + 0L, + ImmutableList.of()); String asJson = TableMetadataParser.toJson(base); TableMetadata metadataFromJson = TableMetadataParser.fromJson(asJson); @@ -720,9 +710,8 @@ public void testAddPreviousMetadataRemoveNone() throws IOException { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L); + 0L, + ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -824,9 +813,8 @@ public void testAddPreviousMetadataRemoveOne() throws IOException { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L); + 0L, + ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -932,9 +920,8 @@ public void testAddPreviousMetadataRemoveMultiple() throws IOException { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L); + 0L, + ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -980,9 +967,8 @@ public void testV2UUIDValidation() { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L)) + 0L, + ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("UUID is required in format v2"); } @@ -1017,9 +1003,8 @@ public void testVersionValidation() { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L)) + 0L, + ImmutableList.of())) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "Unsupported format version: v%s (supported: v%s)", @@ -1065,9 +1050,8 @@ public void testVersionValidation() { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), - false, - 0L)) + 0L, + ImmutableList.of())) .isNotNull(); assertThat( diff --git a/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java index 1c76865e5d4a..7702d691afd0 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java @@ -195,26 +195,6 @@ public void getBool() throws JsonProcessingException { assertThat(JsonUtil.getBool("x", JsonUtil.mapper().readTree("{\"x\": false}"))).isFalse(); } - @Test - public void getBoolOrNull() throws JsonProcessingException { - assertThatThrownBy( - () -> JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": \"23\"}"))) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot parse to a boolean value: x: \"23\""); - - assertThatThrownBy( - () -> JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": \"true\"}"))) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot parse to a boolean value: x: \"true\""); - - assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{}"))).isNull(); - - assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": null}"))).isNull(); - - assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": true}"))).isTrue(); - assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": false}"))).isFalse(); - } - @Test public void getIntArrayOrNull() throws JsonProcessingException { assertThat(JsonUtil.getIntArrayOrNull("items", JsonUtil.mapper().readTree("{}"))).isNull(); From aff717b9bf41228dc66ece7c2f3224a9e977aaf2 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 24 Mar 2025 13:33:28 -0700 Subject: [PATCH 2/8] Core: Add back getBoolOrNull to fix revapi. --- .../org/apache/iceberg/util/JsonUtil.java | 7 +++++++ .../org/apache/iceberg/util/TestJsonUtil.java | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java index da2564609c54..37d48f5a02c5 100644 --- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java @@ -149,6 +149,13 @@ public static long getLong(String property, JsonNode node) { return pNode.asLong(); } + public static Boolean getBoolOrNull(String property, JsonNode node) { + if (!node.hasNonNull(property)) { + return null; + } + return getBool(property, node); + } + public static boolean getBool(String property, JsonNode node) { Preconditions.checkArgument(node.has(property), "Cannot parse missing boolean: %s", property); JsonNode pNode = node.get(property); diff --git a/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java index 7702d691afd0..1c76865e5d4a 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java @@ -195,6 +195,26 @@ public void getBool() throws JsonProcessingException { assertThat(JsonUtil.getBool("x", JsonUtil.mapper().readTree("{\"x\": false}"))).isFalse(); } + @Test + public void getBoolOrNull() throws JsonProcessingException { + assertThatThrownBy( + () -> JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": \"23\"}"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse to a boolean value: x: \"23\""); + + assertThatThrownBy( + () -> JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": \"true\"}"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse to a boolean value: x: \"true\""); + + assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{}"))).isNull(); + + assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": null}"))).isNull(); + + assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": true}"))).isTrue(); + assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": false}"))).isFalse(); + } + @Test public void getIntArrayOrNull() throws JsonProcessingException { assertThat(JsonUtil.getIntArrayOrNull("items", JsonUtil.mapper().readTree("{}"))).isNull(); From c1954bb94cbb5ee58f3cf5dcab06d89c42902b8d Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 24 Mar 2025 16:05:40 -0700 Subject: [PATCH 3/8] Core: Add back row lineage metadata tests. --- .../java/org/apache/iceberg/Snapshot.java | 5 +- .../java/org/apache/iceberg/BaseSnapshot.java | 13 +- .../org/apache/iceberg/SnapshotProducer.java | 43 +- .../org/apache/iceberg/TableMetadata.java | 12 +- .../iceberg/TestRowLineageMetadata.java | 375 ++++++++++++++++++ 5 files changed, 426 insertions(+), 22 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index 52280a41620f..8cdc63422be7 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -178,8 +178,7 @@ default Integer schemaId() { * value were created in a snapshot that was added to the table (but not necessarily commited to * this branch) in the past. * - * @return the first row-id to be used in this snapshot or null if row lineage was not enabled - * when the table was created. + * @return the first row-id to be used in this snapshot or null when row lineage is not supported */ default Long firstRowId() { return null; @@ -189,7 +188,7 @@ default Long firstRowId() { * The total number of newly added rows in this snapshot. It should be the summation of {@link * ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this snapshot. * - *

This field is optional but is required when row lineage is enabled. + *

This field is optional but is required when the table version supports row lineage. * * @return the total number of new rows in this snapshot or null if the value was not stored. */ diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index c3c1159ef8df..f29ecfa58271 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -66,6 +66,17 @@ class BaseSnapshot implements Snapshot { String manifestList, Long firstRowId, Long addedRows) { + Preconditions.checkArgument( + firstRowId == null || firstRowId >= 0, + "Invalid first-row-id (cannot be negative): %s", + firstRowId); + Preconditions.checkArgument( + addedRows == null || addedRows >= 0, + "Invalid added-rows (cannot be negative): %s", + addedRows); + Preconditions.checkArgument( + firstRowId == null || addedRows != null, + "Invalid added-rows (required when first-row-id is set): null"); this.sequenceNumber = sequenceNumber; this.snapshotId = snapshotId; this.parentId = parentId; @@ -76,7 +87,7 @@ class BaseSnapshot implements Snapshot { this.manifestListLocation = manifestList; this.v1ManifestLocations = null; this.firstRowId = firstRowId; - this.addedRows = addedRows; + this.addedRows = firstRowId != null ? addedRows : null; } BaseSnapshot( diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 928405024aa3..7b7587b2ae93 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -75,6 +75,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.IntMath; import org.apache.iceberg.util.Exceptions; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; @@ -283,11 +284,14 @@ public Snapshot apply() { throw new RuntimeIOException(e, "Failed to write manifest list file"); } + Map summary = summary(); + String operation = operation(); + Long addedRows = null; - Long lastRowId = null; + Long firstRowId = null; if (base.formatVersion() >= 3) { - addedRows = calculateAddedRows(manifests); - lastRowId = base.nextRowId(); + addedRows = calculateAddedRows(operation, summary, manifests); + firstRowId = base.nextRowId(); } return new BaseSnapshot( @@ -295,20 +299,40 @@ public Snapshot apply() { snapshotId(), parentSnapshotId, System.currentTimeMillis(), - operation(), - summary(base), + operation, + summaryWithTotals(base, summary), base.currentSchemaId(), manifestList.location(), - lastRowId, + firstRowId, addedRows); } - private Long calculateAddedRows(List manifests) { + private Long calculateAddedRows( + String operation, Map summary, List manifests) { + if (summary != null) { + long addedRecords = + PropertyUtil.propertyAsLong(summary, SnapshotSummary.ADDED_RECORDS_PROP, 0L); + if (DataOperations.REPLACE.equals(operation)) { + long replacedRecords = + PropertyUtil.propertyAsLong(summary, SnapshotSummary.DELETED_RECORDS_PROP, 0L); + // added may be less than replaced when records are already deleted by delete files + Preconditions.checkArgument( + addedRecords <= replacedRecords, + "Invalid REPLACE operation: %s added records > %s replaced records", + addedRecords, + replacedRecords); + return 0L; + } + + return addedRecords; + } + return manifests.stream() .filter( manifest -> manifest.snapshotId() == null || Objects.equals(manifest.snapshotId(), this.snapshotId)) + .filter(manifest -> manifest.content() == ManifestContent.DATA) .mapToLong( manifest -> { Preconditions.checkArgument( @@ -324,9 +348,8 @@ private Long calculateAddedRows(List manifests) { protected abstract Map summary(); /** Returns the snapshot summary from the implementation and updates totals. */ - private Map summary(TableMetadata previous) { - Map summary = summary(); - + private Map summaryWithTotals( + TableMetadata previous, Map summary) { if (summary == null) { return ImmutableMap.of(); } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 3f9db3438c87..fdffba26578c 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -1244,16 +1244,12 @@ public Builder addSnapshot(Snapshot snapshot) { if (formatVersion >= 3) { ValidationException.check( - snapshot.firstRowId() >= nextRowId, - "Cannot add a snapshot whose 'first-row-id' (%s) is less than the metadata 'next-row-id' (%s) because this will end up generating duplicate row_ids.", + snapshot.firstRowId() != null, "Cannot add a snapshot: first-row-id is null"); + ValidationException.check( + snapshot.firstRowId() != null && snapshot.firstRowId() >= nextRowId, + "Cannot add a snapshot, first-row-id is behind table next-row-id: %s < %s", snapshot.firstRowId(), nextRowId); - ValidationException.check( - snapshot.addedRows() != null, - "Cannot add a snapshot with a null 'added-rows' field when row lineage is enabled"); - Preconditions.checkArgument( - snapshot.addedRows() >= 0, - "Cannot decrease 'last-row-id'. 'last-row-id' must increase monotonically. Snapshot reports %s added rows"); this.nextRowId += snapshot.addedRows(); } diff --git a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java new file mode 100644 index 000000000000..68f8a441f2e5 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.io.File; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestRowLineageMetadata { + private static final int ROW_LINEAGE_MIN_FORMAT_VERSION = 3; + + @Parameters(name = "formatVersion = {0}") + private static List formatVersion() { + return Ints.asList(TestHelpers.ALL_VERSIONS); + } + + @Parameter private int formatVersion; + + private static final String TEST_LOCATION = "s3://bucket/test/location"; + + private static final Schema TEST_SCHEMA = + new Schema( + 7, + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "y", Types.LongType.get(), "comment"), + Types.NestedField.required(3, "z", Types.LongType.get())); + + private TableMetadata baseMetadata() { + return TableMetadata.buildFromEmpty(formatVersion) + .addSchema(TEST_SCHEMA) + .setLocation(TEST_LOCATION) + .addPartitionSpec(PartitionSpec.unpartitioned()) + .addSortOrder(SortOrder.unsorted()) + .build(); + } + + @TempDir private File tableDir = null; + + @AfterEach + public void cleanup() { + TestTables.clearTables(); + } + + @Test + public void testSnapshotRowIDValidation() { + Snapshot snapshot = + new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", null, null); + assertThat(snapshot.firstRowId()).isNull(); + assertThat(snapshot.addedRows()).isNull(); + + // added-rows will be set to null if first-row-id is null + snapshot = + new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", null, 10L); + assertThat(snapshot.firstRowId()).isNull(); + assertThat(snapshot.addedRows()).isNull(); + + // added-rows and first-row-id can be 0 + snapshot = new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", 0L, 0L); + assertThat(snapshot.firstRowId()).isEqualTo(0); + assertThat(snapshot.addedRows()).isEqualTo(0); + + assertThatThrownBy( + () -> + new BaseSnapshot( + 0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", 10L, null)) + .hasMessage("Invalid added-rows (required when first-row-id is set): null"); + + assertThatThrownBy( + () -> + new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", 0L, -1L)) + .hasMessage("Invalid added-rows (cannot be negative): -1"); + + assertThatThrownBy( + () -> + new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", -1L, 1L)) + .hasMessage("Invalid first-row-id (cannot be negative): -1"); + } + + @TestTemplate + public void testSnapshotAddition() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + + long newRows = 30L; + + TableMetadata base = baseMetadata(); + + Snapshot addRows = + new BaseSnapshot( + 0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", base.nextRowId(), newRows); + + TableMetadata firstAddition = TableMetadata.buildFrom(base).addSnapshot(addRows).build(); + + assertThat(firstAddition.nextRowId()).isEqualTo(newRows); + + Snapshot addMoreRows = + new BaseSnapshot( + 1, 2, 1L, 0, DataOperations.APPEND, null, 1, "foo", firstAddition.nextRowId(), newRows); + + TableMetadata secondAddition = + TableMetadata.buildFrom(firstAddition).addSnapshot(addMoreRows).build(); + + assertThat(secondAddition.nextRowId()).isEqualTo(newRows * 2); + } + + @TestTemplate + public void testInvalidSnapshotAddition() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + + Long newRows = 30L; + + TableMetadata base = baseMetadata(); + + Snapshot invalidLastRow = + new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", null, newRows); + + assertThatThrownBy(() -> TableMetadata.buildFrom(base).addSnapshot(invalidLastRow)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Cannot add a snapshot: first-row-id is null"); + + // add rows to check TableMetadata validation; Snapshot rejects negative next-row-id + Snapshot addRows = + new BaseSnapshot( + 0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", base.nextRowId(), newRows); + TableMetadata added = TableMetadata.buildFrom(base).addSnapshot(addRows).build(); + + Snapshot invalidNewRows = + new BaseSnapshot( + 1, 2, 1L, 0, DataOperations.APPEND, null, 1, "foo", added.nextRowId() - 1, 10L); + + assertThatThrownBy(() -> TableMetadata.buildFrom(added).addSnapshot(invalidNewRows)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Cannot add a snapshot, first-row-id is behind table next-row-id: 29 < 30"); + } + + @TestTemplate + public void testFastAppend() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + table.newFastAppend().appendFile(fileWithRows(30)).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + + table.newFastAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); + assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11); + } + + @TestTemplate + public void testAppend() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + table.newAppend().appendFile(fileWithRows(30)).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + + table.newAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); + assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11); + } + + @TestTemplate + public void testAppendBranch() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + // Appends to a branch should still change last-row-id even if not on main, these changes + // should also affect commits to main + + String branch = "some_branch"; + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + // Write to Branch + table.newAppend().appendFile(fileWithRows(30)).toBranch(branch).commit(); + + assertThat(table.currentSnapshot()).isNull(); + assertThat(table.snapshot(branch).firstRowId()).isEqualTo(0L); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + + // Write to Main + table.newAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); + assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11); + + // Write again to branch + table.newAppend().appendFile(fileWithRows(21)).toBranch(branch).commit(); + assertThat(table.snapshot(branch).firstRowId()).isEqualTo(30 + 17 + 11); + assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11 + 21); + } + + @TestTemplate + public void testDeletes() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + DataFile file = fileWithRows(30); + + table.newAppend().appendFile(file).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + + table.newDelete().deleteFile(file).commit(); + + // Deleting a file should create a new snapshot which should inherit last-row-id from the + // previous metadata and not change last-row-id for this metadata. + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + } + + @TestTemplate + public void testPositionDeletes() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + DataFile file = fileWithRows(30); + + table.newAppend().appendFile(file).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + + // v3 only allows Puffin-based DVs for position deletes + DeleteFile deletes = + FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned()) + .ofPositionDeletes() + .withFormat(FileFormat.PUFFIN) + .withFileSizeInBytes(100) + .withRecordCount(10) + .withContentOffset(0) + .withContentSizeInBytes(50) + .withPath("deletes.puffin") + .withReferencedDataFile(file.location()) + .build(); + + table.newRowDelta().addDeletes(deletes).commit(); + + // Delete file records do not count toward added rows + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + } + + @TestTemplate + public void testEqualityDeletes() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + DataFile file = fileWithRows(30); + + table.newAppend().appendFile(file).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + + DeleteFile deletes = + FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned()) + .ofEqualityDeletes(table.schema().findField("x").fieldId()) + .withFormat(FileFormat.PARQUET) + .withFileSizeInBytes(100) + .withRecordCount(10) + .withPath("deletes.parquet") + .withReferencedDataFile(file.location()) + .build(); + + table.newRowDelta().addDeletes(deletes).commit(); + + // Delete file records do not count toward added rows + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + } + + @TestTemplate + public void testReplace() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + DataFile filePart1 = fileWithRows(30); + DataFile filePart2 = fileWithRows(30); + DataFile fileCompacted = fileWithRows(60); + + table.newAppend().appendFile(filePart1).appendFile(filePart2).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(60); + assertThat(table.ops().current().nextRowId()).isEqualTo(60); + + table.newRewrite().deleteFile(filePart1).deleteFile(filePart2).addFile(fileCompacted).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(60); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(60); + } + + private final AtomicInteger fileNum = new AtomicInteger(0); + + private DataFile fileWithRows(long numRows) { + return DataFiles.builder(PartitionSpec.unpartitioned()) + .withRecordCount(numRows) + .withFileSizeInBytes(numRows * 100) + .withPath("file://file_" + fileNum.incrementAndGet() + ".parquet") + .build(); + } +} From 000941ab182c1b345b8ecdb70b35236ee1a5052a Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 25 Mar 2025 12:24:46 -0700 Subject: [PATCH 4/8] Core: Fix tests and roll back SnapshotProducer changes. --- .../org/apache/iceberg/SnapshotProducer.java | 36 ++++--------------- .../org/apache/iceberg/TableMetadata.java | 13 ++++++- .../org/apache/iceberg/TableProperties.java | 7 ++++ .../iceberg/TestRowLineageMetadata.java | 28 ++++++++------- .../org/apache/iceberg/TestTableMetadata.java | 8 +++-- 5 files changed, 46 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 7b7587b2ae93..9f409457364f 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -75,7 +75,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.IntMath; import org.apache.iceberg.util.Exceptions; -import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; @@ -284,13 +283,10 @@ public Snapshot apply() { throw new RuntimeIOException(e, "Failed to write manifest list file"); } - Map summary = summary(); - String operation = operation(); - Long addedRows = null; Long firstRowId = null; if (base.formatVersion() >= 3) { - addedRows = calculateAddedRows(operation, summary, manifests); + addedRows = calculateAddedRows(manifests); firstRowId = base.nextRowId(); } @@ -299,34 +295,15 @@ public Snapshot apply() { snapshotId(), parentSnapshotId, System.currentTimeMillis(), - operation, - summaryWithTotals(base, summary), + operation(), + summary(base), base.currentSchemaId(), manifestList.location(), firstRowId, addedRows); } - private Long calculateAddedRows( - String operation, Map summary, List manifests) { - if (summary != null) { - long addedRecords = - PropertyUtil.propertyAsLong(summary, SnapshotSummary.ADDED_RECORDS_PROP, 0L); - if (DataOperations.REPLACE.equals(operation)) { - long replacedRecords = - PropertyUtil.propertyAsLong(summary, SnapshotSummary.DELETED_RECORDS_PROP, 0L); - // added may be less than replaced when records are already deleted by delete files - Preconditions.checkArgument( - addedRecords <= replacedRecords, - "Invalid REPLACE operation: %s added records > %s replaced records", - addedRecords, - replacedRecords); - return 0L; - } - - return addedRecords; - } - + private Long calculateAddedRows(List manifests) { return manifests.stream() .filter( manifest -> @@ -348,8 +325,9 @@ private Long calculateAddedRows( protected abstract Map summary(); /** Returns the snapshot summary from the implementation and updates totals. */ - private Map summaryWithTotals( - TableMetadata previous, Map summary) { + private Map summary(TableMetadata previous) { + Map summary = summary(); + if (summary == null) { return ImmutableMap.of(); } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index fdffba26578c..fe1059c55dcc 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -53,6 +53,7 @@ public class TableMetadata implements Serializable { static final long INVALID_SEQUENCE_NUMBER = -1; static final int DEFAULT_TABLE_FORMAT_VERSION = 2; static final int SUPPORTED_TABLE_FORMAT_VERSION = 3; + static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3; static final int INITIAL_SPEC_ID = 0; static final int INITIAL_SORT_ORDER_ID = 1; static final int INITIAL_SCHEMA_ID = 0; @@ -569,6 +570,16 @@ public TableMetadata withUUID() { return new Builder(this).assignUUID().build(); } + /** + * Whether row lineage is enabled. + * + * @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables. + */ + @Deprecated + public boolean rowLineageEnabled() { + return formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE; + } + public long nextRowId() { return nextRowId; } @@ -1242,7 +1253,7 @@ public Builder addSnapshot(Snapshot snapshot) { snapshotsById.put(snapshot.snapshotId(), snapshot); changes.add(new MetadataUpdate.AddSnapshot(snapshot)); - if (formatVersion >= 3) { + if (formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE) { ValidationException.check( snapshot.firstRowId() != null, "Cannot add a snapshot: first-row-id is null"); ValidationException.check( diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index c137bcd3a2c3..61b4b5a489b5 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -388,4 +388,11 @@ private TableProperties() {} public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; + + /** + * Property to enable row lineage. + * + * @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables. + */ + @Deprecated public static final String ROW_LINEAGE = "row-lineage"; } diff --git a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java index 68f8a441f2e5..e11d987581d7 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java @@ -36,8 +36,6 @@ @ExtendWith(ParameterizedTestExtension.class) public class TestRowLineageMetadata { - private static final int ROW_LINEAGE_MIN_FORMAT_VERSION = 3; - @Parameters(name = "formatVersion = {0}") private static List formatVersion() { return Ints.asList(TestHelpers.ALL_VERSIONS); @@ -92,22 +90,25 @@ public void testSnapshotRowIDValidation() { () -> new BaseSnapshot( 0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", 10L, null)) + .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid added-rows (required when first-row-id is set): null"); assertThatThrownBy( () -> new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", 0L, -1L)) + .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid added-rows (cannot be negative): -1"); assertThatThrownBy( () -> new BaseSnapshot(0, 1, null, 0, DataOperations.APPEND, null, 1, "ml.avro", -1L, 1L)) + .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid first-row-id (cannot be negative): -1"); } @TestTemplate public void testSnapshotAddition() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); long newRows = 30L; @@ -133,7 +134,7 @@ public void testSnapshotAddition() { @TestTemplate public void testInvalidSnapshotAddition() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); Long newRows = 30L; @@ -164,7 +165,7 @@ public void testInvalidSnapshotAddition() { @TestTemplate public void testFastAppend() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); TestTables.TestTable table = TestTables.create( @@ -185,7 +186,7 @@ public void testFastAppend() { @TestTemplate public void testAppend() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); TestTables.TestTable table = TestTables.create( @@ -206,7 +207,7 @@ public void testAppend() { @TestTemplate public void testAppendBranch() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); // Appends to a branch should still change last-row-id even if not on main, these changes // should also affect commits to main @@ -239,7 +240,7 @@ public void testAppendBranch() { @TestTemplate public void testDeletes() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); TestTables.TestTable table = TestTables.create( @@ -265,7 +266,7 @@ public void testDeletes() { @TestTemplate public void testPositionDeletes() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); TestTables.TestTable table = TestTables.create( @@ -303,7 +304,7 @@ public void testPositionDeletes() { @TestTemplate public void testEqualityDeletes() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); TestTables.TestTable table = TestTables.create( @@ -338,7 +339,7 @@ public void testEqualityDeletes() { @TestTemplate public void testReplace() { - assumeThat(formatVersion).isGreaterThanOrEqualTo(ROW_LINEAGE_MIN_FORMAT_VERSION); + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); TestTables.TestTable table = TestTables.create( @@ -358,9 +359,10 @@ public void testReplace() { table.newRewrite().deleteFile(filePart1).deleteFile(filePart2).addFile(fileCompacted).commit(); + // Rewrites are currently just treated as appends. In the future we could treat these as no-ops assertThat(table.currentSnapshot().firstRowId()).isEqualTo(60); - assertThat(table.currentSnapshot().addedRows()).isEqualTo(0); - assertThat(table.ops().current().nextRowId()).isEqualTo(60); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(60); + assertThat(table.ops().current().nextRowId()).isEqualTo(120); } private final AtomicInteger fileNum = new AtomicInteger(0); diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index 94df6b0ce91b..56c4676dfb1c 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -231,6 +231,7 @@ public void testJsonConversion() throws Exception { assertThat(metadata.statisticsFiles()).isEqualTo(statisticsFiles); assertThat(metadata.partitionStatisticsFiles()).isEqualTo(partitionStatisticsFiles); assertThat(metadata.refs()).isEqualTo(refs); + assertThat(metadata.nextRowId()).isEqualTo(expected.nextRowId()); } @Test @@ -338,6 +339,7 @@ public void testBackwardCompat() throws Exception { .isEqualTo(previousSnapshot.allManifests(ops.io())); assertThat(metadata.previousFiles()).isEqualTo(expected.previousFiles()); assertThat(metadata.snapshot(previousSnapshotId).schemaId()).isNull(); + assertThat(metadata.nextRowId()).isEqualTo(expected.nextRowId()); } @Test @@ -396,7 +398,7 @@ public void testInvalidMainBranch() throws IOException { () -> new TableMetadata( null, - 2, + MAX_FORMAT_VERSION, UUID.randomUUID().toString(), TEST_LOCATION, SEQ_NO, @@ -443,7 +445,7 @@ public void testMainWithoutCurrent() throws IOException { () -> new TableMetadata( null, - 2, + MAX_FORMAT_VERSION, UUID.randomUUID().toString(), TEST_LOCATION, SEQ_NO, @@ -484,7 +486,7 @@ public void testBranchSnapshotMissing() { () -> new TableMetadata( null, - 2, + MAX_FORMAT_VERSION, UUID.randomUUID().toString(), TEST_LOCATION, SEQ_NO, From 6e047e8310e5e20ef41b7215cae1640549f1896d Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 25 Mar 2025 12:32:07 -0700 Subject: [PATCH 5/8] Fix revapi. --- .../java/org/apache/iceberg/TableMetadata.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index fe1059c55dcc..0951f4cd1974 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -999,6 +999,22 @@ private Builder(TableMetadata base) { this.nextRowId = base.nextRowId; } + /** + * Enables row lineage in v3 tables. + * + * @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables. + */ + @Deprecated + public Builder enableRowLineage() { + if (formatVersion < MIN_FORMAT_VERSION_ROW_LINEAGE) { + throw new UnsupportedOperationException( + "Cannot enable row lineage for format-version=" + formatVersion); + } + + // otherwise this is a no-op + return null; + } + public Builder withMetadataLocation(String newMetadataLocation) { this.metadataLocation = newMetadataLocation; if (null != base) { From 16f6b2859f9e40d4d053bbbf23206c91a7d62132 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 25 Mar 2025 12:33:41 -0700 Subject: [PATCH 6/8] Fix revapi. --- .../java/org/apache/iceberg/MetadataUpdate.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index da8c627ce897..c1b910312495 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -520,4 +520,17 @@ public void applyTo(ViewMetadata.Builder viewMetadataBuilder) { viewMetadataBuilder.setCurrentVersionId(versionId); } } + + /** + * Update to enable row lineage. + * + * @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables. + */ + @Deprecated + class EnableRowLineage implements MetadataUpdate { + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.enableRowLineage(); + } + } } From 2ea119cbcb41b731125c6feac685cb0cdb45a004 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 25 Mar 2025 13:28:26 -0700 Subject: [PATCH 7/8] Fix failing test. --- .../TestLoadTableResponseParser.java | 78 ++++++++++++++++++- 1 file changed, 74 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java index 2f3c4380f8ac..6fe9f9837e53 100644 --- a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java @@ -138,8 +138,8 @@ public void roundTripSerdeV1() { } @ParameterizedTest - @ValueSource(ints = {2, MAX_FORMAT_VERSION}) - public void roundTripSerdeV2andHigher(int formatVersion) { + @ValueSource(ints = {MAX_FORMAT_VERSION}) + public void roundTripSerdeV3andHigher(int formatVersion) { String uuid = "386b9f01-002b-4d8c-b77f-42c3fd3b7c9b"; TableMetadata metadata = TableMetadata.buildFromEmpty(formatVersion) @@ -189,7 +189,8 @@ public void roundTripSerdeV2andHigher(int formatVersion) { + " \"fields\" : [ ]\n" + " } ],\n" + " \"properties\" : { },\n" - + " \"current-snapshot-id\" : %s,\n" + + " \"current-snapshot-id\" : null,\n" + + " \"next-row-id\" : 0,\n" + " \"refs\" : { },\n" + " \"snapshots\" : [ ],\n" + " \"statistics\" : [ ],\n" @@ -198,7 +199,76 @@ public void roundTripSerdeV2andHigher(int formatVersion) { + " \"metadata-log\" : [ ]\n" + " }\n" + "}", - formatVersion, metadata.lastUpdatedMillis(), formatVersion >= 3 ? "null" : "-1"); + formatVersion, metadata.lastUpdatedMillis()); + + String json = LoadTableResponseParser.toJson(response, true); + assertThat(json).isEqualTo(expectedJson); + // can't do an equality comparison because Schema doesn't implement equals/hashCode + assertThat(LoadTableResponseParser.toJson(LoadTableResponseParser.fromJson(json), true)) + .isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeV2() { + String uuid = "386b9f01-002b-4d8c-b77f-42c3fd3b7c9b"; + TableMetadata metadata = + TableMetadata.buildFromEmpty(2) + .assignUUID(uuid) + .setLocation("location") + .setCurrentSchema( + new Schema(Types.NestedField.required(1, "x", Types.LongType.get())), 1) + .addPartitionSpec(PartitionSpec.unpartitioned()) + .addSortOrder(SortOrder.unsorted()) + .discardChanges() + .withMetadataLocation("metadata-location") + .build(); + + LoadTableResponse response = LoadTableResponse.builder().withTableMetadata(metadata).build(); + + String expectedJson = + String.format( + "{\n" + + " \"metadata-location\" : \"metadata-location\",\n" + + " \"metadata\" : {\n" + + " \"format-version\" : 2,\n" + + " \"table-uuid\" : \"386b9f01-002b-4d8c-b77f-42c3fd3b7c9b\",\n" + + " \"location\" : \"location\",\n" + + " \"last-sequence-number\" : 0,\n" + + " \"last-updated-ms\" : %d,\n" + + " \"last-column-id\" : 1,\n" + + " \"current-schema-id\" : 0,\n" + + " \"schemas\" : [ {\n" + + " \"type\" : \"struct\",\n" + + " \"schema-id\" : 0,\n" + + " \"fields\" : [ {\n" + + " \"id\" : 1,\n" + + " \"name\" : \"x\",\n" + + " \"required\" : true,\n" + + " \"type\" : \"long\"\n" + + " } ]\n" + + " } ],\n" + + " \"default-spec-id\" : 0,\n" + + " \"partition-specs\" : [ {\n" + + " \"spec-id\" : 0,\n" + + " \"fields\" : [ ]\n" + + " } ],\n" + + " \"last-partition-id\" : 999,\n" + + " \"default-sort-order-id\" : 0,\n" + + " \"sort-orders\" : [ {\n" + + " \"order-id\" : 0,\n" + + " \"fields\" : [ ]\n" + + " } ],\n" + + " \"properties\" : { },\n" + + " \"current-snapshot-id\" : -1,\n" + + " \"refs\" : { },\n" + + " \"snapshots\" : [ ],\n" + + " \"statistics\" : [ ],\n" + + " \"partition-statistics\" : [ ],\n" + + " \"snapshot-log\" : [ ],\n" + + " \"metadata-log\" : [ ]\n" + + " }\n" + + "}", + metadata.lastUpdatedMillis()); String json = LoadTableResponseParser.toJson(response, true); assertThat(json).isEqualTo(expectedJson); From 973a85cdf17cbccbb09185696c7767ba9bbab7bc Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 25 Mar 2025 13:37:22 -0700 Subject: [PATCH 8/8] Fix test. --- .../iceberg/rest/responses/TestLoadTableResponseParser.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java index 6fe9f9837e53..89982c4e9b20 100644 --- a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponseParser.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.rest.responses; -import static org.apache.iceberg.TestHelpers.MAX_FORMAT_VERSION; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -138,7 +137,7 @@ public void roundTripSerdeV1() { } @ParameterizedTest - @ValueSource(ints = {MAX_FORMAT_VERSION}) + @ValueSource(ints = 3) public void roundTripSerdeV3andHigher(int formatVersion) { String uuid = "386b9f01-002b-4d8c-b77f-42c3fd3b7c9b"; TableMetadata metadata =