Conversation
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSortedMergeScanRDD.scala
Show resolved
Hide resolved
|
Ok. I suppose this is better. I'll take a closer look at the iterators tomorrow. Think we'll want a second pair of eyes from @lwwmanning or @robert3005 as well. |
|
The methods in |
| // The readFunction may read some bytes before consuming the iterator, e.g., | ||
| // vectorized Parquet reader. Here we use lazy val to delay the creation of | ||
| // iterator so that we will throw exception in `getNext`. | ||
| private lazy val internalIter = readFile(file) |
There was a problem hiding this comment.
I think you can just pass readFile(currentFile). Why do you need private val file = currentFile?
There was a problem hiding this comment.
I have not modified any code copied from FileScanRDD
There was a problem hiding this comment.
oh nvm this is new, I'll change - this was how it was in the upstream PR
| // Set InputFileBlockHolder for the file block's information | ||
| currentFile = currentIteratorWithRow.getFile | ||
| InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) |
There was a problem hiding this comment.
I understand that's to not break input_file_name() (which holds a thread-local of the current file). But I'm not sure that this will work if you read a batch of rows 1..N from different files and only evaluate input_file_name() afterwards. So for row 1 you might get the file name for N.
Not sure. If the tests don't cover it I guess we'll just have to trust.
There was a problem hiding this comment.
Also I wonder why setting currentFile (which could have side-effects in theory but not in this PR) instead of assigning to a local variable. I guess in case it's read, but I only see the value being read when reading a file. (Which doesn't happen after the first hasNext.)
There was a problem hiding this comment.
it's also used in nextIterator in the BaseScanIterators? Wondering if I should just collapse both of them into a single iterator - the superclass was there from the upstream PR since it had 3 subclasses, but we only have one here. Maybe it might be easier to read.
| // but those files combined together are not globally sorted. With configuration | ||
| // "spark.sql.sources.bucketing.sortedScan.enabled" being enabled, sort ordering | ||
| // is preserved by reading those sorted files in sort-merge way. |
There was a problem hiding this comment.
Can you explain how we assert that?
Maybe some fundamental misunderstanding, but I'd expect the query plans change if you enable bucketing.sortedScan.enabled and the inputs are sorted. (Compared to the inputs are stored but the flag is disabled.) Yet you didn't need to update the testBucketing method.
There was a problem hiding this comment.
The BucketedTableTestSpec has a field (expectedSort) to indicate whether there should be a sort in the query plan - we pass that as the negative of if the flag is enabled. So the testBucketing method already has this code to check the query plan:
joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
s"expected sort in the left child to be $sortLeft but found\n${joinOperator.left}")
assert(
joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")
|
Read through the iterator logic and it looks fine to me. The mechanism seems simple enough. (Note to self: In the first |
|
lgtm 👍🏼 |
PR optimized for a smaller diff from master, as opposed to smaller diff from upstream PR. Single file partitions are now an actual no-op as well.
More details in #730