Skip to content

Commit 1f53296

Browse files
committed
Add async_eventfd feature to optionally optimize notify method on epoll
1 parent 18990c4 commit 1f53296

File tree

6 files changed

+159
-52
lines changed

6 files changed

+159
-52
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ async = [
5454
"dep:pin-project-lite",
5555
"dep:crossbeam-channel",
5656
]
57+
# Optimize async notification with eventfd, requires nginx epoll event module
58+
async_eventfd = ["async"]
5759
# Provides APIs that require allocations via the `alloc` crate.
5860
alloc = ["allocator-api2/alloc"]
5961
# Enables serialization support for some of the provided and re-exported types.

examples/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,4 @@ default = ["export-modules", "ngx/vendored"]
7575
export-modules = []
7676
linux = []
7777
async = ["ngx/async"]
78+
async_eventfd = ["async", "ngx/async_eventfd"]

examples/async.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,6 @@ use hyper_util::rt::TokioIo;
77
use nginx_sys::{ngx_http_core_loc_conf_t, NGX_LOG_ERR};
88
use ngx::async_::resolver::Resolver;
99
use ngx::async_::{spawn, Task};
10-
use std::cell::RefCell;
11-
use std::ffi::{c_char, c_void};
12-
use std::future::Future;
13-
use std::pin::Pin;
14-
use std::ptr::{addr_of, addr_of_mut, NonNull};
15-
use std::sync::atomic::{AtomicPtr, Ordering};
16-
use std::task::Poll;
17-
use std::time::Instant;
18-
use tokio::net::TcpStream;
19-
2010
use ngx::core::{self, Pool, Status};
2111
use ngx::ffi::{
2212
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_http_handler_pt,
@@ -29,6 +19,17 @@ use ngx::http::{HttpModuleLocationConf, HttpModuleMainConf, NgxHttpCoreModule};
2919
use ngx::{
3020
http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_log_error, ngx_string,
3121
};
22+
use std::cell::RefCell;
23+
use std::ffi::{c_char, c_void};
24+
use std::future::Future;
25+
use std::pin::Pin;
26+
use std::ptr::{addr_of, addr_of_mut, NonNull};
27+
use std::sync::atomic::{AtomicPtr, Ordering};
28+
use std::task::Poll;
29+
use std::time::Instant;
30+
use tokio::net::TcpStream;
31+
#[cfg(feature = "async_eventfd")]
32+
use {nginx_sys::ngx_cycle_t, ngx::async_::async_init};
3233

3334
struct Module;
3435

@@ -98,6 +99,10 @@ pub static mut ngx_http_async_module: ngx_module_t = ngx_module_t {
9899
ctx: std::ptr::addr_of!(NGX_HTTP_ASYNC_MODULE_CTX) as _,
99100
commands: unsafe { &NGX_HTTP_ASYNC_COMMANDS[0] as *const _ as *mut _ },
100101
type_: NGX_HTTP_MODULE as _,
102+
#[cfg(not(feature = "async_eventfd"))]
103+
init_process: None,
104+
#[cfg(feature = "async_eventfd")]
105+
init_process: Some(ngx_http_async_init),
101106
..ngx_module_t::default()
102107
};
103108

@@ -166,7 +171,7 @@ async fn resolve_something(
166171
.expect("resolution");
167172

168173
(
169-
format!("X-Resolve-Time"),
174+
"X-Resolve-Time".to_string(),
170175
start.elapsed().as_millis().to_string(),
171176
)
172177
}
@@ -188,7 +193,7 @@ async fn reqwest_something() -> (String, String) {
188193
async fn hyper_something() -> (String, String) {
189194
let start = Instant::now();
190195
// see https://hyper.rs/guides/1/client/basic/
191-
let url = "http://httpbin.org/ip".parse::<hyper::Uri>().expect("uri");
196+
let url = "https://example.com".parse::<hyper::Uri>().expect("uri");
192197
let host = url.host().expect("uri has no host");
193198
let port = url.port_u16().unwrap_or(80);
194199

@@ -331,3 +336,9 @@ extern "C" fn ngx_http_async_commands_set_enable(
331336

332337
ngx::core::NGX_CONF_OK
333338
}
339+
340+
#[cfg(feature = "async_eventfd")]
341+
extern "C" fn ngx_http_async_init(_cycle: *mut ngx_cycle_t) -> ngx_int_t {
342+
async_init();
343+
Status::NGX_OK.into()
344+
}

nginx-sys/build/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const NGX_CONF_FEATURES: &[&str] = &[
2020
"compat",
2121
"debug",
2222
"have_epollrdhup",
23+
"have_eventfd",
2324
"have_file_aio",
2425
"have_kqueue",
2526
"have_memalign",

src/async_/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
//! Async runtime and set of utilities on top of the NGINX event loop.
22
pub use self::sleep::{sleep, Sleep};
33
pub use self::spawn::{spawn, Task};
4+
#[cfg(feature = "async_eventfd")]
5+
pub use self::spawn::async_init;
46

57
pub mod resolver;
68

src/async_/spawn.rs

Lines changed: 130 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,135 @@
11
extern crate std;
22

3-
use core::ffi::c_int;
3+
use core::mem;
44
use core::sync::atomic::{AtomicI64, Ordering};
5-
use core::{mem, ptr};
6-
use std::sync::OnceLock;
75

86
use core::future::Future;
7+
use std::sync::OnceLock;
98

10-
use alloc::boxed::Box;
119
pub use async_task::Task;
1210
use async_task::{Runnable, ScheduleInfo, WithInfo};
1311
use crossbeam_channel::{unbounded, Receiver, Sender};
14-
use nginx_sys::{kill, ngx_event_t, ngx_post_event, ngx_posted_next_events, ngx_thread_tid, SIGIO};
12+
use nginx_sys::{ngx_event_t, ngx_thread_tid};
1513

1614
use crate::log::ngx_cycle_log;
1715
use crate::ngx_log_debug;
1816

17+
static mut NOTIFY_EVENT: ngx_event_t = unsafe { mem::zeroed() };
18+
19+
#[cfg(not(feature = "async_eventfd"))]
20+
mod default {
21+
extern crate std;
22+
23+
use nginx_sys::{kill, ngx_log_t, ngx_post_event, ngx_posted_next_events, SIGIO};
24+
25+
use super::async_handler;
26+
use super::NOTIFY_EVENT;
27+
use crate::log::ngx_cycle_log;
28+
use crate::ngx_log_debug;
29+
30+
pub(crate) fn notify() {
31+
let log: *mut ngx_log_t = ngx_cycle_log().as_ptr();
32+
let rc = unsafe {
33+
NOTIFY_EVENT.log = log;
34+
NOTIFY_EVENT.handler = Some(async_handler);
35+
36+
ngx_post_event(&raw mut NOTIFY_EVENT, &raw mut ngx_posted_next_events);
37+
kill(
38+
std::process::id().try_into().unwrap(),
39+
SIGIO.try_into().unwrap(),
40+
)
41+
};
42+
if rc != 0 {
43+
panic!("kill: rc={rc}");
44+
}
45+
46+
ngx_log_debug!(log, "async: notified (SIGIO)");
47+
}
48+
}
49+
#[cfg(not(feature = "async_eventfd"))]
50+
use default::*;
51+
52+
#[cfg(feature = "async_eventfd")]
53+
mod eventfd {
54+
extern crate std;
55+
56+
use core::mem;
57+
58+
use nginx_sys::{
59+
eventfd, ngx_connection_t, ngx_cycle, ngx_event_actions, ngx_event_t, ngx_int_t, ngx_log_t,
60+
EFD_CLOEXEC, EFD_NONBLOCK, EPOLL_EVENTS_EPOLLET, EPOLL_EVENTS_EPOLLIN,
61+
EPOLL_EVENTS_EPOLLRDHUP, NGX_ERROR, NGX_OK,
62+
};
63+
use std::{fs::File, io::Write, os::fd::FromRawFd, sync::OnceLock};
64+
65+
use super::async_handler;
66+
use super::NOTIFY_EVENT;
67+
use crate::log::ngx_cycle_log;
68+
use crate::ngx_log_debug;
69+
70+
#[cfg(not(ngx_feature = "have_eventfd"))]
71+
compile_error!("feature async_eventfd requires eventfd(), NGX_HAVE_EVENTFD");
72+
73+
static mut NOTIFY_CONN: ngx_connection_t = unsafe { mem::zeroed() };
74+
static EVENT_FILE: OnceLock<File> = OnceLock::new();
75+
76+
pub(crate) fn notify() {
77+
let w = EVENT_FILE
78+
.get()
79+
.expect("EVENT_FILE")
80+
.write(&1u64.to_ne_bytes())
81+
.expect("write");
82+
if w != 8 {
83+
panic!("eventfd: wrote {w}, expected 8");
84+
}
85+
86+
ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: notified (eventfd)");
87+
}
88+
89+
static mut _DUMMY_WRITE_EVENT: ngx_event_t = unsafe { mem::zeroed() };
90+
91+
extern "C" fn _dummy_write_handler(_ev: *mut ngx_event_t) {}
92+
93+
/// intialize async_eventfd, needs to be called once in init_process, before spawn() is used
94+
pub fn async_init() -> ngx_int_t {
95+
let fd = unsafe { eventfd(0, (EFD_NONBLOCK | EFD_CLOEXEC).try_into().unwrap()) };
96+
if fd == -1 {
97+
return NGX_ERROR as isize;
98+
}
99+
100+
let log: *mut ngx_log_t = unsafe { ngx_cycle.read().log };
101+
102+
ngx_log_debug!(log, "async: eventfd {fd}");
103+
104+
unsafe {
105+
EVENT_FILE.set(File::from_raw_fd(fd)).expect("EVENT_FILE");
106+
107+
NOTIFY_EVENT.handler = Some(async_handler);
108+
NOTIFY_EVENT.log = log;
109+
#[allow(static_mut_refs)]
110+
NOTIFY_EVENT.set_active(1);
111+
NOTIFY_EVENT.data = (&raw mut NOTIFY_CONN).cast();
112+
113+
_DUMMY_WRITE_EVENT.handler = Some(_dummy_write_handler);
114+
115+
NOTIFY_CONN.fd = fd;
116+
NOTIFY_CONN.read = &raw mut NOTIFY_EVENT;
117+
NOTIFY_CONN.write = &raw mut _DUMMY_WRITE_EVENT;
118+
NOTIFY_CONN.log = log;
119+
120+
ngx_event_actions.add.unwrap()(
121+
&raw mut NOTIFY_EVENT,
122+
(EPOLL_EVENTS_EPOLLIN | EPOLL_EVENTS_EPOLLRDHUP) as isize,
123+
EPOLL_EVENTS_EPOLLET as usize,
124+
);
125+
}
126+
127+
NGX_OK as ngx_int_t
128+
}
129+
}
130+
#[cfg(feature = "async_eventfd")]
131+
pub use eventfd::*;
132+
19133
static MAIN_TID: AtomicI64 = AtomicI64::new(-1);
20134

21135
#[inline]
@@ -25,34 +139,21 @@ fn on_event_thread() -> bool {
25139
main_tid == tid
26140
}
27141

28-
extern "C" fn async_handler(ev: *mut ngx_event_t) {
142+
extern "C" fn async_handler(_ev: *mut ngx_event_t) {
29143
// initialize MAIN_TID on first execution
30144
let tid = unsafe { ngx_thread_tid().into() };
31145
let _ = MAIN_TID.compare_exchange(-1, tid, Ordering::Relaxed, Ordering::Relaxed);
32146
let scheduler = scheduler();
147+
148+
if scheduler.rx.is_empty() {
149+
return;
150+
}
33151
let mut cnt = 0;
34152
while let Ok(r) = scheduler.rx.try_recv() {
35153
r.run();
36154
cnt += 1;
37155
}
38-
ngx_log_debug!(
39-
unsafe { (*ev).log },
40-
"async: processed {cnt} items"
41-
);
42-
43-
unsafe {
44-
drop(Box::from_raw(ev));
45-
}
46-
}
47-
48-
fn notify() -> c_int {
49-
ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: notify via SIGIO");
50-
unsafe {
51-
kill(
52-
std::process::id().try_into().unwrap(),
53-
SIGIO.try_into().unwrap(),
54-
)
55-
}
156+
ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: processed {cnt} items");
56157
}
57158

58159
struct Scheduler {
@@ -69,28 +170,17 @@ impl Scheduler {
69170
fn schedule(&self, runnable: Runnable, info: ScheduleInfo) {
70171
let oet = on_event_thread();
71172
// If we are on the event loop thread it's safe to simply run the Runnable, otherwise we
72-
// enqueue the Runnable, post our event, and SIGIO to interrupt epoll. The event handler
73-
// then runs the Runnable on the event loop thread.
173+
// enqueue the Runnable, post our event, and notify. The event handler then runs the
174+
// Runnable on the event loop thread.
74175
//
75176
// If woken_while_running, it indicates that a task has yielded itself to the Scheduler.
76-
// Force round-trip via queue to limit reentrancy (skipping SIGIO).
177+
// Force round-trip via queue to limit reentrancy.
77178
if oet && !info.woken_while_running {
78179
runnable.run();
79180
} else {
80181
self.tx.send(runnable).expect("send");
81-
unsafe {
82-
let event: *mut ngx_event_t = Box::into_raw(Box::new(mem::zeroed()));
83-
(*event).handler = Some(async_handler);
84-
(*event).log = ngx_cycle_log().as_ptr();
85-
ngx_post_event(event, ptr::addr_of_mut!(ngx_posted_next_events));
86-
}
87-
88-
if !oet {
89-
let rc = notify();
90-
if rc != 0 {
91-
panic!("kill: {rc}")
92-
}
93-
}
182+
183+
notify();
94184
}
95185
}
96186
}

0 commit comments

Comments
 (0)