From 33364750deddae7f267896836b2a4182640a11f4 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Tue, 4 Feb 2025 13:31:27 +0100 Subject: [PATCH] perf: Rechunk only once during join probe gather (#21072) --- .../src/nodes/joins/equi_join.rs | 57 +++++++++++++------ 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/crates/polars-stream/src/nodes/joins/equi_join.rs b/crates/polars-stream/src/nodes/joins/equi_join.rs index ae91d7087c9f..217e09b4a8a8 100644 --- a/crates/polars-stream/src/nodes/joins/equi_join.rs +++ b/crates/polars-stream/src/nodes/joins/equi_join.rs @@ -344,7 +344,9 @@ impl ProbeState { // Compute hashed keys and payload. let (df, seq, src_token, wait_token) = morsel.into_inner(); let hash_keys = select_keys(&df, key_selectors, params, state).await?; - let payload = select_payload(df, payload_selector); + let mut payload = select_payload(df, payload_selector); + let mut payload_rechunked = false; // We don't eagerly rechunk because there might be no matches. + max_seq = seq; unsafe { @@ -371,11 +373,23 @@ impl ProbeState { IdxSize::MAX, ); + if table_match.is_empty() { + continue; + } + + // Gather output and add to buffer. let mut build_df = if emit_unmatched { p.df.take_opt_chunked_unchecked(&table_match) } else { p.df.take_chunked_unchecked(&table_match, IsSorted::Not) }; + + if !payload_rechunked { + // TODO: can avoid rechunk? We have to rechunk here or else we do it + // multiple times during the gather. + payload.rechunk_mut(); + payload_rechunked = true; + } let mut probe_df = payload.take_slice_unchecked_impl(&probe_match, false); let mut out_df = if params.left_is_build { @@ -392,22 +406,25 @@ impl ProbeState { out_per_partition.push(out_df); } - let sort_options = SortMultipleOptions { - descending: vec![false], - nulls_last: vec![false], - multithreaded: false, - maintain_order: true, - limit: None, - }; - let mut out_df = accumulate_dataframes_vertical_unchecked(out_per_partition); - out_df.sort_in_place([name.clone()], sort_options).unwrap(); - out_df.drop_in_place(&name).unwrap(); - out_df = postprocess_join(out_df, params); - - // TODO: break in smaller morsels. - let out_morsel = Morsel::new(out_df, seq, src_token.clone()); - if send.send(out_morsel).await.is_err() { - break; + if !out_per_partition.is_empty() { + let sort_options = SortMultipleOptions { + descending: vec![false], + nulls_last: vec![false], + multithreaded: false, + maintain_order: true, + limit: None, + }; + let mut out_df = + accumulate_dataframes_vertical_unchecked(out_per_partition); + out_df.sort_in_place([name.clone()], sort_options).unwrap(); + out_df.drop_in_place(&name).unwrap(); + out_df = postprocess_join(out_df, params); + + // TODO: break in smaller morsels. + let out_morsel = Morsel::new(out_df, seq, src_token.clone()); + if send.send(out_morsel).await.is_err() { + break; + } } } else { let mut out_frames = Vec::new(); @@ -435,6 +452,12 @@ impl ProbeState { } else { p.df.take_chunked_unchecked(&table_match, IsSorted::Not) }; + if !payload_rechunked { + // TODO: can avoid rechunk? We have to rechunk here or else we do it + // multiple times during the gather. + payload.rechunk_mut(); + payload_rechunked = true; + } let mut probe_df = payload.take_slice_unchecked_impl(&probe_match, false);