diff --git a/conda/recipes/libkvikio/conda_build_config.yaml b/conda/recipes/libkvikio/conda_build_config.yaml index 1149e0f9fa..b67ab5d118 100644 --- a/conda/recipes/libkvikio/conda_build_config.yaml +++ b/conda/recipes/libkvikio/conda_build_config.yaml @@ -11,4 +11,4 @@ c_stdlib_version: - "2.28" libcurl_version: - - "==8.5.0" + - "8.5.0" diff --git a/conda/recipes/libkvikio/recipe.yaml b/conda/recipes/libkvikio/recipe.yaml index a4e423d3ed..f4164c9611 100644 --- a/conda/recipes/libkvikio/recipe.yaml +++ b/conda/recipes/libkvikio/recipe.yaml @@ -65,7 +65,7 @@ cache: - ${{ stdlib("c") }} host: - cuda-version =${{ cuda_version }} - - libcurl ${{ libcurl_version }} + - libcurl ==${{ libcurl_version }} - if: should_use_cufile then: - libcufile-dev @@ -91,7 +91,7 @@ outputs: - ${{ compiler("c") }} host: - cuda-version =${{ cuda_version }} - - libcurl ${{ libcurl_version }} + - libcurl ==${{ libcurl_version }} run: - if: x86_64 then: @@ -108,7 +108,6 @@ outputs: ignore_run_exports: by_name: - cuda-version - - libcurl - if: should_use_cufile then: - libcufile @@ -138,6 +137,7 @@ outputs: - ${{ pin_subpackage("libkvikio", exact=True) }} - cuda-version =${{ cuda_version }} - cuda-cudart-dev + - libcurl ==${{ libcurl_version }} - if: should_use_cufile then: - libcufile-dev @@ -156,7 +156,6 @@ outputs: by_name: - cuda-cudart - cuda-version - - libcurl - libnuma - if: should_use_cufile then: diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 53351393ee..6107a0a795 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -163,7 +163,9 @@ set(SOURCES ) if(KvikIO_REMOTE_SUPPORT) - list(APPEND SOURCES "src/remote_handle.cpp" "src/shim/libcurl.cpp") + list(APPEND SOURCES "src/hdfs.cpp" "src/remote_handle.cpp" "src/detail/remote_handle.cpp" + "src/shim/libcurl.cpp" + ) endif() add_library(kvikio ${SOURCES}) diff --git a/cpp/include/kvikio/detail/remote_handle.hpp b/cpp/include/kvikio/detail/remote_handle.hpp new file mode 100644 index 0000000000..f6bd55c4c0 --- /dev/null +++ b/cpp/include/kvikio/detail/remote_handle.hpp @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace kvikio::detail { +/** + * @brief Callback for `CURLOPT_WRITEFUNCTION` that copies received data into a `std::string`. + * + * @param data Received data + * @param size Curl internal implementation always sets this parameter to 1 + * @param num_bytes Number of bytes received + * @param userdata Must be cast from `std::string*` + * @return The number of bytes consumed by the callback + */ +std::size_t callback_get_string_response(char* data, + std::size_t size, + std::size_t num_bytes, + void* userdata); +} // namespace kvikio::detail diff --git a/cpp/include/kvikio/hdfs.hpp b/cpp/include/kvikio/hdfs.hpp new file mode 100644 index 0000000000..0b20d658bd --- /dev/null +++ b/cpp/include/kvikio/hdfs.hpp @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include + +namespace kvikio { + +/** + * @brief A remote endpoint for Apache Hadoop WebHDFS. + * + * If KvikIO is run within a Docker, the argument `--network host` needs to be passed to the `docker + * run` command. + */ +class WebHdfsEndpoint : public RemoteEndpoint { + private: + std::string _url; + std::optional _username; + + public: + /** + * @brief Create an WebHDFS endpoint from a url. + * + * @param url The WebHDFS HTTP/HTTPS url to the remote file. + */ + explicit WebHdfsEndpoint(std::string url); + + /** + * @brief Create an WebHDFS endpoint from the host, port, file path and optionally username. + * + * @param host Host + * @param port Port + * @param remote_file_path Remote file path + * @param username User name + */ + explicit WebHdfsEndpoint(std::string host, + std::string port, + std::string remote_file_path, + std::optional username = std::nullopt); + + ~WebHdfsEndpoint() override = default; + void setopt(CurlHandle& curl) override; + std::string str() const override; + std::size_t get_file_size() override; + void setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) override; +}; +} // namespace kvikio diff --git a/cpp/include/kvikio/remote_handle.hpp b/cpp/include/kvikio/remote_handle.hpp index 5f11d76f3d..b2e2d1d0ff 100644 --- a/cpp/include/kvikio/remote_handle.hpp +++ b/cpp/include/kvikio/remote_handle.hpp @@ -68,6 +68,12 @@ class RemoteEndpoint { * @return The file size */ virtual std::size_t get_file_size() = 0; + + /** + * @brief Set up the range request in order to read part of a file given the file offset and read + * size. + */ + virtual void setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) = 0; }; /** @@ -89,6 +95,7 @@ class HttpEndpoint : public RemoteEndpoint { void setopt(CurlHandle& curl) override; std::string str() const override; std::size_t get_file_size() override; + void setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) override; }; /** @@ -198,6 +205,7 @@ class S3Endpoint : public RemoteEndpoint { void setopt(CurlHandle& curl) override; std::string str() const override; std::size_t get_file_size() override; + void setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) override; }; /** @@ -215,6 +223,7 @@ class S3EndpointWithPresignedUrl : public RemoteEndpoint { void setopt(CurlHandle& curl) override; std::string str() const override; std::size_t get_file_size() override; + void setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) override; }; /** diff --git a/cpp/src/detail/remote_handle.cpp b/cpp/src/detail/remote_handle.cpp new file mode 100644 index 0000000000..b023859ef8 --- /dev/null +++ b/cpp/src/detail/remote_handle.cpp @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +namespace kvikio::detail { +std::size_t callback_get_string_response(char* data, + std::size_t size, + std::size_t num_bytes, + void* userdata) +{ + auto new_data_size = size * num_bytes; + auto* response = reinterpret_cast(userdata); + response->append(data, new_data_size); + return new_data_size; +} +} // namespace kvikio::detail diff --git a/cpp/src/hdfs.cpp b/cpp/src/hdfs.cpp new file mode 100644 index 0000000000..12455b3a26 --- /dev/null +++ b/cpp/src/hdfs.cpp @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include + +namespace kvikio { + +WebHdfsEndpoint::WebHdfsEndpoint(std::string url) +{ + // todo: Use libcurl URL API for more secure and idiomatic parsing. + // Split the URL into two parts: one without query and one with. + std::regex const pattern{R"(^([^?]+)\?([^#]*))"}; + // Regex meaning: + // ^: From the start of the line + // [^?]+: Matches non-question-mark characters one or more times. The question mark ushers in the + // URL query component. + // \?: Matches the question mark, which needs to be escaped. + // [^#]*: Matches the non-pound characters zero or more times. The pound sign ushers in the URL + // fragment component. It is very likely that this part does not exist. + std::smatch match_results; + bool found = std::regex_search(url, match_results, pattern); + // If the match is not found, the URL contains no query. + if (!found) { + _url = url; + return; + } + + _url = match_results[1].str(); + auto query = match_results[2].str(); + + { + // Extract user name if provided. In WebHDFS, user name is specified as the key=value pair in + // the query + std::regex const pattern{R"(user.name=([^&]+))"}; + // Regex meaning: + // [^&]+: Matches the non-ampersand character one or more times. The ampersand delimits + // different parameters. + std::smatch match_results; + if (std::regex_search(query, match_results, pattern)) { _username = match_results[1].str(); } + } +} + +WebHdfsEndpoint::WebHdfsEndpoint(std::string host, + std::string port, + std::string file_path, + std::optional username) + : _username{std::move(username)} +{ + std::stringstream ss; + ss << "http://" << host << ":" << port << "/webhdfs/v1" << file_path; + _url = ss.str(); +} + +std::string WebHdfsEndpoint::str() const { return _url; } + +void WebHdfsEndpoint::setopt(CurlHandle& curl) +{ + KVIKIO_NVTX_FUNC_RANGE(); + curl.setopt(CURLOPT_URL, _url.c_str()); + curl.setopt(CURLOPT_FOLLOWLOCATION, 1L); +} + +std::size_t WebHdfsEndpoint::get_file_size() +{ + KVIKIO_NVTX_FUNC_RANGE(); + + std::stringstream ss; + ss << _url << "?"; + if (_username.has_value()) { ss << "user.name=" << _username.value() << "&"; } + ss << "op=GETFILESTATUS"; + + auto curl = create_curl_handle(); + curl.setopt(CURLOPT_URL, ss.str().c_str()); + curl.setopt(CURLOPT_FOLLOWLOCATION, 1L); + + std::string response; + curl.setopt(CURLOPT_WRITEDATA, static_cast(&response)); + curl.setopt(CURLOPT_WRITEFUNCTION, detail::callback_get_string_response); + + curl.perform(); + + long http_status_code{}; + curl.getinfo(CURLINFO_RESPONSE_CODE, &http_status_code); + KVIKIO_EXPECT(http_status_code == 200, "HTTP response is not successful."); + + // The response is in JSON format. The file size is given by `"length":`. + std::regex const pattern{R"("length"\s*:\s*(\d+)[^\d])"}; + // Regex meaning: + // \s*: Matches the space character zero or more times. + // \d+: Matches the digit one or more times. + // [^\d]: Matches a non-digit character. + std::smatch match_results; + bool found = std::regex_search(response, match_results, pattern); + KVIKIO_EXPECT( + found, "Regular expression search failed. Cannot extract file length from the JSON response."); + return std::stoull(match_results[1].str()); +} + +void WebHdfsEndpoint::setup_range_request(CurlHandle& curl, + std::size_t file_offset, + std::size_t size) +{ + // WebHDFS does not support CURLOPT_RANGE. The range is specified as query parameters in the URL. + KVIKIO_NVTX_FUNC_RANGE(); + std::stringstream ss; + ss << _url << "?"; + if (_username.has_value()) { ss << "user.name=" << _username.value() << "&"; } + ss << "op=OPEN&offset=" << file_offset << "&length=" << size; + curl.setopt(CURLOPT_URL, ss.str().c_str()); +} +} // namespace kvikio diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 1aba93bdc5..23cf5c6305 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -162,6 +163,20 @@ std::size_t get_file_size_using_head_impl(RemoteEndpoint& endpoint, std::string return static_cast(cl); } +/** + * @brief Set up the range request for libcurl. Use this method when HTTP range request is supposed. + * + * @param curl A curl handle + * @param file_offset File offset + * @param size read size + */ +void setup_range_request_impl(CurlHandle& curl, std::size_t file_offset, std::size_t size) +{ + std::string const byte_range = + std::to_string(file_offset) + "-" + std::to_string(file_offset + size - 1); + curl.setopt(CURLOPT_RANGE, byte_range.c_str()); +} + } // namespace HttpEndpoint::HttpEndpoint(std::string url) : _url{std::move(url)} {} @@ -174,15 +189,15 @@ std::size_t HttpEndpoint::get_file_size() return get_file_size_using_head_impl(*this, _url); } -void HttpEndpoint::setopt(CurlHandle& curl) +void HttpEndpoint::setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) { - KVIKIO_NVTX_FUNC_RANGE(); - curl.setopt(CURLOPT_URL, _url.c_str()); + setup_range_request_impl(curl, file_offset, size); } +void HttpEndpoint::setopt(CurlHandle& curl) { curl.setopt(CURLOPT_URL, _url.c_str()); } + void S3Endpoint::setopt(CurlHandle& curl) { - KVIKIO_NVTX_FUNC_RANGE(); curl.setopt(CURLOPT_URL, _url.c_str()); curl.setopt(CURLOPT_AWS_SIGV4, _aws_sigv4.c_str()); curl.setopt(CURLOPT_USERPWD, _aws_userpwd.c_str()); @@ -327,6 +342,12 @@ std::size_t S3Endpoint::get_file_size() return get_file_size_using_head_impl(*this, _url); } +void S3Endpoint::setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) +{ + KVIKIO_NVTX_FUNC_RANGE(); + setup_range_request_impl(curl, file_offset, size); +} + S3EndpointWithPresignedUrl::S3EndpointWithPresignedUrl(std::string presigned_url) : _url{std::move(presigned_url)} { @@ -334,7 +355,6 @@ S3EndpointWithPresignedUrl::S3EndpointWithPresignedUrl(std::string presigned_url void S3EndpointWithPresignedUrl::setopt(CurlHandle& curl) { - KVIKIO_NVTX_FUNC_RANGE(); curl.setopt(CURLOPT_URL, _url.c_str()); } @@ -411,6 +431,14 @@ std::size_t S3EndpointWithPresignedUrl::get_file_size() return file_size; } +void S3EndpointWithPresignedUrl::setup_range_request(CurlHandle& curl, + std::size_t file_offset, + std::size_t size) +{ + KVIKIO_NVTX_FUNC_RANGE(); + setup_range_request_impl(curl, file_offset, size); +} + RemoteHandle::RemoteHandle(std::unique_ptr endpoint, std::size_t nbytes) : _endpoint{std::move(endpoint)}, _nbytes{nbytes} { @@ -510,10 +538,7 @@ std::size_t RemoteHandle::read(void* buf, std::size_t size, std::size_t file_off bool const is_host_mem = is_host_memory(buf); auto curl = create_curl_handle(); _endpoint->setopt(curl); - - std::string const byte_range = - std::to_string(file_offset) + "-" + std::to_string(file_offset + size - 1); - curl.setopt(CURLOPT_RANGE, byte_range.c_str()); + _endpoint->setup_range_request(curl, file_offset, size); if (is_host_mem) { curl.setopt(CURLOPT_WRITEFUNCTION, callback_host_memory); diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 3d53bbd86f..41de4bb6fa 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -57,7 +57,7 @@ function(kvikio_add_test) ) target_link_libraries( ${_KVIKIO_NAME} PRIVATE kvikio::kvikio GTest::gmock GTest::gmock_main GTest::gtest - GTest::gtest_main CUDA::cudart + GTest::gtest_main CUDA::cudart $ ) rapids_test_add( @@ -76,6 +76,9 @@ kvikio_add_test(NAME ERROR_TEST SOURCES test_error.cpp) kvikio_add_test(NAME MMAP_TEST SOURCES test_mmap.cpp) -kvikio_add_test(NAME REMOTE_HANDLE_TEST SOURCES test_remote_handle.cpp utils/env.cpp) +if(KvikIO_REMOTE_SUPPORT) + kvikio_add_test(NAME REMOTE_HANDLE_TEST SOURCES test_remote_handle.cpp utils/env.cpp) + kvikio_add_test(NAME HDFS_TEST SOURCES test_hdfs.cpp utils/hdfs_helper.cpp) +endif() rapids_test_install_relocatable(INSTALL_COMPONENT_SET testing DESTINATION bin/tests/libkvikio) diff --git a/cpp/tests/test_basic_io.cpp b/cpp/tests/test_basic_io.cpp index d72ba8841f..141fe386c3 100644 --- a/cpp/tests/test_basic_io.cpp +++ b/cpp/tests/test_basic_io.cpp @@ -27,15 +27,16 @@ class BasicIOTest : public testing::Test { TempDir tmp_dir{false}; _filepath = tmp_dir.path() / "test"; - _dev_a = std::move(DevBuffer::arange(100)); - _dev_b = std::move(DevBuffer::zero_like(_dev_a)); + _dev_a = std::move(DevBuffer::arange(100)); + _dev_b = std::move(DevBuffer::zero_like(_dev_a)); } void TearDown() override {} std::filesystem::path _filepath; - DevBuffer _dev_a; - DevBuffer _dev_b; + using value_type = std::int64_t; + DevBuffer _dev_a; + DevBuffer _dev_b; }; TEST_F(BasicIOTest, write_read) diff --git a/cpp/tests/test_hdfs.cpp b/cpp/tests/test_hdfs.cpp new file mode 100644 index 0000000000..354c8c4aca --- /dev/null +++ b/cpp/tests/test_hdfs.cpp @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include + +#include "utils/hdfs_helper.hpp" +#include "utils/utils.hpp" + +// This test makes the following assumptions: +// - This KvikIO unit test is run on the "name node" of a WebHDFS server. +// - Port 9870 (default for WebHDFS) is being used to listen to the requests. +// - The environment variable `KVIKIO_USER` is specified prior to the test. It contains a valid user +// name that has been granted access to the HDFS. +// - The user has the proper permission to create a file under the `/tmp` directory on the HDFS. +// - If the unit test is run within a Docker. The following arguments are passed to the `docker run` +// command: +// - `--network host` +// - `--env KVIKIO_USER=` +// +// If any of these assumptions is not satisfied, this unit test is expected to be skipped +// gracefully. + +using value_type = double; + +namespace kvikio::test { +struct Config { + std::size_t num_elements{1024ull * 1024ull}; + std::vector host_buf; + kvikio::test::DevBuffer dev_buf; + std::string host; + std::string port; + std::string _username; + std::string remote_file_path; + bool file_created{false}; +}; +} // namespace kvikio::test + +class WebHdfsTest : public testing::Test { + protected: + static void SetUpTestSuite() + { + config.num_elements = 1024ull * 1024ull; + config.host_buf.resize(config.num_elements); + std::iota(config.host_buf.begin(), config.host_buf.end(), 0); + + config.dev_buf = kvikio::test::DevBuffer{config.host_buf}; + + config.host = "localhost"; + config.port = "9870"; + + config.remote_file_path = "/tmp/kvikio-test-webhdfs.bin"; + + auto res = std::getenv("KVIKIO_USER"); + if (res) { + config._username = res; + } else { + GTEST_SKIP() << "Environment variable KVIKIO_USER is not set for this test."; + } + + webhdfs_helper = + std::make_unique(config.host, config.port, config._username); + + if (!webhdfs_helper->can_connect()) { + GTEST_SKIP() << "Cannot connect to WebHDFS. Skipping all tests for this fixture."; + } + + std::span buffer{reinterpret_cast(config.host_buf.data()), + config.host_buf.size() * sizeof(value_type)}; + if (!webhdfs_helper->upload_data(buffer, config.remote_file_path)) { + GTEST_SKIP() + << "Failed to upload test data using WebHDFS. Skipping all tests for this fixture."; + }; + + config.file_created = true; + } + + static void TearDownTestSuite() + { + if (config.file_created) { webhdfs_helper->delete_data(config.remote_file_path); } + } + + static kvikio::test::Config config; + static std::unique_ptr webhdfs_helper; +}; + +kvikio::test::Config WebHdfsTest::config{}; +std::unique_ptr WebHdfsTest::webhdfs_helper{}; + +TEST_F(WebHdfsTest, constructor) +{ + auto do_test = [&](kvikio::RemoteHandle& remote_handle) { + kvikio::test::DevBuffer out_device_buf(config.num_elements); + auto read_size = remote_handle.read(out_device_buf.ptr, remote_handle.nbytes()); + auto out_host_buf = out_device_buf.to_vector(); + for (std::size_t i = 0; i < config.num_elements; ++i) { + EXPECT_EQ(config.host_buf[i], out_host_buf[i]); + } + EXPECT_EQ(read_size, remote_handle.nbytes()); + }; + + std::stringstream ss; + ss << "http://" << config.host << ":" << config.port << "/webhdfs/v1" << config.remote_file_path + << "?user.name=" << config._username; + std::vector remote_handles; + + remote_handles.emplace_back(std::make_unique(ss.str())); + remote_handles.emplace_back(std::make_unique( + config.host, config.port, config.remote_file_path, config._username)); + + for (auto& remote_handle : remote_handles) { + do_test(remote_handle); + } +} + +TEST_F(WebHdfsTest, read_parallel) +{ + auto do_test = [&](std::string const& url, + std::size_t num_elements_to_skip, + std::size_t num_elements_to_read, + std::size_t task_size) { + kvikio::RemoteHandle remote_handle{std::make_unique(url)}; + auto const offset = num_elements_to_skip * sizeof(value_type); + auto const expected_read_size = num_elements_to_read * sizeof(value_type); + + // host + { + std::vector out_host_buf(num_elements_to_read, {}); + auto fut = remote_handle.pread(out_host_buf.data(), expected_read_size, offset, task_size); + auto const read_size = fut.get(); + for (std::size_t i = num_elements_to_skip; i < num_elements_to_read; ++i) { + EXPECT_EQ(config.host_buf[i], out_host_buf[i - num_elements_to_skip]); + } + EXPECT_EQ(read_size, expected_read_size); + } + + // device + { + kvikio::test::DevBuffer out_device_buf(num_elements_to_read); + auto fut = remote_handle.pread(out_device_buf.ptr, expected_read_size, offset, task_size); + auto const read_size = fut.get(); + auto out_host_buf = out_device_buf.to_vector(); + for (std::size_t i = num_elements_to_skip; i < num_elements_to_read; ++i) { + EXPECT_EQ(config.host_buf[i], out_host_buf[i - num_elements_to_skip]); + } + EXPECT_EQ(read_size, expected_read_size); + } + }; + + std::stringstream ss; + ss << "http://" << config.host << ":" << config.port << "/webhdfs/v1" << config.remote_file_path + << "?user.name=" << config._username; + std::vector task_sizes{256, 1024, kvikio::defaults::task_size()}; + + for (const auto& task_size : task_sizes) { + for (const auto& num_elements_to_read : {10, 9999}) { + for (const auto& num_elements_to_skip : {0, 10, 100, 1000, 9999}) { + do_test(ss.str(), num_elements_to_skip, num_elements_to_read, task_size); + } + } + } +} diff --git a/cpp/tests/test_mmap.cpp b/cpp/tests/test_mmap.cpp index 7fb1e3a4fd..1c6e3a54b3 100644 --- a/cpp/tests/test_mmap.cpp +++ b/cpp/tests/test_mmap.cpp @@ -40,7 +40,7 @@ class MmapTest : public testing::Test { _filepath = tmp_dir.path() / "test.bin"; std::size_t num_elements = 1024ull * 1024ull; _host_buf = CreateTempFile(_filepath, num_elements); - _dev_buf = kvikio::test::DevBuffer{_host_buf}; + _dev_buf = kvikio::test::DevBuffer{_host_buf}; _page_size = kvikio::get_page_size(); } @@ -62,9 +62,8 @@ class MmapTest : public testing::Test { std::size_t _file_size; std::size_t _page_size; std::vector _host_buf; - kvikio::test::DevBuffer _dev_buf; - using value_type = decltype(_host_buf)::value_type; + kvikio::test::DevBuffer _dev_buf; }; TEST_F(MmapTest, invalid_file_open_flag) @@ -212,7 +211,7 @@ TEST_F(MmapTest, read_seq) // device { - kvikio::test::DevBuffer out_device_buf(num_elements_to_read); + kvikio::test::DevBuffer out_device_buf(num_elements_to_read); auto const read_size = mmap_handle.read(out_device_buf.ptr, expected_read_size, offset); auto out_host_buf = out_device_buf.to_vector(); for (std::size_t i = num_elements_to_skip; i < num_elements_to_read; ++i) { @@ -250,7 +249,7 @@ TEST_F(MmapTest, read_parallel) // device { - kvikio::test::DevBuffer out_device_buf(num_elements_to_read); + kvikio::test::DevBuffer out_device_buf(num_elements_to_read); auto fut = mmap_handle.pread(out_device_buf.ptr, expected_read_size, offset); auto const read_size = fut.get(); auto out_host_buf = out_device_buf.to_vector(); @@ -300,7 +299,7 @@ TEST_F(MmapTest, read_with_default_arguments) // device { - kvikio::test::DevBuffer out_device_buf(num_elements); + kvikio::test::DevBuffer out_device_buf(num_elements); { auto const read_size = mmap_handle.read(out_device_buf.ptr); diff --git a/cpp/tests/utils/hdfs_helper.cpp b/cpp/tests/utils/hdfs_helper.cpp new file mode 100644 index 0000000000..2bcbc7fed2 --- /dev/null +++ b/cpp/tests/utils/hdfs_helper.cpp @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfs_helper.hpp" + +#include +#include +#include +#include +#include + +#include +#include + +namespace kvikio::test { + +namespace { + +/** + * @brief Helper struct that wraps a buffer view and tracks how many data have been processed via an + * offset value. + */ +struct tracked_buffer_t { + std::span buffer; + std::size_t offset; +}; + +/** + * @brief Callback for `CURLOPT_READFUNCTION` to upload data. + * + * @param data + * @param size Curl internal implementation always sets this parameter to 1 + * @param num_bytes_max The maximum number of bytes that can be uploaded + * @param userdata Must be cast from `tracked_buffer_t*` + * @return The number of bytes that have been copied to the transfer buffer. + */ +std::size_t callback_upload(char* data, std::size_t size, std::size_t num_bytes_max, void* userdata) +{ + auto new_data_size_max = size * num_bytes_max; + auto* tracked_buffer = reinterpret_cast(userdata); + + // All data have been uploaded. Nothing more to do. + if (tracked_buffer->offset >= tracked_buffer->buffer.size()) { return 0; } + + auto copy_size = + std::min(new_data_size_max, tracked_buffer->buffer.size() - tracked_buffer->offset); + std::memcpy(data, tracked_buffer->buffer.data() + tracked_buffer->offset, copy_size); + tracked_buffer->offset += copy_size; + + return copy_size; +} +} // namespace + +WebHdfsTestHelper::WebHdfsTestHelper(std::string const& host, + std::string const& port, + std::string const& username) + : _host{host}, _port{port}, _username{username} +{ + std::stringstream ss; + ss << "http://" << host << ":" << port << "/webhdfs/v1"; + _url_before_path = ss.str(); +} + +bool WebHdfsTestHelper::can_connect() noexcept +{ + try { + auto curl = create_curl_handle(); + + std::stringstream ss; + ss << _url_before_path << "/?user.name=" << _username << "&op=GETHOMEDIRECTORY"; + + curl.setopt(CURLOPT_URL, ss.str().c_str()); + + std::string response{}; + curl.setopt(CURLOPT_WRITEDATA, &response); + curl.setopt(CURLOPT_WRITEFUNCTION, kvikio::detail::callback_get_string_response); + curl.setopt(CURLOPT_FOLLOWLOCATION, 1L); + curl.perform(); + return true; + } catch (std::exception const& e) { + std::cout << e.what() << "\n"; + return false; + } +} + +bool WebHdfsTestHelper::upload_data(std::span buffer, + std::string const& remote_file_path) noexcept +{ + try { + // Official reference on how to create and write to a file: + // https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Create_and_Write_to_a_File + std::string redirect_url; + + { + // Step 1: Submit a HTTP PUT request without automatically following redirects and without + // sending the file data. + auto curl = create_curl_handle(); + + std::stringstream ss; + ss << _url_before_path << remote_file_path << "?user.name=" << _username << "&op=CREATE"; + std::string redirect_data_node_location{}; + + curl.setopt(CURLOPT_URL, ss.str().c_str()); + curl.setopt(CURLOPT_FOLLOWLOCATION, 0L); + curl.setopt(CURLOPT_CUSTOMREQUEST, "PUT"); + + std::string response{}; + curl.setopt(CURLOPT_HEADERDATA, &response); + curl.setopt(CURLOPT_HEADERFUNCTION, kvikio::detail::callback_get_string_response); + + curl.perform(); + + long http_status_code{}; + curl.getinfo(CURLINFO_RESPONSE_CODE, &http_status_code); + KVIKIO_EXPECT(http_status_code == 307, "Redirection from name node to data node failed."); + + std::regex const pattern{R"(Location:\s*(.*)\s*)"}; + std::smatch match_results; + bool found = std::regex_search(response, match_results, pattern); + KVIKIO_EXPECT(found, + "Regular expression search failed. Cannot extract redirect location from the " + "JSON response."); + redirect_url = match_results[1].str(); + } + + { + // Step 2: Submit another HTTP PUT request using the URL in the Location header with the file + // data to be written. + auto curl = create_curl_handle(); + curl.setopt(CURLOPT_URL, redirect_url.c_str()); + curl.setopt(CURLOPT_UPLOAD, 1L); + + tracked_buffer_t tracked_buffer{.buffer = buffer, .offset = 0}; + curl.setopt(CURLOPT_READDATA, &tracked_buffer); + curl.setopt(CURLOPT_READFUNCTION, callback_upload); + curl.setopt(CURLOPT_INFILESIZE_LARGE, static_cast(buffer.size())); + + curl.perform(); + + long http_status_code{}; + curl.getinfo(CURLINFO_RESPONSE_CODE, &http_status_code); + KVIKIO_EXPECT(http_status_code == 201, "File creation failed."); + } + + return true; + } catch (std::exception const& e) { + std::cout << e.what() << "\n"; + return false; + } +} + +bool WebHdfsTestHelper::delete_data(std::string const& remote_file_path) noexcept +{ + try { + // Official reference on how to delete a file: + // https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Delete_a_File.2FDirectory + auto curl = create_curl_handle(); + + std::stringstream ss; + ss << _url_before_path << remote_file_path << "?user.name=" << _username << "&op=DELETE"; + std::string const url = ss.str(); + std::string redirect_data_node_location{}; + + curl.setopt(CURLOPT_URL, url.c_str()); + curl.setopt(CURLOPT_FOLLOWLOCATION, 1L); + curl.setopt(CURLOPT_CUSTOMREQUEST, "DELETE"); + + std::string response{}; + curl.setopt(CURLOPT_HEADERDATA, &response); + curl.setopt(CURLOPT_HEADERFUNCTION, kvikio::detail::callback_get_string_response); + + curl.perform(); + + long http_status_code{}; + curl.getinfo(CURLINFO_RESPONSE_CODE, &http_status_code); + KVIKIO_EXPECT(http_status_code == 200, "File deletion failed."); + + return true; + } catch (std::exception const& e) { + std::cout << e.what() << "\n"; + return false; + } +} +} // namespace kvikio::test diff --git a/cpp/tests/utils/hdfs_helper.hpp b/cpp/tests/utils/hdfs_helper.hpp new file mode 100644 index 0000000000..7cc5da377c --- /dev/null +++ b/cpp/tests/utils/hdfs_helper.hpp @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace kvikio::test { + +/** + * @brief Helper class to create and upload a file on WebHDFS so as to enable read testing. + */ +class WebHdfsTestHelper { + private: + std::string _host; + std::string _port; + std::string _username; + std::string _url_before_path; + + public: + WebHdfsTestHelper(std::string const& host, std::string const& port, std::string const& username); + + /** + * @brief Whether KvikIO can connect to the WebHDFS server. + * + * @return A boolean answer. + */ + bool can_connect() noexcept; + + /** + * @brief Copy the data from a host buffer to a remote file on the WebHDFS server. + * + * @param buffer View to the host buffer whose data will be copied to the WebHDFS server + * @param remote_file_path Remote file path + * @return True if the file has been successfully uploaded; false otherwise. + */ + bool upload_data(std::span buffer, std::string const& remote_file_path) noexcept; + + /** + * @brief Delete a remote file on the WebHDFS server. + * + * @param remote_file_path Remote file path + * @return True if the file has been successfully deleted; false otherwise. + */ + bool delete_data(std::string const& remote_file_path) noexcept; +}; + +} // namespace kvikio::test diff --git a/cpp/tests/utils/utils.hpp b/cpp/tests/utils/utils.hpp index bb50d07c13..7d733621a7 100644 --- a/cpp/tests/utils/utils.hpp +++ b/cpp/tests/utils/utils.hpp @@ -108,6 +108,7 @@ class TempDir { /** * @brief Help class for creating and comparing buffers. */ +template class DevBuffer { public: std::size_t nelem; @@ -116,12 +117,12 @@ class DevBuffer { DevBuffer() : nelem{0}, nbytes{0} {}; - DevBuffer(std::size_t nelem) : nelem{nelem}, nbytes{nelem * sizeof(std::int64_t)} + DevBuffer(std::size_t nelem) : nelem{nelem}, nbytes{nelem * sizeof(T)} { KVIKIO_CHECK_CUDA(cudaMalloc(&ptr, nbytes)); KVIKIO_CHECK_CUDA(cudaMemset(ptr, 0, nbytes)); } - DevBuffer(std::vector const& host_buffer) : DevBuffer{host_buffer.size()} + DevBuffer(std::vector const& host_buffer) : DevBuffer{host_buffer.size()} { KVIKIO_CHECK_CUDA(cudaMemcpy(ptr, host_buffer.data(), nbytes, cudaMemcpyHostToDevice)); } @@ -143,9 +144,9 @@ class DevBuffer { ~DevBuffer() noexcept { cudaFree(ptr); } - [[nodiscard]] static DevBuffer arange(std::size_t nelem, std::int64_t start = 0) + [[nodiscard]] static DevBuffer arange(std::size_t nelem, T start = 0) { - std::vector host_buffer(nelem); + std::vector host_buffer(nelem); std::iota(host_buffer.begin(), host_buffer.end(), start); return DevBuffer{host_buffer}; } @@ -157,9 +158,9 @@ class DevBuffer { return ret; } - [[nodiscard]] std::vector to_vector() const + [[nodiscard]] std::vector to_vector() const { - std::vector ret(nelem); + std::vector ret(nelem); KVIKIO_CHECK_CUDA(cudaMemcpy(ret.data(), this->ptr, nbytes, cudaMemcpyDeviceToHost)); return ret; } @@ -177,7 +178,8 @@ class DevBuffer { /** * @brief Check that two buffers are equal */ -inline void expect_equal(DevBuffer const& a, DevBuffer const& b) +template +inline void expect_equal(DevBuffer const& a, DevBuffer const& b) { EXPECT_EQ(a.nbytes, b.nbytes); auto a_vec = a.to_vector();