Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions cpp/src/arrow/util/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,11 @@ class ARROW_MUST_USE_TYPE Future {
///
/// Returns true if a callback was actually added and false if the callback failed
/// to add because the future was marked complete.
template <typename OnComplete>
bool TryAddCallback(const std::function<OnComplete()>& callback_factory) const {
template <typename CallbackFactory>
bool TryAddCallback(const CallbackFactory& callback_factory) const {
return impl_->TryAddCallback([this, &callback_factory]() {
return Callback<OnComplete>{WeakFuture<T>(*this), callback_factory()};
return Callback<detail::result_of_t<CallbackFactory()>>{WeakFuture<T>(*this),
callback_factory()};
});
}

Expand Down Expand Up @@ -691,25 +692,22 @@ Future<BreakValueType> Loop(Iterate iterate) {

auto control_fut = iterate();
while (true) {
if (control_fut.is_finished()) {
// There's no need to AddCallback on a finished future; we can
// CheckForTermination now. This also avoids recursion and potential stack
// overflow.
if (CheckForTermination(control_fut.result())) return;

control_fut = iterate();
} else {
std::function<Callback()> callback_factory = [this]() { return *this; };
if (control_fut.TryAddCallback(callback_factory)) {
break;
}
// Else we tried to add a callback but someone had stolen in and marked the
// future finished so we can just resume iteration
if (control_fut.TryAddCallback([this]() { return *this; })) {
// Adding a callback succeeded; control_fut was not finished
// and we must wait to CheckForTermination.
return;
}
// Adding a callback failed; control_fut was finished and we
// can CheckForTermination immediately. This also avoids recursion and potential
// stack overflow.
if (CheckForTermination(control_fut.result())) return;

control_fut = iterate();
}
}

Iterate iterate;

// If the future returned by control_fut is never completed then we will be hanging on
// to break_fut forever even if the listener has given up listening on it. Instead we
// rely on the fact that a producer (the caller of Future<>::Make) is always
Expand Down