diff --git a/cpp/src/arrow/ipc/ipc-memory-test.cc b/cpp/src/arrow/ipc/ipc-memory-test.cc index 19339212225..a2dbd35728c 100644 --- a/cpp/src/arrow/ipc/ipc-memory-test.cc +++ b/cpp/src/arrow/ipc/ipc-memory-test.cc @@ -26,9 +26,6 @@ #include "arrow/ipc/memory.h" #include "arrow/ipc/test-common.h" -#include "arrow/test-util.h" -#include "arrow/util/buffer.h" -#include "arrow/util/status.h" namespace arrow { namespace ipc { @@ -67,6 +64,57 @@ TEST_F(TestMemoryMappedSource, WriteRead) { } } +TEST_F(TestMemoryMappedSource, ReadOnly) { + const int64_t buffer_size = 1024; + std::vector buffer(buffer_size); + + test::random_bytes(1024, 0, buffer.data()); + + const int reps = 5; + + std::string path = "ipc-read-only-test"; + CreateFile(path, reps * buffer_size); + + std::shared_ptr rwmmap; + ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &rwmmap)); + + int64_t position = 0; + for (int i = 0; i < reps; ++i) { + ASSERT_OK(rwmmap->Write(position, buffer.data(), buffer_size)); + + position += buffer_size; + } + rwmmap->Close(); + + std::shared_ptr rommap; + ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_ONLY, &rommap)); + + position = 0; + std::shared_ptr out_buffer; + for (int i = 0; i < reps; ++i) { + ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer)); + + ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); + position += buffer_size; + } + rommap->Close(); +} + +TEST_F(TestMemoryMappedSource, InvalidMode) { + const int64_t buffer_size = 1024; + std::vector buffer(buffer_size); + + test::random_bytes(1024, 0, buffer.data()); + + std::string path = "ipc-invalid-mode-test"; + CreateFile(path, buffer_size); + + std::shared_ptr rommap; + ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_ONLY, &rommap)); + + ASSERT_RAISES(IOError, rommap->Write(0, buffer.data(), buffer_size)); +} + TEST_F(TestMemoryMappedSource, InvalidFile) { std::string non_existent_path = "invalid-file-name-asfd"; diff --git a/cpp/src/arrow/ipc/memory.cc b/cpp/src/arrow/ipc/memory.cc index caff2c610b9..a6c56d64f4a 100644 --- a/cpp/src/arrow/ipc/memory.cc +++ b/cpp/src/arrow/ipc/memory.cc @@ -41,7 +41,7 @@ MemorySource::~MemorySource() {} class MemoryMappedSource::Impl { public: - Impl() : file_(nullptr), is_open_(false), data_(nullptr) {} + Impl() : file_(nullptr), is_open_(false), is_writable_(false), data_(nullptr) {} ~Impl() { if (is_open_) { @@ -53,10 +53,12 @@ class MemoryMappedSource::Impl { Status Open(const std::string& path, MemorySource::AccessMode mode) { if (is_open_) { return Status::IOError("A file is already open"); } - path_ = path; + int prot_flags = PROT_READ; if (mode == MemorySource::READ_WRITE) { file_ = fopen(path.c_str(), "r+b"); + prot_flags |= PROT_WRITE; + is_writable_ = true; } else { file_ = fopen(path.c_str(), "rb"); } @@ -73,14 +75,13 @@ class MemoryMappedSource::Impl { fseek(file_, 0L, SEEK_SET); is_open_ = true; - // TODO(wesm): Add read-only version of this - data_ = reinterpret_cast( - mmap(nullptr, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fileno(file_), 0)); - if (data_ == nullptr) { + void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fileno(file_), 0); + if (result == MAP_FAILED) { std::stringstream ss; ss << "Memory mapping file failed, errno: " << errno; return Status::IOError(ss.str()); } + data_ = reinterpret_cast(result); return Status::OK(); } @@ -89,11 +90,15 @@ class MemoryMappedSource::Impl { uint8_t* data() { return data_; } + bool writable() { return is_writable_; } + + bool opened() { return is_open_; } + private: - std::string path_; FILE* file_; int64_t size_; bool is_open_; + bool is_writable_; // The memory map uint8_t* data_; @@ -134,6 +139,9 @@ Status MemoryMappedSource::ReadAt( } Status MemoryMappedSource::Write(int64_t position, const uint8_t* data, int64_t nbytes) { + if (!impl_->opened() || !impl_->writable()) { + return Status::IOError("Unable to write"); + } if (position < 0 || position >= impl_->size()) { return Status::Invalid("position is out of bounds"); }