Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 24 additions & 57 deletions monarch_rdma/src/rdma_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,31 +130,15 @@ impl RdmaBuffer {
remote.owner.actor_id(),
remote,
);
let remote_owner = remote.owner.clone();

let local_device = self.device_name.clone();
let remote_device = remote.device_name.clone();
let mut qp = self
.owner
.request_queue_pair_deprecated(
self.owner
.read_into(
client,
remote_owner.clone(),
local_device.clone(),
remote_device.clone(),
self.clone(),
remote,
tokio::time::Duration::from_secs(timeout),
)
.await?;

qp.put(self.clone(), remote)?;
let result = self
.wait_for_completion(&mut qp, PollTarget::Send, timeout)
.await;

// Release the queue pair back to the actor
self.owner
.release_queue_pair_deprecated(client, remote_owner, local_device, remote_device, qp)
.await?;

result
Ok(true)
}

/// Write from the provided memory into the RdmaBuffer.
Expand Down Expand Up @@ -182,32 +166,14 @@ impl RdmaBuffer {
remote.owner.actor_id(),
remote,
);
let remote_owner = remote.owner.clone(); // Clone before the move!

// Extract device name from buffer, fallback to a default if not present
let local_device = self.device_name.clone();
let remote_device = remote.device_name.clone();

let mut qp = self
.owner
.request_queue_pair_deprecated(
self.owner
.write_from(
client,
remote_owner.clone(),
local_device.clone(),
remote_device.clone(),
self.clone(),
remote,
tokio::time::Duration::from_secs(timeout),
)
.await?;
qp.get(self.clone(), remote)?;
let result = self
.wait_for_completion(&mut qp, PollTarget::Send, timeout)
.await;

// Release the queue pair back to the actor
self.owner
.release_queue_pair_deprecated(client, remote_owner, local_device, remote_device, qp)
.await?;

result?;
Ok(true)
}
/// Waits for the completion of an RDMA operation.
Expand All @@ -217,21 +183,19 @@ impl RdmaBuffer {
///
/// # Arguments
/// * `qp` - The RDMA Queue Pair to poll for completion
/// * `timeout` - Timeout in seconds for the RDMA operation to complete.
/// * `timeout` - Timeout for the RDMA operation to complete.
///
/// # Returns
/// `Ok(true)` if the operation completes successfully within the timeout,
/// or an error if the timeout is reached
async fn wait_for_completion(
pub async fn wait_for_completion(
&self,
qp: &mut RdmaQueuePair,
poll_target: PollTarget,
timeout: u64,
timeout: tokio::time::Duration,
) -> Result<bool, anyhow::Error> {
let timeout = Duration::from_secs(timeout);
let start_time = std::time::Instant::now();

while start_time.elapsed() < timeout {
RealClock.timeout(timeout, async {
loop {
match qp.poll_completion_target(poll_target) {
Ok(Some(_wc)) => {
tracing::debug!("work completed");
Expand All @@ -252,11 +216,14 @@ impl RdmaBuffer {
}
}
}
tracing::error!("timed out while waiting on request completion");
Err(anyhow::anyhow!(
"[buffer({:?})] rdma operation did not complete in time",
self
))
}).await.map_err(|_| {
tracing::error!("timed out while waiting on request completion");
anyhow::anyhow!(
"[buffer({:?})] rdma operation did not complete in time (timeout={:?})",
self,
timeout
)
})?
}

/// Drop the buffer and release remote handles.
Expand Down
Loading
Loading