Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions mctp-estack/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,57 @@ impl Fragmenter {
let used = max_total - rest.len();
SendOutput::Packet(&mut out[..used])
}

pub fn fragment_vectored<'f>(
&mut self,
payload: &[&[u8]],
out: &'f mut [u8],
) -> SendOutput<'f> {
let total_payload_len =
payload.iter().fold(0, |acc, part| acc + part.len());
if total_payload_len < self.payload_used {
// Caller is passing varying payload buffers
debug!("varying payload");
return SendOutput::failure(Error::BadArgument, self);
}

// Require at least MTU buffer size, to ensure that all non-end
// fragments are the same size per the spec.
if out.len() < self.mtu {
debug!("small out buffer");
return SendOutput::failure(Error::BadArgument, self);
}

// Reserve header space, the remaining buffer keeps being
// updated in `rest`
let max_total = out.len().min(self.mtu);
let (h, mut rest) = out[..max_total].split_at_mut(MctpHeader::LEN);

// Append type byte
if self.header.som {
rest[0] = mctp::encode_type_ic(self.typ, self.ic);
rest = &mut rest[1..];
}

let remaining_payload_len = total_payload_len - self.payload_used;
let l = remaining_payload_len.min(rest.len());
let (d, rest) = rest.split_at_mut(l);
crate::util::copy_vectored(payload, self.payload_used, d);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a lot of conversion back and forth between total length/offset and slice indices/offsets.

It might simplify the code to remove total_payload_len and payload_used, and instead store current input slice index and offset in Fragmenter? get_sub_slice() wouldn't be needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was thinking something similar while implementing, but just continued bending the existing code until it worked to get a poc.

A "VectorReader" that holds the state and has all of the clean methods like read(), is_at_end(), ... could be nice.
I'll try to come up with something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worked out as I intended.
I'm happy how the Fragmenter looks now.
Maybe there are some nits how to make the reader implementation a bit cleaner, but finding the best way is hard with all the available stuff to access slices.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that looks good.

self.payload_used += l;

// Add the header
if self.payload_used == total_payload_len {
self.header.eom = true;
}
// OK unwrap: seq and tag are valid.
h.copy_from_slice(&self.header.encode().unwrap());

self.header.som = false;
self.header.seq = (self.header.seq + 1) & mctp::MCTP_SEQ_MASK;

let used = max_total - rest.len();
SendOutput::Packet(&mut out[..used])
}
}

pub enum SendOutput<'p> {
Expand Down
28 changes: 3 additions & 25 deletions mctp-estack/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use core::task::{Poll, Waker};

use crate::{
config, AppCookie, Fragmenter, MctpHeader, MctpMessage, SendOutput, Stack,
MAX_MTU, MAX_PAYLOAD,
MAX_MTU,
};
use mctp::{Eid, Error, MsgIC, MsgType, Result, Tag, TagValue};

Expand Down Expand Up @@ -180,23 +180,8 @@ impl PortTop {
&self,
fragmenter: &mut Fragmenter,
pkt: &[&[u8]],
work_msg: &mut Vec<u8, MAX_PAYLOAD>,
) -> Result<Tag> {
trace!("send_message");
let payload = if pkt.len() == 1 {
// Avoid the copy when sending a single slice
pkt[0]
} else {
work_msg.clear();
for p in pkt {
work_msg.extend_from_slice(p).map_err(|_| {
debug!("Message too large");
Error::NoSpace
})?;
}
work_msg
};

// send_message() needs to wait for packets to get enqueued to the PortTop channel.
// It shouldn't hold the send_mutex() across an await, since that would block
// forward_packet().
Expand All @@ -215,7 +200,7 @@ impl PortTop {
};

qpkt.len = 0;
match fragmenter.fragment(payload, &mut qpkt.data) {
match fragmenter.fragment_vectored(pkt, &mut qpkt.data) {
SendOutput::Packet(p) => {
qpkt.len = p.len();
sender.send_done();
Expand Down Expand Up @@ -452,10 +437,6 @@ pub struct Router<'r> {
BlockingMutex<RefCell<Vec<(MsgType, AppCookie), MAX_LISTENERS>>>,

recv_wakers: WakerPool,

/// Temporary storage to flatten vectorised local sent messages
// prior to fragmentation and queueing.
work_msg: AsyncMutex<Vec<u8, MAX_PAYLOAD>>,
}

pub struct RouterInner<'r> {
Expand Down Expand Up @@ -497,7 +478,6 @@ impl<'r> Router<'r> {
app_listeners,
ports: Vec::new(),
recv_wakers: Default::default(),
work_msg: AsyncMutex::new(Vec::new()),
}
}

Expand Down Expand Up @@ -776,9 +756,7 @@ impl<'r> Router<'r> {
// release to allow other ports to continue work
drop(inner);

// lock the shared work buffer against other app_send_message()
let mut work_msg = self.work_msg.lock().await;
top.send_message(&mut fragmenter, buf, &mut work_msg).await
top.send_message(&mut fragmenter, buf).await
}

/// Create a `AsyncReqChannel` instance.
Expand Down
94 changes: 94 additions & 0 deletions mctp-estack/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,97 @@ macro_rules! get_build_var {
}
}};
}

/// Copy data from a vectored src to dest
///
/// Copies `dest.len()` bytes from payload to dest,
/// starting after `offset` bytes.
///
/// ## Panics
///
/// This function will panic when not enough bytes are available to fill dest.
/// Total size of `payload` has to be `atleast dest.len()` + `offset`.
pub fn copy_vectored(src: &[&[u8]], offset: usize, dest: &mut [u8]) {
let mut i = 0;

while i < dest.len() {
let payload_index = i + offset;
let next = get_sub_slice(src, payload_index);
let remaining = dest.len() - i;
if remaining > next.len() {
dest[i..(i + next.len())].copy_from_slice(next);
i += next.len();
} else {
dest[i..].copy_from_slice(&next[..remaining]);
return;
}
}
}

/// Get a slice of `vector` indexed by `offset`
///
/// The `offset` is the absolute byte index.
/// The returned slice is the remaining sub slice starting at `offset`.
///
/// ## Panics
///
/// Will panic when offset is larger than the size of `vector`.
///
/// ## Example
/// ```ignore
/// # use mctp_estack::fragment::get_slice;
/// let vector: &[&[u8]] = &[&[1, 2, 3], &[4, 5, 6]];
///
/// let slice = get_slice(vector, 4);
///
/// assert_eq!(slice, &[5, 6]);
/// ```
pub fn get_sub_slice<'a>(vector: &'a [&[u8]], offset: usize) -> &'a [u8] {
let mut i = offset;
for slice in vector {
if i >= slice.len() {
i -= slice.len();
} else {
return &slice[i..];
}
}
panic!("offset for vector out of bounds");
}

#[cfg(test)]
mod tests {
#[test]
fn test_get_slice() {
use super::get_sub_slice;
let vector: &[&[u8]] = &[&[1, 2, 3], &[4, 5, 6], &[7, 8, 9]];
let slice = get_sub_slice(vector, 4);
assert_eq!(slice, &[5, 6]);
let slice = get_sub_slice(vector, 0);
assert_eq!(slice, &[1, 2, 3]);
let slice = get_sub_slice(vector, 3);
assert_eq!(slice, &[4, 5, 6]);
}
#[test]
fn test_copy_vectored() {
use super::copy_vectored;
let vector: &[&[u8]] = &[&[1, 2, 3], &[4, 5], &[6, 7, 8, 9]];

let mut dest = [0; 6];
copy_vectored(vector, 1, &mut dest);
assert_eq!(&dest, &[2, 3, 4, 5, 6, 7]);

let mut dest = [0; 5];
copy_vectored(vector, 4, &mut dest);
assert_eq!(&dest, &[5, 6, 7, 8, 9]);

let mut dest = [0; 9];
copy_vectored(vector, 0, &mut dest);
assert_eq!(&dest, &[1, 2, 3, 4, 5, 6, 7, 8, 9]);

let vector: &[&[u8]] = &[&[1, 2, 3]];

let mut dest = [0; 1];
copy_vectored(vector, 2, &mut dest);
assert_eq!(&dest, &[3]);
}
}