-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec #9070
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## master #9070 +/- ##
==========================================
+ Coverage 82.59% 82.60% +0.01%
==========================================
Files 204 204
Lines 50169 50197 +28
==========================================
+ Hits 41436 41465 +29
+ Misses 8733 8732 -1
Continue to review full report at Codecov.
|
|
I'm seeing a 2x improvement in performance for q12 @ SF=100 with batch size 4096 🚀 This should allow us to reduce the default batch size and reduce memory pressure. |
| use crate::error::{DataFusionError, Result}; | ||
|
|
||
| use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; | ||
| use crate::physical_plan::coalesce_batches::concat_batches; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a place to put this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should probably go into the arrow crate since it isn't specific to DataFusion.
|
|
||
| let mut key = Vec::with_capacity(keys_values.len()); | ||
|
|
||
| let mut left_indices = UInt64Builder::new(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems better for inner join to not use builder / allocate bitmap for nulls? What is the current most ergonomic way to do this?
|
|
||
| for _ in 0..indices.len() { | ||
| // on an inner join, left and right indices are present | ||
| right_indices.append_value(row as u32)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could use something like append_n
| RecordBatch::try_new(Arc::new(schema.clone()), columns) | ||
| } | ||
|
|
||
| /// Create a key `Vec<u8>` that is used as key for the hashmap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently looks like creating this key / hashing is most expensive part of the queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Is now ready for review. The PR now includes the following:
|
| // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true | ||
| // for rows 3 and 8 from batch 0 and row 6 from batch 1. | ||
| type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>; | ||
| type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change does mean that we can only use this join operator when the data can fit into a single batch. For example, if we are using 32-bit offsets then this limits us to 4GB of string data in a single column.
I don't know if we have any users that care about this scenario or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but as far as I can tell only limits the max nr of elements to 2 ^ 32 for each batch / array on the right?
I think the string data per right record batch itself should be able to hold more than 4GB because the string offsets are stored separately, or am I wrong about that?
If this becomes a problem in the future, we can change it to use u64 for the right indices as well, I think without too much extra cost besides double allocation/memory usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Arrow spec supports both 32-bit and 64-bit variants for the offset vector, so in the 32-bit case, each batch would be limited to 2^32 bytes of variable-width data per array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant is that the types of the index used in the hash join has no effect of the type being used in the string array offsets. So the offsets values inside the string array might be using 64-bit values, while here we use 32-bit to index into the string elements, which would allow more than 4GB? Anyway, at some point there is a limit we should keep in mind 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also FYI, for the left side we are now using 64 bit indices, so the 2^ 32 limit currently should be only on the number of elements per right batch, which seems reasonable to me (and easy to get rid of that constraint). Also, on the other hand, a future way to save some memory could be to choose for 32 bit indices for the left side whenever there are less than 2 ^ 32 items in the build side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see what you mean now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be a bit less trivial to get rid of than I thought, but at some point you could make a case either to convert those string arrays to use 64 bit indexes or to split the batches to have 2^32 size, or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation @andygrove 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I would love to see a join approach for larger datasets that doesn't require giant buffers (eg. 64 bit indexes) but a runtime switch over to using sort/merge join or its equivalent. But that is a pipe dream for me at the moment.
andygrove
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM and I re-tested and still see a ~2x speedup for TPC-H q12 @ SF=100 with batch size 4096.
I added a comment noting the restriction when using 32-bit offsets. I think it would be reasonable to address this in the future if it becomes an issue for anyone.
|
@andygrove , isn't this going in the direction that we were discussing in https://issues.apache.org/jira/browse/ARROW-11058 wrt to the current lack of benefits in having multiple batches (though you just mentioned a benefit: indexing size)? |
|
Yes @jorgecarleitao reducing the number of batches makes sense here. I am just slightly concerned that always reducing to a single batch may be introducing a constraint that could bite us in the future, although this is realistically only an issue when using 32-bit offsets for variable-width arrays. |
|
@jorgecarleitao @andygrove |
|
@Dandandan Yes, I personally think this is fine for now (hence the approval) and since no-one has objected I think we can go ahead and merge this. |
jorgecarleitao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
![]()
|
FYI I am planning to on working on a the first changes https://issues.apache.org/jira/browse/ARROW-11112 basef on this PR/branch later this week, to start getting rid of the hash key creation and start prepare the algorithm to use a vectorized implementation. |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Dandandan !
| // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true | ||
| // for rows 3 and 8 from batch 0 and row 6 from batch 1. | ||
| type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>; | ||
| type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I would love to see a join approach for larger datasets that doesn't require giant buffers (eg. 64 bit indexes) but a runtime switch over to using sort/merge join or its equivalent. But that is a pipe dream for me at the moment.
This PR removes the exponential slowdown based on the number of left/right batches in the hash join by concatenating all left batches to a single batch, so we can directly index into the array (with
take) instead of building a structure / iterating all the left arrays. This also simplifies the code, which can help with further speed improvements and reducing memory usage.The benchmark results look very promising, mostly removing the overhead of small batches and big datasets with many left-side batches. For a very small batch size of 64, on tcph q 12:
Master
The PR also includes improvements on query 1 / 5 (batch size 64k):
This PR:
Master
PR
Master: