Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions cpp/build-support/run_clang_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@

# examine the output of clang-format and if changes are
# present assemble a (unified)patch of the difference
def _check_one_file(completed_processes, filename):
def _check_one_file(filename, formatted):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to this PR, but I finally got annoyed enough by the slow format-check that I spent some time figuring out what happened. The bottom line is that each call to _check_one_file in a child process was serializing and transmitting the entire results for all files. Instead, we just transmit the result for the single file being checked, it's much faster.

@fsaintjacques @bkietz

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

with open(filename, "rb") as reader:
original = reader.read()

returncode, stdout, stderr = completed_processes[filename]
formatted = stdout
if formatted != original:
# Run the equivalent of diff -u
diff = list(difflib.unified_diff(
Expand Down Expand Up @@ -106,20 +104,21 @@ def _check_one_file(completed_processes, filename):
[arguments.clang_format_binary, filename]
for filename in formatted_filenames
], stdout=PIPE, stderr=PIPE)
for returncode, stdout, stderr in results:

checker_args = []
for filename, res in zip(formatted_filenames, results):
# if any clang-format reported a parse error, bubble it
returncode, stdout, stderr = res
if returncode != 0:
print(stderr)
sys.exit(returncode)
checker_args.append((filename, stdout))

error = False
checker = partial(_check_one_file, {
filename: result
for filename, result in zip(formatted_filenames, results)
})
pool = mp.Pool()
try:
# check the output from each invocation of clang-format in parallel
for filename, diff in pool.imap(checker, formatted_filenames):
for filename, diff in pool.starmap(_check_one_file, checker_args):
if not arguments.quiet:
print("Checking {}".format(filename))
if diff:
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ set(ARROW_SRCS
io/interfaces.cc
io/memory.cc
io/readahead.cc
io/slow.cc
testing/util.cc
util/basic_decimal.cc
util/bit_util.cc
Expand Down
92 changes: 90 additions & 2 deletions cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/io/slow.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"

Expand Down Expand Up @@ -75,6 +76,10 @@ std::ostream& operator<<(std::ostream& os, const FileStats& stats) {
return os << "FileStats(" << stats.type() << ", " << stats.path() << ")";
}

std::string FileStats::extension() const {
return internal::GetAbstractPathExtension(path_);
}

//////////////////////////////////////////////////////////////////////////
// FileSystem default method implementations

Expand Down Expand Up @@ -232,8 +237,91 @@ Status SubTreeFileSystem::OpenAppendStream(const std::string& path,
return base_fs_->OpenAppendStream(s, out);
}

std::string FileStats::extension() const {
return internal::GetAbstractPathExtension(path_);
//////////////////////////////////////////////////////////////////////////
// SlowFileSystem implementation

SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
std::shared_ptr<io::LatencyGenerator> latencies)
: base_fs_(base_fs), latencies_(latencies) {}

SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
double average_latency)
: base_fs_(base_fs), latencies_(io::LatencyGenerator::Make(average_latency)) {}

SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
double average_latency, int32_t seed)
: base_fs_(base_fs), latencies_(io::LatencyGenerator::Make(average_latency, seed)) {}

Status SlowFileSystem::GetTargetStats(const std::string& path, FileStats* out) {
latencies_->Sleep();
return base_fs_->GetTargetStats(path, out);
}

Status SlowFileSystem::GetTargetStats(const Selector& selector,
std::vector<FileStats>* out) {
latencies_->Sleep();
return base_fs_->GetTargetStats(selector, out);
}

Status SlowFileSystem::CreateDir(const std::string& path, bool recursive) {
latencies_->Sleep();
return base_fs_->CreateDir(path, recursive);
}

Status SlowFileSystem::DeleteDir(const std::string& path) {
latencies_->Sleep();
return base_fs_->DeleteDir(path);
}

Status SlowFileSystem::DeleteDirContents(const std::string& path) {
latencies_->Sleep();
return base_fs_->DeleteDirContents(path);
}

Status SlowFileSystem::DeleteFile(const std::string& path) {
latencies_->Sleep();
return base_fs_->DeleteFile(path);
}

Status SlowFileSystem::Move(const std::string& src, const std::string& dest) {
latencies_->Sleep();
return base_fs_->Move(src, dest);
}

Status SlowFileSystem::CopyFile(const std::string& src, const std::string& dest) {
latencies_->Sleep();
return base_fs_->CopyFile(src, dest);
}

Status SlowFileSystem::OpenInputStream(const std::string& path,
std::shared_ptr<io::InputStream>* out) {
latencies_->Sleep();
std::shared_ptr<io::InputStream> stream;
RETURN_NOT_OK(base_fs_->OpenInputStream(path, &stream));
*out = std::make_shared<io::SlowInputStream>(stream, latencies_);
return Status::OK();
}

Status SlowFileSystem::OpenInputFile(const std::string& path,
std::shared_ptr<io::RandomAccessFile>* out) {
latencies_->Sleep();
std::shared_ptr<io::RandomAccessFile> file;
RETURN_NOT_OK(base_fs_->OpenInputFile(path, &file));
*out = std::make_shared<io::SlowRandomAccessFile>(file, latencies_);
return Status::OK();
}

Status SlowFileSystem::OpenOutputStream(const std::string& path,
std::shared_ptr<io::OutputStream>* out) {
latencies_->Sleep();
// XXX Should we have a SlowOutputStream that waits on Flush() and Close()?
return base_fs_->OpenOutputStream(path, out);
}

Status SlowFileSystem::OpenAppendStream(const std::string& path,
std::shared_ptr<io::OutputStream>* out) {
latencies_->Sleep();
return base_fs_->OpenAppendStream(path, out);
}

} // namespace fs
Expand Down
44 changes: 43 additions & 1 deletion cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <vector>

#include "arrow/status.h"
#include "arrow/util/compression.h"
#include "arrow/util/visibility.h"

// The Windows API defines macros from *File resolving to either
Expand All @@ -44,6 +43,7 @@ namespace arrow {
namespace io {

class InputStream;
class LatencyGenerator;
class OutputStream;
class RandomAccessFile;

Expand Down Expand Up @@ -265,5 +265,47 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem {
Status FixStats(FileStats* st) const;
};

/// \brief EXPERIMENTAL: a FileSystem implementation that delegates to another
/// implementation but inserts latencies at various points.
class ARROW_EXPORT SlowFileSystem : public FileSystem {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've brought this topic up before, but what do you think about putting the entire implementation of SlowFS in a .cc file and returning std::shared_ptr from the function that creates it? This can always be done later so refactor need not happen now. It's probably better to use factory methods for instantiating most FS classes anyway

Unless you anticipate adding additional methods to the class

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments hold regarding the slow stream classes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. I don't think our conventions should vary too much. If some classes are exposed and other hidden it feels a bit weird. But strong opinion from me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference would be to hide the implementations in all cases except where we expose additional methods. If only that it offers more freedom to refactor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we discuss this in a separate JIRA. Will merge.

public:
SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
std::shared_ptr<io::LatencyGenerator> latencies);
SlowFileSystem(std::shared_ptr<FileSystem> base_fs, double average_latency);
SlowFileSystem(std::shared_ptr<FileSystem> base_fs, double average_latency,
int32_t seed);

using FileSystem::GetTargetStats;
Status GetTargetStats(const std::string& path, FileStats* out) override;
Status GetTargetStats(const Selector& select, std::vector<FileStats>* out) override;

Status CreateDir(const std::string& path, bool recursive = true) override;

Status DeleteDir(const std::string& path) override;
Status DeleteDirContents(const std::string& path) override;

Status DeleteFile(const std::string& path) override;

Status Move(const std::string& src, const std::string& dest) override;

Status CopyFile(const std::string& src, const std::string& dest) override;

Status OpenInputStream(const std::string& path,
std::shared_ptr<io::InputStream>* out) override;

Status OpenInputFile(const std::string& path,
std::shared_ptr<io::RandomAccessFile>* out) override;

Status OpenOutputStream(const std::string& path,
std::shared_ptr<io::OutputStream>* out) override;

Status OpenAppendStream(const std::string& path,
std::shared_ptr<io::OutputStream>* out) override;

protected:
std::shared_ptr<FileSystem> base_fs_;
std::shared_ptr<io::LatencyGenerator> latencies_;
};

} // namespace fs
} // namespace arrow
21 changes: 21 additions & 0 deletions cpp/src/arrow/filesystem/filesystem_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,27 @@ TEST_F(TestSubTreeFileSystem, GetTargetStatsSelector) {
ASSERT_EQ(stats.size(), 0);
}

////////////////////////////////////////////////////////////////////////////
// Generic SlowFileSystem tests

class TestSlowFSGeneric : public ::testing::Test, public GenericFileSystemTest {
public:
void SetUp() override {
time_ = TimePoint(TimePoint::duration(42));
fs_ = std::make_shared<MockFileSystem>(time_);
slow_fs_ = std::make_shared<SlowFileSystem>(fs_, 0.001);
}

protected:
std::shared_ptr<FileSystem> GetEmptyFileSystem() override { return slow_fs_; }

TimePoint time_;
std::shared_ptr<MockFileSystem> fs_;
std::shared_ptr<SlowFileSystem> slow_fs_;
};

GENERIC_FS_TEST_FUNCTIONS(TestSlowFSGeneric);

} // namespace internal
} // namespace fs
} // namespace arrow
36 changes: 36 additions & 0 deletions cpp/src/arrow/io/memory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <cstring>
Expand All @@ -26,6 +27,7 @@
#include "arrow/buffer.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/io/slow.h"
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"
Expand Down Expand Up @@ -313,5 +315,39 @@ TEST(TestMemcopy, ParallelMemcopy) {
}
}

template <typename SlowStreamType>
void TestSlowInputStream() {
using clock = std::chrono::high_resolution_clock;

auto stream = std::make_shared<BufferReader>(util::string_view("abcdefghijkl"));
const double latency = 0.6;
auto slow = std::make_shared<SlowStreamType>(stream, latency);

ASSERT_FALSE(slow->closed());
std::shared_ptr<Buffer> buf;
auto t1 = clock::now();
ASSERT_OK(slow->Read(6, &buf));
auto t2 = clock::now();
AssertBufferEqual(*buf, "abcdef");
auto dt = std::chrono::duration_cast<std::chrono::duration<double>>(t2 - t1).count();
ASSERT_LT(dt, latency * 3); // likely
ASSERT_GT(dt, latency / 3); // likely

util::string_view view;
ASSERT_OK(slow->Peek(4, &view));
ASSERT_EQ(view, util::string_view("ghij"));

ASSERT_OK(slow->Close());
ASSERT_TRUE(slow->closed());
ASSERT_TRUE(stream->closed());
ASSERT_OK(slow->Close());
ASSERT_TRUE(slow->closed());
ASSERT_TRUE(stream->closed());
}

TEST(TestSlowInputStream, Basics) { TestSlowInputStream<SlowInputStream>(); }

TEST(TestSlowRandomAccessFile, Basics) { TestSlowInputStream<SlowRandomAccessFile>(); }

} // namespace io
} // namespace arrow
Loading