diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 4b2ff497f33..1eeadb68c51 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -25,6 +25,8 @@ #include #include +#include +#include #include @@ -109,20 +111,8 @@ class kvikio_source : public datasource { rmm::cuda_stream_view stream) override { CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); - auto const read_size = std::min(size, this->size() - offset); - - if constexpr (std::is_same_v) { - return _kvikio_handle.pread(dst, - read_size, - offset, - kvikio::defaults::task_size(), - kvikio::defaults::gds_threshold(), - false /* not to sync_default_stream */); - } else { - // HandleT is kvikio::RemoteHandle - return _kvikio_handle.pread(dst, read_size, offset); - } + return _kvikio_handle.pread(dst, read_size, offset); } size_t device_read(size_t offset, @@ -167,6 +157,21 @@ class file_source : public kvikio_source { "Reading a file using kvikIO, with compatibility mode %s.", _kvikio_handle.get_compat_mode_manager().is_compat_mode_preferred() ? "on" : "off"); } + + std::future device_read_async(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override + { + CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); + auto const read_size = std::min(size, this->size() - offset); + return _kvikio_handle.pread(dst, + read_size, + offset, + kvikio::defaults::task_size(), + kvikio::defaults::gds_threshold(), + false /* not to sync_default_stream */); + } }; /** @@ -175,117 +180,22 @@ class file_source : public kvikio_source { * Unlike Arrow's memory mapped IO class, this implementation allows memory mapping a subset of the * file where the starting offset may not be zero. */ -class memory_mapped_source : public file_source { +class memory_mapped_source : public kvikio_source { public: - explicit memory_mapped_source(char const* filepath, size_t offset, size_t max_size_estimate) - : file_source(filepath) - { - if (this->size() != 0) { - // Memory mapping is not exclusive, so we can include the whole region we expect to read - map(_kvikio_handle.fd(), offset, max_size_estimate); - } - } - - ~memory_mapped_source() override - { - if (_map_addr != nullptr) { unmap(); } - } - - std::unique_ptr host_read(size_t offset, size_t size) override - { - // Clamp length to available data - auto const read_size = std::min(size, this->size() - offset); - - // If the requested range is outside of the mapped region, read from the file - if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) { - return file_source::host_read(offset, read_size); - } - - // If the requested range is only partially within the registered region, copy to a new - // host buffer to make the data safe to copy to the device - if (_reg_addr != nullptr and - (offset < _reg_offset or offset + read_size > (_reg_offset + _reg_size))) { - auto const src = static_cast(_map_addr) + (offset - _map_offset); - - return std::make_unique>>( - std::vector(src, src + read_size)); - } - - return std::make_unique( - static_cast(_map_addr) + offset - _map_offset, read_size); - } - - std::future> host_read_async(size_t offset, - size_t size) override - { - // Use the default implementation instead of the file_source's implementation - return datasource::host_read_async(offset, size); - } - - size_t host_read(size_t offset, size_t size, uint8_t* dst) override - { - // Clamp length to available data - auto const read_size = std::min(size, this->size() - offset); - - // If the requested range is outside of the mapped region, read from the file - if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) { - return file_source::host_read(offset, read_size, dst); + explicit memory_mapped_source(char const* filepath, + size_t offset, + [[maybe_unused]] size_t max_size_estimate) + : kvikio_source{kvikio::MmapHandle()} + { + // Since the superclass kvikio_source is initialized with an empty mmap handle, `this->size()` + // returns 0 at this point. Use `kvikio::get_file_size()` instead. + auto const file_size = kvikio::get_file_size(filepath); + if (file_size != 0) { + CUDF_EXPECTS(offset < file_size, "Offset is past end of file", std::overflow_error); + _kvikio_handle = + kvikio::MmapHandle(filepath, "r", std::nullopt, 0, kvikio::FileHandle::m644, MAP_SHARED); } - - auto const src = static_cast(_map_addr) + (offset - _map_offset); - std::memcpy(dst, src, read_size); - return read_size; } - - std::future host_read_async(size_t offset, size_t size, uint8_t* dst) override - { - // Use the default implementation instead of the file_source's implementation - return datasource::host_read_async(offset, size, dst); - } - - [[nodiscard]] bool supports_device_read() const override { return false; } - - [[nodiscard]] bool is_device_read_preferred(size_t size) const override - { - return supports_device_read(); - } - - private: - void map(int fd, size_t offset, size_t size) - { - CUDF_EXPECTS(offset < this->size(), "Offset is past end of file", std::overflow_error); - - // Offset for `mmap()` must be page aligned - _map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1); - - if (size == 0 || (offset + size) > this->size()) { size = this->size() - offset; } - - // Size for `mmap()` needs to include the page padding - _map_size = size + (offset - _map_offset); - if (_map_size == 0) { return; } - - // Check if accessing a region within already mapped area - _map_addr = mmap(nullptr, _map_size, PROT_READ, MAP_PRIVATE, fd, _map_offset); - CUDF_EXPECTS(_map_addr != MAP_FAILED, "Cannot create memory mapping"); - } - - void unmap() - { - if (_map_addr != nullptr) { - auto const result = munmap(_map_addr, _map_size); - if (result != 0) { CUDF_LOG_WARN("munmap failed with %d", result); } - _map_addr = nullptr; - } - } - - private: - size_t _map_offset = 0; - size_t _map_size = 0; - void* _map_addr = nullptr; - - size_t _reg_offset = 0; - size_t _reg_size = 0; - void* _reg_addr = nullptr; }; /**