-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Enable row lineage for all v3 tables #12593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
a66942b
aff717b
c1954bb
000941a
6e047e8
16f6b28
2ea119c
973a85c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -75,6 +75,7 @@ | |
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
| import org.apache.iceberg.relocated.com.google.common.math.IntMath; | ||
| import org.apache.iceberg.util.Exceptions; | ||
| import org.apache.iceberg.util.PropertyUtil; | ||
| import org.apache.iceberg.util.SnapshotUtil; | ||
| import org.apache.iceberg.util.Tasks; | ||
| import org.apache.iceberg.util.ThreadPools; | ||
|
|
@@ -283,32 +284,55 @@ public Snapshot apply() { | |
| throw new RuntimeIOException(e, "Failed to write manifest list file"); | ||
| } | ||
|
|
||
| Map<String, String> summary = summary(); | ||
| String operation = operation(); | ||
|
|
||
| Long addedRows = null; | ||
| Long lastRowId = null; | ||
| if (base.rowLineageEnabled()) { | ||
| addedRows = calculateAddedRows(manifests); | ||
| lastRowId = base.nextRowId(); | ||
| Long firstRowId = null; | ||
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (base.formatVersion() >= 3) { | ||
| addedRows = calculateAddedRows(operation, summary, manifests); | ||
| firstRowId = base.nextRowId(); | ||
| } | ||
|
|
||
| return new BaseSnapshot( | ||
| sequenceNumber, | ||
| snapshotId(), | ||
| parentSnapshotId, | ||
| System.currentTimeMillis(), | ||
| operation(), | ||
| summary(base), | ||
| operation, | ||
| summaryWithTotals(base, summary), | ||
| base.currentSchemaId(), | ||
| manifestList.location(), | ||
| lastRowId, | ||
| firstRowId, | ||
| addedRows); | ||
| } | ||
|
|
||
| private Long calculateAddedRows(List<ManifestFile> manifests) { | ||
| private Long calculateAddedRows( | ||
| String operation, Map<String, String> summary, List<ManifestFile> manifests) { | ||
| if (summary != null) { | ||
| long addedRecords = | ||
| PropertyUtil.propertyAsLong(summary, SnapshotSummary.ADDED_RECORDS_PROP, 0L); | ||
|
||
| if (DataOperations.REPLACE.equals(operation)) { | ||
| long replacedRecords = | ||
| PropertyUtil.propertyAsLong(summary, SnapshotSummary.DELETED_RECORDS_PROP, 0L); | ||
| // added may be less than replaced when records are already deleted by delete files | ||
| Preconditions.checkArgument( | ||
rdblue marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| addedRecords <= replacedRecords, | ||
| "Invalid REPLACE operation: %s added records > %s replaced records", | ||
| addedRecords, | ||
| replacedRecords); | ||
| return 0L; | ||
| } | ||
|
|
||
| return addedRecords; | ||
| } | ||
|
|
||
| return manifests.stream() | ||
| .filter( | ||
| manifest -> | ||
| manifest.snapshotId() == null | ||
| || Objects.equals(manifest.snapshotId(), this.snapshotId)) | ||
| .filter(manifest -> manifest.content() == ManifestContent.DATA) | ||
amogh-jahagirdar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .mapToLong( | ||
| manifest -> { | ||
| Preconditions.checkArgument( | ||
|
|
@@ -324,9 +348,8 @@ private Long calculateAddedRows(List<ManifestFile> manifests) { | |
| protected abstract Map<String, String> summary(); | ||
|
|
||
| /** Returns the snapshot summary from the implementation and updates totals. */ | ||
| private Map<String, String> summary(TableMetadata previous) { | ||
| Map<String, String> summary = summary(); | ||
|
|
||
| private Map<String, String> summaryWithTotals( | ||
rdblue marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| TableMetadata previous, Map<String, String> summary) { | ||
| if (summary == null) { | ||
| return ImmutableMap.of(); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,8 +57,6 @@ public class TableMetadata implements Serializable { | |
| static final int INITIAL_SORT_ORDER_ID = 1; | ||
| static final int INITIAL_SCHEMA_ID = 0; | ||
| static final int INITIAL_ROW_ID = 0; | ||
| static final boolean DEFAULT_ROW_LINEAGE = false; | ||
| static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3; | ||
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1); | ||
|
|
||
|
|
@@ -133,11 +131,6 @@ static TableMetadata newTableMetadata( | |
| int freshSortOrderId = sortOrder.isUnsorted() ? sortOrder.orderId() : INITIAL_SORT_ORDER_ID; | ||
| SortOrder freshSortOrder = freshSortOrder(freshSortOrderId, freshSchema, sortOrder); | ||
|
|
||
| // configure row lineage using table properties | ||
| Boolean rowLineage = | ||
| PropertyUtil.propertyAsBoolean( | ||
| properties, TableProperties.ROW_LINEAGE, DEFAULT_ROW_LINEAGE); | ||
|
|
||
| // Validate the metrics configuration. Note: we only do this on new tables to we don't | ||
| // break existing tables. | ||
| MetricsConfig.fromProperties(properties).validateReferencedColumns(schema); | ||
|
|
@@ -151,7 +144,6 @@ static TableMetadata newTableMetadata( | |
| .setDefaultSortOrder(freshSortOrder) | ||
| .setLocation(location) | ||
| .setProperties(properties) | ||
| .setRowLineage(rowLineage) | ||
| .build(); | ||
| } | ||
|
|
||
|
|
@@ -266,13 +258,12 @@ public String toString() { | |
| private final List<StatisticsFile> statisticsFiles; | ||
| private final List<PartitionStatisticsFile> partitionStatisticsFiles; | ||
| private final List<MetadataUpdate> changes; | ||
| private final long nextRowId; | ||
| private SerializableSupplier<List<Snapshot>> snapshotsSupplier; | ||
| private volatile List<Snapshot> snapshots; | ||
| private volatile Map<Long, Snapshot> snapshotsById; | ||
| private volatile Map<String, SnapshotRef> refs; | ||
| private volatile boolean snapshotsLoaded; | ||
| private final Boolean rowLineageEnabled; | ||
| private final long nextRowId; | ||
|
|
||
| @SuppressWarnings("checkstyle:CyclomaticComplexity") | ||
| TableMetadata( | ||
|
|
@@ -299,9 +290,8 @@ public String toString() { | |
| Map<String, SnapshotRef> refs, | ||
| List<StatisticsFile> statisticsFiles, | ||
| List<PartitionStatisticsFile> partitionStatisticsFiles, | ||
| List<MetadataUpdate> changes, | ||
| boolean rowLineageEnabled, | ||
| long nextRowId) { | ||
| long nextRowId, | ||
| List<MetadataUpdate> changes) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not that this PR also restores the convention that changes are passed last to the |
||
| Preconditions.checkArgument( | ||
| specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty"); | ||
| Preconditions.checkArgument( | ||
|
|
@@ -320,10 +310,6 @@ public String toString() { | |
| Preconditions.checkArgument( | ||
| metadataFileLocation == null || changes.isEmpty(), | ||
| "Cannot create TableMetadata with a metadata location and changes"); | ||
| Preconditions.checkArgument( | ||
| formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE || !rowLineageEnabled, | ||
| "Cannot enable row lineage when Table Version is less than V3. Table Version is %s", | ||
| formatVersion); | ||
|
|
||
| this.metadataFileLocation = metadataFileLocation; | ||
| this.formatVersion = formatVersion; | ||
|
|
@@ -359,7 +345,6 @@ public String toString() { | |
| this.partitionStatisticsFiles = ImmutableList.copyOf(partitionStatisticsFiles); | ||
|
|
||
| // row lineage | ||
| this.rowLineageEnabled = rowLineageEnabled; | ||
| this.nextRowId = nextRowId; | ||
|
|
||
| HistoryEntry last = null; | ||
|
|
@@ -584,10 +569,6 @@ public TableMetadata withUUID() { | |
| return new Builder(this).assignUUID().build(); | ||
| } | ||
|
|
||
| public boolean rowLineageEnabled() { | ||
| return rowLineageEnabled; | ||
| } | ||
|
|
||
| public long nextRowId() { | ||
| return nextRowId; | ||
| } | ||
|
|
@@ -634,15 +615,10 @@ public TableMetadata replaceProperties(Map<String, String> rawProperties) { | |
| int newFormatVersion = | ||
| PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion); | ||
|
|
||
| Boolean newRowLineage = | ||
| PropertyUtil.propertyAsBoolean( | ||
| rawProperties, TableProperties.ROW_LINEAGE, rowLineageEnabled); | ||
|
|
||
| return new Builder(this) | ||
| .setProperties(updated) | ||
| .removeProperties(removed) | ||
| .upgradeFormatVersion(newFormatVersion) | ||
| .setRowLineage(newRowLineage) | ||
| .build(); | ||
| } | ||
|
|
||
|
|
@@ -927,7 +903,6 @@ public static class Builder { | |
| private final Map<Long, List<StatisticsFile>> statisticsFiles; | ||
| private final Map<Long, List<PartitionStatisticsFile>> partitionStatisticsFiles; | ||
| private boolean suppressHistoricalSnapshots = false; | ||
| private boolean rowLineage; | ||
| private long nextRowId; | ||
|
|
||
| // change tracking | ||
|
|
@@ -975,7 +950,6 @@ private Builder(int formatVersion) { | |
| this.schemasById = Maps.newHashMap(); | ||
| this.specsById = Maps.newHashMap(); | ||
| this.sortOrdersById = Maps.newHashMap(); | ||
| this.rowLineage = DEFAULT_ROW_LINEAGE; | ||
| this.nextRowId = INITIAL_ROW_ID; | ||
| } | ||
|
|
||
|
|
@@ -1011,7 +985,6 @@ private Builder(TableMetadata base) { | |
| this.specsById = Maps.newHashMap(base.specsById); | ||
| this.sortOrdersById = Maps.newHashMap(base.sortOrdersById); | ||
|
|
||
| this.rowLineage = base.rowLineageEnabled; | ||
| this.nextRowId = base.nextRowId; | ||
| } | ||
|
|
||
|
|
@@ -1269,18 +1242,14 @@ public Builder addSnapshot(Snapshot snapshot) { | |
| snapshotsById.put(snapshot.snapshotId(), snapshot); | ||
| changes.add(new MetadataUpdate.AddSnapshot(snapshot)); | ||
|
|
||
| if (rowLineage) { | ||
| if (formatVersion >= 3) { | ||
| ValidationException.check( | ||
| snapshot.firstRowId() >= nextRowId, | ||
| "Cannot add a snapshot whose 'first-row-id' (%s) is less than the metadata 'next-row-id' (%s) because this will end up generating duplicate row_ids.", | ||
| snapshot.firstRowId() != null, "Cannot add a snapshot: first-row-id is null"); | ||
| ValidationException.check( | ||
| snapshot.firstRowId() != null && snapshot.firstRowId() >= nextRowId, | ||
| "Cannot add a snapshot, first-row-id is behind table next-row-id: %s < %s", | ||
| snapshot.firstRowId(), | ||
| nextRowId); | ||
| ValidationException.check( | ||
| snapshot.addedRows() != null, | ||
| "Cannot add a snapshot with a null 'added-rows' field when row lineage is enabled"); | ||
| Preconditions.checkArgument( | ||
| snapshot.addedRows() >= 0, | ||
| "Cannot decrease 'last-row-id'. 'last-row-id' must increase monotonically. Snapshot reports %s added rows"); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @RussellSpitzer, I moved these validations in to Also, I thought about keeping the checks here rather than moving them to |
||
|
|
||
| this.nextRowId += snapshot.addedRows(); | ||
| } | ||
|
|
@@ -1508,34 +1477,6 @@ public Builder setPreviousFileLocation(String previousFileLocation) { | |
| return this; | ||
| } | ||
|
|
||
| private Builder setRowLineage(Boolean newRowLineage) { | ||
| if (newRowLineage == null) { | ||
| return this; | ||
| } | ||
|
|
||
| boolean disablingRowLineage = rowLineage && !newRowLineage; | ||
|
|
||
| Preconditions.checkArgument( | ||
| !disablingRowLineage, "Cannot disable row lineage once it has been enabled"); | ||
|
|
||
| if (!rowLineage && newRowLineage) { | ||
| return enableRowLineage(); | ||
| } else { | ||
| return this; | ||
| } | ||
| } | ||
|
|
||
| public Builder enableRowLineage() { | ||
| Preconditions.checkArgument( | ||
| formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE, | ||
| "Cannot use row lineage with format version %s. Only format version %s or higher support row lineage", | ||
| formatVersion, | ||
| MIN_FORMAT_VERSION_ROW_LINEAGE); | ||
| this.rowLineage = true; | ||
| changes.add(new MetadataUpdate.EnableRowLineage()); | ||
| return this; | ||
| } | ||
|
|
||
| private boolean hasChanges() { | ||
| return changes.size() != startingChangeCount | ||
| || (discardChanges && !changes.isEmpty()) | ||
|
|
@@ -1603,9 +1544,8 @@ public TableMetadata build() { | |
| partitionStatisticsFiles.values().stream() | ||
| .flatMap(List::stream) | ||
| .collect(Collectors.toList()), | ||
| discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes), | ||
| rowLineage, | ||
| nextRowId); | ||
| nextRowId, | ||
| discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes)); | ||
| } | ||
|
|
||
| private int addSchemaInternal(Schema schema, int newLastColumnId) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be slightly more permissive, I'm allowing
addedRowsto be passed, but it is only propagated iffirstRowIdis non-null.