diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index eaa6b325cc6..bf4b6d94787 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -189,7 +189,6 @@ set(ARROW_SRCS util/future.cc util/int_util.cc util/io_util.cc - util/iterator.cc util/logging.cc util/key_value_metadata.cc util/memory.cc diff --git a/cpp/src/arrow/csv/column_decoder.cc b/cpp/src/arrow/csv/column_decoder.cc index c57477ef59d..1dd13bc9086 100644 --- a/cpp/src/arrow/csv/column_decoder.cc +++ b/cpp/src/arrow/csv/column_decoder.cc @@ -84,7 +84,7 @@ class ConcreteColumnDecoder : public ColumnDecoder { auto chunk_index = next_chunk_++; WaitForChunkUnlocked(chunk_index); // Move Future to avoid keeping chunk alive - return std::move(chunks_[chunk_index]).result(); + return chunks_[chunk_index].MoveResult(); } protected: diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 544098caf54..142104a0ea8 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -918,11 +918,11 @@ class AsyncThreadedTableReader ARROW_ASSIGN_OR_RAISE(auto istream_it, io::MakeInputStreamIterator(input_, read_options_.block_size)); - ARROW_ASSIGN_OR_RAISE(auto bg_it, - MakeBackgroundGenerator(std::move(istream_it), thread_pool_)); + ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it))); + bg_it = TransferGenerator(std::move(bg_it), thread_pool_); int32_t block_queue_size = thread_pool_->GetCapacity(); - auto rh_it = AddReadahead(bg_it, block_queue_size); + auto rh_it = AddReadahead(std::move(bg_it), block_queue_size); buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(rh_it)); return Status::OK(); } @@ -957,7 +957,7 @@ class AsyncThreadedTableReader return Status::OK(); }; - return VisitAsyncGenerator(block_generator, block_visitor) + return VisitAsyncGenerator(std::move(block_generator), block_visitor) .Then([self](...) -> Future<> { // By this point we've added all top level tasks so it is safe to call // FinishAsync diff --git a/cpp/src/arrow/csv/reader_test.cc b/cpp/src/arrow/csv/reader_test.cc index d8c210672e0..594bc5046a7 100644 --- a/cpp/src/arrow/csv/reader_test.cc +++ b/cpp/src/arrow/csv/reader_test.cc @@ -62,6 +62,27 @@ void StressTableReader(TableReaderFactory reader_factory) { } } +void StressInvalidTableReader(TableReaderFactory reader_factory) { + const int NTASKS = 100; + const int NROWS = 1000; + ASSERT_OK_AND_ASSIGN(auto table_buffer, MakeSampleCsvBuffer(NROWS, false)); + + std::vector>> task_futures(NTASKS); + for (int i = 0; i < NTASKS; i++) { + auto input = std::make_shared(table_buffer); + ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input)); + task_futures[i] = reader->ReadAsync(); + } + auto combined_future = All(task_futures); + combined_future.Wait(); + + ASSERT_OK_AND_ASSIGN(std::vector>> results, + combined_future.result()); + for (auto&& result : results) { + ASSERT_RAISES(Invalid, result); + } +} + void TestNestedParallelism(std::shared_ptr thread_pool, TableReaderFactory reader_factory) { const int NROWS = 1000; @@ -82,82 +103,77 @@ void TestNestedParallelism(std::shared_ptr thread_pool, ASSERT_EQ(table->num_rows(), NROWS); } // namespace csv -TEST(SerialReaderTests, Stress) { - auto task_factory = [](std::shared_ptr input_stream) { +TableReaderFactory MakeSerialFactory() { + return [](std::shared_ptr input_stream) { + auto read_options = ReadOptions::Defaults(); + read_options.block_size = 1 << 10; + read_options.use_threads = false; return TableReader::Make(default_memory_pool(), io::AsyncContext(), input_stream, - ReadOptions::Defaults(), ParseOptions::Defaults(), + read_options, ParseOptions::Defaults(), ConvertOptions::Defaults()); }; - StressTableReader(task_factory); } +TEST(SerialReaderTests, Stress) { StressTableReader(MakeSerialFactory()); } +TEST(SerialReaderTests, StressInvalid) { StressInvalidTableReader(MakeSerialFactory()); } TEST(SerialReaderTests, NestedParallelism) { ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1)); - auto task_factory = [](std::shared_ptr input_stream) { - return TableReader::Make(default_memory_pool(), io::AsyncContext(), input_stream, - ReadOptions::Defaults(), ParseOptions::Defaults(), - ConvertOptions::Defaults()); - }; - TestNestedParallelism(thread_pool, task_factory); + TestNestedParallelism(thread_pool, MakeSerialFactory()); } -TEST(ThreadedReaderTests, Stress) { - ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1)); - auto task_factory = [&thread_pool](std::shared_ptr input_stream) - -> Result> { +Result CreateThreadedFactory() { + ARROW_ASSIGN_OR_RAISE(auto thread_pool, internal::ThreadPool::Make(1)); + return [thread_pool](std::shared_ptr input_stream) + -> Result> { ReadOptions read_options = ReadOptions::Defaults(); read_options.use_threads = true; + read_options.block_size = 1 << 10; read_options.legacy_blocking_reads = true; auto table_reader = TableReader::Make( default_memory_pool(), io::AsyncContext(thread_pool.get()), input_stream, read_options, ParseOptions::Defaults(), ConvertOptions::Defaults()); return table_reader; }; - StressTableReader(task_factory); } -// Simulates deadlock that exists with ThreadedReaderTests -// TEST(ThreadedReaderTests, NestedParallelism) { -// ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1)); -// auto task_factory = [&thread_pool](std::shared_ptr input_stream) -// -> Result> { -// ReadOptions read_options = ReadOptions::Defaults(); -// read_options.use_threads = true; -// read_options.legacy_blocking_reads = true; -// auto table_reader = TableReader::Make( -// default_memory_pool(), io::AsyncContext(thread_pool.get()), input_stream, -// read_options, ParseOptions::Defaults(), ConvertOptions::Defaults()); -// return table_reader; -// }; -// TestNestedParallelism(thread_pool, task_factory); -// } +TEST(ThreadedReaderTests, Stress) { + ASSERT_OK_AND_ASSIGN(auto factory, CreateThreadedFactory()); + StressTableReader(factory); +} +TEST(ThreadedReaderTests, StressInvalid) { + ASSERT_OK_AND_ASSIGN(auto factory, CreateThreadedFactory()); + StressInvalidTableReader(factory); +} -TEST(AsyncReaderTests, Stress) { - ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1)); - auto task_factory = [&thread_pool](std::shared_ptr input_stream) - -> Result> { +Result MakeAsyncFactory( + std::shared_ptr thread_pool = nullptr) { + if (!thread_pool) { + ARROW_ASSIGN_OR_RAISE(thread_pool, internal::ThreadPool::Make(1)); + } + return [thread_pool](std::shared_ptr input_stream) + -> Result> { ReadOptions read_options = ReadOptions::Defaults(); read_options.use_threads = true; + read_options.block_size = 1 << 10; auto table_reader = TableReader::Make( default_memory_pool(), io::AsyncContext(thread_pool.get()), input_stream, read_options, ParseOptions::Defaults(), ConvertOptions::Defaults()); return table_reader; }; - StressTableReader(task_factory); } +TEST(AsyncReaderTests, Stress) { + ASSERT_OK_AND_ASSIGN(auto table_factory, MakeAsyncFactory()); + StressTableReader(table_factory); +} +TEST(AsyncReaderTests, StressInvalid) { + ASSERT_OK_AND_ASSIGN(auto table_factory, MakeAsyncFactory()); + StressInvalidTableReader(table_factory); +} TEST(AsyncReaderTests, NestedParallelism) { ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1)); - auto task_factory = [&thread_pool](std::shared_ptr input_stream) - -> Result> { - ReadOptions read_options = ReadOptions::Defaults(); - read_options.use_threads = true; - auto table_reader = TableReader::Make( - default_memory_pool(), io::AsyncContext(thread_pool.get()), input_stream, - read_options, ParseOptions::Defaults(), ConvertOptions::Defaults()); - return table_reader; - }; - TestNestedParallelism(thread_pool, task_factory); + ASSERT_OK_AND_ASSIGN(auto table_factory, MakeAsyncFactory(thread_pool)); + TestNestedParallelism(thread_pool, table_factory); } } // namespace csv diff --git a/cpp/src/arrow/csv/test_common.cc b/cpp/src/arrow/csv/test_common.cc index f60555b5a9d..c3d0241aa38 100644 --- a/cpp/src/arrow/csv/test_common.cc +++ b/cpp/src/arrow/csv/test_common.cc @@ -91,14 +91,23 @@ static void WriteRow(std::ostream& writer, size_t row_index) { writer << GetCell(strptime_rows, row_index); writer << std::endl; } + +static void WriteInvalidRow(std::ostream& writer, size_t row_index) { + writer << "\"" << std::endl << "\""; + writer << std::endl; +} } // namespace -Result> MakeSampleCsvBuffer(size_t num_rows) { +Result> MakeSampleCsvBuffer(size_t num_rows, bool valid) { std::stringstream writer; WriteHeader(writer); for (size_t i = 0; i < num_rows; ++i) { - WriteRow(writer, i); + if (i == num_rows / 2 && !valid) { + WriteInvalidRow(writer, i); + } else { + WriteRow(writer, i); + } } auto table_str = writer.str(); diff --git a/cpp/src/arrow/csv/test_common.h b/cpp/src/arrow/csv/test_common.h index b0c471d1b1b..823cf643fa0 100644 --- a/cpp/src/arrow/csv/test_common.h +++ b/cpp/src/arrow/csv/test_common.h @@ -47,7 +47,7 @@ ARROW_TESTING_EXPORT void MakeColumnParser(std::vector items, std::shared_ptr* out); ARROW_TESTING_EXPORT -Result> MakeSampleCsvBuffer(size_t num_rows); +Result> MakeSampleCsvBuffer(size_t num_rows, bool valid = true); } // namespace csv } // namespace arrow diff --git a/cpp/src/arrow/json/reader.cc b/cpp/src/arrow/json/reader.cc index dc0d6e04d11..232a64f212c 100644 --- a/cpp/src/arrow/json/reader.cc +++ b/cpp/src/arrow/json/reader.cc @@ -29,6 +29,7 @@ #include "arrow/json/parser.h" #include "arrow/record_batch.h" #include "arrow/table.h" +#include "arrow/util/async_iterator.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/string_view.h" diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index 19529152606..e8060ba7ef4 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -144,18 +144,26 @@ } \ } while (false) -#define ASSERT_FINISHES_OK(fut) \ +#define ASSERT_FINISHES_OK(expr) \ do { \ - ASSERT_TRUE(fut.Wait(2)); \ - if (!fut.is_finished()) { \ + auto&& _fut = (expr); \ + ASSERT_TRUE(_fut.Wait(2)); \ + if (!_fut.is_finished()) { \ FAIL() << "Future did not finish in a timely fashion"; \ } \ - auto _st = fut.status(); \ + auto _st = _fut.status(); \ if (!_st.ok()) { \ FAIL() << "'" ARROW_STRINGIFY(expr) "' failed with " << _st.ToString(); \ } \ } while (false) +#define ASSERT_FINISHES_ERR(ENUM, expr) \ + do { \ + auto&& fut = (expr); \ + ASSERT_FINISHES_IMPL(fut); \ + ASSERT_RAISES(ENUM, fut.status()); \ + } while (false) + #define ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, future_name) \ auto future_name = (rexpr); \ ASSERT_FINISHES_IMPL(future_name); \ diff --git a/cpp/src/arrow/util/async_iterator.h b/cpp/src/arrow/util/async_iterator.h index b78eaf686d2..0b93b769324 100644 --- a/cpp/src/arrow/util/async_iterator.h +++ b/cpp/src/arrow/util/async_iterator.h @@ -89,77 +89,92 @@ Future> CollectAsyncGenerator(AsyncGenerator generator) { template class TransformingGenerator { - public: - explicit TransformingGenerator(AsyncGenerator generator, - Transformer transformer) - : finished_(), - last_value_(), - generator_(std::move(generator)), - transformer_(std::move(transformer)) {} - - Future operator()() { - while (true) { - auto maybe_next_result = Pump(); - if (!maybe_next_result.ok()) { - return Future::MakeFinished(maybe_next_result.status()); - } - auto maybe_next = std::move(maybe_next_result).ValueUnsafe(); - if (maybe_next.has_value()) { - return Future::MakeFinished(*std::move(maybe_next)); - } - - auto next_fut = generator_(); - // If finished already, process results immediately inside the loop to avoid stack - // overflow - if (next_fut.is_finished()) { - auto next_result = next_fut.result(); - if (next_result.ok()) { - last_value_ = *std::move(next_result); - } else { - return Future::MakeFinished(next_result.status()); + // The transforming generator state will be referenced as an async generator but will + // also be referenced via callback to various futures. If the async generator owner + // moves it around we need the state to be consistent for future callbacks. + struct TransformingGeneratorState + : std::enable_shared_from_this { + TransformingGeneratorState(AsyncGenerator generator, Transformer transformer) + : generator_(std::move(generator)), + transformer_(std::move(transformer)), + last_value_(), + finished_() {} + + Future operator()() { + while (true) { + auto maybe_next_result = Pump(); + if (!maybe_next_result.ok()) { + return Future::MakeFinished(maybe_next_result.status()); } - // Otherwise, if not finished immediately, add callback to process results - } else { - return next_fut.Then([this](const Result& next_result) { + auto maybe_next = std::move(maybe_next_result).ValueUnsafe(); + if (maybe_next.has_value()) { + return Future::MakeFinished(*std::move(maybe_next)); + } + + auto next_fut = generator_(); + // If finished already, process results immediately inside the loop to avoid stack + // overflow + if (next_fut.is_finished()) { + auto next_result = next_fut.result(); if (next_result.ok()) { - last_value_ = *std::move(next_result); - return (*this)(); + last_value_ = *next_result; } else { return Future::MakeFinished(next_result.status()); } - }); + // Otherwise, if not finished immediately, add callback to process results + } else { + auto self = this->shared_from_this(); + return next_fut.Then([self](const Result& next_result) { + if (next_result.ok()) { + self->last_value_ = *next_result; + return (*self)(); + } else { + return Future::MakeFinished(next_result.status()); + } + }); + } } } - } - protected: - // See comment on TransformingIterator::Pump - Result> Pump() { - if (!finished_ && last_value_.has_value()) { - ARROW_ASSIGN_OR_RAISE(TransformFlow next, transformer_(*last_value_)); - if (next.ReadyForNext()) { - if (*last_value_ == IterationTraits::End()) { + // See comment on TransformingIterator::Pump + Result> Pump() { + if (!finished_ && last_value_.has_value()) { + ARROW_ASSIGN_OR_RAISE(TransformFlow next, transformer_(*last_value_)); + if (next.ReadyForNext()) { + if (*last_value_ == IterationTraits::End()) { + finished_ = true; + } + last_value_.reset(); + } + if (next.Finished()) { finished_ = true; } - last_value_.reset(); - } - if (next.Finished()) { - finished_ = true; + if (next.HasValue()) { + return next.Value(); + } } - if (next.HasValue()) { - return next.Value(); + if (finished_) { + return IterationTraits::End(); } + return util::nullopt; } - if (finished_) { - return IterationTraits::End(); - } - return util::nullopt; - } - bool finished_; - util::optional last_value_; - AsyncGenerator generator_; - Transformer transformer_; + AsyncGenerator generator_; + Transformer transformer_; + util::optional last_value_; + bool finished_; + }; + + public: + explicit TransformingGenerator(AsyncGenerator generator, + Transformer transformer) + : state_(std::make_shared(std::move(generator), + std::move(transformer))) {} + + Future operator()() { return (*state_)(); } + + protected: + std::shared_ptr state_; }; template @@ -167,13 +182,15 @@ class ReadaheadGenerator { public: ReadaheadGenerator(AsyncGenerator source_generator, int max_readahead) : source_generator_(std::move(source_generator)), max_readahead_(max_readahead) { - auto finished = std::make_shared(); + auto finished = std::make_shared>(); mark_finished_if_done_ = [finished](const Result& next_result) { if (!next_result.ok()) { - *finished = true; + finished->store(true); } else { const auto& next = *next_result; - *finished = (next == IterationTraits::End()); + if (next == IterationTraits::End()) { + *finished = true; + } } }; finished_ = std::move(finished); @@ -191,7 +208,7 @@ class ReadaheadGenerator { // Pop one and add one auto result = readahead_queue_.front(); readahead_queue_.pop(); - if (*finished_) { + if (finished_->load()) { readahead_queue_.push(Future::MakeFinished(IterationTraits::End())); } else { auto back_of_queue = source_generator_(); @@ -207,7 +224,7 @@ class ReadaheadGenerator { std::function&)> mark_finished_if_done_; // Can't use a bool here because finished may be referenced by callbacks that // outlive this class - std::shared_ptr finished_; + std::shared_ptr> finished_; std::queue> readahead_queue_; }; @@ -237,94 +254,129 @@ AsyncGenerator TransformAsyncGenerator(AsyncGenerator generator, return TransformingGenerator(generator, transformer); } -namespace detail { - +/// \brief Transfers execution of the generator onto the given executor +/// +/// This generator is async-reentrant if the source generator is async-reentrant template -struct BackgroundGeneratorPromise : ReadaheadPromise { - ~BackgroundGeneratorPromise() override {} - - explicit BackgroundGeneratorPromise(Iterator* it) : it_(it) {} - - bool Call() override { - auto next = it_->Next(); - auto finished = next == IterationTraits::End(); - out_.MarkFinished(std::move(next)); - return finished; - } +class TransferringGenerator { + public: + explicit TransferringGenerator(AsyncGenerator source, internal::Executor* executor) + : source_(std::move(source)), executor_(executor) {} - void End() override { out_.MarkFinished(IterationTraits::End()); } + Future operator()() { return executor_->Transfer(source_()); } - Iterator* it_; - Future out_ = Future::Make(); + private: + AsyncGenerator source_; + internal::Executor* executor_; }; -} // namespace detail +/// \brief Transfers a future to an underlying executor. +/// +/// Continuations run on the returned future will be run on the given executor +/// if they cannot be run synchronously. +/// +/// This is often needed to move computation off I/O threads or other external +/// completion sources and back on to the CPU executor so the I/O thread can +/// stay busy and focused on I/O +/// +/// Keep in mind that continuations called on an already completed future will +/// always be run synchronously and so no transfer will happen in that case. +template +AsyncGenerator TransferGenerator(AsyncGenerator source, + internal::Executor* executor) { + return TransferringGenerator(std::move(source), executor); +} /// \brief Async generator that iterates on an underlying iterator in a -/// separate thread. +/// separate executor. /// /// This generator is async-reentrant template class BackgroundGenerator { - using PromiseType = typename detail::BackgroundGeneratorPromise; - public: - explicit BackgroundGenerator(Iterator it, internal::Executor* executor) - : it_(new Iterator(std::move(it))), - queue_(new detail::ReadaheadQueue(0)), - executor_(executor), - done_() {} + explicit BackgroundGenerator(Iterator it, + std::shared_ptr background_executor) + : background_executor_(std::move(background_executor)) { + task_ = + Task{std::make_shared>(std::move(it)), std::make_shared(false)}; + } ~BackgroundGenerator() { - if (queue_) { - // Make sure the queue doesn't call any promises after this object - // is destroyed. - queue_->EnsureShutdownOrDie(); - } + // The thread pool will be disposed of automatically. By default it will not wait + // so the background thread may outlive this object. That should be ok. Any task + // objects in the thread pool are copies of task_ and have their own shared_ptr to + // the iterator. } ARROW_DEFAULT_MOVE_AND_ASSIGN(BackgroundGenerator); ARROW_DISALLOW_COPY_AND_ASSIGN(BackgroundGenerator); Future operator()() { - if (done_) { - return Future::MakeFinished(IterationTraits::End()); + auto submitted_future = background_executor_->Submit(task_); + if (!submitted_future.ok()) { + return Future::MakeFinished(submitted_future.status()); } - auto promise = std::unique_ptr(new PromiseType{it_.get()}); - auto future = Future(promise->out_); - // TODO: Need a futuristic version of ARROW_RETURN_NOT_OK - auto append_status = queue_->Append( - static_cast>(std::move(promise))); - if (!append_status.ok()) { - return Future::MakeFinished(append_status); - } - - future.AddCallback([this](const Result& result) { - if (!result.ok() || result.ValueUnsafe() == IterationTraits::End()) { - done_ = true; - } - }); - - return executor_->Transfer(future); + return std::move(*submitted_future); } protected: - // The underlying iterator is referenced by pointer in ReadaheadPromise, - // so make sure it doesn't move. - std::unique_ptr> it_; - std::unique_ptr queue_; - internal::Executor* executor_; - bool done_; + struct Task { + Result operator()() { + if (*done_) { + return IterationTraits::End(); + } + auto next = it_->Next(); + if (!next.ok() || *next == IterationTraits::End()) { + *done_ = true; + } + return next; + } + // This task is going to be copied so we need to convert the iterator ptr to + // a shared ptr. This should be safe however because the background executor only + // has a single thread so it can't access it_ across multiple threads. + std::shared_ptr> it_; + std::shared_ptr done_; + }; + + Task task_; + std::shared_ptr background_executor_; }; /// \brief Creates an AsyncGenerator by iterating over an Iterator on a background /// thread template -static Result> MakeBackgroundGenerator(Iterator iterator, - internal::Executor* executor) { - auto background_iterator = - std::make_shared>(std::move(iterator), executor); +static Result> MakeBackgroundGenerator(Iterator iterator) { + ARROW_ASSIGN_OR_RAISE(auto background_executor, internal::ThreadPool::Make(1)); + auto background_iterator = std::make_shared>( + std::move(iterator), std::move(background_executor)); return [background_iterator]() { return (*background_iterator)(); }; } +/// \brief Converts an AsyncGenerator to an Iterator by blocking until each future +/// is finished +template +class GeneratorIterator { + public: + explicit GeneratorIterator(AsyncGenerator source) : source_(std::move(source)) {} + + Result Next() { return source_().result(); } + + private: + AsyncGenerator source_; +}; + +template +Result> MakeGeneratorIterator(AsyncGenerator source) { + return Iterator(GeneratorIterator(std::move(source))); +} + +template +Result> MakeReadaheadIterator(Iterator it, int readahead_queue_size) { + ARROW_ASSIGN_OR_RAISE(auto background_generator, + MakeBackgroundGenerator(std::move(it))); + auto readahead_generator = + AddReadahead(std::move(background_generator), readahead_queue_size); + return MakeGeneratorIterator(std::move(readahead_generator)); +} + } // namespace arrow diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index 5fbf272531a..913e28fbbb5 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -275,7 +275,14 @@ class ARROW_MUST_USE_TYPE Future { Wait(); return *GetResult(); } - Result&& result() && { + + /// \brief Returns an rvalue to the result. This method is potentially unsafe + /// + /// The future is not the unique owner of the result, copies of a future will + /// also point to the same result. You must make sure that no other copies + /// of the future exist. Attempts to add callbacks after you move the result + /// will result in undefined behavior. + Result&& MoveResult() { Wait(); return std::move(*GetResult()); } @@ -639,26 +646,6 @@ inline std::vector WaitForAny(const std::vector*>& futures, return waiter->MoveFinishedFutures(); } -// template -// struct ControlFlow { -// using BreakValueType = T; - -// bool IsBreak() const { return break_value_.has_value(); } - -// static Result MoveBreakValue(const ControlFlow& cf) { -// return std::move(*cf.break_value_); -// } - -// mutable util::optional break_value_; -// }; - -// struct Continue { -// template -// operator ControlFlow() && { // NOLINT explicit -// return {}; -// } -// }; - struct Continue { template operator util::optional() && { // NOLINT explicit @@ -666,11 +653,6 @@ struct Continue { } }; -// template -// ControlFlow Break(T break_value = {}) { -// return ControlFlow{std::move(break_value)}; -// } - template util::optional Break(T break_value = {}) { return util::optional{std::move(break_value)}; @@ -742,53 +724,4 @@ Future Loop(Iterate iterate) { return break_fut; } -// template ::ValueType, -// typename BreakValueType = typename Control::BreakValueType> -// Future Loop(Iterate iterate) { -// auto break_fut = Future::Make(); - -// struct Callback { -// bool CheckForTermination(const Result& maybe_control) { -// if (!maybe_control.ok() || maybe_control->IsBreak()) { -// Result maybe_break = -// maybe_control.Map(Control::MoveBreakValue); -// break_fut.MarkFinished(std::move(maybe_break)); -// return true; -// } -// return false; -// } - -// void operator()(const Result& maybe_control) && { -// if (CheckForTermination(maybe_control)) return; - -// auto control_fut = iterate(); -// while (control_fut.is_finished()) { -// // There's no need to AddCallback on a finished future; we can -// CheckForTermination -// // now. This also avoids recursion and potential stack overflow. -// if (CheckForTermination(control_fut.result())) return; - -// control_fut = iterate(); -// } -// control_fut.AddCallback(std::move(*this)); -// } - -// Iterate iterate; -// // If the future returned by control_fut is never completed then we will be hanging -// on -// // to break_fut forever even if the listener has given up listening on it. Instead -// we -// // rely on the fact that a producer (the caller of Future<>::Make) is always -// // responsible for completing the futures they create. -// // TODO: Could avoid this kind of situation with "future abandonment" similar to -// mesos Future break_fut; -// }; - -// auto control_fut = iterate(); -// control_fut.AddCallback(Callback{std::move(iterate), break_fut}); - -// return break_fut; -// } - } // namespace arrow diff --git a/cpp/src/arrow/util/future_test.cc b/cpp/src/arrow/util/future_test.cc index 17eed1eed18..2a4fc6bb2fd 100644 --- a/cpp/src/arrow/util/future_test.cc +++ b/cpp/src/arrow/util/future_test.cc @@ -324,41 +324,6 @@ TEST(FutureSyncTest, Foo) { } } -TEST(FutureSyncTest, MoveOnlyDataType) { - { - // MarkFinished(MoveOnlyDataType) - auto fut = Future::Make(); - AssertNotFinished(fut); - fut.MarkFinished(MoveOnlyDataType(42)); - AssertSuccessful(fut); - const auto& res = fut.result(); - ASSERT_TRUE(res.ok()); - ASSERT_EQ(*res, 42); - ASSERT_OK_AND_ASSIGN(MoveOnlyDataType value, std::move(fut).result()); - ASSERT_EQ(value, 42); - } - { - // MarkFinished(Result) - auto fut = Future::Make(); - AssertNotFinished(fut); - fut.MarkFinished(Result(MoveOnlyDataType(43))); - AssertSuccessful(fut); - ASSERT_OK_AND_ASSIGN(MoveOnlyDataType value, std::move(fut).result()); - ASSERT_EQ(value, 43); - } - { - // MarkFinished(failed Result) - auto fut = Future::Make(); - AssertNotFinished(fut); - fut.MarkFinished(Result(Status::IOError("xxx"))); - AssertFailed(fut); - ASSERT_RAISES(IOError, fut.status()); - const auto& res = fut.result(); - ASSERT_TRUE(res.status().IsIOError()); - ASSERT_RAISES(IOError, std::move(fut).result()); - } -} - TEST(FutureSyncTest, Empty) { { // MarkFinished() @@ -545,20 +510,22 @@ TEST(FutureStessTest, Callback) { } TEST(FutureStessTest, TryAddCallback) { - for (unsigned int n = 0; n < 1000; n++) { + for (unsigned int n = 0; n < 1; n++) { auto fut = Future<>::Make(); std::atomic callbacks_added(0); - bool finished; + std::atomic finished; std::mutex mutex; std::condition_variable cv; + std::thread::id callback_adder_thread_id; std::thread callback_adder([&] { - auto test_thread = std::this_thread::get_id(); - std::function&)> callback = [&test_thread](...) { - if (std::this_thread::get_id() == test_thread) { - FAIL() << "TryAddCallback allowed a callback to be run synchronously"; - } - }; + callback_adder_thread_id = std::this_thread::get_id(); + std::function&)> callback = + [&callback_adder_thread_id](const Result&) { + if (std::this_thread::get_id() == callback_adder_thread_id) { + FAIL() << "TryAddCallback allowed a callback to be run synchronously"; + } + }; std::function&)>()> callback_factory = [&callback]() { return callback; }; while (true) { @@ -571,7 +538,7 @@ TEST(FutureStessTest, TryAddCallback) { } { std::lock_guard lg(mutex); - finished = true; + finished.store(true); } cv.notify_one(); }); @@ -583,7 +550,8 @@ TEST(FutureStessTest, TryAddCallback) { fut.MarkFinished(); std::unique_lock lk(mutex); - cv.wait_for(lk, std::chrono::duration(0.5), [&finished] { return finished; }); + cv.wait_for(lk, std::chrono::duration(0.5), + [&finished] { return finished.load(); }); lk.unlock(); ASSERT_TRUE(finished); @@ -1572,7 +1540,7 @@ TYPED_TEST(FutureWaitTest, StressWaitForAll) { this->TestStressWaitForAll(); } template class FutureIteratorTest : public FutureTestBase {}; -using FutureIteratorTestTypes = ::testing::Types; +using FutureIteratorTestTypes = ::testing::Types; TYPED_TEST_SUITE(FutureIteratorTest, FutureIteratorTestTypes); diff --git a/cpp/src/arrow/util/iterator.cc b/cpp/src/arrow/util/iterator.cc deleted file mode 100644 index 5814824300f..00000000000 --- a/cpp/src/arrow/util/iterator.cc +++ /dev/null @@ -1,195 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/util/iterator.h" - -#include -#include -#include -#include -#include - -#include "arrow/util/logging.h" - -namespace arrow { -namespace detail { - -ReadaheadPromise::~ReadaheadPromise() {} - -class ReadaheadQueue::Impl : public std::enable_shared_from_this { - public: - explicit Impl(int64_t readahead_queue_size) : max_readahead_(readahead_queue_size) {} - - ~Impl() { EnsureShutdownOrDie(false); } - - void Start() { - // Cannot do this in constructor as shared_from_this() would throw - DCHECK(!thread_.joinable()); - auto self = shared_from_this(); - thread_ = std::thread([self]() { self->DoWork(); }); - DCHECK(thread_.joinable()); - } - - void EnsureShutdownOrDie(bool wait = true) { - std::unique_lock lock(mutex_); - if (!please_shutdown_) { - ARROW_CHECK_OK(ShutdownUnlocked(std::move(lock), wait)); - } - DCHECK(!thread_.joinable()); - } - - Status Append(std::unique_ptr promise) { - std::unique_lock lock(mutex_); - if (please_shutdown_) { - return Status::Invalid("Shutdown requested"); - } - todo_.push_back(std::move(promise)); - if (static_cast(todo_.size()) == 1) { - // Signal there's more work to do - lock.unlock(); - worker_wakeup_.notify_one(); - } - return Status::OK(); - } - - Status PopDone(std::unique_ptr* out) { - DCHECK_GT(max_readahead_, 0) - << "This function should not be called if using the queue unbounded"; - std::unique_lock lock(mutex_); - if (please_shutdown_) { - return Status::Invalid("Shutdown requested"); - } - work_done_.wait(lock, [this]() { return done_.size() > 0; }); - *out = std::move(done_.front()); - done_.pop_front(); - if (static_cast(done_.size()) < max_readahead_) { - // Signal there's more work to do - lock.unlock(); - worker_wakeup_.notify_one(); - } - return Status::OK(); - } - - Status Pump(std::function()> factory) { - DCHECK_GT(max_readahead_, 0) - << "This function should not be called if using the queue unbounded"; - std::unique_lock lock(mutex_); - if (please_shutdown_) { - return Status::Invalid("Shutdown requested"); - } - while (static_cast(done_.size() + todo_.size()) < max_readahead_) { - todo_.push_back(factory()); - } - // Signal there's more work to do - lock.unlock(); - worker_wakeup_.notify_one(); - return Status::OK(); - } - - Status Shutdown(bool wait = true) { - return ShutdownUnlocked(std::unique_lock(mutex_), wait); - } - - Status ShutdownUnlocked(std::unique_lock lock, bool wait = true) { - if (please_shutdown_) { - return Status::Invalid("Shutdown already requested"); - } - DCHECK(thread_.joinable()); - please_shutdown_ = true; - lock.unlock(); - worker_wakeup_.notify_one(); - if (wait) { - thread_.join(); - } else { - thread_.detach(); - } - return Status::OK(); - } - - void DoWork() { - std::unique_lock lock(mutex_); - while (!please_shutdown_) { - while (todo_.size() > 0 && - ((max_readahead_ <= 0) || - (static_cast(done_.size()) < max_readahead_))) { - auto promise = std::move(todo_.front()); - todo_.pop_front(); - lock.unlock(); - if (promise->Call()) { - // If the call finished then we should purge the remaining TODO items, marking - // them finished - lock.lock(); - std::deque> to_clear(std::move(todo_)); - // While the async iterator doesn't use todo_ anymore after it hits a finish the - // sync iterator might still due to timing so leave it valid - todo_.clear(); - lock.unlock(); - for (auto&& promise : to_clear) { - promise->End(); - } - } - lock.lock(); - if (max_readahead_ > 0) { - done_.push_back(std::move(promise)); - work_done_.notify_one(); - } - // Exit eagerly - if (please_shutdown_) { - return; - } - } - // Wait for more work to do - worker_wakeup_.wait(lock); - } - } - - std::deque> todo_; - std::deque> done_; - int64_t max_readahead_; - bool please_shutdown_ = false; - - std::thread thread_; - std::mutex mutex_; - std::condition_variable worker_wakeup_; - std::condition_variable work_done_; -}; - -ReadaheadQueue::ReadaheadQueue(int readahead_queue_size) - : impl_(new Impl(readahead_queue_size)) { - impl_->Start(); -} - -ReadaheadQueue::~ReadaheadQueue() {} - -Status ReadaheadQueue::Append(std::unique_ptr promise) { - return impl_->Append(std::move(promise)); -} - -Status ReadaheadQueue::PopDone(std::unique_ptr* out) { - return impl_->PopDone(out); -} - -Status ReadaheadQueue::Pump(std::function()> factory) { - return impl_->Pump(std::move(factory)); -} - -Status ReadaheadQueue::Shutdown() { return impl_->Shutdown(); } - -void ReadaheadQueue::EnsureShutdownOrDie() { return impl_->EnsureShutdownOrDie(); } - -} // namespace detail -} // namespace arrow diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index d8d0954d6ff..75ccf283aa5 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -536,125 +536,4 @@ Iterator MakeFlattenIterator(Iterator> it) { return Iterator(FlattenIterator(std::move(it))); } -namespace detail { - -// A type-erased promise object for ReadaheadQueue. -struct ARROW_EXPORT ReadaheadPromise { - virtual ~ReadaheadPromise(); - virtual bool Call() = 0; - // Called on any remaining promises when the queue hits the end of the source iterator - virtual void End() = 0; -}; - -template -struct ReadaheadIteratorPromise : ReadaheadPromise { - ~ReadaheadIteratorPromise() override {} - - explicit ReadaheadIteratorPromise(Iterator* it) : it_(it) {} - - bool Call() override { - assert(!called_); - out_ = it_->Next(); - called_ = true; - return out_ == IterationTraits::End(); - } - - void End() override { - // No need to do anything for the synchronous case. No one is waiting on this - called_ = true; - } - - Iterator* it_; - Result out_ = IterationTraits::End(); - bool called_ = false; -}; - -class ARROW_EXPORT ReadaheadQueue { - public: - explicit ReadaheadQueue(int readahead_queue_size); - ~ReadaheadQueue(); - - Status Append(std::unique_ptr); - Status PopDone(std::unique_ptr*); - Status Pump(std::function()> factory); - Status Shutdown(); - void EnsureShutdownOrDie(); - - protected: - class Impl; - std::shared_ptr impl_; -}; - -} // namespace detail - -/// \brief Readahead iterator that iterates on the underlying iterator in a -/// separate thread, getting up to N values in advance. -template -class ReadaheadIterator { - using PromiseType = typename detail::ReadaheadIteratorPromise; - - public: - // Public default constructor creates an empty iterator - ReadaheadIterator() : done_(true) {} - - ~ReadaheadIterator() { - if (queue_) { - // Make sure the queue doesn't call any promises after this object - // is destroyed. - queue_->EnsureShutdownOrDie(); - } - } - - ARROW_DEFAULT_MOVE_AND_ASSIGN(ReadaheadIterator); - ARROW_DISALLOW_COPY_AND_ASSIGN(ReadaheadIterator); - - Result Next() { - if (done_) { - return IterationTraits::End(); - } - - std::unique_ptr promise; - ARROW_RETURN_NOT_OK(queue_->PopDone(&promise)); - auto it_promise = static_cast(promise.get()); - - ARROW_RETURN_NOT_OK(queue_->Append(MakePromise())); - - ARROW_ASSIGN_OR_RAISE(auto out, it_promise->out_); - if (out == IterationTraits::End()) { - done_ = true; - } - return out; - } - - static Result> Make(Iterator it, int readahead_queue_size) { - ReadaheadIterator rh(std::move(it), readahead_queue_size); - ARROW_RETURN_NOT_OK(rh.Pump()); - return Iterator(std::move(rh)); - } - - private: - explicit ReadaheadIterator(Iterator it, int readahead_queue_size) - : it_(new Iterator(std::move(it))), - queue_(new detail::ReadaheadQueue(readahead_queue_size)) {} - - Status Pump() { - return queue_->Pump([this]() { return MakePromise(); }); - } - - std::unique_ptr MakePromise() { - return std::unique_ptr(new PromiseType{it_.get()}); - } - - // The underlying iterator is referenced by pointer in ReadaheadPromise, - // so make sure it doesn't move. - std::unique_ptr> it_; - std::unique_ptr queue_; - bool done_ = false; -}; - -template -Result> MakeReadaheadIterator(Iterator it, int readahead_queue_size) { - return ReadaheadIterator::Make(std::move(it), readahead_queue_size); -} - } // namespace arrow diff --git a/cpp/src/arrow/util/iterator_test.cc b/cpp/src/arrow/util/iterator_test.cc index 320adc4cfb4..9d9d2082e31 100644 --- a/cpp/src/arrow/util/iterator_test.cc +++ b/cpp/src/arrow/util/iterator_test.cc @@ -181,8 +181,8 @@ std::function()> BackgroundAsyncVectorIt(std::vector v) return TransformYield(item); }); EXPECT_OK_AND_ASSIGN(auto background, - MakeBackgroundGenerator(std::move(slow_iterator), pool)); - return background; + MakeBackgroundGenerator(std::move(slow_iterator))); + return TransferGenerator(background, pool); } std::vector RangeVector(unsigned int max) { @@ -374,6 +374,27 @@ TEST(TestIteratorTransform, Abort) { ASSERT_EQ(IterationTraits::End(), third); } +template +Transformer MakeRepeatN(int repeat_count) { + int current_repeat = 0; + return [repeat_count, current_repeat](T next) mutable -> Result> { + current_repeat++; + bool ready_for_next = false; + if (current_repeat == repeat_count) { + current_repeat = 0; + ready_for_next = true; + } + return TransformYield(next, ready_for_next); + }; +} + +TEST(TestIteratorTransform, Repeating) { + auto original = VectorIt({1, 2, 3}); + auto repeated = MakeTransformedIterator(std::move(original), + MakeRepeatN(2)); + AssertIteratorMatch({1, 1, 2, 2, 3, 3}, std::move(repeated)); +} + TEST(TestFunctionIterator, RangeForLoop) { int i = 0; auto fails_at_3 = MakeFunctionIterator([&]() -> Result { @@ -455,13 +476,6 @@ TEST(FlattenVectorIterator, Pyramid) { AssertIteratorMatch({1, 2, 2, 3, 3, 3}, std::move(it)); } -TEST(ReadaheadIterator, DefaultConstructor) { - ReadaheadIterator it; - TestInt v{42}; - ASSERT_OK_AND_ASSIGN(v, it.Next()); - ASSERT_EQ(v, TestInt()); -} - TEST(ReadaheadIterator, Empty) { ASSERT_OK_AND_ASSIGN(auto it, MakeReadaheadIterator(VectorIt({}), 2)); AssertIteratorMatch({}, std::move(it)); @@ -489,13 +503,16 @@ TEST(ReadaheadIterator, Trace) { ASSERT_OK_AND_ASSIGN( auto it, MakeReadaheadIterator(Iterator(std::move(tracing_it)), 2)); - tracing->WaitForValues(2); - SleepABit(); // check no further value is emitted - tracing->AssertValuesEqual({1, 2}); + SleepABit(); // Background iterator won't start pumping until first request comes in + ASSERT_EQ(tracing->values().size(), 0); + + AssertIteratorNext({1}, it); // Once we ask for one value we should get that one value + // as well as 2 read ahead - AssertIteratorNext({1}, it); tracing->WaitForValues(3); - SleepABit(); + tracing->AssertValuesEqual({1, 2, 3}); + + SleepABit(); // No further values should be fetched tracing->AssertValuesEqual({1, 2, 3}); AssertIteratorNext({2}, it); @@ -543,12 +560,9 @@ TEST(ReadaheadIterator, NextError) { ASSERT_RAISES(IOError, it.Next().status()); - AssertIteratorNext({1}, it); - tracing->WaitForValues(3); + AssertIteratorExhausted(it); SleepABit(); - tracing->AssertValuesEqual({1, 2, 3}); - AssertIteratorNext({2}, it); - AssertIteratorNext({3}, it); + tracing->AssertValuesEqual({}); AssertIteratorExhausted(it); } @@ -586,6 +600,54 @@ TEST(TestAsyncUtil, SynchronousFinish) { ASSERT_EQ(std::vector(), actual); } +TEST(TestAsyncUtil, GeneratorIterator) { + auto generator = BackgroundAsyncVectorIt({1, 2, 3}); + ASSERT_OK_AND_ASSIGN(auto iterator, MakeGeneratorIterator(std::move(generator))); + ASSERT_OK_AND_EQ(TestInt(1), iterator.Next()); + ASSERT_OK_AND_EQ(TestInt(2), iterator.Next()); + ASSERT_OK_AND_EQ(TestInt(3), iterator.Next()); + ASSERT_OK_AND_EQ(IterationTraits::End(), iterator.Next()); + ASSERT_OK_AND_EQ(IterationTraits::End(), iterator.Next()); +} + +TEST(TestAsyncUtil, TransferGenerator) { + std::mutex mutex; + std::condition_variable cv; + std::atomic finished(false); + + ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1)); + + // Needs to be a slow source to ensure we don't call Then on a completed + AsyncGenerator slow_generator = [&]() { + return thread_pool + ->Submit([&] { + std::unique_lock lock(mutex); + cv.wait_for(lock, std::chrono::duration(30), + [&] { return finished.load(); }); + return IterationTraits::End(); + }) + .ValueOrDie(); + }; + + auto transferred = + TransferGenerator(std::move(slow_generator), thread_pool.get()); + + auto current_thread_id = std::this_thread::get_id(); + auto fut = transferred().Then([¤t_thread_id](const Result& result) { + ASSERT_NE(current_thread_id, std::this_thread::get_id()); + }); + + { + std::lock_guard lg(mutex); + finished.store(true); + } + cv.notify_one(); + ASSERT_FINISHES_OK(fut); +} + +// This test is too slow for valgrind +#if !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)) + TEST(TestAsyncUtil, StackOverflow) { int counter = 0; AsyncGenerator generator = [&counter]() { @@ -603,6 +665,8 @@ TEST(TestAsyncUtil, StackOverflow) { ASSERT_EQ(0, collected.size()); } +#endif + TEST(TestAsyncUtil, Background) { std::vector expected = {1, 2, 3}; auto background = BackgroundAsyncVectorIt(expected); @@ -627,15 +691,16 @@ struct SlowEmptyIterator { }; TEST(TestAsyncUtil, BackgroundRepeatEnd) { - // Ensure that the background iterator properly fulfills the asyncgenerator contract + // Ensure that the background generator properly fulfills the asyncgenerator contract // and can be called after it ends. auto iterator = Iterator(SlowEmptyIterator()); - ASSERT_OK_AND_ASSIGN( - auto background_iter, - MakeBackgroundGenerator(std::move(iterator), internal::GetCpuThreadPool())); + ASSERT_OK_AND_ASSIGN(auto background_gen, MakeBackgroundGenerator(std::move(iterator))); + + background_gen = + TransferGenerator(std::move(background_gen), internal::GetCpuThreadPool()); - auto one = background_iter(); - auto two = background_iter(); + auto one = background_gen(); + auto two = background_gen(); ASSERT_FINISHES_OK_AND_ASSIGN(auto one_fin, one); ASSERT_EQ(IterationTraits::End(), one_fin); @@ -692,38 +757,42 @@ TEST(TestAsyncUtil, Readahead) { } TEST(TestAsyncUtil, ReadaheadFailed) { - auto source = []() -> Future { - return Future::MakeFinished(Status::Invalid("X")); + ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(4)); + std::atomic counter(0); + // All tasks are a little slow. The first task fails. + // The readahead will have spawned 9 more tasks and they + // should all pass + auto source = [thread_pool, &counter]() -> Future { + auto count = counter++; + return *thread_pool->Submit([count]() -> Result { + if (count == 0) { + return Status::Invalid("X"); + } + return TestInt(count); + }); }; auto readahead = AddReadahead(source, 10); - for (int i = 0; i < 10; i++) { - auto next = readahead(); - ASSERT_EQ(Status::Invalid("X"), next.status()); + ASSERT_FINISHES_ERR(Invalid, readahead()); + SleepABit(); + + for (int i = 0; i < 9; i++) { + ASSERT_FINISHES_OK_AND_ASSIGN(auto next_val, readahead()); + ASSERT_EQ(TestInt(i + 1), next_val); } - auto after_fut = readahead(); - ASSERT_FINISHES_OK_AND_ASSIGN(auto after, after_fut); - ASSERT_EQ(IterationTraits::End(), after); -} + ASSERT_FINISHES_OK_AND_ASSIGN(auto after, readahead()); -template -Transformer MakeRepeatN(int repeat_count) { - int current_repeat = 0; - return [repeat_count, current_repeat](T next) mutable -> Result> { - current_repeat++; - bool ready_for_next = false; - if (current_repeat == repeat_count) { - current_repeat = 0; - ready_for_next = true; - } - return TransformYield(next, ready_for_next); - }; -} + // It's possible that finished was set quickly and there + // are only 10 elements + if (after == IterationTraits::End()) { + return; + } -TEST(TestIteratorTransform, Repeating) { - auto original = VectorIt({1, 2, 3}); - auto repeated = MakeTransformedIterator(std::move(original), - MakeRepeatN(2)); - AssertIteratorMatch({1, 1, 2, 2, 3, 3}, std::move(repeated)); + // It's also possible that finished was too slow and there + // ended up being 11 elements + ASSERT_EQ(TestInt(10), after); + // There can't be 12 elements because SleepABit will prevent it + ASSERT_FINISHES_OK_AND_ASSIGN(auto definitely_last, readahead()); + ASSERT_EQ(IterationTraits::End(), definitely_last); } TEST(TestAsyncIteratorTransform, SkipSome) {