-
Notifications
You must be signed in to change notification settings - Fork 3k
Core, API, Spec: Metadata Row Lineage #11948
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
715dbba
4039869
c416acf
1b4f0f6
75c1ad4
3ed2844
a58f7b4
5811521
f472497
1023d81
d7c5416
2c0546d
f048cc3
a929c32
b51efa8
c567421
d2e016f
9b574ff
8d143c9
e789fda
6fac1a4
61f18c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,6 +43,8 @@ class BaseSnapshot implements Snapshot { | |
| private final Map<String, String> summary; | ||
| private final Integer schemaId; | ||
| private final String[] v1ManifestLocations; | ||
| private final Long firstRowId; | ||
| private final Long addedRows; | ||
|
|
||
| // lazily initialized | ||
| private transient List<ManifestFile> allManifests = null; | ||
|
|
@@ -61,7 +63,9 @@ class BaseSnapshot implements Snapshot { | |
| String operation, | ||
| Map<String, String> summary, | ||
| Integer schemaId, | ||
| String manifestList) { | ||
| String manifestList, | ||
| Long firstRowId, | ||
| Long addedRows) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe worth adding an overriding constructor that takes these two new arguments? And the existing constructor would then just pass nulls for these? That way you wouldn't have to update all the constructor calls
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I think there is more of an argument to have multiple constructors since there isn't a builder, but I"m still hesitant to add another constructor here. What instances to we have where we want to make a new Snapshot and not have these fields explicitly specified?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess one alternative could be to introduce a Snapshot Builder instead of relying on the constructor. But if we decide to do that, I'd add that builder to the core module rather than the test module (as is being done by #11947).
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair enough I think we can also punt on this for now and have it be a cleanup later since it's not really integral to this pr
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we decide to add a Builder or some other kinda refactoring, I'd also prefer to punt. The PR is fairly sizeable and I'd prefer to keep it focused on the row lineage core metadata changes (and all of this rather internal at the moment) |
||
| this.sequenceNumber = sequenceNumber; | ||
| this.snapshotId = snapshotId; | ||
| this.parentId = parentId; | ||
|
|
@@ -71,6 +75,8 @@ class BaseSnapshot implements Snapshot { | |
| this.schemaId = schemaId; | ||
| this.manifestListLocation = manifestList; | ||
| this.v1ManifestLocations = null; | ||
| this.firstRowId = firstRowId; | ||
| this.addedRows = addedRows; | ||
| } | ||
|
|
||
| BaseSnapshot( | ||
|
|
@@ -91,6 +97,8 @@ class BaseSnapshot implements Snapshot { | |
| this.schemaId = schemaId; | ||
| this.manifestListLocation = null; | ||
| this.v1ManifestLocations = v1ManifestLocations; | ||
| this.firstRowId = null; | ||
| this.addedRows = null; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -128,6 +136,16 @@ public Integer schemaId() { | |
| return schemaId; | ||
| } | ||
|
|
||
| @Override | ||
| public Long firstRowId() { | ||
| return firstRowId; | ||
| } | ||
|
|
||
| @Override | ||
| public Long addedRows() { | ||
| return addedRows; | ||
| } | ||
|
|
||
| private void cacheManifests(FileIO fileIO) { | ||
| if (fileIO == null) { | ||
| throw new IllegalArgumentException("Cannot cache changes: FileIO is null"); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,6 +60,7 @@ private MetadataUpdateParser() {} | |
| static final String SET_PARTITION_STATISTICS = "set-partition-statistics"; | ||
| static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics"; | ||
| static final String REMOVE_PARTITION_SPECS = "remove-partition-specs"; | ||
| static final String ENABLE_ROW_LINEAGE = "enable-row-lineage"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI that this also requires changes in the OpenAPI spec
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| // AssignUUID | ||
| private static final String UUID = "uuid"; | ||
|
|
@@ -154,6 +155,7 @@ private MetadataUpdateParser() {} | |
| .put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION) | ||
| .put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION) | ||
| .put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS) | ||
| .put(MetadataUpdate.EnableRowLineage.class, ENABLE_ROW_LINEAGE) | ||
| .buildOrThrow(); | ||
|
|
||
| public static String toJson(MetadataUpdate metadataUpdate) { | ||
|
|
@@ -249,6 +251,8 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator | |
| case REMOVE_PARTITION_SPECS: | ||
| writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator); | ||
| break; | ||
| case ENABLE_ROW_LINEAGE: | ||
| break; | ||
| default: | ||
| throw new IllegalArgumentException( | ||
| String.format( | ||
|
|
@@ -322,6 +326,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) { | |
| return readCurrentViewVersionId(jsonNode); | ||
| case REMOVE_PARTITION_SPECS: | ||
| return readRemovePartitionSpecs(jsonNode); | ||
| case ENABLE_ROW_LINEAGE: | ||
| return new MetadataUpdate.EnableRowLineage(); | ||
| default: | ||
| throw new UnsupportedOperationException( | ||
| String.format("Cannot convert metadata update action to json: %s", action)); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,7 @@ | |
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Queue; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
|
|
@@ -282,6 +283,13 @@ public Snapshot apply() { | |
| throw new RuntimeIOException(e, "Failed to write manifest list file"); | ||
| } | ||
|
|
||
| Long addedRows = null; | ||
| Long lastRowId = null; | ||
| if (base.rowLineageEnabled()) { | ||
| addedRows = calculateAddedRows(manifests); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's an optimization so we can always just do this later (and arguably makes it a bit harder to read the code) but instead of waiting until all the manifests are written what if we set the addedRows as we add manifests to the writer in the try with-resources-above Something like
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's worth thinking about, but here I'm not sure it's that much of an optimization. The amount of new manifests should be pretty small, even if it was thousands of manifests the overhead should be very low imho and we already have them in memory. |
||
| lastRowId = base.nextRowId(); | ||
| } | ||
|
|
||
| return new BaseSnapshot( | ||
| sequenceNumber, | ||
| snapshotId(), | ||
|
|
@@ -290,7 +298,27 @@ public Snapshot apply() { | |
| operation(), | ||
| summary(base), | ||
| base.currentSchemaId(), | ||
| manifestList.location()); | ||
| manifestList.location(), | ||
| lastRowId, | ||
| addedRows); | ||
| } | ||
|
|
||
| private Long calculateAddedRows(List<ManifestFile> manifests) { | ||
| return manifests.stream() | ||
| .filter( | ||
| manifest -> | ||
| manifest.snapshotId() == null | ||
| || Objects.equals(manifest.snapshotId(), this.snapshotId)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we use there is no practical difference, since earlier call of |
||
| .mapToLong( | ||
| manifest -> { | ||
| Preconditions.checkArgument( | ||
| manifest.addedRowsCount() != null, | ||
| "Cannot determine number of added rows in snapshot because" | ||
| + " the entry for manifest %s is missing the field `added-rows-count`", | ||
| manifest.path()); | ||
amogh-jahagirdar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return manifest.addedRowsCount(); | ||
| }) | ||
| .sum(); | ||
| } | ||
|
|
||
| protected abstract Map<String, String> summary(); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.