Skip to content

Conversation

@sundy-li
Copy link
Contributor

@sundy-li sundy-li commented Feb 28, 2021

  • Introduce limit: Option<usize> for sort functions, then we can use partial_sort to achieve better performance in queries with limit & sort. Datafusion can use it later if it can push up the limit to the sort plan (currently use None as default).

@github-actions
Copy link

@codecov-io
Copy link

codecov-io commented Feb 28, 2021

Codecov Report

Merging #9602 (06c503c) into master (5ae63f8) will increase coverage by 0.03%.
The diff coverage is 82.68%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #9602      +/-   ##
==========================================
+ Coverage   82.32%   82.36%   +0.03%     
==========================================
  Files         245      245              
  Lines       56383    56661     +278     
==========================================
+ Hits        46419    46668     +249     
- Misses       9964     9993      +29     
Impacted Files Coverage Δ
rust/arrow/src/compute/kernels/sort.rs 93.60% <82.62%> (+0.04%) ⬆️
rust/datafusion/src/physical_plan/sort.rs 90.22% <100.00%> (+0.05%) ⬆️
rust/arrow/src/datatypes/field.rs 55.47% <0.00%> (-0.66%) ⬇️
...integration-testing/src/flight_server_scenarios.rs 0.00% <0.00%> (ø)
...-testing/src/flight_server_scenarios/middleware.rs 0.00% <0.00%> (ø)
...ng/src/flight_server_scenarios/auth_basic_proto.rs 0.00% <0.00%> (ø)
...ng/src/flight_server_scenarios/integration_test.rs 0.00% <0.00%> (ø)
rust/arrow/src/csv/reader.rs 93.17% <0.00%> (+0.02%) ⬆️
rust/arrow/src/compute/kernels/cast.rs 97.33% <0.00%> (+0.03%) ⬆️
rust/arrow/src/datatypes/schema.rs 73.39% <0.00%> (+0.76%) ⬆️
... and 2 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 5ae63f8...06c503c. Read the comment docs.

@andygrove andygrove changed the title ARROW-11630: [Rust] Introduce limit option for sort kenerl ARROW-11630: [Rust] Introduce limit option for sort kernel Mar 1, 2021
Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a quick look through and LGTM. I had a question about introducing the new dependency for sorting. We already have sorting in place so I'm curious why the new dependency is needed and whether we should look at updating other sorting code to use the same dependency?

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have the same questions as @andygrove . Also, I would be more favourable having a separate function for this: sorting is already quite a complex logic already.

Comment on lines 54 to 55
partial_sort = "0.1.1"
pdqsort = "1.0.3"
Copy link
Member

@jorgecarleitao jorgecarleitao Mar 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am -0 on this without any benchmarks justifying this. Can't we at least feature-gate this?

pub fn sort(
values: &ArrayRef,
options: Option<SortOptions>,
limit: Option<usize>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this should be done on a separate function instead of adding another parameter to this one. It would also allow to feature-gate said function with the two new dependencies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will introduce partial_sort with limit parameter.

@sundy-li
Copy link
Contributor Author

sundy-li commented Mar 2, 2021

I removed pdqsort, because the performance didn't show any improvement during the benches in my pc.
Added a separate function partial_sort as @jorgecarleitao suggested.

I did not find any other partial_sort function in Rust like std::partial_sort in c++. So I created one.

And the benche results about partial_sort in arrow are:

sort 2^12                     time:   [753.58 us 755.43 us 758.14 us]
sort nulls 2^12               time:   [633.41 us 635.28 us 637.51 us]

sort 2^12 limit 10            time:   [49.246 us 49.820 us 50.667 us]
sort 2^12 limit 100           time:   [115.11 us 116.26 us 117.76 us]
sort 2^12 limit 1000          time:   [645.91 us 654.36 us 663.78 us]
sort nulls 2^12 limit 10      time:   [66.283 us 66.725 us 67.347 us]
sort nulls 2^12 limit 100     time:   [76.281 us 77.907 us 79.783 us]
sort nulls 2^12 limit 1000    time:   [258.98 us 260.32 us 262.24 us]


len = size.min(len);
}

// we are not using partial_sort here, because array is ArrayRef. Something is not working good in that.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "not working good" mean here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there is a bug in the previous version. It's fixed in the newest commit

Some(limit) => {
len = limit.min(len);
if !descending {
valids.partial_sort(len, |a, b| cmp(a.1, b.1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does partial_sort have a fast path when len is the length of the array? The pattern in this block is repeated a few times, if we could just use partial_sort it could be simplified to a simple if let that updates len?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry not "fast path" but would the performance be the same as sort_by?

Copy link
Contributor Author

@sundy-li sundy-li Mar 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry not "fast path" but would the performance be the same as sort_by?

I'm afraid not. The sort function in rust is merge sort, it is slightly faster than the heap sort for larger sets to sort all elements.
But it requires twice the memory of the heap sort because of the second array.

sort 2^12               time:   [799.70 us 806.41 us 814.15 us]
sort 2^12 limit 2^12     time:   [1.2848 ms 1.3012 ms 1.3229 ms]

sort nulls 2^12         time:   [647.20 us 649.27 us 651.61 us]
sort nulls 2^12 limit 2^12   time:   [780.17 us 788.48 us 798.04 us]

We can make a new function to reduce the repeated patterns or make partial_sort fallback to default sort when limit equals to len.

   equals to len size, it will use merge sort by default.

2. update partial_sort version to 0.1.2
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @sundy-li

I found the idea of implementing "Top K" using a sort with limit option interesting -- thank you for pointing at the clickhouse implementation.

There are other, seemingly more memory efficient (and likely performant) ways, to compute the TopK than buffering the entire input and then doing a partial sort. For example, using a priority queue to keep only the top k values in memory.

That being said I am not opposed to adding a partial_sort to arrow if others would find it useful. One datapoint is that I took a look through the C++ arrow kernels and didn't see a limit (which is fine):
https://github.com/apache/arrow/blob/dfd232313e1538b81a38db1e59cf4a109b61a467/cpp/src/arrow/compute/kernels/vector_sort.cc

If we choose to add this feature, another suggest I have is to inline the partial_sort implementation into the arrow crate so that:

  • We can evaluate its test coverage / stability (given it has no other users yet I am not sure we can rely on the rest of the rust community to crowdsource this)
  • Avoid additional crate dependencies (mostly in the future if additional code is added to partial_sort).

I realize you have already gone through the trouble if creating a partial_crate but I think this is worth considering

Some(0),
Some(2),
])) as ArrayRef];
test_lex_sort_arrays(input, expected, Some(3));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too I recommend using a limit that is less than the length of the array

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here input size is 4, expected size is 3.

)?;

// sort combined record batch
// TODO: pushup the limit expression to sort
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I have seen this done in other systems is a rewrite pass a (Sort) -> (Limit) combination into a new operator TopK

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through this PR, and also went skimmed through the code of partial_sort, including tests, benchmarks and the CI pipeline.

First of all, @sundy-li , thank you for your interest in this project, for this PR, and for the cool work on partial_sort 💯

With that said, and in summary, I am -0 on this atm.

On the one hand, there is a very good use case here as partial sort is a common operation. I think it is fine that we have different kernels as C++: there is no spec about that. I also think that the API makes sense, the code is simple and easy to understand, and the tests are sufficient.

On the other hand, we are adding a 10-day old crate as a mandatory dependency of the arrow crate. This dependency is also essentially a large unsafe block and has no miri on CI.

This is not to badmouth the implementation, @sundy-li, which I am fine, even though I did not go through all the details. I just feel that the sheer amount of uncommented unsafe on it is an indication that there is some opportunity to revisit and further improve the crate, thereby reducing the risks of undefined behavior (UB).

In summary, I wish to have this, but I sense a large risk wrt to UB.

imo, a first step here would be for partial_sort to mature and stabilize before we take it as a dependency. However, if others feel that the benefits already outweigh the risks, let's go for it.

#[inline]
fn sort_by<T, F>(array: &mut [T], limit: usize, cmp: F)
where
F: FnMut(&T, &T) -> Ordering,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it need to be FnMut?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be compatible with std::sort_by.

}

/// Sort elements from `ArrayRef` into an unsigned integer (`UInt32Array`) of indices.
/// For floating point arrays any NaN values are considered to be greater than any other non-null value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc could be updated :)

@sundy-li
Copy link
Contributor Author

sundy-li commented Mar 7, 2021

For example, using a priority queue to keep only the top k values in memory.

Yes, but lots of codes may duplicate with sort kernel. partial_sort used priority queue inside. It maybe good to do sorting in pipeline OLAP systems

In ClickHouse, PartialSortingTransform(Each block in each thread) --> MergeSortingTransform(Blocks to one block in each thread) --> MergingSortedTransform(N Block in N Thread to one block) .

┌─explain────────────────────────────────┐
│ (Expression)                           │
│ ExpressionTransform                    │
│   (Limit)                              │
│   Limit                                │
│     (MergingSorted)                    │
│     MergingSortedTransform 16 → 1      │
│       (MergeSorting)                   │
│       MergeSortingTransform × 16       │
│         (PartialSorting)               │
│         LimitsCheckingTransform × 16   │
│           PartialSortingTransform × 16 │
│             (Expression)               │
│             ExpressionTransform × 16   │
│               (SettingQuotaAndLimits)  │
│                 (ReadFromStorage)      │
│                 NumbersMt × 16 0 → 1   │
└────────────────────────────────────────┘

I just feel that the sheer amount of uncommented unsafe on it is an indication that there is some opportunity to revisit and further improve the crate, thereby reducing the risks of undefined behavior (UB).

@alamb @jorgecarleitao Thanks for all your reviews. I also have the consideration about unsafe codes in partial_sort may break arrow, because it was just created, without any used in production(BTW I am new to rust).

We can keep this MR open currently until you think it's safe enough or must have it.

@Dandandan
Copy link
Contributor

I think there are some very nice use-cases of this optimization (top-k kind of queries), I think it would be very cool to have more advanced features like this in Arrow/DataFusion/...

The partial_sort crate seems also quite small (about 60 lines without tests/imports). What about having the code in Arrow for now, and (try to) limit the amount of unsafety @sundy-li and we can have a next review iteration?

@alamb
Copy link
Contributor

alamb commented Mar 7, 2021

In ClickHouse, PartialSortingTransform(Each block in each thread)

This is a very cool idea (effectively to compute top-k for each block in parallel and then merge them together). Thank you for the explanation @sundy-li

@jorgecarleitao
Copy link
Member

Thanks @sundy-li for the explanation.

The partial_sort crate seems also quite small (about 60 lines without tests/imports). What about having the code in Arrow for now, and (try to) limit the amount of unsafety @sundy-li and we can have a next review iteration?

I do not follow the argument. Isn't the whole purpose of crates and dependencies to separate concerns and allow code reuse across the ecosystem without dragging dependencies?

From what I've seen, the crate:

  • addresses one problem and one alone (unlike our repo or our workspace)
  • has CI and CD in place
  • it is easy to publish in crates.io (via a github action) which makes CD also easy
  • it is duo license, as it is the standard in Rust
  • it uses SemVer

If I were @sundy-li, I would keep the setup exactly like it is: it enables the implementation to be used by more projects without having to drag arrow and its dependencies with it, while at the same time maintains the generality that the algorithm itself has. I would be behaving exactly how @sundy-li is behaving: engage with potential "customers" like Arrow to identify value, how it can be adopted, and what are the points to improve.

From our side, I am not sure I agree with copy-pasting code from a crate to our own crate. IMO we should instead actively engage with our "providers" by contributing upstream. In this particular case, one step is to have MIRI run on CI. The other is to either limit or document the usage of unsafe. This way, the Rust community as a whole benefits from these, instead of the benefit be limited to users of the Arrow crate. I created a PR upstream to address one of the steps: sundy-li/partial_sort#1 .

Side note: @alamb , what just happened (on which I could trivially PR upstream without any complications, issues, etc.) is a concrete example of what I have been trying to express on the mailing list wrt to the UX that IMO our repo lacks: I can't do that on this repo.

@Dandandan
Copy link
Contributor

@jorgecarleitao

I'm perfectly fine with that approach too.

I do think that depending on crates that are useful instead of building our own or even duplicating code is what we should do in general, and indeed will benefit others in the Rust community.

My reasoning behind my suggestion above was

  • The crate has no currently no users and (I think) only 1 maintainer.
  • The code size is small
  • The PR author is the same as the maintainer of the crate (and I guess it was created for this use case for now?)
  • It would be easier to review the code / run Miri checks, etc. when it was included in the PR

Of course, there might be some other parties that would be interested in partial_sort or maybe it will at some point make it to the standard library.

Also dependencies have some costs, some of them being:

  • Has some risk that the crated will become unmaintained at some point, but causes a bug / security problem, etc. in Arrow. Or we would like to improve it, but a crate maintainer might be unresponsive. A recent example of this I saw in the memmap crate which now has a fork memmap2 which is quite hard to find as the docs / repo of memmap are still online.
  • Non-semver breaking changes can not be caught in PRs
  • Build times become larger compared to having the same code in the project (as the crate will be downloaded / compiled separately). Of course one extra small dependency won't matter that much, but it accumulates with lots of dependencies.

@jorgecarleitao
Copy link
Member

Thanks for that, @Dandandan, all great points 👍, to which I generally agree with.

It would be easier to review the code / run Miri checks, etc. when it was included in the PR

I agree. My concern atm is that our own crate does not pass miri checks (our miri checks are cargo miri test || true), thus, any potential soundness issues new code may have cannot be identified by our own CI. I only recently was able to have a version of a arrow crate to pass miri checks, and I am still deactivating some checks and investigating. My hypothesis is that it is very difficult to write safe code in this crate's design and thus my state of mind is that our best bet is to push the soundness checks about specific parts of the code to places where those can be done in a more contained environment (i.e. without all the transmutes and the like that we do here). We should still of course try to fix our own miri checks. xD

Build times become larger compared to having the same code in the project (as the crate will be downloaded / compiled separately). Of course one extra small dependency won't matter that much, but it accumulates with lots of dependencies.

I agree. And I think that this is already a problem for DataFusion and parquet. I do get the feeling that the culprits there are crossbeam, tokio, tower and the like. These small crates seem to be downloaded and compiled in parallel and thus have a small impact when compared to the big beasts that block compilation (i.e. we have unbalanced-sized tasks). In arrows' case, the largest is flatbuffer I think, which we could feature-gate under e.g. io_ipc in case people do not need that. We can't get away with these during testing, but I think it is by design; if we want, IMO we need a new compilation unit (e.g. a crate).

Has some risk that the crated will become unmaintained at some point, but causes a bug / security problem, etc. in Arrow. Or we would like to improve it, but a crate maintainer might be unresponsive. A recent example of this I saw in the memmap crate which now has a fork memmap2 which is quite hard to find as the docs / repo of memmap are still online.

I agree. That IMO should be sufficient for us to bring the crate to arrow.

@alamb
Copy link
Contributor

alamb commented Mar 7, 2021

From our side, I am not sure I agree with copy-pasting code from a crate to our own crate. IMO we should instead actively engage with our "providers" by contributing upstream.

@jorgecarleitao as you and @Dandandan have elucidated, I think there are tradeoffs to the two approaches. I am happy with either approach in general -- I suggested the "include it in arrow approach" in this case as I felt it had less overhead for @sundy-li who has already worked on this PR a great deal, but that may not be the best approach either short or long term.

@sundy-li sundy-li closed this Mar 8, 2021
@sundy-li sundy-li reopened this Mar 8, 2021
@sundy-li
Copy link
Contributor Author

sundy-li commented Mar 12, 2021

Hi, all. From other user's advice, partial_sort could be

pub fn partial_sort<T, F>(v: &mut [T], limit: usize, mut is_less: F)
where
    F: FnMut(&T, &T) -> Ordering,
{
    let (before, _mid, _after) = v.select_nth_unstable_by(limit, &mut is_less);
    before.sort_unstable_by(is_less);
}

So I abandon to use of custom partial_sort. This could fix the issue and risk we are arguing about.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is looking good now. Given the lengthy discussion I would like at least one other person (e.g @jorgecarleitao @paddyhoran @andygrove ) to give their signoff on this PR as well

Thank you for sticking with it @sundy-li and I really like how this PR has shaped up.

F: FnMut(&T, &T) -> Ordering,
{
let (before, _mid, _after) = v.select_nth_unstable_by(limit, &mut is_less);
before.sort_unstable_by(is_less);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering, why can we use unstable sort here, while we use stable sort for the normal sort kernel?
Shouldn't that be an option of the kernel?

Copy link
Contributor Author

@sundy-li sundy-li Mar 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some reasons for using unstable_sort.

  1. select_nth_unstable_by is unstable, so using before.sort_stable_by don't ensure it's stable. Currently we do not have select_nth_stable_by
  2. sort_unstable is faster than sort_stable, refer to doc

Copy link
Contributor

@Dandandan Dandandan Mar 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clear, thanks @sundy-li.

Maybe some docs can be added about this?

I think it probably is not really a problem, but might be surprising by someone using the kernel. And we might want to see whether we could use a (faster) unstable version of the normal sort kernel as well (that could be used by DataFusion as well I think).

What do you think @alamb

@alamb
Copy link
Contributor

alamb commented Mar 15, 2021

Any last thoughts prior to merge? I'll plan to merge this later today unless I hear otherwise

@alamb alamb closed this in 2a30cd0 Mar 16, 2021
@alamb
Copy link
Contributor

alamb commented Mar 16, 2021

Merged. Thanks again @sundy-li for getting this done. It is a cool piece of functionality

@alamb
Copy link
Contributor

alamb commented Mar 16, 2021

BTW when I was writing some doc examples for this functionality, it occured to me that there is another potential way to expose this functionality -- #9722

alamb added a commit that referenced this pull request Mar 18, 2021
# Rationale
Examples in the docs serve as both tests and makes the library easier to use

I figured this would be helpful the wake of the very helpful #9602 by @sundy-li

I think I am in a documenting kind of mood this week

Closes #9721 from alamb/alamb/sort_doc_examples

Authored-by: Andrew Lamb <[email protected]>
Signed-off-by: Andrew Lamb <[email protected]>
alamb added a commit to apache/arrow-rs that referenced this pull request Apr 20, 2021
# Rationale
Examples in the docs serve as both tests and makes the library easier to use

I figured this would be helpful the wake of the very helpful apache/arrow#9602 by @sundy-li

I think I am in a documenting kind of mood this week

Closes #9721 from alamb/alamb/sort_doc_examples

Authored-by: Andrew Lamb <[email protected]>
Signed-off-by: Andrew Lamb <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants