Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions cpp/src/arrow/util/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,8 @@ class FutureStorage : public FutureStorageBase {

Status status() const { return result_.status(); }

template <typename U>
void MarkFinished(U&& value) {
result_ = std::forward<U>(value);
void MarkFinished(Result<T> result) {
result_ = std::move(result);
if (ARROW_PREDICT_TRUE(result_.ok())) {
impl_->MarkFinished();
} else {
Expand Down Expand Up @@ -250,7 +249,7 @@ class Future {
// The default constructor creates an invalid Future. Use Future::Make()
// for a valid Future. This constructor is mostly for the convenience
// of being able to presize a vector of Futures.
Future() : impl_(NULLPTR) {}
Future() = default;

// Consumer API

Expand Down Expand Up @@ -311,6 +310,15 @@ class Future {
return impl_->Wait(seconds);
}

/// If a Result<Future> holds an error instead of a Future, construct a finished Future
/// holding that error.
static Future DeferNotOk(Result<Future> maybe_future) {
if (ARROW_PREDICT_FALSE(!maybe_future.ok())) {
return MakeFinished(std::move(maybe_future).status());
}
return std::move(maybe_future).MoveValueUnsafe();
}

// Producer API

/// \brief Producer API: execute function and mark Future finished
Expand Down Expand Up @@ -368,7 +376,7 @@ class Future {
}

std::shared_ptr<FutureStorage<T>> storage_;
FutureImpl* impl_;
FutureImpl* impl_ = NULLPTR;

friend class FutureWaiter;
};
Expand Down Expand Up @@ -419,15 +427,4 @@ inline std::vector<int> WaitForAny(const std::vector<Future<T>*>& futures,
return waiter->MoveFinishedFutures();
}

#define ARROW_ASSIGN_OR_RETURN_FUTURE_IMPL(result_name, lhs, T, rexpr) \
auto result_name = (rexpr); \
if (ARROW_PREDICT_FALSE(!(result_name).ok())) { \
return Future<T>::MakeFinished(std::move(result_name).status()); \
} \
lhs = std::move(result_name).MoveValueUnsafe();

#define ARROW_ASSIGN_OR_RETURN_FUTURE(lhs, T, rexpr) \
ARROW_ASSIGN_OR_RETURN_FUTURE_IMPL( \
ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, T, rexpr);

} // namespace arrow
4 changes: 1 addition & 3 deletions cpp/src/arrow/util/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,8 @@ class ARROW_EXPORT Executor {
typename RT = typename detail::ExecutorResultTraits<FunctionRetType>,
typename ValueType = typename RT::ValueType>
Future<ValueType> SubmitAsFuture(Function&& func, Args&&... args) {
ARROW_ASSIGN_OR_RETURN_FUTURE(
auto future, ValueType,
return Future<ValueType>::DeferNotOk(
Submit(std::forward<Function>(func), std::forward<Args>(args)...));
return future;
}

// Return the level of parallelism (the number of tasks that may be executed
Expand Down