diff --git a/cpp/build-support/run_clang_format.py b/cpp/build-support/run_clang_format.py index 1d1592d233e..afd933140ac 100755 --- a/cpp/build-support/run_clang_format.py +++ b/cpp/build-support/run_clang_format.py @@ -28,12 +28,10 @@ # examine the output of clang-format and if changes are # present assemble a (unified)patch of the difference -def _check_one_file(completed_processes, filename): +def _check_one_file(filename, formatted): with open(filename, "rb") as reader: original = reader.read() - returncode, stdout, stderr = completed_processes[filename] - formatted = stdout if formatted != original: # Run the equivalent of diff -u diff = list(difflib.unified_diff( @@ -106,20 +104,21 @@ def _check_one_file(completed_processes, filename): [arguments.clang_format_binary, filename] for filename in formatted_filenames ], stdout=PIPE, stderr=PIPE) - for returncode, stdout, stderr in results: + + checker_args = [] + for filename, res in zip(formatted_filenames, results): # if any clang-format reported a parse error, bubble it + returncode, stdout, stderr = res if returncode != 0: + print(stderr) sys.exit(returncode) + checker_args.append((filename, stdout)) error = False - checker = partial(_check_one_file, { - filename: result - for filename, result in zip(formatted_filenames, results) - }) pool = mp.Pool() try: # check the output from each invocation of clang-format in parallel - for filename, diff in pool.imap(checker, formatted_filenames): + for filename, diff in pool.starmap(_check_one_file, checker_args): if not arguments.quiet: print("Checking {}".format(filename)) if diff: diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index bb0a365e68f..f1968af592d 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -127,6 +127,7 @@ set(ARROW_SRCS io/interfaces.cc io/memory.cc io/readahead.cc + io/slow.cc testing/util.cc util/basic_decimal.cc util/bit_util.cc diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 4c3d75c067f..9ec5735a9cf 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -19,6 +19,7 @@ #include "arrow/filesystem/filesystem.h" #include "arrow/filesystem/path_util.h" +#include "arrow/io/slow.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" @@ -75,6 +76,10 @@ std::ostream& operator<<(std::ostream& os, const FileStats& stats) { return os << "FileStats(" << stats.type() << ", " << stats.path() << ")"; } +std::string FileStats::extension() const { + return internal::GetAbstractPathExtension(path_); +} + ////////////////////////////////////////////////////////////////////////// // FileSystem default method implementations @@ -232,8 +237,91 @@ Status SubTreeFileSystem::OpenAppendStream(const std::string& path, return base_fs_->OpenAppendStream(s, out); } -std::string FileStats::extension() const { - return internal::GetAbstractPathExtension(path_); +////////////////////////////////////////////////////////////////////////// +// SlowFileSystem implementation + +SlowFileSystem::SlowFileSystem(std::shared_ptr base_fs, + std::shared_ptr latencies) + : base_fs_(base_fs), latencies_(latencies) {} + +SlowFileSystem::SlowFileSystem(std::shared_ptr base_fs, + double average_latency) + : base_fs_(base_fs), latencies_(io::LatencyGenerator::Make(average_latency)) {} + +SlowFileSystem::SlowFileSystem(std::shared_ptr base_fs, + double average_latency, int32_t seed) + : base_fs_(base_fs), latencies_(io::LatencyGenerator::Make(average_latency, seed)) {} + +Status SlowFileSystem::GetTargetStats(const std::string& path, FileStats* out) { + latencies_->Sleep(); + return base_fs_->GetTargetStats(path, out); +} + +Status SlowFileSystem::GetTargetStats(const Selector& selector, + std::vector* out) { + latencies_->Sleep(); + return base_fs_->GetTargetStats(selector, out); +} + +Status SlowFileSystem::CreateDir(const std::string& path, bool recursive) { + latencies_->Sleep(); + return base_fs_->CreateDir(path, recursive); +} + +Status SlowFileSystem::DeleteDir(const std::string& path) { + latencies_->Sleep(); + return base_fs_->DeleteDir(path); +} + +Status SlowFileSystem::DeleteDirContents(const std::string& path) { + latencies_->Sleep(); + return base_fs_->DeleteDirContents(path); +} + +Status SlowFileSystem::DeleteFile(const std::string& path) { + latencies_->Sleep(); + return base_fs_->DeleteFile(path); +} + +Status SlowFileSystem::Move(const std::string& src, const std::string& dest) { + latencies_->Sleep(); + return base_fs_->Move(src, dest); +} + +Status SlowFileSystem::CopyFile(const std::string& src, const std::string& dest) { + latencies_->Sleep(); + return base_fs_->CopyFile(src, dest); +} + +Status SlowFileSystem::OpenInputStream(const std::string& path, + std::shared_ptr* out) { + latencies_->Sleep(); + std::shared_ptr stream; + RETURN_NOT_OK(base_fs_->OpenInputStream(path, &stream)); + *out = std::make_shared(stream, latencies_); + return Status::OK(); +} + +Status SlowFileSystem::OpenInputFile(const std::string& path, + std::shared_ptr* out) { + latencies_->Sleep(); + std::shared_ptr file; + RETURN_NOT_OK(base_fs_->OpenInputFile(path, &file)); + *out = std::make_shared(file, latencies_); + return Status::OK(); +} + +Status SlowFileSystem::OpenOutputStream(const std::string& path, + std::shared_ptr* out) { + latencies_->Sleep(); + // XXX Should we have a SlowOutputStream that waits on Flush() and Close()? + return base_fs_->OpenOutputStream(path, out); +} + +Status SlowFileSystem::OpenAppendStream(const std::string& path, + std::shared_ptr* out) { + latencies_->Sleep(); + return base_fs_->OpenAppendStream(path, out); } } // namespace fs diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index 6e24db03dfe..5073461901d 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -25,7 +25,6 @@ #include #include "arrow/status.h" -#include "arrow/util/compression.h" #include "arrow/util/visibility.h" // The Windows API defines macros from *File resolving to either @@ -44,6 +43,7 @@ namespace arrow { namespace io { class InputStream; +class LatencyGenerator; class OutputStream; class RandomAccessFile; @@ -265,5 +265,47 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem { Status FixStats(FileStats* st) const; }; +/// \brief EXPERIMENTAL: a FileSystem implementation that delegates to another +/// implementation but inserts latencies at various points. +class ARROW_EXPORT SlowFileSystem : public FileSystem { + public: + SlowFileSystem(std::shared_ptr base_fs, + std::shared_ptr latencies); + SlowFileSystem(std::shared_ptr base_fs, double average_latency); + SlowFileSystem(std::shared_ptr base_fs, double average_latency, + int32_t seed); + + using FileSystem::GetTargetStats; + Status GetTargetStats(const std::string& path, FileStats* out) override; + Status GetTargetStats(const Selector& select, std::vector* out) override; + + Status CreateDir(const std::string& path, bool recursive = true) override; + + Status DeleteDir(const std::string& path) override; + Status DeleteDirContents(const std::string& path) override; + + Status DeleteFile(const std::string& path) override; + + Status Move(const std::string& src, const std::string& dest) override; + + Status CopyFile(const std::string& src, const std::string& dest) override; + + Status OpenInputStream(const std::string& path, + std::shared_ptr* out) override; + + Status OpenInputFile(const std::string& path, + std::shared_ptr* out) override; + + Status OpenOutputStream(const std::string& path, + std::shared_ptr* out) override; + + Status OpenAppendStream(const std::string& path, + std::shared_ptr* out) override; + + protected: + std::shared_ptr base_fs_; + std::shared_ptr latencies_; +}; + } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/filesystem_test.cc b/cpp/src/arrow/filesystem/filesystem_test.cc index 47ddc16273f..c8eba30218d 100644 --- a/cpp/src/arrow/filesystem/filesystem_test.cc +++ b/cpp/src/arrow/filesystem/filesystem_test.cc @@ -612,6 +612,27 @@ TEST_F(TestSubTreeFileSystem, GetTargetStatsSelector) { ASSERT_EQ(stats.size(), 0); } +//////////////////////////////////////////////////////////////////////////// +// Generic SlowFileSystem tests + +class TestSlowFSGeneric : public ::testing::Test, public GenericFileSystemTest { + public: + void SetUp() override { + time_ = TimePoint(TimePoint::duration(42)); + fs_ = std::make_shared(time_); + slow_fs_ = std::make_shared(fs_, 0.001); + } + + protected: + std::shared_ptr GetEmptyFileSystem() override { return slow_fs_; } + + TimePoint time_; + std::shared_ptr fs_; + std::shared_ptr slow_fs_; +}; + +GENERIC_FS_TEST_FUNCTIONS(TestSlowFSGeneric); + } // namespace internal } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc index d545520ddbc..7b0621bcc92 100644 --- a/cpp/src/arrow/io/memory_test.cc +++ b/cpp/src/arrow/io/memory_test.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include @@ -26,6 +27,7 @@ #include "arrow/buffer.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" +#include "arrow/io/slow.h" #include "arrow/status.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" @@ -313,5 +315,39 @@ TEST(TestMemcopy, ParallelMemcopy) { } } +template +void TestSlowInputStream() { + using clock = std::chrono::high_resolution_clock; + + auto stream = std::make_shared(util::string_view("abcdefghijkl")); + const double latency = 0.6; + auto slow = std::make_shared(stream, latency); + + ASSERT_FALSE(slow->closed()); + std::shared_ptr buf; + auto t1 = clock::now(); + ASSERT_OK(slow->Read(6, &buf)); + auto t2 = clock::now(); + AssertBufferEqual(*buf, "abcdef"); + auto dt = std::chrono::duration_cast>(t2 - t1).count(); + ASSERT_LT(dt, latency * 3); // likely + ASSERT_GT(dt, latency / 3); // likely + + util::string_view view; + ASSERT_OK(slow->Peek(4, &view)); + ASSERT_EQ(view, util::string_view("ghij")); + + ASSERT_OK(slow->Close()); + ASSERT_TRUE(slow->closed()); + ASSERT_TRUE(stream->closed()); + ASSERT_OK(slow->Close()); + ASSERT_TRUE(slow->closed()); + ASSERT_TRUE(stream->closed()); +} + +TEST(TestSlowInputStream, Basics) { TestSlowInputStream(); } + +TEST(TestSlowRandomAccessFile, Basics) { TestSlowInputStream(); } + } // namespace io } // namespace arrow diff --git a/cpp/src/arrow/io/slow.cc b/cpp/src/arrow/io/slow.cc new file mode 100644 index 00000000000..8422aaa6f5c --- /dev/null +++ b/cpp/src/arrow/io/slow.cc @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/io/slow.h" + +#include +#include +#include +#include +#include +#include + +#include "arrow/buffer.h" +#include "arrow/io/util_internal.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace io { + +// Multiply the average by this ratio to get the intended standard deviation +static constexpr double kStandardDeviationRatio = 0.1; + +class LatencyGeneratorImpl : public LatencyGenerator { + public: + ~LatencyGeneratorImpl() override = default; + + LatencyGeneratorImpl(double average_latency, int32_t seed) + : gen_(static_cast(seed)), + latency_dist_(average_latency, average_latency * kStandardDeviationRatio) {} + + double NextLatency() override { + // std::random distributions are unlikely to be thread-safe, and + // a RandomAccessFile may be called from multiple threads + std::lock_guard lock(mutex_); + return std::max(0.0, latency_dist_(gen_)); + } + + private: + std::default_random_engine gen_; + std::normal_distribution latency_dist_; + std::mutex mutex_; +}; + +LatencyGenerator::~LatencyGenerator() {} + +void LatencyGenerator::Sleep() { + std::this_thread::sleep_for(std::chrono::duration(NextLatency())); +} + +std::shared_ptr LatencyGenerator::Make(double average_latency) { + auto seed = static_cast(std::random_device()()); + return std::make_shared(average_latency, seed); +} + +std::shared_ptr LatencyGenerator::Make(double average_latency, + int32_t seed) { + return std::make_shared(average_latency, seed); +} + +////////////////////////////////////////////////////////////////////////// +// SlowInputStream implementation + +SlowInputStream::~SlowInputStream() { internal::CloseFromDestructor(this); } + +Status SlowInputStream::Close() { return stream_->Close(); } + +Status SlowInputStream::Abort() { return stream_->Abort(); } + +bool SlowInputStream::closed() const { return stream_->closed(); } + +Status SlowInputStream::Tell(int64_t* position) const { return stream_->Tell(position); } + +Status SlowInputStream::Read(int64_t nbytes, int64_t* bytes_read, void* out) { + latencies_->Sleep(); + return stream_->Read(nbytes, bytes_read, out); +} + +Status SlowInputStream::Read(int64_t nbytes, std::shared_ptr* out) { + latencies_->Sleep(); + return stream_->Read(nbytes, out); +} + +Status SlowInputStream::Peek(int64_t nbytes, util::string_view* out) { + return stream_->Peek(nbytes, out); +} + +////////////////////////////////////////////////////////////////////////// +// SlowRandomAccessFile implementation + +SlowRandomAccessFile::~SlowRandomAccessFile() { internal::CloseFromDestructor(this); } + +Status SlowRandomAccessFile::Close() { return stream_->Close(); } + +Status SlowRandomAccessFile::Abort() { return stream_->Abort(); } + +bool SlowRandomAccessFile::closed() const { return stream_->closed(); } + +Status SlowRandomAccessFile::GetSize(int64_t* size) { return stream_->GetSize(size); } + +Status SlowRandomAccessFile::Seek(int64_t position) { return stream_->Seek(position); } + +Status SlowRandomAccessFile::Tell(int64_t* position) const { + return stream_->Tell(position); +} + +Status SlowRandomAccessFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) { + latencies_->Sleep(); + return stream_->Read(nbytes, bytes_read, out); +} + +Status SlowRandomAccessFile::Read(int64_t nbytes, std::shared_ptr* out) { + latencies_->Sleep(); + return stream_->Read(nbytes, out); +} + +Status SlowRandomAccessFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + void* out) { + latencies_->Sleep(); + return stream_->ReadAt(position, nbytes, bytes_read, out); +} + +Status SlowRandomAccessFile::ReadAt(int64_t position, int64_t nbytes, + std::shared_ptr* out) { + latencies_->Sleep(); + return stream_->ReadAt(position, nbytes, out); +} + +Status SlowRandomAccessFile::Peek(int64_t nbytes, util::string_view* out) { + return stream_->Peek(nbytes, out); +} + +} // namespace io +} // namespace arrow diff --git a/cpp/src/arrow/io/slow.h b/cpp/src/arrow/io/slow.h new file mode 100644 index 00000000000..57bddceb0bc --- /dev/null +++ b/cpp/src/arrow/io/slow.h @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Slow stream implementations, mainly for testing and benchmarking + +#pragma once + +#include +#include +#include + +#include "arrow/io/interfaces.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Buffer; +class Status; + +namespace io { + +class ARROW_EXPORT LatencyGenerator { + public: + virtual ~LatencyGenerator(); + + void Sleep(); + + virtual double NextLatency() = 0; + + static std::shared_ptr Make(double average_latency); + static std::shared_ptr Make(double average_latency, int32_t seed); +}; + +// XXX use ConcurrencyWrapper? It could increase chances of finding a race. + +template +class ARROW_EXPORT SlowInputStreamBase : public StreamType { + public: + SlowInputStreamBase(std::shared_ptr stream, + std::shared_ptr latencies) + : stream_(std::move(stream)), latencies_(std::move(latencies)) {} + + SlowInputStreamBase(std::shared_ptr stream, double average_latency) + : stream_(std::move(stream)), latencies_(LatencyGenerator::Make(average_latency)) {} + + SlowInputStreamBase(std::shared_ptr stream, double average_latency, + int32_t seed) + : stream_(std::move(stream)), + latencies_(LatencyGenerator::Make(average_latency, seed)) {} + + protected: + std::shared_ptr stream_; + std::shared_ptr latencies_; +}; + +/// \brief An InputStream wrapper that makes reads slower. +/// +/// Read() calls are made slower by an average latency (in seconds). +/// Actual latencies form a normal distribution closely centered +/// on the average latency. +/// Other calls are forwarded directly. +class ARROW_EXPORT SlowInputStream : public SlowInputStreamBase { + public: + ~SlowInputStream() override; + + using SlowInputStreamBase::SlowInputStreamBase; + + Status Close() override; + Status Abort() override; + bool closed() const override; + + Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override; + Status Read(int64_t nbytes, std::shared_ptr* out) override; + Status Peek(int64_t nbytes, util::string_view* out) override; + + Status Tell(int64_t* position) const override; +}; + +/// \brief A RandomAccessFile wrapper that makes reads slower. +/// +/// Similar to SlowInputStream, but allows random access and seeking. +class ARROW_EXPORT SlowRandomAccessFile : public SlowInputStreamBase { + public: + ~SlowRandomAccessFile() override; + + using SlowInputStreamBase::SlowInputStreamBase; + + Status Close() override; + Status Abort() override; + bool closed() const override; + + Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override; + Status Read(int64_t nbytes, std::shared_ptr* out) override; + Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + void* out) override; + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + Status Peek(int64_t nbytes, util::string_view* out) override; + + Status GetSize(int64_t* size) override; + Status Seek(int64_t position) override; + Status Tell(int64_t* position) const override; +}; + +} // namespace io +} // namespace arrow