-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-11883: [C++] Add ConcatMap, MergeMap, and an async-reentrant version of Map #9643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
a2d49e3 to
020658b
Compare
cpp/src/arrow/util/async_generator.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this max readahead or max subscriptions (as the implementation itself calls it?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Max subscriptions. I fixed up the naming. Maybe it's not the best name as subscriptions comes from Rx where there are actual "subscriptions" but the concept is pretty much the same.
b51ac8c to
07d6455
Compare
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. One comment about the SerialReadaheadGenerator, which is probably a misunderstanding on my part.
cpp/src/arrow/util/async_generator.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. In the case that max_readahead == 1, the first Callback will run and won't enqueue anything. Then the next time the caller gets a future from the generator, the readahead queue will be empty, even though the generator isn't finished.
But if we instead had last_available > 0, then we'd accidentally make spaces_available negative when the callback runs. I think we have to check last_available before decrementing it, which should be okay since it's not accessed concurrently. Not only that, but the check and decrement should be done in Pump(), which is what actually adds to the queue - otherwise you'd have the same problem again when operator() calls Pump().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And as written, max_readahead > 1 is safe, but will enqueue one fewer item than declared - maybe this is all intentional and I'm misunderstanding the intent here, given the SerialReadaheadSlowConsumer test below which explicitly checks the current behavior (num_delivered == 3 whereas I would expect 4). But if that's the case maybe we should enforce that max_readahead > 1, or add 1 to the given value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be an off-by-1 here that I'll test out tomorrow. I agree that max_readahead == 1 should mean the first callback pumps. It's maybe as simple as starting spaces_available with max_readahead + 1 (in which case spaces_available would be interpreted as the slots in the queue as well as the last item we returned to the caller, in other words, all requests currently in "flight"). This means max_readahead = 1 would allow you to have 2 requests in flight at once but I think that is what I would expect.
However, for your second paragraph, spaces_available IS accessed concurrently. It can be accessed by the caller (the end consuming the generator) as well as the producer (the end marking the futures complete and running the callbacks/continuations). I'm pretty sure it is correct that the caller end operator()() does not decrement spaces_available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. It was indeed an off-by-one. There were two issues, spaces_available was not accounting for the "in-flight" request and ProducerConsumerQueue actually has size-1 "usable" slots (which is a little misleading, size refers to the size in memory and not the functional size).
bce1671 to
f310733
Compare
|
This seems to be based on another PR? |
cpp/src/arrow/util/type_fwd.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth maintaining arrow/util/type_fwd.h? Maybe it can be inlined into arrow/type_fwd.h. In any case, it can wait for follow up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just my 2 cents, but for readability it seems better not to dump everything in a single header.
cpp/src/arrow/util/async_generator.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Future<> AwaitAsyncGenerator(AsyncGenerator<T> generator) { | |
| Future<> DiscardAllFromAsyncGenerator(AsyncGenerator<T> generator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not the biggest fan of discard either as it implies to me the generator is doing nothing but I agree that "await" isn't very clear. I've switched to discard. This is used in the Scanner::ToTable method because the callbacks are side-effecting a table builder and after they've all run we can get a table out of the builder.
13cb4fe to
e320b7a
Compare
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@westonpace could you resolve merge conflicts? |
c33fb15 to
1b61012
Compare
|
@bkietz Rebased |
|
These failures are from the Github Actions blackout. The rest of the jobs seem to be stuck. Feel free to restart them. I fear adding more commits would just add fuel to the issue. |
| TEST(StlUtilTest, VectorUnwrapOrRaise) { | ||
| // FIXME There should be an easier way to construct these vectors | ||
| std::vector<Result<MoveOnlyDataType>> all_good; | ||
| all_good.push_back(Result<MoveOnlyDataType>(MoveOnlyDataType(1))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just FTR, perhaps all_good.emplace_back(MoveOnlyDataType(1)) works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've opened westonpace#6 for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I've moved this into ARROW-11998 and #9734 and converted this from a FIXME to a TODO.
20550db to
1906369
Compare
b38825e to
f3368ec
Compare
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
pitrou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, just one comment
…async datasets Left some debug code in Lint Another compiler warning caught by CI Fixing a few more compiler warnings from CI Fixed a bug in map-reentrant test case Removing ... because I seem to remember them causing trouble on Windows in the past Cleaned up a few test utils and made one small change to the mapping generator that seems to avoid Windows SEH Lint Minor comment miss First pass of addressing PR comments Renamed MergeMap to Merged and ConcatMap to Concatenated per PR. Split iterator_test.cc into iterator_test and async_generator_test Removed unneeded template arg from IsIterationEnd Lint Missed one spot for removing template arg The readahead generator was reading 1 fewer ahead than it should Addressing PR comments Removing forward declaration of AsyncGenerator Create note for TODO follow-up Merged generator was not properly passing through failure Removing unused header that was introducing DCHECK and preventing async_generator from being public Removing type_fwd declaration of AsyncGenerator. It shouldn't have been part of this PR anyways and we removed it later Changed MapVector so it can accept function pointers Refined SleepABit to take less time on Windows systems by doing a pseudo-busy-wait Accidentally added windows_compatibility to header and not cc file Added the new SleepABit to the map stress test as well Try a rediculous timeout to rule out deadlock on appveyor builds
57dbc5e to
8415634
Compare
… behavior. These changes should be removed before merging
…appveyor behavior. These changes should be removed before merging" This reverts commit c65e669.
|
Assuming remaining jobs pass (Travis looks like it is backlogged ~12 hours) or are unrelated, please merge. |
These items can all stand on their own and they are used by the async datasets conversion.
MergeMap - Given AsyncGenerator<AsyncGenerator> return AsyncGenerator. This method flattens a generator of generators into a generator of items. It may reorder the items.
ConcatMap - Same as MergeMap but it will only pull items from one inner subscription at a time. This reduced parallelism allows items to be returned in-order.
Async-reentrant Map - In some cases the map function is slow. Even if the source is not async-reentrant this map can still be async-reentrant by allowing multiple instances of the map function to run at once. The resulting mapped generator is async reentrant but it will not pull reentrantly from the source.
Vector utilities - In order to make migrating from Iterator code to vector code easier I added some map style utilities. These copy the vectors (where an iterator wouldn't) so some care should be taken but they can still be useful.
Moved Future/AsyncGenerator into top level type_fwd. This is needed for the RecordBatchGenerator alias in the same way Iterator is needed at the top level.
Added
IsEndtoIterationTraits. This allows non-comparable types to be iterated on. It allows us to create an AsyncGenerator<AsyncGenerator> since AsyncGenerator is std::function and we can use an empty instance as an end token even though std::function is not comaprable.