Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,11 @@ def _listen_for_requests(self):
elif data["cmd"] == "PUT":
tensor_id = data["tensor_id"]
try:
tensor = torch.empty(data["shape"],
dtype=getattr(
torch, data["dtype"]),
device=self.device)
with torch.cuda.stream(self.recv_stream):
tensor = torch.empty(data["shape"],
dtype=getattr(
torch, data["dtype"]),
device=self.device)
Comment on lines +313 to +317
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This change is a crucial fix that correctly addresses a potential race condition.

By placing the torch.empty call within the recv_stream context, you ensure that the tensor memory allocation is properly ordered with respect to the subsequent ncclRecv call, which also uses self.recv_stream.

Without this, the allocation would occur on the default CUDA stream. If the default stream and recv_stream are not synchronized, the ncclRecv operation could begin writing to the tensor's memory before the allocation is complete, leading to the data corruption and garbled output described in the pull request. This change correctly serializes these operations on the same stream, which is essential for correctness and stability under load.

self.router_socket.send_multipart(
[remote_address, b"0"])
comm, rank = self.comms[remote_address.decode()]
Expand Down