Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ def put(
payload = self.serialize_obj(data)
size = len(payload)

# Currently, we always use SHM.
if True:
if size > self.threshold:
Copy link
Copy Markdown
Contributor

@natureofnature natureofnature Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On sender side, if the size <= self.threshold, we need to update metadata = {"inline_bytes": payload, "size": size}, but on the receiver side , the metadata might NOT be passes to the get function. And in this case, the receiver tries to receive data from shared memory, which does not exist.
I think you need to fix this to make sender<->receiver consistent in both with/without meta path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably just drop the without metadata path. Other connectors may necessitate the use of metadata and leaving the choice of whether to pass the metadata to the caller is not ideal.

Copy link
Copy Markdown
Contributor

@natureofnature natureofnature Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@R2-Y PTAL, is it possible that we remove non-metadata path for async chunk function and use meta in all model and modes?

# Use Shared Memory
lock_file = f"/dev/shm/shm_{put_key}_lockfile.lock"
with open(lock_file, "wb+") as lockf:
Expand Down Expand Up @@ -122,13 +121,21 @@ def get(
return self._get_data_with_lock(lock_file, shm_handle)

return None

shm = None
try:
shm = shm_pkg.SharedMemory(name=get_key)
if shm is None or shm.size == 0:
return None
lock_file = f"/dev/shm/shm_{get_key}_lockfile.lock"
except Exception as e:
# Probable cause: producer and consumer not on the same node
# and metadata is not propagated to consumer
logger.error(f"SharedMemoryConnector shm get failed for req {get_key}: {e}")
return None
Comment thread
NickCao marked this conversation as resolved.

try:
shm_handle = {"name": get_key, "size": shm.size}
lock_file = f"/dev/shm/shm_{shm_handle['name']}_lockfile.lock"
return self._get_data_with_lock(lock_file, shm_handle)
except Exception:
return None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pytest

from vllm_omni.distributed.omni_connectors.connectors.shm_connector import SharedMemoryConnector


@pytest.mark.parametrize(
"size, threshold",
[
(50, 100), # size < threshold, inline
(200, 100), # size > threshold, shm
],
)
def test_shared_memory_connector(size, threshold):
from_stage = "dummy_from_stage"
to_stage = "dummy_to_stage"
key = "dummy_key"
data = b" " * size

tx = SharedMemoryConnector({"shm_threshold_bytes": threshold})
rx = SharedMemoryConnector({"shm_threshold_bytes": threshold})

success, _, metadata = tx.put(
from_stage=from_stage,
to_stage=to_stage,
put_key=key,
data=data,
)
assert success

if size < threshold:
assert "inline_bytes" in metadata
else:
assert "shm" in metadata

rx_data, _ = rx.get(
from_stage=from_stage,
to_stage=to_stage,
get_key=key,
metadata=metadata,
)
assert rx_data == data