expose TransferQueueClient#3
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR exposes TransferQueueClient classes by adding a complete client implementation for the transfer queue system. The primary purpose is to provide both synchronous and asynchronous client interfaces for interacting with transfer queue controllers and storage units via ZMQ messaging.
Key changes:
- Adds comprehensive client implementation with async operations (get_meta, put, get_data, clear)
- Provides synchronous wrapper for easier integration
- Implements dynamic socket management with automatic connection handling
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| verl/experimental/transfer_queue/client.py | New file containing AsyncTransferQueueClient and TransferQueueClient classes with ZMQ-based communication |
| verl/experimental/transfer_queue/storage.py | Copyright year update from 2024 to 2025 |
| verl/experimental/transfer_queue/init.py | Copyright year update from 2024 to 2025 |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| logger.info(f"[{self.client_id}]: get data response from storage unit {target_storage}: {response_msg}") | ||
|
|
||
| if response_msg.request_type == ZMQRequestType.GET_DATA_RESPONSE: | ||
| # 返回该存储单元的数据和索引信息 |
There was a problem hiding this comment.
Comment contains Chinese text. Should be translated to English: '# Return data and index information from this storage unit'
| # 返回该存储单元的数据和索引信息 | |
| # Return data and index information from this storage unit |
|
|
||
| for info in server_infos.values(): | ||
| if not isinstance(info, ZMQServerInfo): | ||
| raise ValueError(f"Invalid server info for {role} {id}") |
There was a problem hiding this comment.
Variable 'id' is undefined in this scope. Should use 'info.id' instead.
| raise ValueError(f"Invalid server info for {role} {id}") | |
| raise ValueError(f"Invalid server info for {role} {info.id}") |
| return result | ||
|
|
||
|
|
||
| def process_zmq_server_info(handlers: dict[Any, Union[TransferQueueController, TransferQueueStorageSimpleUnit]]): # noqa: UP007 |
There was a problem hiding this comment.
[nitpick] The noqa comment 'UP007' suggests this is suppressing a ruff rule about Union syntax. Consider using the modern pipe syntax 'TransferQueueController | TransferQueueStorageSimpleUnit' instead of Union, or remove the noqa if the suppression is no longer needed.
| def process_zmq_server_info(handlers: dict[Any, Union[TransferQueueController, TransferQueueStorageSimpleUnit]]): # noqa: UP007 | |
| def process_zmq_server_info(handlers: dict[Any, TransferQueueController | TransferQueueStorageSimpleUnit]): |
…oller * Support storage unit in TransferQueue * Fix importance error * Support controller in TransferQueue (#2) * Support controller in TransferQueue * Fix import * Fix comments --------- Co-authored-by: liuximeng <13073314+liuximeng18772102439@user.noreply.gitee.com> * expose TransferQueueClient (#3) * Add copyright and license information Added copyright and licensing information to the controller.py file. * update client docstring (#5) Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> * merge TransferQueue utils (#4) * [fix] Fix n_sample related problems (#8) * update client docstring Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> * fix n_sample related problems Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> --------- Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> * expose TransferQueue client/controller UT (#6) * Add metadata.py and test_simple_storage_unit.py (#9) * Add metadata.py and test_simple_storage_unit.py * Add copyright and license information to test_simple_storage_unit.py * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Han Zhenyu 韩振宇 <o0shower0o@outlook.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Add reorder function to BatchMeta (#13) Co-authored-by: liuximeng <13073314+liuximeng18772102439@user.noreply.gitee.com> * [recipe, data] feat: TransferQueue - Support managing multiple data partitions for Train/Val/Test in controller (#45) Co-authored-by: liuximeng <13073314+liuximeng18772102439@user.noreply.gitee.com> * delete TQ source codes Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> * update docs Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> * update performance Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> * fix Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> --------- Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> Co-authored-by: FightingZhen <295632982@qq.com> Co-authored-by: Han Zhenyu 韩振宇 <o0shower0o@outlook.com> Co-authored-by: LLLLxmmm <130739718+LLLLxmmm@users.noreply.github.com> Co-authored-by: liuximeng <13073314+liuximeng18772102439@user.noreply.gitee.com> Co-authored-by: Han Zhenyu 韩振宇 <hanzy19@tsinghua.org.cn> Co-authored-by: zhabuye <74179177+zhabuye@users.noreply.github.com> Co-authored-by: Jianjun Zhong <87791082+jianjunzhong@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
What does this PR do?
As per title.
Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includefsdp,megatron,sglang,vllm,rollout,trainer,ci,training_utils,recipe,hardware,deployment,ray,worker,single_controller,misc,perf,model,algo,env,tool,ckpt,doc,data,like[megatron, fsdp, doc]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batchingTest
API and Usage Example
# Add code snippet or script demonstrating how to use thisDesign & Code Changes
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)