Skip to content

Support WebHDFS (2/2): Python binding#791

Merged
rapids-bot[bot] merged 43 commits intorapidsai:branch-25.10from
kingcrimsontianyu:python-web-hdfs
Aug 15, 2025
Merged

Support WebHDFS (2/2): Python binding#791
rapids-bot[bot] merged 43 commits intorapidsai:branch-25.10from
kingcrimsontianyu:python-web-hdfs

Conversation

@kingcrimsontianyu
Copy link
Contributor

@kingcrimsontianyu kingcrimsontianyu commented Aug 5, 2025

Summary

This PR adds Python binding for the WebHDFS support

Depends on PR #788

Closes #787

Python's built-in package http.server is well suited to server mocking. It enables high-level testing for the client. Closes #634 too.

@kingcrimsontianyu kingcrimsontianyu added feature request New feature or request non-breaking Introduces a non-breaking change python Affects the Python API of KvikIO DO NOT MERGE labels Aug 5, 2025
@copy-pr-bot
Copy link

copy-pr-bot bot commented Aug 5, 2025

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@kingcrimsontianyu
Copy link
Contributor Author

kingcrimsontianyu commented Aug 5, 2025

Verification

This test code gives the following profile, where tasks on different HTTP request on different threads are run concurrently.

import cupy
import numpy as np
import kvikio
import kvikio.defaults

kvikio.defaults.set("num_threads", 8)

url = "<path-to-pds-scale-100.0-partsupp.parquet>"
remote_file = kvikio.RemoteFile.open_webhdfs(url)
print("--> file size: {:}".format(remote_file.nbytes()))

buf = cupy.zeros(remote_file.nbytes(), dtype=np.uint8)
fut = remote_file.pread(buf)
read_size = fut.get()

print("--> read_size: {:}".format(read_size))
assert read_size == remote_file.nbytes()
image

@kingcrimsontianyu
Copy link
Contributor Author

kingcrimsontianyu commented Aug 14, 2025

Thanks for making me aware of the pytest-httpserver plugin. It's a great tool for server mocking. With this, #634 can be closed too.

It takes some debugging exercises to get it to work, documented as follows.

Debugging the local hang

Initially, testing KvikIO's WebHDFS Python binding using pytest-httpserver ended up with a hang. When pytest-httpserver was replaced with Python built-in http.server and the server was launched in a daemon thread, the hang still persisted. Tracing of HTTP messages showed that the hang arose from the call to curl_easy_perform() on the client side. It turns out that this was attributed to Python's GIL: curl_easy_perform() is a blocking call, and GIL prevented the daemon server thread from reacting. In the newly added unit test, concurrent.futures.ProcessPoolExecutor has been used to avoid this GIL issue.

@kingcrimsontianyu
Copy link
Contributor Author

/ok to test f74a230

@kingcrimsontianyu
Copy link
Contributor Author

/ok to test 05e31f8

@kingcrimsontianyu
Copy link
Contributor Author

/ok to test d2ca076

@kingcrimsontianyu
Copy link
Contributor Author

Where is still a hang on CI, which I cannot reproduce locally:

python/kvikio/tests/test_hdfs_io.py::TestWebHdfsOperations::test_get_file_size[] PASSED [ 61%]
python/kvikio/tests/test_hdfs_io.py::TestWebHdfsOperations::test_get_file_size[?op=OPEN] PASSED [ 61%]
python/kvikio/tests/test_hdfs_io.py::TestWebHdfsOperations::test_parallel_read --> hang!

@kingcrimsontianyu
Copy link
Contributor Author

/ok to test 8f43cbc

@kingcrimsontianyu
Copy link
Contributor Author

The hang continues to exist on CI. I think ProcessPoolExecutor is not the right solution either. Because when fut.result() is called, the main thread is block waiting, and the server is unable to respond again.

@kingcrimsontianyu
Copy link
Contributor Author

/ok to test 7c9a1ff

@kingcrimsontianyu
Copy link
Contributor Author

/ok to test 82512bd

@kingcrimsontianyu
Copy link
Contributor Author

/ok to test fe87dcb

@kingcrimsontianyu
Copy link
Contributor Author

CI hang has been fixed. Previously, pytest-httpserver was run on the main process and KvikIO on the second process. Block waiting on KvikIO's result prevents the server from responding.
The fix is to swap the server and client processes: KvikIO is now run on the main process, and the server on the second process. No block waiting is performed.
There seems no need to use pytest-httpserver any more, so I've removed the dependency. Python's built-in http.server alone is sufficient to create a simple mock server.

Copy link
Member

@madsbk madsbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Comment on lines 23 to 28
def find_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((LOCALHOST, 0))
s.listen(1)
port = s.getsockname()[1]
return port
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's unify with the fixtures in test_s3_io.py and move them to conftest.py?

@pytest.fixture(scope="session")
def endpoint_ip():
return "127.0.0.1"
@pytest.fixture(scope="session")
def endpoint_port():
# Return a free port per worker session.
sock = socket.socket()
sock.bind(("127.0.0.1", 0))
port = sock.getsockname()[1]
sock.close()
return port

Copy link
Contributor Author

@kingcrimsontianyu kingcrimsontianyu Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Good idea.

I made them free functions and put them in a new file utils.py. The reason I didn't put them in conftest.py is I'm not aware of a flexible way to change the fixture's scope on a per-file basis. In test_s3_io.py, they have a scope of session, while in test_hdfs_io.py, I prefer letting them have the function scope in case we may run tests in parallel in the future where several servers co-exist, each having to use a different port from the localhost.

@kingcrimsontianyu
Copy link
Contributor Author

/ok to test 326c482

@kingcrimsontianyu
Copy link
Contributor Author

/ok to test 51be21c

@kingcrimsontianyu
Copy link
Contributor Author

Final improvement: used the neat fixture xp defined in conftest.py to test with both the host and device buffers.

@kingcrimsontianyu
Copy link
Contributor Author

/merge

@rapids-bot rapids-bot bot merged commit 9f8e873 into rapidsai:branch-25.10 Aug 15, 2025
46 checks passed
rapids-bot bot pushed a commit that referenced this pull request Aug 25, 2025
… URL (1/2): C++ implementation (#793)

This PR adds a new remote I/O utility function `RemoteHandle::open(url)` that infers the remote endpoint type from the URL to facilitate `RemoteHandle` creation.

- Supported endpoint types include S3, S3 with presigned URL, WebHDFS, and generic HTTP/HTTPS.
- Optionally, instead of letting `open` figure it out, users can explicitly specify the endpoint type by passing an enum argument `RemoteEndpointType`.
- Optionally, users can provide an allowlist that restricts the endpoint candidates
- Optionally, users can specify the expected file size. This design is to fully support the existing constructor overload `RemoteHandle(endpoint, nbytes)`.

A byproduct of this PR is an internal utility class `UrlParser` that uses the idiomatic libcurl URL API to validate the URL against "[RFC 3986 plus](https://curl.se/docs/url-syntax.html)".

## This PR depends on
- [x] #791
- [x] #788

Authors:
  - Tianyu Liu (https://github.com/kingcrimsontianyu)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #793
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature request New feature or request non-breaking Introduces a non-breaking change python Affects the Python API of KvikIO

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA] Support for WebHDFS file read Mock remote I/O

3 participants