-
Notifications
You must be signed in to change notification settings - Fork 3k
Add a Parallelized Spark Job Planning Path #1421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a Parallelized Spark Job Planning Path #1421
Conversation
spark/src/main/java/org/apache/iceberg/spark/source/SparkPlannerUtil.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/SparkPlannerUtil.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/SparkPlannerUtil.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/SparkPlannerUtil.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/SparkPlannerUtil.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
Outdated
Show resolved
Hide resolved
569572d to
13296e4
Compare
api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
Outdated
Show resolved
Hide resolved
| found = true; | ||
| fromProjectionPos[i] = j; | ||
| } | ||
| if (fields.get(i).fieldId() == ManifestFile.SPEC_ID.fieldId()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These modifications allow BaseFile to translate into a SparkRow with the specID as a column
| private final SparkSession spark; | ||
| private final Snapshot snapshot; | ||
|
|
||
| private PlanScanAction(SparkSession spark, Table table, TableScan scan) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for type safety public entry points will only be for valid scan types
spark/src/main/java/org/apache/iceberg/actions/PlanScanAction.java
Outdated
Show resolved
Hide resolved
| public CloseableIterable<CombinedScanTask> execute() { | ||
| Map<String, String> options = ((BaseTableScan) scan).options(); | ||
| TableMetadata meta = ((BaseTableScan) scan).tableOps().current(); // TODO maybe pass through metadata instead | ||
| long splitSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block is mostly copied directly from outside configuration for setting up the task combiner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to add a comment there indicating if changes are made there they should consider if they need to update this code too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little afraid of comment rot there, I'm hoping that the fact that we run the tests for this code block with both distributing planning on and off, so if a proper test is added for any changes, it should fail tests when distributed planning is on. I think that is probably the best defense we can do since the comment would have to be on a parent of the Scan class... we also wouldn't be able to warn folks overriding the method in other Scan Impls.
I think the tests are our best chance here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense 👍
| } | ||
|
|
||
| private CloseableIterable<FileScanTask> planDataTableScanFiles() { | ||
| // TODO Currently this approach reads all manifests, no manifest filtering - Maybe through pushdowns or table read |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the scan on ManifestEntries handles no pushdowns, which means every record must be serialized and every manifest must be read when doing distributed planning.
| // Read entries which are not deleted and are datafiles and not delete files | ||
| Dataset<Row> dataFileEntries = | ||
| manifestEntries.filter(manifestEntries.col("data_file").getField(DataFile.CONTENT.name()).equalTo(0)) | ||
| .filter(manifestEntries.col("status").notEqual(2)); // not deleted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These numbers are both magic because ManifestEntry is protected and we can't access the constants from here, may want to change this as well
| specIdPosition = positions.get("partition_spec_id"); | ||
| } | ||
|
|
||
| private SparkDataFile(SparkDataFile other) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy constructor since we need to actually serialize this representation back to Spark when we distributed CombinedScanTasks. This means we actually need a SparkDataFile object for every BaseFileScanTask
| if (fields.get(i).fieldId() == ManifestFile.SPEC_ID.fieldId()) { | ||
| found = true; | ||
| fromProjectionPos[i] = 14; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not related to your PR but while we're here: once we find the projected value and found is true, do we need to iterate over the rest of the entries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't, but I don't think it's that much of a time sink
|
|
||
| public Long toSnapshotId() { | ||
| return context.toSnapshotId(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
spark/src/main/java/org/apache/iceberg/actions/PlanScanAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/actions/PlanScanAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/actions/PlanScanAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/actions/PlanScanAction.java
Outdated
Show resolved
Hide resolved
| .filter(dataFileEntries.col("status").equalTo(1)); // Added files only | ||
| } else if (context.snapshotId() != null) { | ||
| LOG.debug("Planning scan at snapshot {}", context.snapshotId()); | ||
| return dataFileEntries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is correct. It is not really an incremental scan. It is time travelling. We have to read all files that were valid at that snapshot, not those that were created in that snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If scanContext has set a specific snapshot id, we have to set it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me add another test too then, because our current test coverage is not catching that behavior
spark/src/main/java/org/apache/iceberg/actions/PlanScanAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/actions/PlanScanAction.java
Outdated
Show resolved
Hide resolved
Planning for very large scans can take a considerable amount of time even for queries that will not return a large amount of data. Some of the reason of this is that all planning for Spark Reads happens locally prior to any data actually be fetched from Spark. We can speed this up by providing a distributing planning Action which will distribute the generation of the actual scan plan. This should allow us to scale planning with compute and IO resources.
ea98e74 to
14943e4
Compare
| result = Lists.newArrayList(Actions.forTable(table).planScan().withContext(scan.tableScanContext()).execute()); | ||
| } catch (Exception e) { | ||
| if (SparkSession.active().conf().get(PlanScanAction.ICEBERG_TEST_PLAN_MODE).equals("true")) { | ||
| throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we run tests we just want to break if distributed planning fails
|
I see this is out of date with the dev branch, are you still working on this? |
|
Hey @RussellSpitzer and @aokolnychyi what was the status of this PR? We could use some of the changes here to core/api for our planning. Is this close to merging once conflicts are fixed? |
|
I have to do some major updates so that it works with deletes. I plan on getting back to this soon but if you let me know what you want extracted maybe we can do some smaller prs real fast? |
I really only need the visibility modifications and the util classes in I suspect our first iteration isn't focused on V2 delete files. |
|
Forgot to add the most important part: Shout if I can help! cc @vvellanki @pravindra @snazy and @nastra |
This is the second of two WIPs for parallelizing Spark Read Job Planning
The other is located at #1420
To parallelize the creation of TableScanTasks, we use the
metadata tables to get a listing of DataFiles and do filtering in
spark before starting the scan job. Once the correct datafiles are
identified, scan tasks are created and returned.