-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Core, API: BaseRowDelta, BaseOverwrite,BaseReplacePartitions, BaseRewrite to branch Impl #5234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 35 commits
9e43889
e6ea5f6
b43b80b
9cb39c2
ba341ec
d1ca432
0d79ef0
495c0f3
f7464eb
39de1e4
7e75c68
080d76e
1dc4f89
41b2f62
0c18302
0acca3b
49b6667
4ab3414
fb28c02
cfc5e67
70a3cf3
a64b837
f9b3d67
cd5569c
3d5659b
0028bcd
7173dcb
caf9d59
4f64b09
90f23ab
4b8ac6d
c6df1e7
defff1d
66e1850
fc8780c
6b6aefc
1a98e54
74bab58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -79,23 +79,32 @@ public ReplacePartitions validateNoConflictingData() { | |
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public BaseReplacePartitions toBranch(String branch) { | ||
| targetBranch(branch); | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public void validate(TableMetadata currentMetadata, Snapshot snapshot) { | ||
| if (validateConflictingData) { | ||
| if (dataSpec().isUnpartitioned()) { | ||
| validateAddedDataFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue()); | ||
| validateAddedDataFiles( | ||
| currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), snapshot); | ||
| } else { | ||
| validateAddedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions); | ||
| validateAddedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, snapshot); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: I think it makes more sense to use |
||
| } | ||
| } | ||
|
|
||
| if (validateConflictingDeletes) { | ||
| if (dataSpec().isUnpartitioned()) { | ||
| validateDeletedDataFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue()); | ||
| validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue()); | ||
| validateDeletedDataFiles( | ||
| currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), snapshot); | ||
| validateNoNewDeleteFiles( | ||
| currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), snapshot); | ||
| } else { | ||
| validateDeletedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions); | ||
| validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, replacedPartitions); | ||
| validateDeletedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, snapshot); | ||
| validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, replacedPartitions, snapshot); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |||
| import org.apache.iceberg.expressions.Expressions; | ||||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||||
| import org.apache.iceberg.util.CharSequenceSet; | ||||
| import org.apache.iceberg.util.SnapshotUtil; | ||||
|
|
||||
| class BaseRowDelta extends MergingSnapshotProducer<RowDelta> implements RowDelta { | ||||
| private Long startingSnapshotId = null; // check all versions by default | ||||
|
|
@@ -96,23 +97,37 @@ public RowDelta validateNoConflictingDeleteFiles() { | |||
| } | ||||
|
|
||||
| @Override | ||||
| protected void validate(TableMetadata base, Snapshot snapshot) { | ||||
| if (base.currentSnapshot() != null) { | ||||
| public RowDelta toBranch(String branch) { | ||||
| targetBranch(branch); | ||||
| return this; | ||||
| } | ||||
|
|
||||
| @Override | ||||
| protected void validate(TableMetadata base, Snapshot parent) { | ||||
| if (parent != null) { | ||||
| if (startingSnapshotId != null) { | ||||
| Preconditions.checkArgument( | ||||
| SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId, base::snapshot), | ||||
| "Snapshot %s is not an ancestor of %s", | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a bit more context would be helpful if you ever encounter this error.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @amogh-jahagirdar Is this ancestor check even required anymore ? We are anyway doing ancestor check in
|
||||
| startingSnapshotId, | ||||
| parent.snapshotId()); | ||||
| } | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: whitespace after this block. |
||||
| if (!referencedDataFiles.isEmpty()) { | ||||
| validateDataFilesExist( | ||||
| base, | ||||
| startingSnapshotId, | ||||
| referencedDataFiles, | ||||
| !validateDeletes, | ||||
| conflictDetectionFilter); | ||||
| conflictDetectionFilter, | ||||
| parent); | ||||
| } | ||||
|
|
||||
| if (validateNewDataFiles) { | ||||
| validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter); | ||||
| validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, parent); | ||||
| } | ||||
|
|
||||
| if (validateNewDeleteFiles) { | ||||
| validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter); | ||||
| validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent); | ||||
| } | ||||
| } | ||||
| } | ||||
|
|
||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -277,9 +277,9 @@ private ManifestFile copyManifest(ManifestFile manifest) { | |
| * @param partitionSet a set of partitions to filter new conflicting data files | ||
| */ | ||
| protected void validateAddedDataFiles( | ||
| TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet) { | ||
| TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet, Snapshot parent) { | ||
| CloseableIterable<ManifestEntry<DataFile>> conflictEntries = | ||
| addedDataFiles(base, startingSnapshotId, null, partitionSet); | ||
| addedDataFiles(base, startingSnapshotId, null, partitionSet, parent); | ||
|
|
||
| try (CloseableIterator<ManifestEntry<DataFile>> conflicts = conflictEntries.iterator()) { | ||
| if (conflicts.hasNext()) { | ||
|
|
@@ -305,9 +305,12 @@ protected void validateAddedDataFiles( | |
| * @param conflictDetectionFilter an expression used to find new conflicting data files | ||
| */ | ||
| protected void validateAddedDataFiles( | ||
| TableMetadata base, Long startingSnapshotId, Expression conflictDetectionFilter) { | ||
| TableMetadata base, | ||
| Long startingSnapshotId, | ||
| Expression conflictDetectionFilter, | ||
| Snapshot parent) { | ||
| CloseableIterable<ManifestEntry<DataFile>> conflictEntries = | ||
| addedDataFiles(base, startingSnapshotId, conflictDetectionFilter, null); | ||
| addedDataFiles(base, startingSnapshotId, conflictDetectionFilter, null, parent); | ||
|
|
||
| try (CloseableIterator<ManifestEntry<DataFile>> conflicts = conflictEntries.iterator()) { | ||
| if (conflicts.hasNext()) { | ||
|
|
@@ -337,15 +340,20 @@ private CloseableIterable<ManifestEntry<DataFile>> addedDataFiles( | |
| TableMetadata base, | ||
| Long startingSnapshotId, | ||
| Expression dataFilter, | ||
| PartitionSet partitionSet) { | ||
| PartitionSet partitionSet, | ||
| Snapshot parent) { | ||
| // if there is no current table state, no files have been added | ||
| if (base.currentSnapshot() == null) { | ||
| if (parent == null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My bad, missed this when I raised the change to your branch. I think we should update this comment to say "If there is no parent, no files have been added" |
||
| return CloseableIterable.empty(); | ||
| } | ||
|
|
||
| Pair<List<ManifestFile>, Set<Long>> history = | ||
| validationHistory( | ||
| base, startingSnapshotId, VALIDATE_ADDED_FILES_OPERATIONS, ManifestContent.DATA); | ||
| base, | ||
| startingSnapshotId, | ||
| VALIDATE_ADDED_FILES_OPERATIONS, | ||
| ManifestContent.DATA, | ||
| parent); | ||
| List<ManifestFile> manifests = history.first(); | ||
| Set<Long> newSnapshots = history.second(); | ||
|
|
||
|
|
@@ -379,9 +387,9 @@ private CloseableIterable<ManifestEntry<DataFile>> addedDataFiles( | |
| * @param dataFiles data files to validate have no new row deletes | ||
| */ | ||
| protected void validateNoNewDeletesForDataFiles( | ||
| TableMetadata base, Long startingSnapshotId, Iterable<DataFile> dataFiles) { | ||
| TableMetadata base, Long startingSnapshotId, Iterable<DataFile> dataFiles, Snapshot parent) { | ||
| validateNoNewDeletesForDataFiles( | ||
| base, startingSnapshotId, null, dataFiles, newFilesSequenceNumber != null); | ||
| base, startingSnapshotId, null, dataFiles, newFilesSequenceNumber != null, parent); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -397,8 +405,10 @@ protected void validateNoNewDeletesForDataFiles( | |
| TableMetadata base, | ||
| Long startingSnapshotId, | ||
| Expression dataFilter, | ||
| Iterable<DataFile> dataFiles) { | ||
| validateNoNewDeletesForDataFiles(base, startingSnapshotId, dataFilter, dataFiles, false); | ||
| Iterable<DataFile> dataFiles, | ||
| Snapshot parent) { | ||
namrathamyske marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| validateNoNewDeletesForDataFiles( | ||
| base, startingSnapshotId, dataFilter, dataFiles, false, parent); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -423,13 +433,14 @@ private void validateNoNewDeletesForDataFiles( | |
| Long startingSnapshotId, | ||
| Expression dataFilter, | ||
| Iterable<DataFile> dataFiles, | ||
| boolean ignoreEqualityDeletes) { | ||
| boolean ignoreEqualityDeletes, | ||
| Snapshot parent) { | ||
| // if there is no current table state, no files have been added | ||
| if (base.currentSnapshot() == null || base.formatVersion() < 2) { | ||
| if (parent == null || base.formatVersion() < 2) { | ||
| return; | ||
| } | ||
|
|
||
| DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, dataFilter, null); | ||
| DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, dataFilter, null, parent); | ||
|
|
||
| long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId); | ||
| for (DataFile dataFile : dataFiles) { | ||
|
|
@@ -460,8 +471,8 @@ private void validateNoNewDeletesForDataFiles( | |
| * @param dataFilter an expression used to find new conflicting delete files | ||
| */ | ||
| protected void validateNoNewDeleteFiles( | ||
| TableMetadata base, Long startingSnapshotId, Expression dataFilter) { | ||
| DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, dataFilter, null); | ||
| TableMetadata base, Long startingSnapshotId, Expression dataFilter, Snapshot parent) { | ||
namrathamyske marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, dataFilter, null, parent); | ||
| ValidationException.check( | ||
| deletes.isEmpty(), | ||
| "Found new conflicting delete files that can apply to records matching %s: %s", | ||
|
|
@@ -478,8 +489,9 @@ protected void validateNoNewDeleteFiles( | |
| * @param partitionSet a partition set used to find new conflicting delete files | ||
| */ | ||
| protected void validateNoNewDeleteFiles( | ||
| TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet) { | ||
| DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null, partitionSet); | ||
| TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet, Snapshot parent) { | ||
| DeleteFileIndex deletes = | ||
| addedDeleteFiles(base, startingSnapshotId, null, partitionSet, parent); | ||
| ValidationException.check( | ||
| deletes.isEmpty(), | ||
| "Found new conflicting delete files that can apply to records matching %s: %s", | ||
|
|
@@ -499,9 +511,10 @@ protected DeleteFileIndex addedDeleteFiles( | |
| TableMetadata base, | ||
| Long startingSnapshotId, | ||
| Expression dataFilter, | ||
| PartitionSet partitionSet) { | ||
| PartitionSet partitionSet, | ||
| Snapshot parent) { | ||
| // if there is no current table state, return empty delete file index | ||
| if (base.currentSnapshot() == null || base.formatVersion() < 2) { | ||
| if (parent == null || base.formatVersion() < 2) { | ||
| return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of()) | ||
| .specsById(base.specsById()) | ||
| .build(); | ||
|
|
@@ -512,7 +525,8 @@ protected DeleteFileIndex addedDeleteFiles( | |
| base, | ||
| startingSnapshotId, | ||
| VALIDATE_ADDED_DELETE_FILES_OPERATIONS, | ||
| ManifestContent.DELETES); | ||
| ManifestContent.DELETES, | ||
| parent); | ||
| List<ManifestFile> deleteManifests = history.first(); | ||
|
|
||
| long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId); | ||
|
|
@@ -528,9 +542,9 @@ protected DeleteFileIndex addedDeleteFiles( | |
| * @param dataFilter an expression used to find deleted data files | ||
| */ | ||
| protected void validateDeletedDataFiles( | ||
| TableMetadata base, Long startingSnapshotId, Expression dataFilter) { | ||
| TableMetadata base, Long startingSnapshotId, Expression dataFilter, Snapshot parent) { | ||
| CloseableIterable<ManifestEntry<DataFile>> conflictEntries = | ||
| deletedDataFiles(base, startingSnapshotId, dataFilter, null); | ||
| deletedDataFiles(base, startingSnapshotId, dataFilter, null, parent); | ||
|
|
||
| try (CloseableIterator<ManifestEntry<DataFile>> conflicts = conflictEntries.iterator()) { | ||
| if (conflicts.hasNext()) { | ||
|
|
@@ -556,9 +570,9 @@ protected void validateDeletedDataFiles( | |
| * @param partitionSet a partition set used to find deleted data files | ||
| */ | ||
| protected void validateDeletedDataFiles( | ||
| TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet) { | ||
| TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet, Snapshot parent) { | ||
| CloseableIterable<ManifestEntry<DataFile>> conflictEntries = | ||
| deletedDataFiles(base, startingSnapshotId, null, partitionSet); | ||
| deletedDataFiles(base, startingSnapshotId, null, partitionSet, parent); | ||
|
|
||
| try (CloseableIterator<ManifestEntry<DataFile>> conflicts = conflictEntries.iterator()) { | ||
| if (conflicts.hasNext()) { | ||
|
|
@@ -588,15 +602,20 @@ private CloseableIterable<ManifestEntry<DataFile>> deletedDataFiles( | |
| TableMetadata base, | ||
| Long startingSnapshotId, | ||
| Expression dataFilter, | ||
| PartitionSet partitionSet) { | ||
| PartitionSet partitionSet, | ||
| Snapshot parent) { | ||
| // if there is no current table state, no files have been deleted | ||
| if (base.currentSnapshot() == null) { | ||
| if (parent == null) { | ||
| return CloseableIterable.empty(); | ||
| } | ||
|
|
||
| Pair<List<ManifestFile>, Set<Long>> history = | ||
| validationHistory( | ||
| base, startingSnapshotId, VALIDATE_DATA_FILES_EXIST_OPERATIONS, ManifestContent.DATA); | ||
| base, | ||
| startingSnapshotId, | ||
| VALIDATE_DATA_FILES_EXIST_OPERATIONS, | ||
| ManifestContent.DATA, | ||
| parent); | ||
| List<ManifestFile> manifests = history.first(); | ||
| Set<Long> newSnapshots = history.second(); | ||
|
|
||
|
|
@@ -662,9 +681,10 @@ protected void validateDataFilesExist( | |
| Long startingSnapshotId, | ||
| CharSequenceSet requiredDataFiles, | ||
| boolean skipDeletes, | ||
| Expression conflictDetectionFilter) { | ||
| Expression conflictDetectionFilter, | ||
| Snapshot parent) { | ||
| // if there is no current table state, no files have been removed | ||
| if (base.currentSnapshot() == null) { | ||
| if (parent == null) { | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -674,7 +694,8 @@ protected void validateDataFilesExist( | |
| : VALIDATE_DATA_FILES_EXIST_OPERATIONS; | ||
|
|
||
| Pair<List<ManifestFile>, Set<Long>> history = | ||
| validationHistory(base, startingSnapshotId, matchingOperations, ManifestContent.DATA); | ||
| validationHistory( | ||
| base, startingSnapshotId, matchingOperations, ManifestContent.DATA, parent); | ||
| List<ManifestFile> manifests = history.first(); | ||
| Set<Long> newSnapshots = history.second(); | ||
|
|
||
|
|
@@ -710,14 +731,14 @@ private Pair<List<ManifestFile>, Set<Long>> validationHistory( | |
| TableMetadata base, | ||
| Long startingSnapshotId, | ||
| Set<String> matchingOperations, | ||
| ManifestContent content) { | ||
| ManifestContent content, | ||
| Snapshot parent) { | ||
| List<ManifestFile> manifests = Lists.newArrayList(); | ||
| Set<Long> newSnapshots = Sets.newHashSet(); | ||
|
|
||
| Snapshot lastSnapshot = null; | ||
| Iterable<Snapshot> snapshots = | ||
| SnapshotUtil.ancestorsBetween( | ||
| base.currentSnapshot().snapshotId(), startingSnapshotId, base::snapshot); | ||
| SnapshotUtil.ancestorsBetween(parent.snapshotId(), startingSnapshotId, base::snapshot); | ||
| for (Snapshot currentSnapshot : snapshots) { | ||
| lastSnapshot = currentSnapshot; | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.