Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Add left/right/anti/semi joins to native executor #2743

Merged
merged 22 commits into from
Sep 19, 2024

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Aug 27, 2024

Implement left/right/anti/semi joins to native executor.

  • Left/Right: Build probe table on the opposite side, e.g. for left joins build the probe table on the right side. During the probing phase, if there is no match, add a null row.

  • Anti/Semi: Build probe table on the right side. During the probing phase, emit for anti if there is no match, vice versa for semi.

Running DAFT_ENABLE_NATIVE_EXECUTOR=1 pytest tests/dataframe/test_joins.py with SMJ/broadcast + Outer joins skipped.
Screenshot 2024-08-29 at 9 47 16 PM

@github-actions github-actions bot added the enhancement New feature or request label Aug 27, 2024
Copy link

codspeed-hq bot commented Aug 27, 2024

CodSpeed Performance Report

Merging #2743 will degrade performances by 46.77%

Comparing colin/swordfish-joins (a28249e) with main (7666669)

Summary

⚡ 1 improvements
❌ 1 regressions
✅ 14 untouched benchmarks

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main colin/swordfish-joins Change
test_count[1 Small File] 10.2 ms 19.2 ms -46.77%
test_explain[100 Small Files] 38.1 ms 32.9 ms +15.68%

Copy link

codecov bot commented Aug 30, 2024

Codecov Report

Attention: Patch coverage is 96.06481% with 17 lines in your changes missing coverage. Please review.

Project coverage is 65.98%. Comparing base (7666669) to head (a28249e).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-table/src/probeable/probe_table.rs 81.57% 7 Missing ⚠️
...-execution/src/intermediate_ops/hash_join_probe.rs 97.74% 3 Missing ⚠️
src/daft-table/src/probeable/probe_set.rs 97.29% 3 Missing ⚠️
.../src/intermediate_ops/anti_semi_hash_join_probe.rs 96.36% 2 Missing ⚠️
src/daft-local-execution/src/pipeline.rs 95.74% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2743      +/-   ##
==========================================
+ Coverage   65.27%   65.98%   +0.71%     
==========================================
  Files        1005     1008       +3     
  Lines      113099   113417     +318     
==========================================
+ Hits        73820    74836    +1016     
+ Misses      39279    38581     -698     
Files with missing lines Coverage Δ
.../daft-local-execution/src/sinks/hash_join_build.rs 95.23% <100.00%> (+95.23%) ⬆️
src/daft-table/src/lib.rs 90.55% <ø> (+1.85%) ⬆️
src/daft-table/src/probeable/mod.rs 100.00% <100.00%> (ø)
.../src/intermediate_ops/anti_semi_hash_join_probe.rs 96.36% <96.36%> (ø)
src/daft-local-execution/src/pipeline.rs 91.24% <95.74%> (+36.38%) ⬆️
...-execution/src/intermediate_ops/hash_join_probe.rs 97.41% <97.74%> (+97.41%) ⬆️
src/daft-table/src/probeable/probe_set.rs 97.29% <97.29%> (ø)
src/daft-table/src/probeable/probe_table.rs 93.70% <81.57%> (ø)

... and 50 files with indirect coverage changes

@colin-ho colin-ho changed the title [FEAT] Add other joins to native executor [FEAT] Add left/right/anti/semi joins to native executor Aug 30, 2024
@colin-ho colin-ho marked this pull request as ready for review August 30, 2024 05:05
Comment on lines 306 to 309
pub enum ProbeTable {
WithIdx(ProbeTableWithIdx),
WithoutIdx(ProbeTableWithoutIdx),
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samster25 Gonna keep thinking on this but wanted to ask if you have any opinions on how to allow for probe tables with different values? Right now I have this as an enum at the probe table level, but it's not the best as there's now quite a bit of duplicate code, and on the prober side it opens up more room for error as it now has to get the appropriate probe table type based on the join type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm not a fan of this either. Here's what i'm thinking as an alternative.
Since the API for building add_table will be the same, we can center that into a trait.

fn make_probe_table_builder(schema: SchemaRef, track_indices: bool) -> Box<dyn ProbeTableBuilder> {}

pub trait ProbeTableBuilder {
  fn add_table(&mut self, table: &Table) -> DaftResult<()>;

  fn build(&mut self) -> Box<dyn Probeable>;
}


// Both methods should work if we are tracking indices but if we disabled it, the later should only work.
pub trait Probeable {
  fn probe_indices(&self, table: &Table) -> Iterator<...>;

  fn probe_exists(&self, table: &Table) -> Iterator<bool>;
}

Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left an initial review with some ideas with how to organize the code better. LMK what you think

pub fn probe<'a>(
&'a self,
right: &'a Table,
) -> DaftResult<impl Iterator<Item = (u64, bool)> + 'a> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think you need to return the index here since it should map 1:1 with the right input.

num_rows: usize,
}

impl ProbeTableWithoutIdx {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may make more sense to call it a ProbeSet rather than ProbeTableWithoutIdx

Comment on lines 306 to 309
pub enum ProbeTable {
WithIdx(ProbeTableWithIdx),
WithoutIdx(ProbeTableWithoutIdx),
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm not a fan of this either. Here's what i'm thinking as an alternative.
Since the API for building add_table will be the same, we can center that into a trait.

fn make_probe_table_builder(schema: SchemaRef, track_indices: bool) -> Box<dyn ProbeTableBuilder> {}

pub trait ProbeTableBuilder {
  fn add_table(&mut self, table: &Table) -> DaftResult<()>;

  fn build(&mut self) -> Box<dyn Probeable>;
}


// Both methods should work if we are tracking indices but if we disabled it, the later should only work.
pub trait Probeable {
  fn probe_indices(&self, table: &Table) -> Iterator<...>;

  fn probe_exists(&self, table: &Table) -> Iterator<bool>;
}

Copy link
Member

@kevinzwang kevinzwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Largely, it looks super clean!

I'm wondering why you do not just create a separate intermediate op and sink for the anti and semi hash joins, I think that might actually reduce the amount of code since we would not need the boilerplate for Probeable. The Probeable trait doesn't seem to be a great abstraction to me, since the two things that implement it implement different subsets of the methods, and ProbeTable::probe_exists is unused. Moreover, having separate ops means that we can enforce via types that anti/semi joins take in a ProbeSet and the other joins take in a ProbeTable.

Another comment about that ^ is that the inner and left/right joins actually have some logic for actually creating the final table by taking the appropriate columns from the left and right sides that can be shared better if anti/semi joins are separated out.

Comment on lines 238 to 247
let (build_on, probe_on, build_child, probe_child, build_on_left) = match join_type {
JoinType::Inner => (left_on, right_on, left, right, true),
JoinType::Right => (left_on, right_on, left, right, true),
JoinType::Left | JoinType::Anti | JoinType::Semi => {
(right_on, left_on, right, left, false)
}
JoinType::Outer => {
unimplemented!("Outer join not supported yet");
}
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit but it might be cleaner just to have a match for build_on_left and then set the build_on, build_child, etc based on the build_on_left value. Also, I would just separate the left join type match from the anti and semis for readability

) -> DaftResult<Arc<MicroPartition>> {
if let HashJoinProbeState::ReadyToProbe(probe_table, tables) = self {
let _growables = info_span!("HashJoinOperator::build_growables").entered();

// Left should only be created once per probe table
let mut left_growable =
let mut build_side_growable =
Copy link
Member

@kevinzwang kevinzwang Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious what the performance difference is between using a growable vs storing indices in a vec and then doing table.take(idx). I'm not sure why one would be faster than another, just wondering if you knew.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why are the capacities set to 20 initially? Especially for the left and right joins, where we know at least how large the tables will be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't actually tested the performance, but my theory is that the growable will have a smaller memory foot print than doing a take. Reason being, if we do the take method, we'd actually have to do a concat first, and this could be quite costly, whereas for the growable, we can elide this concat and leave the Vec as is.

We should definitely test this, but, since we already see pretty good performance with the growable method it's probably not a huge priority

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yes you're right we should set the capacity for left/rights

})
}

fn probe<'a>(&'a self, right: &'a Table) -> DaftResult<impl Iterator<Item = bool> + 'a> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename right to input to match the join op naming convention

Comment on lines 72 to 81
h == other.hash && {
let l_idx = other.idx;
let l_table_idx = (l_idx >> Self::TABLE_IDX_SHIFT) as usize;
let l_row_idx = (l_idx & Self::LOWER_MASK) as usize;

let l_table = self.tables.get(l_table_idx).unwrap();

let left_refs = l_table.0.as_slice();

(self.compare_fn)(left_refs, &right_arrays, l_row_idx, r_idx).is_eq()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some kind of complicated logic here that's also repeated in add_table. It would be nice to have a better abstraction over it

Comment on lines 300 to 323
JoinType::Inner => state.probe_inner(
input,
&self.probe_on,
&self.common_join_keys,
&self.left_non_join_columns,
&self.right_non_join_columns,
self.build_on_left,
),
JoinType::Left | JoinType::Right => state.probe_left_right(
input,
&self.probe_on,
&self.common_join_keys,
&self.left_non_join_columns,
&self.right_non_join_columns,
self.join_type == JoinType::Left,
),
JoinType::Semi | JoinType::Anti => state.probe_anti_semi(
input,
&self.probe_on,
self.join_type == JoinType::Semi,
),
JoinType::Outer => {
unimplemented!("Outer join is not yet implemented")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not important but why do we have the actual probing logic inside of the state instead of the operator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not much of a compelling reason, I think it's safe to move the logic to operator side and keep state logic free.

@colin-ho
Copy link
Contributor Author

Largely, it looks super clean!

I'm wondering why you do not just create a separate intermediate op and sink for the anti and semi hash joins, I think that might actually reduce the amount of code since we would not need the boilerplate for Probeable. The Probeable trait doesn't seem to be a great abstraction to me, since the two things that implement it implement different subsets of the methods, and ProbeTable::probe_exists is unused. Moreover, having separate ops means that we can enforce via types that anti/semi joins take in a ProbeSet and the other joins take in a ProbeTable.

Another comment about that ^ is that the inner and left/right joins actually have some logic for actually creating the final table by taking the appropriate columns from the left and right sides that can be shared better if anti/semi joins are separated out.

Makes sense, will separate the anti/semi joins out.

On Probeable, it essentially was made to allow the build operator to send 'some kind of thing that can be probed' to the probe operator via the PipelineChannel (this is the channel that connects operators). Given that this is kind of confusing, maybe it makes sense for the PipelineChannel to solely be dedicated to sending data only, and for the probe table / sets to be sent via some other mechanism. I'll give this a bit more thought and come up with something.

Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the probeable code and that looks good! Seems like @kevinzwang already reviewed the local-execution which also doesn't look that different than before.

}

fn probe_indices<'a>(&'a self, _right: &'a Table) -> DaftResult<IndicesMapper<'a>> {
panic!("Not implemented")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw a more informative error message!

@colin-ho colin-ho enabled auto-merge (squash) September 19, 2024 17:03
@colin-ho colin-ho merged commit 7ee5fda into main Sep 19, 2024
37 checks passed
@colin-ho colin-ho deleted the colin/swordfish-joins branch September 19, 2024 17:11
kevinzwang added a commit that referenced this pull request Sep 19, 2024
Also adds some functionality to the benchmarking code for manual runs

Blocked by #2743 for compatibility with new executor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants