Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Dec 21, 2020

This PR adds support for the repartition operator and it is plumbed through from the DataFrame API all the way through to execution.

The benchmark crate TPC-H file conversion utility has been updated to take advantage of this new operator.

I can break this down into smaller PRs if that helps.

@github-actions
Copy link

@codecov-io
Copy link

codecov-io commented Dec 21, 2020

Codecov Report

Merging #8982 (aabdf94) into master (0519c4c) will decrease coverage by 0.10%.
The diff coverage is 50.78%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #8982      +/-   ##
==========================================
- Coverage   82.64%   82.54%   -0.11%     
==========================================
  Files         200      201       +1     
  Lines       49730    49983     +253     
==========================================
+ Hits        41098    41256     +158     
- Misses       8632     8727      +95     
Impacted Files Coverage Δ
rust/benchmarks/src/bin/tpch.rs 0.00% <0.00%> (ø)
rust/datafusion/src/execution/dataframe_impl.rs 93.47% <0.00%> (-2.80%) ⬇️
rust/datafusion/src/logical_plan/builder.rs 88.26% <0.00%> (-1.84%) ⬇️
...datafusion/src/optimizer/hash_build_probe_order.rs 58.42% <0.00%> (-0.67%) ⬇️
...t/datafusion/src/optimizer/projection_push_down.rs 97.70% <ø> (ø)
rust/datafusion/src/optimizer/utils.rs 58.71% <0.00%> (-3.05%) ⬇️
rust/datafusion/src/physical_plan/planner.rs 77.46% <0.00%> (-3.00%) ⬇️
rust/datafusion/src/logical_plan/plan.rs 82.73% <5.55%> (-5.39%) ⬇️
rust/datafusion/src/physical_plan/mod.rs 87.80% <50.00%> (+0.62%) ⬆️
rust/datafusion/src/physical_plan/repartition.rs 76.00% <76.00%> (ø)
... and 12 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0519c4c...aabdf94. Read the comment docs.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like where this is headed @andygrove -- 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that you asked, but if I had to pick 2 of these three schemes to implement, I would pick RoundRobinBatch and Hash and leave RoundRobinRow until later

The rationale being that I theorize RoundRobinRow usecase is much less common (e.g. maybe re-evening output of joins or filters, but I would expect most operators to respect the requested batch size if possible when creating their output)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. That makes sense and I have removed RoundRobinRow now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I was trying to implement the RoundRobinRow functionality independently and was going down a route similar to the StructBuilder vector of builders route: https://github.com/apache/arrow/blob/master/rust/arrow/src/array/builder.rs#L1600. Staying at the RecordBatch level is much more sensible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems to me that the biggest buffer we would want would be the total number of cores available for processing. Any larger and we are just wasting memory and cache size if the producer can make them faster than the consumer can consume them

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this turned out to be the really challenging part. Input partitions are sending to multiple output partitions, but those output partitions could be read in order and this results in deadlocks if the buffer is too small. I switched to using unbounded channels for now to make this functional but I know this isn't a great solution. I think I need to sleep on this and have another look tomorrow now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am adding unit tests now that will be easily modifiable to demonstrate this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect the deadlock problem to be most acute when trying to keep the data sorted (e.g a traditional merge). I didn't think we had any operators like that (yet) in DataFusion.

Maybe we need to use try_recv when reading from channels rather than recvso as not to block on empty channels

When we do actually have something that is trying to keep the data sorted, the behavior you want is "keep producing until every output channel has at least one record batch"

Using round robin repartitioning, you can probably avoid infinite channels. Using hash re-partitioning, however, I don't think in general there is any way to ensure you have evenly distributed rows

@github-actions github-actions bot added the needs-rebase A PR that needs to be rebased by the author label Dec 22, 2020
@andygrove
Copy link
Member Author

@alamb @jorgecarleitao I have not implemented a poll_next method before and am struggling a bit with this. I have not given up yet, but if you happen to have time today, maybe you give me some advice.

I have a Receiver already (from async-channel) which is also supposed to implement Stream so I thought it would be trivial to connect these together but it is not. I am very likely missing something obvious here though.

@andygrove
Copy link
Member Author

@alamb @jorgecarleitao never mind ... switching to crossbeam did the trick

@github-actions github-actions bot removed the needs-rebase A PR that needs to be rebased by the author label Dec 22, 2020
@andygrove andygrove changed the title ARROW-10582: [Rust] [DataFusion] Implement "repartition" operator [WIP] ARROW-10582: [Rust] [DataFusion] Implement "repartition" operator Dec 22, 2020
@andygrove
Copy link
Member Author

@alamb @jorgecarleitao @seddonm1 @Dandandan This is ready for review now

let mut rx = self.rx.lock().await;

let num_input_partitions = self.input.output_partitioning().partition_count();
let num_output_partition = self.partitioning.partition_count();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bikeshedding but num_output_partition -> num_output_partitions would help readability

let mut counter = 0;
while let Some(result) = stream.next().await {
match partitioning {
Partitioning::RoundRobinBatch(_) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hash partition is not yet implemented here?

Copy link
Member Author

@andygrove andygrove Dec 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I filed https://issues.apache.org/jira/browse/ARROW-11011 to implement hash partitioning as a separate PR since it will be quite a lot of work.

RepartitionExec::try_new returns a DataFusionError::NotImplemented error if you try and create it with the hash partitioning scheme.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok 👍 makes sense!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an old hash repartitioning code in a branch around from a previous try. Quite old by now, but I can definitely put it together for this (like I did for the join). I think we now actually have the framework in place to use it.

@andygrove andygrove marked this pull request as ready for review December 22, 2020 22:49
Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, @andygrove . Good as is. Left minor comments.


fn repartition(
&self,
partitioning_scheme: Partitioning,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this introduces a new naming, partitioning_scheme.

We have:

  • partition
  • partitioning
  • partitioning_scheme
  • repartition
  • part

I do not know the common notation, but we could try to reduce the number of different names we use.

In my (little) understanding:

  • data is partitioned according to a partition
  • partitioned data is divided in parts
  • we can repartition it according to a new partition.

In this understanding, I would replace partitioning and partitioning_scheme by partition.

Even if this understanding is not correct, maybe we could reduce the number of different names?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree keeping the number of different names low is important

I suggest using

  • partition to refer to an actual portion of the data (in a bunch of RecordBatches)
  • partitioning to refer to the "schema" of how the data is divided into partitions (the use of the Partitioning scheme now)

Thus we would repartition the data into a new partitioning

let mut counter = 0;
while let Some(result) = stream.next().await {
match partitioning {
Partitioning::RoundRobinBatch(_) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an old hash repartitioning code in a branch around from a previous try. Quite old by now, but I can definitely put it together for this (like I did for the join). I think we now actually have the framework in place to use it.

/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
/// let df2 = df.repartition(Partitioning::Hash(vec![col("a")], 4))?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not place it in an example since we do not support it yet.

pub enum Partitioning {
/// Allocate batches using a round-robin algorithm
RoundRobinBatch(usize),
/// Allocate rows based on a hash of one of more expressions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document usize? (Number of parts?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also add a comment here that Hash partitioning is not yet completely implemented so as to avoid runtime disappointment for someone who sees this enum in the code

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really nicely done @andygrove 👍


fn repartition(
&self,
partitioning_scheme: Partitioning,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree keeping the number of different names low is important

I suggest using

  • partition to refer to an actual portion of the data (in a bunch of RecordBatches)
  • partitioning to refer to the "schema" of how the data is divided into partitions (the use of the Partitioning scheme now)

Thus we would repartition the data into a new partitioning

pub enum Partitioning {
/// Allocate batches using a round-robin algorithm
RoundRobinBatch(usize),
/// Allocate rows based on a hash of one of more expressions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also add a comment here that Hash partitioning is not yet completely implemented so as to avoid runtime disappointment for someone who sees this enum in the code

// partitions to be blocked when sending data to output receivers that are not
// being read yet. This may cause high memory usage if the next operator is
// reading output partitions in order rather than concurrently. One workaround
// for this would be to add spill-to-disk capabilities.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the other work around is to ensure that any operator that reads from multiple partitions doesn't block waiting for data from partition channel if other partitions can produce data.

With that invariant, the only operators that would need spill to disk would be ones that are maintaining the sorted ness (e.g a classic merge)

@andygrove
Copy link
Member Author

Thanks for the reviews. I have pushed changes to address feedback:

  • Made variable names more consistent
  • Documented that Partitioning::Hash is not yet supported (with link to JIRA issue)
  • Updated the enum documentation to mention that the usize represents the number of partitions

@andygrove andygrove closed this in 2f5d592 Dec 24, 2020
alamb added a commit that referenced this pull request Mar 3, 2021
…void potential deadlocks

# Rationale

As spotted / articulated by @edrevo #9523 (comment), the intermixing of `crossbeam` channels (not designed for `async` and can block task threads) and `async` code such as DataFusion can lead to deadlock.

At least one of the crossbeam uses predates DataFusion being async (e.g. the one in the parquet reader). The use of crossbeam in the repartition operator in #8982 may have resulted from the re-use of the same pattern.

# Changes

1. Removes the use of crossbeam channels from DataFusion (in `RepartitionExec` and `ParquetExec`) and replace with tokio channels (which are designed for single threaded code).
2. Removes `crossbeam` dependency entirely
3. Removes use of `multi_thread`ed executor in tests (e.g. `#[tokio::test(flavor = "multi_thread")]`) which can mask hangs

# Kudos / Thanks

This PR incorporates the work of @seddonm1 from #9603 and @edrevo in  https://github.com/edrevo/arrow/tree/remove-crossbeam (namely 97c256c4f76b8185311f36a7b27e317588904a3a). A big thanks to both of them for their help in this endeavor.

Closes #9605 from alamb/alamb/remove_hang

Lead-authored-by: Ximo Guanter <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Mike Seddon <[email protected]>
Signed-off-by: Andrew Lamb <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants