Skip to content

Commit c857274

Browse files
authored
removes redundant vector allocations before calling sendmmsg::batch_send (anza-xyz#4129)
streamer::sendmmsg::batch_send only requires an ExactSizeIterator: https://github.com/anza-xyz/agave/blob/566bb9565/streamer/src/sendmmsg.rs#L203-L204 https://github.com/anza-xyz/agave/blob/566bb9565/streamer/src/sendmmsg.rs#L166-L175 Collecting an iterator into a vector before calling batch_send is unnecessary and only adds overhead. In particular multi_target_send used in retransmitting shreds can be used without doing an additional vector allocation: https://github.com/anza-xyz/agave/blob/566bb9565/streamer/src/sendmmsg.rs#L219
1 parent 9ba0b93 commit c857274

File tree

9 files changed

+62
-56
lines changed

9 files changed

+62
-56
lines changed

core/src/banking_stage/forwarder.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use {
2323
solana_sdk::{pubkey::Pubkey, transport::TransportError},
2424
solana_streamer::sendmmsg::batch_send,
2525
std::{
26-
iter::repeat,
2726
net::{SocketAddr, UdpSocket},
2827
sync::{atomic::Ordering, Arc, RwLock},
2928
},
@@ -281,8 +280,8 @@ impl<T: LikeClusterInfo> Forwarder<T> {
281280
match forward_option {
282281
ForwardOption::ForwardTpuVote => {
283282
// The vote must be forwarded using only UDP.
284-
let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(*addr)).collect();
285-
batch_send(&self.socket, &pkts).map_err(|err| err.into())
283+
let pkts = packet_vec.iter().map(|pkt| (pkt, addr));
284+
batch_send(&self.socket, pkts).map_err(TransportError::from)
286285
}
287286
ForwardOption::ForwardTransaction => {
288287
let conn = self.connection_cache.get_connection(addr);

core/src/forwarding_stage.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ impl VoteClient {
7272
}
7373

7474
fn send_batch(&self, batch: &mut Vec<(Vec<u8>, SocketAddr)>) {
75-
let _res = batch_send(&self.udp_socket, batch);
75+
let pkts = batch.iter().map(|(bytes, addr)| (bytes, addr));
76+
let _res = batch_send(&self.udp_socket, pkts);
7677
batch.clear();
7778
}
7879
}

core/src/repair/repair_service.rs

+6-8
Original file line numberDiff line numberDiff line change
@@ -646,15 +646,13 @@ impl RepairService {
646646

647647
let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed");
648648
if !batch.is_empty() {
649-
match batch_send(repair_socket, &batch) {
649+
let num_pkts = batch.len();
650+
let batch = batch.iter().map(|(bytes, addr)| (bytes, addr));
651+
match batch_send(repair_socket, batch) {
650652
Ok(()) => (),
651653
Err(SendPktsError::IoError(err, num_failed)) => {
652654
error!(
653-
"{} batch_send failed to send {}/{} packets first error {:?}",
654-
id,
655-
num_failed,
656-
batch.len(),
657-
err
655+
"{id} batch_send failed to send {num_failed}/{num_pkts} packets first error {err:?}"
658656
);
659657
}
660658
}
@@ -954,10 +952,10 @@ impl RepairService {
954952
ServeRepair::repair_proto_to_bytes(&request_proto, &identity_keypair).unwrap();
955953

956954
// Prepare packet batch to send
957-
let reqs = [(packet_buf, address)];
955+
let reqs = [(&packet_buf, address)];
958956

959957
// Send packet batch
960-
match batch_send(repair_socket, &reqs[..]) {
958+
match batch_send(repair_socket, reqs) {
961959
Ok(()) => {
962960
debug!("successfully sent repair request to {pubkey} / {address}!");
963961
}

core/src/repair/serve_repair.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -1249,14 +1249,13 @@ impl ServeRepair {
12491249
}
12501250
}
12511251
if !pending_pongs.is_empty() {
1252-
match batch_send(repair_socket, &pending_pongs) {
1252+
let num_pkts = pending_pongs.len();
1253+
let pending_pongs = pending_pongs.iter().map(|(bytes, addr)| (bytes, addr));
1254+
match batch_send(repair_socket, pending_pongs) {
12531255
Ok(()) => (),
12541256
Err(SendPktsError::IoError(err, num_failed)) => {
12551257
warn!(
1256-
"batch_send failed to send {}/{} packets. First error: {:?}",
1257-
num_failed,
1258-
pending_pongs.len(),
1259-
err
1258+
"batch_send failed to send {num_failed}/{num_pkts} packets. First error: {err:?}"
12601259
);
12611260
}
12621261
}

streamer/src/sendmmsg.rs

+37-19
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use {
1616
std::{
1717
borrow::Borrow,
1818
io,
19-
iter::repeat,
2019
net::{SocketAddr, UdpSocket},
2120
},
2221
thiserror::Error,
@@ -35,11 +34,15 @@ impl From<SendPktsError> for TransportError {
3534
}
3635
}
3736

37+
// The type and lifetime constraints are overspecified to match 'linux' code.
3838
#[cfg(not(target_os = "linux"))]
39-
pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
39+
pub fn batch_send<'a, S, T: 'a + ?Sized>(
40+
sock: &UdpSocket,
41+
packets: impl IntoIterator<Item = (&'a T, S), IntoIter: ExactSizeIterator>,
42+
) -> Result<(), SendPktsError>
4043
where
4144
S: Borrow<SocketAddr>,
42-
T: AsRef<[u8]>,
45+
&'a T: AsRef<[u8]>,
4346
{
4447
let mut num_failed = 0;
4548
let mut erropt = None;
@@ -158,12 +161,17 @@ fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut [mmsghdr]) -> Result<(), SendPkts
158161
const MAX_IOV: usize = libc::UIO_MAXIOV as usize;
159162

160163
#[cfg(target_os = "linux")]
161-
pub fn batch_send_max_iov<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
164+
fn batch_send_max_iov<'a, S, T: 'a + ?Sized>(
165+
sock: &UdpSocket,
166+
packets: impl IntoIterator<Item = (&'a T, S), IntoIter: ExactSizeIterator>,
167+
) -> Result<(), SendPktsError>
162168
where
163169
S: Borrow<SocketAddr>,
164-
T: AsRef<[u8]>,
170+
&'a T: AsRef<[u8]>,
165171
{
166-
assert!(packets.len() <= MAX_IOV);
172+
let packets = packets.into_iter();
173+
let num_packets = packets.len();
174+
debug_assert!(num_packets <= MAX_IOV);
167175

168176
let mut iovs = [MaybeUninit::uninit(); MAX_IOV];
169177
let mut addrs = [MaybeUninit::uninit(); MAX_IOV];
@@ -177,13 +185,13 @@ where
177185
// SAFETY: The first `packets.len()` elements of `hdrs`, `iovs`, and `addrs` are
178186
// guaranteed to be initialized by `mmsghdr_for_packet` before this loop.
179187
let hdrs_slice =
180-
unsafe { std::slice::from_raw_parts_mut(hdrs.as_mut_ptr() as *mut mmsghdr, packets.len()) };
188+
unsafe { std::slice::from_raw_parts_mut(hdrs.as_mut_ptr() as *mut mmsghdr, num_packets) };
181189

182190
let result = sendmmsg_retry(sock, hdrs_slice);
183191

184192
// SAFETY: The first `packets.len()` elements of `hdrs`, `iovs`, and `addrs` are
185193
// guaranteed to be initialized by `mmsghdr_for_packet` before this loop.
186-
for (hdr, iov, addr) in izip!(&mut hdrs, &mut iovs, &mut addrs).take(packets.len()) {
194+
for (hdr, iov, addr) in izip!(&mut hdrs, &mut iovs, &mut addrs).take(num_packets) {
187195
unsafe {
188196
hdr.assume_init_drop();
189197
iov.assume_init_drop();
@@ -194,13 +202,23 @@ where
194202
result
195203
}
196204

205+
// Need &'a to ensure that raw packet pointers obtained in mmsghdr_for_packet
206+
// stay valid.
197207
#[cfg(target_os = "linux")]
198-
pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
208+
pub fn batch_send<'a, S, T: 'a + ?Sized>(
209+
sock: &UdpSocket,
210+
packets: impl IntoIterator<Item = (&'a T, S), IntoIter: ExactSizeIterator>,
211+
) -> Result<(), SendPktsError>
199212
where
200213
S: Borrow<SocketAddr>,
201-
T: AsRef<[u8]>,
214+
&'a T: AsRef<[u8]>,
202215
{
203-
for chunk in packets.chunks(MAX_IOV) {
216+
let mut packets = packets.into_iter();
217+
loop {
218+
let chunk = packets.by_ref().take(MAX_IOV);
219+
if chunk.len() == 0 {
220+
break;
221+
}
204222
batch_send_max_iov(sock, chunk)?;
205223
}
206224
Ok(())
@@ -216,8 +234,8 @@ where
216234
T: AsRef<[u8]>,
217235
{
218236
let dests = dests.iter().map(Borrow::borrow);
219-
let pkts: Vec<_> = repeat(&packet).zip(dests).collect();
220-
batch_send(sock, &pkts)
237+
let pkts = dests.map(|addr| (&packet, addr));
238+
batch_send(sock, pkts)
221239
}
222240

223241
#[cfg(test)]
@@ -246,7 +264,7 @@ mod tests {
246264
let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
247265
let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect();
248266

249-
let sent = batch_send(&sender, &packet_refs[..]).ok();
267+
let sent = batch_send(&sender, packet_refs).ok();
250268
assert_eq!(sent, Some(()));
251269

252270
let mut packets = vec![Packet::default(); 32];
@@ -277,7 +295,7 @@ mod tests {
277295
})
278296
.collect();
279297

280-
let sent = batch_send(&sender, &packet_refs[..]).ok();
298+
let sent = batch_send(&sender, packet_refs).ok();
281299
assert_eq!(sent, Some(()));
282300

283301
let mut packets = vec![Packet::default(); 32];
@@ -345,7 +363,7 @@ mod tests {
345363
let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4];
346364

347365
let sender = bind_to_unspecified().expect("bind");
348-
let res = batch_send(&sender, &packet_refs[..]);
366+
let res = batch_send(&sender, packet_refs);
349367
assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1)));
350368
let res = multi_target_send(&sender, &packets[0], &dest_refs);
351369
assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1)));
@@ -366,7 +384,7 @@ mod tests {
366384
(&packets[3][..], &ipv4broadcast),
367385
(&packets[4][..], &ipv4local),
368386
];
369-
match batch_send(&sender, &packet_refs[..]) {
387+
match batch_send(&sender, packet_refs) {
370388
Ok(()) => panic!(),
371389
Err(SendPktsError::IoError(ioerror, num_failed)) => {
372390
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
@@ -382,7 +400,7 @@ mod tests {
382400
(&packets[3][..], &ipv4local),
383401
(&packets[4][..], &ipv4broadcast),
384402
];
385-
match batch_send(&sender, &packet_refs[..]) {
403+
match batch_send(&sender, packet_refs) {
386404
Ok(()) => panic!(),
387405
Err(SendPktsError::IoError(ioerror, num_failed)) => {
388406
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
@@ -398,7 +416,7 @@ mod tests {
398416
(&packets[3][..], &ipv4broadcast),
399417
(&packets[4][..], &ipv4local),
400418
];
401-
match batch_send(&sender, &packet_refs[..]) {
419+
match batch_send(&sender, packet_refs) {
402420
Ok(()) => panic!(),
403421
Err(SendPktsError::IoError(ioerror, num_failed)) => {
404422
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);

streamer/src/streamer.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ fn recv_send(
362362
let data = pkt.data(..)?;
363363
socket_addr_space.check(&addr).then_some((data, addr))
364364
});
365-
batch_send(sock, &packets.collect::<Vec<_>>())?;
365+
batch_send(sock, packets.collect::<Vec<_>>())?;
366366
Ok(())
367367
}
368368

turbine/src/broadcast_stage.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,9 @@ pub fn broadcast_shreds(
477477
shred_select.stop();
478478
transmit_stats.shred_select += shred_select.as_us();
479479

480+
let num_udp_packets = packets.len();
480481
let mut send_mmsg_time = Measure::start("send_mmsg");
481-
match batch_send(s, &packets[..]) {
482+
match batch_send(s, packets) {
482483
Ok(()) => (),
483484
Err(SendPktsError::IoError(ioerr, num_failed)) => {
484485
transmit_stats.dropped_packets_udp += num_failed;
@@ -487,7 +488,7 @@ pub fn broadcast_shreds(
487488
}
488489
send_mmsg_time.stop();
489490
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
490-
transmit_stats.total_packets += packets.len() + quic_packets.len();
491+
transmit_stats.total_packets += num_udp_packets + quic_packets.len();
491492
for (shred, addr) in quic_packets {
492493
let shred = Bytes::from(shred::Payload::unwrap_or_clone(shred.clone()));
493494
if let Err(err) = quic_endpoint_sender.blocking_send((addr, shred)) {

turbine/src/broadcast_stage/broadcast_duplicates_run.rs

+1-7
Original file line numberDiff line numberDiff line change
@@ -392,13 +392,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
392392
.flatten()
393393
.collect();
394394

395-
match batch_send(sock, &packets) {
396-
Ok(()) => (),
397-
Err(SendPktsError::IoError(ioerr, _)) => {
398-
return Err(Error::Io(ioerr));
399-
}
400-
}
401-
Ok(())
395+
batch_send(sock, packets).map_err(|SendPktsError::IoError(err, _)| Error::Io(err))
402396
}
403397

404398
fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> {

udp-client/src/udp_client.rs

+6-10
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
//! an interface for sending data
33
44
use {
5-
core::iter::repeat,
65
solana_connection_cache::client_connection::ClientConnection,
76
solana_streamer::sendmmsg::batch_send,
87
solana_transaction_error::TransportResult,
@@ -37,18 +36,15 @@ impl ClientConnection for UdpClientConnection {
3736
}
3837

3938
fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
40-
let pkts: Vec<_> = buffers.iter().zip(repeat(self.server_addr())).collect();
41-
batch_send(&self.socket, &pkts)?;
42-
Ok(())
39+
let addr = self.server_addr();
40+
let pkts = buffers.iter().map(|bytes| (bytes, addr));
41+
Ok(batch_send(&self.socket, pkts)?)
4342
}
4443

4544
fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
46-
let pkts: Vec<_> = buffers
47-
.into_iter()
48-
.zip(repeat(self.server_addr()))
49-
.collect();
50-
batch_send(&self.socket, &pkts)?;
51-
Ok(())
45+
let addr = self.server_addr();
46+
let pkts = buffers.iter().map(|bytes| (bytes, addr));
47+
Ok(batch_send(&self.socket, pkts)?)
5248
}
5349

5450
fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {

0 commit comments

Comments
 (0)