Skip to content

Commit

Permalink
perf: Rechunk only once during join probe gather (#21072)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Feb 4, 2025
1 parent 7f4dc50 commit 3336475
Showing 1 changed file with 40 additions and 17 deletions.
57 changes: 40 additions & 17 deletions crates/polars-stream/src/nodes/joins/equi_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,9 @@ impl ProbeState {
// Compute hashed keys and payload.
let (df, seq, src_token, wait_token) = morsel.into_inner();
let hash_keys = select_keys(&df, key_selectors, params, state).await?;
let payload = select_payload(df, payload_selector);
let mut payload = select_payload(df, payload_selector);
let mut payload_rechunked = false; // We don't eagerly rechunk because there might be no matches.

max_seq = seq;

unsafe {
Expand All @@ -371,11 +373,23 @@ impl ProbeState {
IdxSize::MAX,
);

if table_match.is_empty() {
continue;
}

// Gather output and add to buffer.
let mut build_df = if emit_unmatched {
p.df.take_opt_chunked_unchecked(&table_match)
} else {
p.df.take_chunked_unchecked(&table_match, IsSorted::Not)
};

if !payload_rechunked {
// TODO: can avoid rechunk? We have to rechunk here or else we do it
// multiple times during the gather.
payload.rechunk_mut();
payload_rechunked = true;
}
let mut probe_df = payload.take_slice_unchecked_impl(&probe_match, false);

let mut out_df = if params.left_is_build {
Expand All @@ -392,22 +406,25 @@ impl ProbeState {
out_per_partition.push(out_df);
}

let sort_options = SortMultipleOptions {
descending: vec![false],
nulls_last: vec![false],
multithreaded: false,
maintain_order: true,
limit: None,
};
let mut out_df = accumulate_dataframes_vertical_unchecked(out_per_partition);
out_df.sort_in_place([name.clone()], sort_options).unwrap();
out_df.drop_in_place(&name).unwrap();
out_df = postprocess_join(out_df, params);

// TODO: break in smaller morsels.
let out_morsel = Morsel::new(out_df, seq, src_token.clone());
if send.send(out_morsel).await.is_err() {
break;
if !out_per_partition.is_empty() {
let sort_options = SortMultipleOptions {
descending: vec![false],
nulls_last: vec![false],
multithreaded: false,
maintain_order: true,
limit: None,
};
let mut out_df =
accumulate_dataframes_vertical_unchecked(out_per_partition);
out_df.sort_in_place([name.clone()], sort_options).unwrap();
out_df.drop_in_place(&name).unwrap();
out_df = postprocess_join(out_df, params);

// TODO: break in smaller morsels.
let out_morsel = Morsel::new(out_df, seq, src_token.clone());
if send.send(out_morsel).await.is_err() {
break;
}
}
} else {
let mut out_frames = Vec::new();
Expand Down Expand Up @@ -435,6 +452,12 @@ impl ProbeState {
} else {
p.df.take_chunked_unchecked(&table_match, IsSorted::Not)
};
if !payload_rechunked {
// TODO: can avoid rechunk? We have to rechunk here or else we do it
// multiple times during the gather.
payload.rechunk_mut();
payload_rechunked = true;
}
let mut probe_df =
payload.take_slice_unchecked_impl(&probe_match, false);

Expand Down

0 comments on commit 3336475

Please sign in to comment.