Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d406591
Support WebHDFS: C++ implementation
kingcrimsontianyu Aug 5, 2025
2e8328f
Update
kingcrimsontianyu Aug 5, 2025
46c1cbb
Move advanced URL handling to a separate PR to reduce the scope of cu…
kingcrimsontianyu Aug 5, 2025
8fd6a70
Update
kingcrimsontianyu Aug 5, 2025
cecd49c
Add clarifying comments
kingcrimsontianyu Aug 5, 2025
e6e4d56
Add more comments
kingcrimsontianyu Aug 5, 2025
97da554
Update
kingcrimsontianyu Aug 5, 2025
6b48777
Update
kingcrimsontianyu Aug 5, 2025
e4fbd35
Add default arg
kingcrimsontianyu Aug 5, 2025
87603f0
Fix a bug where too large file causes string-to-size conversion to fail
kingcrimsontianyu Aug 5, 2025
c3881e0
Merge branch 'branch-25.10' into web-hdfs
kingcrimsontianyu Aug 5, 2025
bb256f2
For WebHDFS unit test, fix segfault when the test is skipped; create/…
kingcrimsontianyu Aug 7, 2025
969f6d0
Remove unnecessary ntvx annotation
kingcrimsontianyu Aug 7, 2025
377d88e
Update cpp/tests/CMakeLists.txt
kingcrimsontianyu Aug 7, 2025
9c6477c
Attempt to fix CI overlinking error
kingcrimsontianyu Aug 7, 2025
0507d78
Remove libcurl from libkvikio-tests' run section
kingcrimsontianyu Aug 7, 2025
cb24034
Add relaxed libcurl run dependencies with explanatory comments.
bdice Aug 7, 2025
8a7ff55
Merge branch 'branch-25.10' into web-hdfs
bdice Aug 7, 2025
70be8f9
Simplify libcurl pinnings after re-checking run-export behavior.
bdice Aug 7, 2025
4daf55e
Merge branch 'web-hdfs' of github.com:kingcrimsontianyu/kvikio into w…
bdice Aug 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conda/recipes/libkvikio/conda_build_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ c_stdlib_version:
- "2.28"

libcurl_version:
- "==8.5.0"
- "8.5.0"
7 changes: 3 additions & 4 deletions conda/recipes/libkvikio/recipe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ cache:
- ${{ stdlib("c") }}
host:
- cuda-version =${{ cuda_version }}
- libcurl ${{ libcurl_version }}
- libcurl ==${{ libcurl_version }}
- if: should_use_cufile
then:
- libcufile-dev
Expand All @@ -91,7 +91,7 @@ outputs:
- ${{ compiler("c") }}
host:
- cuda-version =${{ cuda_version }}
- libcurl ${{ libcurl_version }}
- libcurl ==${{ libcurl_version }}
run:
- if: x86_64
then:
Expand All @@ -108,7 +108,6 @@ outputs:
ignore_run_exports:
by_name:
- cuda-version
- libcurl
- if: should_use_cufile
then:
- libcufile
Expand Down Expand Up @@ -138,6 +137,7 @@ outputs:
- ${{ pin_subpackage("libkvikio", exact=True) }}
- cuda-version =${{ cuda_version }}
- cuda-cudart-dev
- libcurl ==${{ libcurl_version }}
- if: should_use_cufile
then:
- libcufile-dev
Expand All @@ -156,7 +156,6 @@ outputs:
by_name:
- cuda-cudart
- cuda-version
- libcurl
- libnuma
- if: should_use_cufile
then:
Expand Down
4 changes: 3 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ set(SOURCES
)

if(KvikIO_REMOTE_SUPPORT)
list(APPEND SOURCES "src/remote_handle.cpp" "src/shim/libcurl.cpp")
list(APPEND SOURCES "src/hdfs.cpp" "src/remote_handle.cpp" "src/detail/remote_handle.cpp"
"src/shim/libcurl.cpp"
)
endif()

add_library(kvikio ${SOURCES})
Expand Down
34 changes: 34 additions & 0 deletions cpp/include/kvikio/detail/remote_handle.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cstring>

namespace kvikio::detail {
/**
* @brief Callback for `CURLOPT_WRITEFUNCTION` that copies received data into a `std::string`.
*
* @param data Received data
* @param size Curl internal implementation always sets this parameter to 1
* @param num_bytes Number of bytes received
* @param userdata Must be cast from `std::string*`
* @return The number of bytes consumed by the callback
*/
std::size_t callback_get_string_response(char* data,
std::size_t size,
std::size_t num_bytes,
void* userdata);
} // namespace kvikio::detail
62 changes: 62 additions & 0 deletions cpp/include/kvikio/hdfs.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <optional>

#include <kvikio/remote_handle.hpp>

namespace kvikio {

/**
* @brief A remote endpoint for Apache Hadoop WebHDFS.
*
* If KvikIO is run within a Docker, the argument `--network host` needs to be passed to the `docker
* run` command.
*/
class WebHdfsEndpoint : public RemoteEndpoint {
private:
std::string _url;
std::optional<std::string> _username;

public:
/**
* @brief Create an WebHDFS endpoint from a url.
*
* @param url The WebHDFS HTTP/HTTPS url to the remote file.
*/
explicit WebHdfsEndpoint(std::string url);

/**
* @brief Create an WebHDFS endpoint from the host, port, file path and optionally username.
*
* @param host Host
* @param port Port
* @param remote_file_path Remote file path
* @param username User name
*/
explicit WebHdfsEndpoint(std::string host,
std::string port,
std::string remote_file_path,
std::optional<std::string> username = std::nullopt);

~WebHdfsEndpoint() override = default;
void setopt(CurlHandle& curl) override;
std::string str() const override;
std::size_t get_file_size() override;
void setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) override;
};
} // namespace kvikio
9 changes: 9 additions & 0 deletions cpp/include/kvikio/remote_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ class RemoteEndpoint {
* @return The file size
*/
virtual std::size_t get_file_size() = 0;

/**
* @brief Set up the range request in order to read part of a file given the file offset and read
* size.
*/
virtual void setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) = 0;
};

/**
Expand All @@ -89,6 +95,7 @@ class HttpEndpoint : public RemoteEndpoint {
void setopt(CurlHandle& curl) override;
std::string str() const override;
std::size_t get_file_size() override;
void setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) override;
};

/**
Expand Down Expand Up @@ -198,6 +205,7 @@ class S3Endpoint : public RemoteEndpoint {
void setopt(CurlHandle& curl) override;
std::string str() const override;
std::size_t get_file_size() override;
void setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) override;
};

/**
Expand All @@ -215,6 +223,7 @@ class S3EndpointWithPresignedUrl : public RemoteEndpoint {
void setopt(CurlHandle& curl) override;
std::string str() const override;
std::size_t get_file_size() override;
void setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) override;
};

/**
Expand Down
32 changes: 32 additions & 0 deletions cpp/src/detail/remote_handle.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <string>

#include <kvikio/detail/remote_handle.hpp>

namespace kvikio::detail {
std::size_t callback_get_string_response(char* data,
std::size_t size,
std::size_t num_bytes,
void* userdata)
{
auto new_data_size = size * num_bytes;
auto* response = reinterpret_cast<std::string*>(userdata);
response->append(data, new_data_size);
return new_data_size;
}
} // namespace kvikio::detail
131 changes: 131 additions & 0 deletions cpp/src/hdfs.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <regex>

#include <kvikio/detail/remote_handle.hpp>
#include <kvikio/error.hpp>
#include <kvikio/hdfs.hpp>
#include <kvikio/nvtx.hpp>
#include <kvikio/remote_handle.hpp>
#include <kvikio/shim/libcurl.hpp>

namespace kvikio {

WebHdfsEndpoint::WebHdfsEndpoint(std::string url)
{
// todo: Use libcurl URL API for more secure and idiomatic parsing.
// Split the URL into two parts: one without query and one with.
std::regex const pattern{R"(^([^?]+)\?([^#]*))"};
// Regex meaning:
// ^: From the start of the line
// [^?]+: Matches non-question-mark characters one or more times. The question mark ushers in the
// URL query component.
// \?: Matches the question mark, which needs to be escaped.
// [^#]*: Matches the non-pound characters zero or more times. The pound sign ushers in the URL
// fragment component. It is very likely that this part does not exist.
std::smatch match_results;
bool found = std::regex_search(url, match_results, pattern);
// If the match is not found, the URL contains no query.
if (!found) {
_url = url;
return;
}

_url = match_results[1].str();
auto query = match_results[2].str();

{
// Extract user name if provided. In WebHDFS, user name is specified as the key=value pair in
// the query
std::regex const pattern{R"(user.name=([^&]+))"};
// Regex meaning:
// [^&]+: Matches the non-ampersand character one or more times. The ampersand delimits
// different parameters.
std::smatch match_results;
if (std::regex_search(query, match_results, pattern)) { _username = match_results[1].str(); }
}
}

WebHdfsEndpoint::WebHdfsEndpoint(std::string host,
std::string port,
std::string file_path,
std::optional<std::string> username)
: _username{std::move(username)}
{
std::stringstream ss;
ss << "http://" << host << ":" << port << "/webhdfs/v1" << file_path;
_url = ss.str();
}

std::string WebHdfsEndpoint::str() const { return _url; }

void WebHdfsEndpoint::setopt(CurlHandle& curl)
{
KVIKIO_NVTX_FUNC_RANGE();
curl.setopt(CURLOPT_URL, _url.c_str());
curl.setopt(CURLOPT_FOLLOWLOCATION, 1L);
}

std::size_t WebHdfsEndpoint::get_file_size()
{
KVIKIO_NVTX_FUNC_RANGE();

std::stringstream ss;
ss << _url << "?";
if (_username.has_value()) { ss << "user.name=" << _username.value() << "&"; }
ss << "op=GETFILESTATUS";

auto curl = create_curl_handle();
curl.setopt(CURLOPT_URL, ss.str().c_str());
curl.setopt(CURLOPT_FOLLOWLOCATION, 1L);

std::string response;
curl.setopt(CURLOPT_WRITEDATA, static_cast<void*>(&response));
curl.setopt(CURLOPT_WRITEFUNCTION, detail::callback_get_string_response);

curl.perform();

long http_status_code{};
curl.getinfo(CURLINFO_RESPONSE_CODE, &http_status_code);
KVIKIO_EXPECT(http_status_code == 200, "HTTP response is not successful.");

// The response is in JSON format. The file size is given by `"length":<file_size>`.
std::regex const pattern{R"("length"\s*:\s*(\d+)[^\d])"};
// Regex meaning:
// \s*: Matches the space character zero or more times.
// \d+: Matches the digit one or more times.
// [^\d]: Matches a non-digit character.
std::smatch match_results;
bool found = std::regex_search(response, match_results, pattern);
KVIKIO_EXPECT(
found, "Regular expression search failed. Cannot extract file length from the JSON response.");
return std::stoull(match_results[1].str());
}

void WebHdfsEndpoint::setup_range_request(CurlHandle& curl,
std::size_t file_offset,
std::size_t size)
{
// WebHDFS does not support CURLOPT_RANGE. The range is specified as query parameters in the URL.
KVIKIO_NVTX_FUNC_RANGE();
std::stringstream ss;
ss << _url << "?";
if (_username.has_value()) { ss << "user.name=" << _username.value() << "&"; }
ss << "op=OPEN&offset=" << file_offset << "&length=" << size;
curl.setopt(CURLOPT_URL, ss.str().c_str());
}
} // namespace kvikio
Loading