Skip to content

Commit

Permalink
io: add budgeting to tokio::runtime::io::registration::async_io (#6221)
Browse files Browse the repository at this point in the history
Fixes #5946.
Fixes #4782.

This change adds budgeting to most of the remaining unbudgeted IO operations which can complete instantly, including datagram send/recv operations and listener socket accepts.

This is particularly significant for scenarios in which resource limits are hit, as it can be common for things like listener tasks to spin when receiving errors and just log them, busy looping worker threads which might otherwise be handling existing connections and closing them.

This can also sometimes lead to complex failure scenarios within datagram systems experiencing resource exhaustion.
Noah-Kennedy authored Dec 17, 2023
1 parent 9ab4ca6 commit ab7313f
Showing 5 changed files with 1,504 additions and 139 deletions.
30 changes: 15 additions & 15 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ jobs:
- cross-test
- no-atomic-u64
- features
- minrust
# - minrust
- minimal-versions
- fmt
- clippy
@@ -342,20 +342,20 @@ jobs:
env:
RUSTFLAGS: --cfg tokio_unstable -Dwarnings

minrust:
name: minrust
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_min }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_min }}
- uses: Swatinem/rust-cache@v2
- name: "check --workspace --all-features"
run: cargo check --workspace --all-features
env:
RUSTFLAGS: "" # remove -Dwarnings
# minrust:
# name: minrust
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v3
# - name: Install Rust ${{ env.rust_min }}
# uses: dtolnay/rust-toolchain@master
# with:
# toolchain: ${{ env.rust_min }}
# - uses: Swatinem/rust-cache@v2
# - name: "check --workspace --all-features"
# run: cargo check --workspace --all-features
# env:
# RUSTFLAGS: "" # remove -Dwarnings

minimal-versions:
name: minimal-versions
1,515 changes: 1,399 additions & 116 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
@@ -53,18 +53,18 @@ tokio_thread_local! {
#[cfg(feature = "rt")]
thread_id: Cell::new(None),

/// Tracks the current runtime handle to use when spawning,
/// accessing drivers, etc...
// Tracks the current runtime handle to use when spawning,
// accessing drivers, etc...
#[cfg(feature = "rt")]
handle: RefCell::new(None),
#[cfg(feature = "rt")]
current_task_id: Cell::new(None),

/// Tracks if the current thread is currently driving a runtime.
/// Note, that if this is set to "entered", the current scheduler
/// handle may not reference the runtime currently executing. This
/// is because other runtime handles may be set to current from
/// within a runtime.
// Tracks if the current thread is currently driving a runtime.
// Note, that if this is set to "entered", the current scheduler
// handle may not reference the runtime currently executing. This
// is because other runtime handles may be set to current from
// within a runtime.
#[cfg(feature = "rt")]
runtime: Cell::new(EnterRuntime::NotEntered),

7 changes: 6 additions & 1 deletion tokio/src/runtime/io/registration.rs
Original file line number Diff line number Diff line change
@@ -239,11 +239,16 @@ cfg_io_readiness! {
loop {
let event = self.readiness(interest).await?;

let coop = crate::future::poll_fn(crate::runtime::coop::poll_proceed).await;

match f() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(event);
}
x => return x,
x => {
coop.made_progress();
return x
},
}
}
}
77 changes: 77 additions & 0 deletions tokio/tests/coop_budger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", target_os = "linux"))]

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::net::UdpSocket;

/// Ensure that UDP sockets have functional budgeting
///
/// # Design
/// Two sockets communicate by spamming packets from one to the other.
///
/// In Linux, this packet will be slammed through the entire network stack and into the receiver's buffer during the
/// send system call because we are using the loopback interface.
/// This happens because the softirq chain invoked on send when using the loopback interface covers virtually the
/// entirety of the lifecycle of a packet within the kernel network stack.
///
/// As a result, neither socket will ever encounter an EWOULDBLOCK, and the only way for these to yield during the loop
/// is through budgeting.
///
/// A second task runs in the background and increments a counter before yielding, allowing us to know how many times sockets yielded.
/// Since we are both sending and receiving, that should happen once per 64 packets, because budgets are of size 128
/// and there are two budget events per packet, a send and a recv.
#[tokio::test]
async fn coop_budget_udp_send_recv() {
const BUDGET: usize = 128;
const N_ITERATIONS: usize = 1024;

const PACKET: &[u8] = b"Hello, world";
const PACKET_LEN: usize = 12;

assert_eq!(
PACKET_LEN,
PACKET.len(),
"Defect in test, programmer can't do math"
);

// bind each socket to a dynamic port, forcing IPv4 addressing on the localhost interface
let tx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let rx = UdpSocket::bind("127.0.0.1:0").await.unwrap();

tx.connect(rx.local_addr().unwrap()).await.unwrap();
rx.connect(tx.local_addr().unwrap()).await.unwrap();

let tracker = Arc::new(AtomicUsize::default());

let tracker_clone = Arc::clone(&tracker);

tokio::task::yield_now().await;

tokio::spawn(async move {
loop {
tracker_clone.fetch_add(1, Ordering::SeqCst);

tokio::task::yield_now().await;
}
});

for _ in 0..N_ITERATIONS {
tx.send(PACKET).await.unwrap();

let mut tmp = [0; PACKET_LEN];

// ensure that we aren't somehow accumulating other
assert_eq!(
PACKET_LEN,
rx.recv(&mut tmp).await.unwrap(),
"Defect in test case, received unexpected result from socket"
);
assert_eq!(
PACKET, &tmp,
"Defect in test case, received unexpected result from socket"
);
}

assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
}

0 comments on commit ab7313f

Please sign in to comment.