Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions mctp-estack/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,113 @@ impl Fragmenter {
let used = max_total - rest.len();
SendOutput::Packet(&mut out[..used])
}

pub fn fragment_vectored<'f>(
&mut self,
payload: &[&[u8]],
out: &'f mut [u8],
) -> SendOutput<'f> {
let total_payload_len =
payload.iter().fold(0, |acc, part| acc + part.len());
if total_payload_len < self.payload_used {
// Caller is passing varying payload buffers
debug!("varying payload");
return SendOutput::failure(Error::BadArgument, self);
}

// Require at least MTU buffer size, to ensure that all non-end
// fragments are the same size per the spec.
if out.len() < self.mtu {
debug!("small out buffer");
return SendOutput::failure(Error::BadArgument, self);
}

// Reserve header space, the remaining buffer keeps being
// updated in `rest`
let max_total = out.len().min(self.mtu);
let (h, mut rest) = out[..max_total].split_at_mut(MctpHeader::LEN);

// Append type byte
if self.header.som {
rest[0] = mctp::encode_type_ic(self.typ, self.ic);
rest = &mut rest[1..];
}

let remaining_payload_len = total_payload_len - self.payload_used;
let l = remaining_payload_len.min(rest.len());
let (d, rest) = rest.split_at_mut(l);
copy_vectored(payload, self.payload_used, d);
self.payload_used += l;

// Add the header
if self.payload_used == total_payload_len {
self.header.eom = true;
}
// OK unwrap: seq and tag are valid.
h.copy_from_slice(&self.header.encode().unwrap());

self.header.som = false;
self.header.seq = (self.header.seq + 1) & mctp::MCTP_SEQ_MASK;

let used = max_total - rest.len();
SendOutput::Packet(&mut out[..used])
}
}

/// Copy data from a vectored src to dest
///
/// Copies `dest.len()` bytes from payload to dest,
/// starting after `offset` bytes.
///
/// ## Panics
///
/// This function will panic when not enough bytes are available to fill dest.
/// Total size of `payload` has to be `atleast dest.len()` + `offset`.
fn copy_vectored(src: &[&[u8]], offset: usize, dest: &mut [u8]) {
let mut i = 0;

while i < dest.len() {
let payload_index = i + offset;
let next = get_sub_slice(src, payload_index);
let remaining = dest.len() - i;
if remaining > next.len() {
dest[i..(i + next.len())].copy_from_slice(next);
i += next.len();
} else {
dest[i..].copy_from_slice(&next[..remaining]);
return;
}
}
}

/// Get a slice of `vector` indexed by `offset`
///
/// The `offset` is the absolute byte index.
/// The returned slice is the remaining sub slice starting at `offset`.
///
/// ## Panics
///
/// Will panic when offset is larger than the size of `vector`.
///
/// ## Example
/// ```ignore
/// # use mctp_estack::fragment::get_slice;
/// let vector: &[&[u8]] = &[&[1, 2, 3], &[4, 5, 6]];
///
/// let slice = get_slice(vector, 4);
///
/// assert_eq!(slice, &[5, 6]);
/// ```
fn get_sub_slice<'a>(vector: &'a [&[u8]], offset: usize) -> &'a [u8] {
let mut i = offset;
for slice in vector {
if i >= slice.len() {
i -= slice.len();
} else {
return &slice[i..];
}
}
panic!("offset for vector out of bounds");
}

pub enum SendOutput<'p> {
Expand Down Expand Up @@ -194,3 +301,41 @@ impl SendOutput<'_> {
Self::Error { err, cookie: None }
}
}

#[cfg(test)]
mod tests {
#[test]
fn test_get_slice() {
use super::get_sub_slice;
let vector: &[&[u8]] = &[&[1, 2, 3], &[4, 5, 6], &[7, 8, 9]];
let slice = get_sub_slice(vector, 4);
assert_eq!(slice, &[5, 6]);
let slice = get_sub_slice(vector, 0);
assert_eq!(slice, &[1, 2, 3]);
let slice = get_sub_slice(vector, 3);
assert_eq!(slice, &[4, 5, 6]);
}
#[test]
fn test_copy_vectored() {
use super::copy_vectored;
let vector: &[&[u8]] = &[&[1, 2, 3], &[4, 5], &[6, 7, 8, 9]];

let mut dest = [0; 6];
copy_vectored(vector, 1, &mut dest);
assert_eq!(&dest, &[2, 3, 4, 5, 6, 7]);

let mut dest = [0; 5];
copy_vectored(vector, 4, &mut dest);
assert_eq!(&dest, &[5, 6, 7, 8, 9]);

let mut dest = [0; 9];
copy_vectored(vector, 0, &mut dest);
assert_eq!(&dest, &[1, 2, 3, 4, 5, 6, 7, 8, 9]);

let vector: &[&[u8]] = &[&[1, 2, 3]];

let mut dest = [0; 1];
copy_vectored(vector, 2, &mut dest);
assert_eq!(&dest, &[3]);
}
}
28 changes: 3 additions & 25 deletions mctp-estack/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use core::task::{Poll, Waker};

use crate::{
config, AppCookie, Fragmenter, MctpHeader, MctpMessage, SendOutput, Stack,
MAX_MTU, MAX_PAYLOAD,
MAX_MTU,
};
use mctp::{Eid, Error, MsgIC, MsgType, Result, Tag, TagValue};

Expand Down Expand Up @@ -180,23 +180,8 @@ impl PortTop {
&self,
fragmenter: &mut Fragmenter,
pkt: &[&[u8]],
work_msg: &mut Vec<u8, MAX_PAYLOAD>,
) -> Result<Tag> {
trace!("send_message");
let payload = if pkt.len() == 1 {
// Avoid the copy when sending a single slice
pkt[0]
} else {
work_msg.clear();
for p in pkt {
work_msg.extend_from_slice(p).map_err(|_| {
debug!("Message too large");
Error::NoSpace
})?;
}
work_msg
};

// send_message() needs to wait for packets to get enqueued to the PortTop channel.
// It shouldn't hold the send_mutex() across an await, since that would block
// forward_packet().
Expand All @@ -215,7 +200,7 @@ impl PortTop {
};

qpkt.len = 0;
match fragmenter.fragment(payload, &mut qpkt.data) {
match fragmenter.fragment_vectored(pkt, &mut qpkt.data) {
SendOutput::Packet(p) => {
qpkt.len = p.len();
sender.send_done();
Expand Down Expand Up @@ -452,10 +437,6 @@ pub struct Router<'r> {
BlockingMutex<RefCell<Vec<(MsgType, AppCookie), MAX_LISTENERS>>>,

recv_wakers: WakerPool,

/// Temporary storage to flatten vectorised local sent messages
// prior to fragmentation and queueing.
work_msg: AsyncMutex<Vec<u8, MAX_PAYLOAD>>,
}

pub struct RouterInner<'r> {
Expand Down Expand Up @@ -497,7 +478,6 @@ impl<'r> Router<'r> {
app_listeners,
ports: Vec::new(),
recv_wakers: Default::default(),
work_msg: AsyncMutex::new(Vec::new()),
}
}

Expand Down Expand Up @@ -776,9 +756,7 @@ impl<'r> Router<'r> {
// release to allow other ports to continue work
drop(inner);

// lock the shared work buffer against other app_send_message()
let mut work_msg = self.work_msg.lock().await;
top.send_message(&mut fragmenter, buf, &mut work_msg).await
top.send_message(&mut fragmenter, buf).await
}

/// Create a `AsyncReqChannel` instance.
Expand Down