-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark 3.3: Add SparkChangelogTable #5740
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| private Pair<Table, Long> load(Identifier ident) { | ||
| private Table load(Identifier ident, String version) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using existing options for configuring boundaries. This means we cannot use SQL right now. Only the DF API. Hopefully, we will have support for options in Spark 3.4.
An alternative option is to add a stored procedure to generate a changelog and register it as a view. We will need the procedure in any case to generate pre and pos images. I am reluctant to use table identifiers as it makes the logic tricky.
| } | ||
|
|
||
| SparkChangelogBatch that = (SparkChangelogBatch) o; | ||
| return scan.equals(that.scan); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is very clean to implement both Scan and Batch in one class. I understand we had a performance regression but I think it was because our Batch implementation did not implement equals and hashCode.
Here is the code in Spark BatchScanExec.
override def equals(other: Any): Boolean = other match {
case other: BatchScanExec =>
this.batch == other.batch && this.runtimeFilters == other.runtimeFilters
case _ =>
false
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bryanck, do you remember the details on that issue? Do you think my assumption is reasonable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that was what I found, the equals call returned false and the filters weren't pushed down. I had a workaround for that, but IIRC I ran into some other issues. Unfortunately I didn't delve deeper at that point and I went with reverting the change. It could be that implementing equals resolves the issue. I could run a benchmark test to confirm if interested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, I can submit a separate PR and it would be awesome if you could re-run the benchmark. I'll ping you.
| } | ||
|
|
||
| public static String[] blockLocations(FileIO io, CombinedScanTask task) { | ||
| public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably more of a question for my understanding. Iceberg only guarantee compatibility for classes from iceberg-api module, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. Only iceberg-api has the API / ABI compatibility guarantees.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, but this is compatible as CombinedScanTask implements ScanTaskGroup. Existing user code should continue to work.
...rk-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java
Outdated
Show resolved
Hide resolved
...rk-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| public void testMetadataDeletes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this called metadata delete? is it because of the assertion of DataOperations.DELETE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is because the actual delete operation is issues against an entire partition, the partition of data = 'a'. This delete operation uses an optimized / "metadata only" operation; no data files need to be read or rewritten to perform the delete.
Thata's always been my understanding of "metadata deletes". That they are deletes which only require updating metadata, without having to inspect data files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, @kbendick is spot on.
| long rowsCount = taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum(); | ||
| long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount); | ||
| return new Stats(sizeInBytes, rowsCount); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, statistics were calculated multiple times during the same query in some other scenarios.
Would there be any benefit to caching this result? It was @bryanck I believe who found that we were spending extra time in statistics calculation before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I double checked and we have the same logic in our regular scans. I think it will be fairly cheap to call this method multiple times because taskGroups() caches the result and we will simply iterate over it in memory.
| boolean isChangelog = false; | ||
|
|
||
| for (String meta : parsed.second()) { | ||
| if (meta.equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changelog should be the last element of the list, right? this may have false match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for path-based tables, which have a bit weird identifiers like location#meta1,meta2,meta3 so I am not sure whether changelog must be last. Let me think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I double checked this and I think we should follow the existing logic for path-based tables where the order of parts in a selector does not matter.
| return sparkTable.copyWithSnapshotId(Long.parseLong(version)); | ||
|
|
||
| } else if (table instanceof SparkChangelogTable) { | ||
| throw new UnsupportedOperationException("AS OF is not supported for changelogs"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe complete AS OF as AsOfTime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark supports both timestamp and version based syntax.
temporalClause
: FOR? (SYSTEM_VERSION | VERSION) AS OF version=(INTEGER_VALUE | STRING)
| FOR? (SYSTEM_TIME | TIMESTAMP) AS OF timestamp=valueExpression
flyrain
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @aokolnychyi. Looks good overall. I favor this solution over a view. Is there a plan to support specifying a snapshot range in SQL, e.g. select * from table.changes where start_snapshot = xxx and end_snapshot = xxx?
| import org.apache.spark.broadcast.Broadcast; | ||
| import org.apache.spark.sql.connector.read.InputPartition; | ||
|
|
||
| class SparkInputPartition implements InputPartition, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for this refactor.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
Show resolved
Hide resolved
| if (isChangelog) { | ||
| return new SparkChangelogTable(table, !cacheEnabled); | ||
| } else if (snapshotId != null) { | ||
| return new SparkTable(table, snapshotId, !cacheEnabled); | ||
| } else if (asOfTimestamp != null) { | ||
| return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); | ||
| return new SparkTable( | ||
| table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp), !cacheEnabled); | ||
| } else { | ||
| return Pair.of(table, null); | ||
| return new SparkTable(table, null, !cacheEnabled); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A refactor suggestion: we may use a builder here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try it out if we decide to make changes in SparkCatalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried but it seemed like an overkill as it is just a single place where it makes sense. However, I did refactor this part a bit so it should be slightly better now.
437fd3f to
b6b4da4
Compare
| long snapshotId = Long.parseLong(id.group(1)); | ||
| return Pair.of(table, snapshotId); | ||
| return new SparkTable(table, snapshotId, !cacheEnabled); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker. It'd be more readable if we wrap the code within the catch clause. like this:
try {
org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident));
return new SparkTable(table, !cacheEnabled);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
Table table = loadAlternativeTable(ident);
if (table != null) {
return table;
} else {
throw e;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. Let me do that in a separate PR after this one.
flyrain
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
Thanks for reviewing, @stevenzwu @flyrain @kbendick! |
This PR adds
SparkChangelogTablefor querying changelogs in Spark.