Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,12 @@ struct CSVBlock {
template <>
struct IterationTraits<csv::CSVBlock> {
static csv::CSVBlock End() { return csv::CSVBlock{{}, {}, {}, -1, true, {}}; }
static bool IsEnd(const csv::CSVBlock& val) { return val.block_index < 0; }
};

namespace csv {
namespace {

// The == operator must be defined to be used as T in Iterator<T>
bool operator==(const CSVBlock& left, const CSVBlock& right) {
return left.block_index == right.block_index;
}
bool operator!=(const CSVBlock& left, const CSVBlock& right) {
return left.block_index != right.block_index;
}

// This is a callable that can be used to transform an iterator. The source iterator
// will contain buffers of data and the output iterator will contain delimited CSV
// blocks. util::optional is used so that there is an end token (required by the
Expand Down Expand Up @@ -731,7 +724,7 @@ class SerialStreamingReader : public BaseStreamingReader {

if (!source_eof_) {
ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_iterator_.Next());
if (maybe_block != IterationTraits<CSVBlock>::End()) {
if (!IsIterationEnd(maybe_block)) {
last_block_index_ = maybe_block.block_index;
auto maybe_parsed = ParseAndInsert(maybe_block.partial, maybe_block.completion,
maybe_block.buffer, maybe_block.block_index,
Expand Down Expand Up @@ -813,7 +806,7 @@ class SerialTableReader : public BaseTableReader {
RETURN_NOT_OK(stop_token_.Poll());

ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_iterator.Next());
if (maybe_block == IterationTraits<CSVBlock>::End()) {
if (IsIterationEnd(maybe_block)) {
// EOF
break;
}
Expand Down Expand Up @@ -865,7 +858,7 @@ class AsyncThreadedTableReader

auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);

int32_t block_queue_size = std::max(2, cpu_executor_->GetCapacity());
int32_t block_queue_size = cpu_executor_->GetCapacity();
auto rh_it =
MakeSerialReadaheadGenerator(std::move(transferred_it), block_queue_size);
buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(rh_it));
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/testing/future_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
// unit test anyways.
#define ASSERT_FINISHES_IMPL(fut) \
do { \
ASSERT_TRUE(fut.Wait(10)); \
ASSERT_TRUE(fut.Wait(300)); \
if (!fut.is_finished()) { \
FAIL() << "Future did not finish in a timely fashion"; \
} \
Expand All @@ -35,11 +35,11 @@
#define ASSERT_FINISHES_OK(expr) \
do { \
auto&& _fut = (expr); \
ASSERT_TRUE(_fut.Wait(10)); \
ASSERT_TRUE(_fut.Wait(300)); \
if (!_fut.is_finished()) { \
FAIL() << "Future did not finish in a timely fashion"; \
} \
auto _st = _fut.status(); \
auto& _st = _fut.status(); \
if (!_st.ok()) { \
FAIL() << "'" ARROW_STRINGIFY(expr) "' failed with " << _st.ToString(); \
} \
Expand Down
47 changes: 47 additions & 0 deletions cpp/src/arrow/testing/gtest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/windows_compatibility.h"

namespace arrow {

Expand Down Expand Up @@ -596,13 +598,58 @@ void SleepFor(double seconds) {
std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
}

#ifdef _WIN32
void SleepABit() {
LARGE_INTEGER freq, start, now;
QueryPerformanceFrequency(&freq);
// 1 ms
auto desired = freq.QuadPart / 1000;
if (desired <= 0) {
// Fallback to STL sleep if high resolution clock not available, tests may fail,
// shouldn't really happen
SleepFor(1e-3);
return;
}
QueryPerformanceCounter(&start);
while (true) {
std::this_thread::yield();
QueryPerformanceCounter(&now);
auto elapsed = now.QuadPart - start.QuadPart;
if (elapsed > desired) {
break;
}
}
}
#else
// std::this_thread::sleep_for should be high enough resolution on non-Windows systems
void SleepABit() { SleepFor(1e-3); }
#endif

void BusyWait(double seconds, std::function<bool()> predicate) {
const double period = 0.001;
for (int i = 0; !predicate() && i * period < seconds; ++i) {
SleepFor(period);
}
}

Future<> SleepAsync(double seconds) {
auto out = Future<>::Make();
std::thread([out, seconds]() mutable {
SleepFor(seconds);
out.MarkFinished(Status::OK());
}).detach();
return out;
}

Future<> SleepABitAsync() {
auto out = Future<>::Make();
std::thread([out]() mutable {
SleepABit();
out.MarkFinished(Status::OK());
}).detach();
return out;
}

///////////////////////////////////////////////////////////////////////////
// Extension types

Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/testing/gtest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,24 @@ inline void BitmapFromVector(const std::vector<T>& is_valid,
ARROW_TESTING_EXPORT
void SleepFor(double seconds);

// Sleeps for a very small amount of time. The thread will be yielded
// at least once ensuring that context switches could happen. It is intended
// to be used for stress testing parallel code and shouldn't be assumed to do any
// reliable timing.
ARROW_TESTING_EXPORT
void SleepABit();

// Wait until predicate is true or timeout in seconds expires.
ARROW_TESTING_EXPORT
void BusyWait(double seconds, std::function<bool()> predicate);

ARROW_TESTING_EXPORT
Future<> SleepAsync(double seconds);

// \see SleepABit
ARROW_TESTING_EXPORT
Future<> SleepABitAsync();

template <typename T>
std::vector<T> IteratorToVector(Iterator<T> iterator) {
EXPECT_OK_AND_ASSIGN(auto out, iterator.ToVector());
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ class Result;

class Status;

namespace detail {
struct Empty;
}
template <typename T = detail::Empty>
class Future;

namespace util {
class Codec;
} // namespace util
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ endif()
add_arrow_test(utility-test
SOURCES
align_util_test.cc
async_generator_test.cc
bit_block_counter_test.cc
bit_util_test.cc
cache_test.cc
Expand All @@ -60,6 +61,7 @@ add_arrow_test(utility-test
stl_util_test.cc
string_test.cc
tdigest_test.cc
test_common.cc
time_test.cc
trie_test.cc
uri_test.cc
Expand Down
33 changes: 33 additions & 0 deletions cpp/src/arrow/util/algorithm.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "arrow/result.h"

namespace arrow {

template <typename InputIterator, typename OutputIterator, typename UnaryOperation>
Status MaybeTransform(InputIterator first, InputIterator last, OutputIterator out,
UnaryOperation unary_op) {
for (; first != last; ++first, (void)++out) {
ARROW_ASSIGN_OR_RAISE(*out, unary_op(*first));
}
return Status::OK();
}

} // namespace arrow
Loading