Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: high tail latencies with threaded scheduler when under load #2702

Closed
carllerche opened this issue Jul 26, 2020 · 3 comments
Closed

rt: high tail latencies with threaded scheduler when under load #2702

carllerche opened this issue Jul 26, 2020 · 3 comments
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. T-performance Topic: performance and benchmarks

Comments

@carllerche
Copy link
Member

Given a TCP echo client that:

  • Opens 200~1000 connections.
  • Sends 8kb of data on each connection.
  • Reads the data back.

Running this client on the threaded runtime results in significant tail latencies.

Code

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;

#[tokio::main(core_threads=2)]
async fn main() {
    const N: usize = 200;
    const STEP: usize = N/10;
    const BUFFER: usize = 8*1024;

    let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
    let data = Arc::new(vec![10_u8; BUFFER]);

    let mut handles = Vec::with_capacity(N);
    let mut elapsed = Vec::with_capacity(N);

    for i in 0..N {
        if i % 10 == 9 {
            tokio::task::yield_now().await;
        }

        let data = data.clone();
        let now = Instant::now();

        handles.push(tokio::spawn(async move {
            let tts = now.elapsed();
            let mut buf = vec![10_u8; BUFFER];
            let mut socket = TcpStream::connect(addr).await.unwrap();

            socket.write_all(&data).await.unwrap();
            socket.read_exact(&mut buf).await.unwrap();

            assert_eq!(buf[..], data[..]);

            // now.elapsed()
            tts
        }));
    }

    for handle in handles.drain(..) {
        elapsed.push(handle.await.unwrap());
    }

    elapsed.sort();

    let mut i = STEP;

    while i <= N {
        println!("{}th = {:?}; tts", i, elapsed[i-1]);
        i += STEP;
    }
}

Output

Running this against the echo example results in the following output:

20th = 241.19µs; tts
40th = 476.356µs; tts
60th = 697.187µs; tts
80th = 23.421715ms; tts
100th = 23.651342ms; tts
120th = 23.894434ms; tts
140th = 48.230702ms; tts
160th = 49.305577ms; tts
180th = 50.433882ms; tts
200th = 51.743111ms; tts

Compare this with basic_scheduler

20th = 112.65µs; tts
40th = 129.878µs; tts
60th = 145.256µs; tts
80th = 159.83µs; tts
100th = 176.598µs; tts
120th = 192.344µs; tts
140th = 209.232µs; tts
160th = 223.36µs; tts
180th = 243.565µs; tts
200th = 278.131µs; tts

This behavior is most likely a conflation of a number of factors. In the above example, the primary issue is spawning many tasks from the main function. When using the threaded scheduler, the main function runs outside of the scheduler. Spawned tasks are sent to the scheduler using the injection queue. This injection queue (MPMC) is slower than the scheduler's primary queue (SPMC).

When the scheduler is under load, it heavily prioritizes its local queue. In the above example, the scheduler is under load, so it prioritizes already spawned tasks instead of checking for new tasks. This results in the time to first poll for tasks to be very high.

This behavior can be verified by wrapping the contents of the main function with a spawn:

#[tokio::main(core_threads=2)]
async fn main() {
    tokio::spawn(async {
    }).await.unwrap();
}

Doing this changes the output to:

20th = 2.735µs; tts
40th = 41.12µs; tts
60th = 59.978µs; tts
80th = 80.126µs; tts
100th = 100.416µs; tts
120th = 136.696µs; tts
140th = 191.012µs; tts
160th = 244.484µs; tts
180th = 527.24µs; tts
200th = 29.068916ms; tts

And if we increase the number of threads to 8, we get:

20th = 2.007µs; tts
40th = 5.823µs; tts
60th = 18.551µs; tts
80th = 32.356µs; tts
100th = 39.56µs; tts
120th = 51.522µs; tts
140th = 72.05µs; tts
160th = 110.676µs; tts
180th = 160.459µs; tts
200th = 26.270611ms; tts

This is better. Notice how adding threads reduces the latencies compared to basic_scheduler. However, the maximum latency is surprisingly high (26ms) vs. basic_scheduler. I have not yet investigated why that is the case.

Fixing

I believe the fix for the injection queue will require:

  • Improving the injection queue
  • Tweaking heuristics

The current injection queue is fairly naive. It is a linked list guarded with a mutex. One option to consider is switching to an MPSC intrusive channel with a mutex guarding the head. This probably won't do too much. Workers probably want to acquire tasks from the injection queue in batches. Instead of popping one task at a time, when the local queue is not full, grab a bunch of tasks from the injection queue.

Heuristic wise, when the worker is under load, we may want to consider checking the injection queue more often. This may be less necessary if tasks are acquired in batches. If the injection queue does need to be checked more often, one option would be to check the queue every ~5 ticks if the last time the queue was checked there was a task.

The high tail latency after adding the spawn in the main fn is most likely not due to the injection queue. I believe adding the spawn should prevent the injection queue from being used as the total number of tasks is less than the local queue capacity (256). I do not know what is the cause for that behavior yet.

@carllerche carllerche added C-bug Category: This is a bug. A-tokio Area: The main tokio crate T-performance Topic: performance and benchmarks labels Jul 26, 2020
@carllerche
Copy link
Member Author

I spent more time investigating this.

I discovered the high latency spike is due to a call to libc::socket blocking for ~30ms. When creating sockets in a new, multi-threaded, process, the kernel will need to grow its internal FD table.

When running a multi-threaded process, the kernel needs to perform synchronization on the FD table. Synchronization is performed using RCU. In practice, the read lock on the FD table is released after "some period of time" after usage and not immediately after the read is complete. When socket is called and the FD table needs to grow in order to track the new FD, the thread must wait for all readers to complete in order to grow the table. This is the source of the hang in socket.

To diagnose, I used the following:

perf trace -e probe:alloc_fdtable,probe:free_fdtable_rcu,socket target/release/tmp-simple-client
[...]
     0.965 (         ): tokio-runtime-/69619 probe:alloc_fdtable:(ffffffffa90ff090)
     0.957 (28.139 ms): tokio-runtime-/69619  ... [continued]: socket())                                           = 64
     0.978 (28.173 ms): tokio-runtime-/69618  ... [continued]: socket())                                           = 65
[...]

full output

Additionally, running the test after "warming up" the FD table results in smooth execution:

20th - 2.92µs
40th - 19.048µs
60th - 36.003µs
80th - 49.98µs
100th - 59.227µs
120th - 66.215µs
140th - 81.66µs
160th - 96.405µs
180th - 110.253µs
200th - 291.598µs

"warming up" the FD table is done by duping sockets to create FDs.

@sfc-gh-fgross
Copy link

FWIW, I think I've been seeing this exact same behaviour when using recvmsg on a Unix domain socket to receive lots of file descriptors (in an unrelated non-Rust project). I was also able to sort it out by using dup2() to force the FD table to be big enough to fit all sockets I'll receive.

@hailengc
Copy link

hailengc commented Jan 8, 2024

I was seeing the same issue regarding alloc_fdtable slows the thread.

By following @carllerche 's comment, I was able to use dup to force fd_table resize to a certain size, so to avoid alloc_fdtable call during the middle and blocking the thread.

The code I'm doing

use std::net::SocketAddr;
use std::os::fd::AsRawFd;
use std::sync::Arc;
use std::time::Instant;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpSocket;

#[tokio::main(worker_threads = 2)]
// #[tokio::main(flavor = "current_thread")]
async fn main() {
    // ---  trick begins---
    // craete a socket and use it with `dup` in order to pre-size the fdtable
    let socket = TcpSocket::new_v4().unwrap();
    let socket_fd = socket.as_raw_fd();
    let mut dup_fd_list = Vec::new();
    unsafe {
        // `dup`` file descriptors
        // NOTE: I notice fd table will resize to its 2 times big when
        // its size is 64, 128, 256, etc. Here we call `dup` 512 times, this
        // effectively will add 512 entries to the fd_table and triggers
        // `alloc_fdtable` 4 times - which will be sufficient for us since
        // it will then have 1024 entries in the fd_table.
        for _ in 0..512 {
            let dup_fd = libc::dup(socket_fd);
            dup_fd_list.push(dup_fd);
        }

        // close all duplicated file descriptor
        for fd in dup_fd_list {
            libc::close(fd);
        }
        drop(socket);
    }

   // ---  trick ends ---

    tokio::spawn(async {
        const N: usize = 500;
        const STEP: usize = N / 10;
        const BUFFER: usize = 8 * 1024;

        let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
        let data = Arc::new(vec![10_u8; BUFFER]);

        let mut handles = Vec::with_capacity(N);
        let mut elapsed = Vec::with_capacity(N);

        for i in 0..N {
            if i % 10 == 9 {
                tokio::task::yield_now().await;
            }

            let data = data.clone();
            let now = Instant::now();

            handles.push(tokio::spawn(async move {
                let tts = now.elapsed();
                let mut buf = vec![10_u8; BUFFER];

                let soc = TcpSocket::new_v4().unwrap();
                let mut socket = soc.connect(addr).await.unwrap();

                socket.write_all(&data).await.unwrap();
                socket.read_exact(&mut buf).await.unwrap();

                assert_eq!(buf[..], data[..]);

                // now.elapsed()
                tts
            }));
        }

        for handle in handles.drain(..) {
            elapsed.push(handle.await.unwrap());
        }

        elapsed.sort();

        let mut i = STEP;

        while i <= N {
            println!("{}th = {:?}; tts", i, elapsed[i - 1]);
            i += STEP;
        }
    })
    .await
    .unwrap();
}

[update] better to avoid use of unsafe , could use fs::File api

use std::fs::File;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpSocket;

#[tokio::main(worker_threads = 2)]
async fn main() {
    let file = File::create("foo.txt").unwrap();
    let mut dup_file_list = Vec::new();

    // clone file, this calls `dup` underneath which adds file descriptor into the fd_table.
    //
    // NOTE: I notice fd table will resize to its 2 times big when
    // its size is 64, 128, 256, etc. Here we call `dup` 512 times, this
    // effectively will add 512 entries to the fd_table and triggers
    // `alloc_fdtable` 4 times - which will be sufficient for us since
    // it will then have 1024 entries in the fd_table.
    for _ in 0..512 {
        dup_file_list.push(file.try_clone().unwrap());
    }
    // once fd_table grows up, drop all files which close each file descriptor
    for cloned_file in dup_file_list {
        drop(cloned_file);
    }
    drop(file);

   ...     
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. T-performance Topic: performance and benchmarks
Projects
None yet
Development

No branches or pull requests

3 participants