Skip to content

Commit 7194252

Browse files
committed
Add async_eventfd feature to optionally optimize notify method on epoll
1 parent 18990c4 commit 7194252

File tree

6 files changed

+147
-47
lines changed

6 files changed

+147
-47
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ async = [
5454
"dep:pin-project-lite",
5555
"dep:crossbeam-channel",
5656
]
57+
# Optimize async notification with eventfd, requires nginx epoll event module
58+
async_eventfd = ["async"]
5759
# Provides APIs that require allocations via the `alloc` crate.
5860
alloc = ["allocator-api2/alloc"]
5961
# Enables serialization support for some of the provided and re-exported types.

examples/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,4 @@ default = ["export-modules", "ngx/vendored"]
7575
export-modules = []
7676
linux = []
7777
async = ["ngx/async"]
78+
async_eventfd = ["async", "ngx/async_eventfd"]

examples/async.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,6 @@ use hyper_util::rt::TokioIo;
77
use nginx_sys::{ngx_http_core_loc_conf_t, NGX_LOG_ERR};
88
use ngx::async_::resolver::Resolver;
99
use ngx::async_::{spawn, Task};
10-
use std::cell::RefCell;
11-
use std::ffi::{c_char, c_void};
12-
use std::future::Future;
13-
use std::pin::Pin;
14-
use std::ptr::{addr_of, addr_of_mut, NonNull};
15-
use std::sync::atomic::{AtomicPtr, Ordering};
16-
use std::task::Poll;
17-
use std::time::Instant;
18-
use tokio::net::TcpStream;
19-
2010
use ngx::core::{self, Pool, Status};
2111
use ngx::ffi::{
2212
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_http_handler_pt,
@@ -29,6 +19,17 @@ use ngx::http::{HttpModuleLocationConf, HttpModuleMainConf, NgxHttpCoreModule};
2919
use ngx::{
3020
http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_log_error, ngx_string,
3121
};
22+
use std::cell::RefCell;
23+
use std::ffi::{c_char, c_void};
24+
use std::future::Future;
25+
use std::pin::Pin;
26+
use std::ptr::{addr_of, addr_of_mut, NonNull};
27+
use std::sync::atomic::{AtomicPtr, Ordering};
28+
use std::task::Poll;
29+
use std::time::Instant;
30+
use tokio::net::TcpStream;
31+
#[cfg(feature = "async_eventfd")]
32+
use {nginx_sys::ngx_cycle_t, ngx::async_::async_init};
3233

3334
struct Module;
3435

@@ -98,6 +99,10 @@ pub static mut ngx_http_async_module: ngx_module_t = ngx_module_t {
9899
ctx: std::ptr::addr_of!(NGX_HTTP_ASYNC_MODULE_CTX) as _,
99100
commands: unsafe { &NGX_HTTP_ASYNC_COMMANDS[0] as *const _ as *mut _ },
100101
type_: NGX_HTTP_MODULE as _,
102+
#[cfg(not(feature = "async_eventfd"))]
103+
init_process: None,
104+
#[cfg(feature = "async_eventfd")]
105+
init_process: Some(ngx_http_async_init),
101106
..ngx_module_t::default()
102107
};
103108

@@ -166,7 +171,7 @@ async fn resolve_something(
166171
.expect("resolution");
167172

168173
(
169-
format!("X-Resolve-Time"),
174+
"X-Resolve-Time".to_string(),
170175
start.elapsed().as_millis().to_string(),
171176
)
172177
}
@@ -188,7 +193,7 @@ async fn reqwest_something() -> (String, String) {
188193
async fn hyper_something() -> (String, String) {
189194
let start = Instant::now();
190195
// see https://hyper.rs/guides/1/client/basic/
191-
let url = "http://httpbin.org/ip".parse::<hyper::Uri>().expect("uri");
196+
let url = "https://example.com".parse::<hyper::Uri>().expect("uri");
192197
let host = url.host().expect("uri has no host");
193198
let port = url.port_u16().unwrap_or(80);
194199

@@ -331,3 +336,9 @@ extern "C" fn ngx_http_async_commands_set_enable(
331336

332337
ngx::core::NGX_CONF_OK
333338
}
339+
340+
#[cfg(feature = "async_eventfd")]
341+
extern "C" fn ngx_http_async_init(_cycle: *mut ngx_cycle_t) -> ngx_int_t {
342+
async_init();
343+
Status::NGX_OK.into()
344+
}

nginx-sys/build/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const NGX_CONF_FEATURES: &[&str] = &[
2020
"compat",
2121
"debug",
2222
"have_epollrdhup",
23+
"have_eventfd",
2324
"have_file_aio",
2425
"have_kqueue",
2526
"have_memalign",

src/async_/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
//! Async runtime and set of utilities on top of the NGINX event loop.
22
pub use self::sleep::{sleep, Sleep};
33
pub use self::spawn::{spawn, Task};
4+
#[cfg(feature = "async_eventfd")]
5+
pub use self::spawn::async_init;
46

57
pub mod resolver;
68

src/async_/spawn.rs

Lines changed: 118 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,117 @@
11
extern crate std;
22

3-
use core::ffi::c_int;
43
use core::sync::atomic::{AtomicI64, Ordering};
5-
use core::{mem, ptr};
6-
use std::sync::OnceLock;
74

85
use core::future::Future;
6+
use std::sync::OnceLock;
97

10-
use alloc::boxed::Box;
118
pub use async_task::Task;
129
use async_task::{Runnable, ScheduleInfo, WithInfo};
1310
use crossbeam_channel::{unbounded, Receiver, Sender};
14-
use nginx_sys::{kill, ngx_event_t, ngx_post_event, ngx_posted_next_events, ngx_thread_tid, SIGIO};
11+
use nginx_sys::{ngx_event_t, ngx_thread_tid};
1512

1613
use crate::log::ngx_cycle_log;
1714
use crate::ngx_log_debug;
1815

16+
#[cfg(not(feature = "async_eventfd"))]
17+
mod default {
18+
extern crate std;
19+
20+
use nginx_sys::{kill, SIGIO};
21+
22+
use crate::log::ngx_cycle_log;
23+
use crate::ngx_log_debug;
24+
25+
pub(crate) fn notify() {
26+
let rc = unsafe {
27+
kill(
28+
std::process::id().try_into().unwrap(),
29+
SIGIO.try_into().unwrap(),
30+
)
31+
};
32+
if rc != 0 {
33+
panic!("kill: rc={rc}");
34+
}
35+
36+
ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: notified (SIGIO)");
37+
}
38+
}
39+
#[cfg(not(feature = "async_eventfd"))]
40+
use default::*;
41+
42+
#[cfg(feature = "async_eventfd")]
43+
mod eventfd {
44+
extern crate std;
45+
46+
use core::mem;
47+
48+
use nginx_sys::{
49+
eventfd, ngx_connection_t, ngx_cycle, ngx_event_actions, ngx_event_t, ngx_int_t, ngx_log_t,
50+
EFD_CLOEXEC, EFD_NONBLOCK, NGX_ERROR, NGX_OK,
51+
};
52+
use std::{fs::File, io::Write, os::fd::FromRawFd, sync::OnceLock};
53+
54+
use crate::log::ngx_cycle_log;
55+
use crate::ngx_log_debug;
56+
57+
#[cfg(not(ngx_feature = "have_eventfd"))]
58+
compile_error!("feature async_eventfd requires eventfd(), NGX_HAVE_EVENTFD");
59+
60+
pub(crate) fn notify() {
61+
let w = EVENT_FILE
62+
.get()
63+
.expect("EVENT_FILE")
64+
.write(&1u64.to_ne_bytes())
65+
.expect("write");
66+
if w != 8 {
67+
panic!("eventfd: wrote {w}, expected 8");
68+
}
69+
70+
ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: notified (eventfd)");
71+
}
72+
73+
static mut NOTIFY_EVENT: ngx_event_t = unsafe { mem::zeroed() };
74+
static mut NOTIFY_CONN: ngx_connection_t = unsafe { mem::zeroed() };
75+
static EVENT_FILE: OnceLock<File> = OnceLock::new();
76+
static mut DUMMY_WRITE_EVENT: ngx_event_t = unsafe { mem::zeroed() };
77+
78+
extern "C" fn _dummy_write_handler(_ev: *mut ngx_event_t) {}
79+
80+
/// intialize async_eventfd, needs to be called once in init_process, before spawn() is used
81+
pub fn async_init() -> ngx_int_t {
82+
let fd = unsafe { eventfd(0, (EFD_NONBLOCK | EFD_CLOEXEC).try_into().unwrap()) };
83+
if fd == -1 {
84+
return NGX_ERROR as isize;
85+
}
86+
87+
let log: *mut ngx_log_t = unsafe { ngx_cycle.read().log };
88+
89+
ngx_log_debug!(log, "async: eventfd {fd}");
90+
91+
unsafe {
92+
NOTIFY_EVENT.handler = Some(super::async_handler);
93+
NOTIFY_EVENT.log = log;
94+
(&raw mut NOTIFY_EVENT).read().set_active(1);
95+
NOTIFY_EVENT.data = (&raw mut NOTIFY_CONN).cast();
96+
97+
DUMMY_WRITE_EVENT.handler = Some(_dummy_write_handler);
98+
99+
NOTIFY_CONN.fd = fd;
100+
NOTIFY_CONN.read = &raw mut NOTIFY_EVENT;
101+
NOTIFY_CONN.write = &raw mut DUMMY_WRITE_EVENT;
102+
NOTIFY_CONN.log = log;
103+
104+
ngx_event_actions.add_conn.unwrap()(&raw mut NOTIFY_CONN);
105+
106+
EVENT_FILE.set(File::from_raw_fd(fd)).expect("EVENT_FILE");
107+
}
108+
109+
NGX_OK as ngx_int_t
110+
}
111+
}
112+
#[cfg(feature = "async_eventfd")]
113+
pub use eventfd::*;
114+
19115
static MAIN_TID: AtomicI64 = AtomicI64::new(-1);
20116

21117
#[inline]
@@ -30,29 +126,16 @@ extern "C" fn async_handler(ev: *mut ngx_event_t) {
30126
let tid = unsafe { ngx_thread_tid().into() };
31127
let _ = MAIN_TID.compare_exchange(-1, tid, Ordering::Relaxed, Ordering::Relaxed);
32128
let scheduler = scheduler();
129+
130+
if scheduler.rx.is_empty() {
131+
return;
132+
}
33133
let mut cnt = 0;
34134
while let Ok(r) = scheduler.rx.try_recv() {
35135
r.run();
36136
cnt += 1;
37137
}
38-
ngx_log_debug!(
39-
unsafe { (*ev).log },
40-
"async: processed {cnt} items"
41-
);
42-
43-
unsafe {
44-
drop(Box::from_raw(ev));
45-
}
46-
}
47-
48-
fn notify() -> c_int {
49-
ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: notify via SIGIO");
50-
unsafe {
51-
kill(
52-
std::process::id().try_into().unwrap(),
53-
SIGIO.try_into().unwrap(),
54-
)
55-
}
138+
ngx_log_debug!(unsafe { ev.read().log }, "async: processed {cnt} items");
56139
}
57140

58141
struct Scheduler {
@@ -69,28 +152,28 @@ impl Scheduler {
69152
fn schedule(&self, runnable: Runnable, info: ScheduleInfo) {
70153
let oet = on_event_thread();
71154
// If we are on the event loop thread it's safe to simply run the Runnable, otherwise we
72-
// enqueue the Runnable, post our event, and SIGIO to interrupt epoll. The event handler
73-
// then runs the Runnable on the event loop thread.
155+
// enqueue the Runnable, post our event, and notify. The event handler then runs the
156+
// Runnable on the event loop thread.
74157
//
75158
// If woken_while_running, it indicates that a task has yielded itself to the Scheduler.
76-
// Force round-trip via queue to limit reentrancy (skipping SIGIO).
159+
// Force round-trip via queue to limit reentrancy.
77160
if oet && !info.woken_while_running {
78161
runnable.run();
79162
} else {
80163
self.tx.send(runnable).expect("send");
164+
165+
#[cfg(not(feature = "async_eventfd"))]
81166
unsafe {
82-
let event: *mut ngx_event_t = Box::into_raw(Box::new(mem::zeroed()));
167+
let event: *mut ngx_event_t =
168+
std::boxed::Box::into_raw(std::boxed::Box::new(core::mem::zeroed()));
83169
(*event).handler = Some(async_handler);
84170
(*event).log = ngx_cycle_log().as_ptr();
85-
ngx_post_event(event, ptr::addr_of_mut!(ngx_posted_next_events));
86-
}
87-
88-
if !oet {
89-
let rc = notify();
90-
if rc != 0 {
91-
panic!("kill: {rc}")
92-
}
171+
nginx_sys::ngx_post_event(
172+
event,
173+
std::ptr::addr_of_mut!(nginx_sys::ngx_posted_next_events),
174+
);
93175
}
176+
notify();
94177
}
95178
}
96179
}

0 commit comments

Comments
 (0)