Skip to content

Commit c57f8ea

Browse files
committed
feat: trait for HTTP request handler
1 parent 1b802cd commit c57f8ea

File tree

6 files changed

+275
-180
lines changed

6 files changed

+275
-180
lines changed

examples/async.rs

Lines changed: 63 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@ use std::time::Instant;
66

77
use ngx::core;
88
use ngx::ffi::{
9-
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_handler_pt,
10-
ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t,
11-
ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t,
9+
ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_module_t, ngx_int_t,
10+
ngx_module_t, ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t,
1211
NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE, NGX_LOG_EMERG,
1312
};
14-
use ngx::http::{self, HttpModule, MergeConfigError};
15-
use ngx::http::{HttpModuleLocationConf, HttpModuleMainConf, NgxHttpCoreModule};
16-
use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string};
13+
use ngx::http::{self, HttpModule, HttpModuleLocationConf, HttpRequestHandler, MergeConfigError};
14+
use ngx::{ngx_conf_log_error, ngx_log_debug_http, ngx_string};
1715
use tokio::runtime::Runtime;
1816

1917
struct Module;
@@ -25,17 +23,8 @@ impl http::HttpModule for Module {
2523

2624
unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t {
2725
// SAFETY: this function is called with non-NULL cf always
28-
let cf = &mut *cf;
29-
let cmcf = NgxHttpCoreModule::main_conf_mut(cf).expect("http core main conf");
30-
31-
let h = ngx_array_push(
32-
&mut cmcf.phases[ngx_http_phases_NGX_HTTP_ACCESS_PHASE as usize].handlers,
33-
) as *mut ngx_http_handler_pt;
34-
if h.is_null() {
35-
return core::Status::NGX_ERROR.into();
36-
}
37-
// set an Access phase handler
38-
*h = Some(async_access_handler);
26+
let cf = unsafe { &mut *cf };
27+
AsyncAccessHandler::register(cf);
3928
core::Status::NGX_OK.into()
4029
}
4130
}
@@ -139,63 +128,70 @@ impl Drop for RequestCTX {
139128
}
140129
}
141130

142-
http_request_handler!(async_access_handler, |request: &mut http::Request| {
143-
let co = Module::location_conf(request).expect("module config is none");
131+
struct AsyncAccessHandler;
144132

145-
ngx_log_debug_http!(request, "async module enabled: {}", co.enable);
133+
impl HttpRequestHandler<Option<ngx_int_t>> for AsyncAccessHandler {
134+
const PHASE: nginx_sys::NgxHttpPhases = nginx_sys::NgxHttpPhases::Access;
135+
type Module = Module;
146136

147-
if !co.enable {
148-
return core::Status::NGX_DECLINED;
149-
}
137+
fn handler(request: &mut http::Request) -> Option<ngx_int_t> {
138+
let co = Module::location_conf(request).expect("module config is none");
139+
140+
ngx_log_debug_http!(request, "async module enabled: {}", co.enable);
150141

151-
if let Some(ctx) =
152-
unsafe { request.get_module_ctx::<RequestCTX>(&*addr_of!(ngx_http_async_module)) }
153-
{
154-
if !ctx.done.load(Ordering::Relaxed) {
155-
return core::Status::NGX_AGAIN;
142+
if !co.enable {
143+
return Some(core::Status::NGX_DECLINED.into());
156144
}
157145

158-
return core::Status::NGX_OK;
159-
}
146+
if let Some(ctx) =
147+
unsafe { request.get_module_ctx::<RequestCTX>(&*addr_of!(ngx_http_async_module)) }
148+
{
149+
if !ctx.done.load(Ordering::Relaxed) {
150+
return Some(core::Status::NGX_AGAIN.into());
151+
}
160152

161-
let ctx = request.pool().allocate(RequestCTX::default());
162-
if ctx.is_null() {
163-
return core::Status::NGX_ERROR;
153+
return Some(core::Status::NGX_OK.into());
154+
}
155+
156+
let ctx = request.pool().allocate(RequestCTX::default());
157+
if ctx.is_null() {
158+
return Some(core::Status::NGX_ERROR.into());
159+
}
160+
request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) });
161+
162+
let ctx = unsafe { &mut *ctx };
163+
ctx.event.handler = Some(check_async_work_done);
164+
ctx.event.data = request.connection().cast();
165+
ctx.event.log = unsafe { (*request.connection()).log };
166+
unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) };
167+
168+
// Request is no longer needed and can be converted to something movable to the async block
169+
let req = AtomicPtr::new(request.into());
170+
let done_flag = ctx.done.clone();
171+
172+
let rt = ngx_http_async_runtime();
173+
ctx.task = Some(rt.spawn(async move {
174+
let start = Instant::now();
175+
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
176+
let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) };
177+
// not really thread safe, we should apply all these operation in nginx thread
178+
// but this is just an example. proper way would be storing these headers in the request ctx
179+
// and apply them when we get back to the nginx thread.
180+
req.add_header_out(
181+
"X-Async-Time",
182+
start.elapsed().as_millis().to_string().as_str(),
183+
);
184+
185+
done_flag.store(true, Ordering::Release);
186+
// there is a small issue here. If traffic is low we may get stuck behind a 300ms timer
187+
// in the nginx event loop. To workaround it we can notify the event loop using
188+
// pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx
189+
// and use the same trick as the thread pool)
190+
}));
191+
192+
Some(core::Status::NGX_AGAIN.into())
164193
}
165-
request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) });
166-
167-
let ctx = unsafe { &mut *ctx };
168-
ctx.event.handler = Some(check_async_work_done);
169-
ctx.event.data = request.connection().cast();
170-
ctx.event.log = unsafe { (*request.connection()).log };
171-
unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) };
172-
173-
// Request is no longer needed and can be converted to something movable to the async block
174-
let req = AtomicPtr::new(request.into());
175-
let done_flag = ctx.done.clone();
176-
177-
let rt = ngx_http_async_runtime();
178-
ctx.task = Some(rt.spawn(async move {
179-
let start = Instant::now();
180-
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
181-
let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) };
182-
// not really thread safe, we should apply all these operation in nginx thread
183-
// but this is just an example. proper way would be storing these headers in the request ctx
184-
// and apply them when we get back to the nginx thread.
185-
req.add_header_out(
186-
"X-Async-Time",
187-
start.elapsed().as_millis().to_string().as_str(),
188-
);
189-
190-
done_flag.store(true, Ordering::Release);
191-
// there is a small issue here. If traffic is low we may get stuck behind a 300ms timer
192-
// in the nginx event loop. To workaround it we can notify the event loop using
193-
// pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx
194-
// and use the same trick as the thread pool)
195-
}));
196-
197-
core::Status::NGX_AGAIN
198-
});
194+
}
199195

200196
extern "C" fn ngx_http_async_commands_set_enable(
201197
cf: *mut ngx_conf_t,

examples/awssig.rs

Lines changed: 80 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@ use std::ffi::{c_char, c_void};
33
use http::HeaderMap;
44
use ngx::core;
55
use ngx::ffi::{
6-
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_http_handler_pt, ngx_http_module_t,
7-
ngx_http_phases_NGX_HTTP_PRECONTENT_PHASE, ngx_int_t, ngx_module_t, ngx_str_t, ngx_uint_t,
6+
ngx_command_t, ngx_conf_t, ngx_http_module_t, ngx_int_t, ngx_module_t, ngx_str_t, ngx_uint_t,
87
NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE,
98
NGX_HTTP_SRV_CONF, NGX_LOG_EMERG,
109
};
1110
use ngx::http::*;
12-
use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string};
11+
use ngx::{ngx_conf_log_error, ngx_log_debug_http, ngx_string};
1312

1413
struct Module;
1514

@@ -20,17 +19,8 @@ impl HttpModule for Module {
2019

2120
unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t {
2221
// SAFETY: this function is called with non-NULL cf always
23-
let cf = &mut *cf;
24-
let cmcf = NgxHttpCoreModule::main_conf_mut(cf).expect("http core main conf");
25-
26-
let h = ngx_array_push(
27-
&mut cmcf.phases[ngx_http_phases_NGX_HTTP_PRECONTENT_PHASE as usize].handlers,
28-
) as *mut ngx_http_handler_pt;
29-
if h.is_null() {
30-
return core::Status::NGX_ERROR.into();
31-
}
32-
// set an phase handler
33-
*h = Some(awssigv4_header_handler);
22+
let cf = unsafe { &mut *cf };
23+
AwsSigV4HeaderHandler::register(cf);
3424
core::Status::NGX_OK.into()
3525
}
3626
}
@@ -261,82 +251,89 @@ extern "C" fn ngx_http_awssigv4_commands_set_s3_endpoint(
261251
ngx::core::NGX_CONF_OK
262252
}
263253

264-
http_request_handler!(awssigv4_header_handler, |request: &mut Request| {
265-
// get Module Config from request
266-
let conf = Module::location_conf(request).expect("module conf");
267-
ngx_log_debug_http!(request, "AWS signature V4 module {}", {
268-
if conf.enable {
269-
"enabled"
270-
} else {
271-
"disabled"
272-
}
273-
});
274-
if !conf.enable {
275-
return core::Status::NGX_DECLINED;
276-
}
254+
struct AwsSigV4HeaderHandler;
277255

278-
// TODO: build url properly from the original URL from client
279-
let method = request.method();
280-
if !matches!(method, ngx::http::Method::HEAD | ngx::http::Method::GET) {
281-
return HTTPStatus::FORBIDDEN.into();
282-
}
256+
impl HttpRequestHandler<Option<ngx_int_t>> for AwsSigV4HeaderHandler {
257+
const PHASE: nginx_sys::NgxHttpPhases = nginx_sys::NgxHttpPhases::PreContent;
258+
type Module = Module;
283259

284-
let datetime = chrono::Utc::now();
285-
let uri = match request.unparsed_uri().to_str() {
286-
Ok(v) => format!("https://{}.{}{}", conf.s3_bucket, conf.s3_endpoint, v),
287-
Err(_) => return core::Status::NGX_DECLINED,
288-
};
260+
fn handler(request: &mut Request) -> Option<ngx_int_t> {
261+
// get Module Config from request
262+
let conf = Module::location_conf(request).expect("module conf");
263+
ngx_log_debug_http!(request, "AWS signature V4 module {}", {
264+
if conf.enable {
265+
"enabled"
266+
} else {
267+
"disabled"
268+
}
269+
});
270+
if !conf.enable {
271+
return Some(core::Status::NGX_DECLINED.into());
272+
}
273+
274+
// TODO: build url properly from the original URL from client
275+
let method = request.method();
276+
if !matches!(method, ngx::http::Method::HEAD | ngx::http::Method::GET) {
277+
return Some(HTTPStatus::FORBIDDEN.into());
278+
}
289279

290-
let datetime_now = datetime.format("%Y%m%dT%H%M%SZ");
291-
let datetime_now = datetime_now.to_string();
280+
let datetime = chrono::Utc::now();
281+
let uri = match request.unparsed_uri().to_str() {
282+
Ok(v) => format!("https://{}.{}{}", conf.s3_bucket, conf.s3_endpoint, v),
283+
Err(_) => return Some(core::Status::NGX_DECLINED.into()),
284+
};
292285

293-
let signature = {
294-
// NOTE: aws_sign_v4::AwsSign::new() implementation requires a HeaderMap.
295-
// Iterate over requests headers_in and copy into HeaderMap
296-
// Copy only headers that will be used to sign the request
297-
let mut headers = HeaderMap::new();
298-
for (name, value) in request.headers_in_iterator() {
299-
if let Ok(name) = name.to_str() {
300-
if name.to_lowercase() == "host" {
301-
if let Ok(value) = http::HeaderValue::from_bytes(value.as_bytes()) {
302-
headers.insert(http::header::HOST, value);
303-
} else {
304-
return core::Status::NGX_DECLINED;
286+
let datetime_now = datetime.format("%Y%m%dT%H%M%SZ");
287+
let datetime_now = datetime_now.to_string();
288+
289+
let signature = {
290+
// NOTE: aws_sign_v4::AwsSign::new() implementation requires a HeaderMap.
291+
// Iterate over requests headers_in and copy into HeaderMap
292+
// Copy only headers that will be used to sign the request
293+
let mut headers = HeaderMap::new();
294+
for (name, value) in request.headers_in_iterator() {
295+
if let Ok(name) = name.to_str() {
296+
if name.to_lowercase() == "host" {
297+
if let Ok(value) = http::HeaderValue::from_bytes(value.as_bytes()) {
298+
headers.insert(http::header::HOST, value);
299+
} else {
300+
return Some(core::Status::NGX_DECLINED.into());
301+
}
305302
}
303+
} else {
304+
return Some(core::Status::NGX_DECLINED.into());
306305
}
307-
} else {
308-
return core::Status::NGX_DECLINED;
309306
}
310-
}
311-
headers.insert("X-Amz-Date", datetime_now.parse().unwrap());
312-
ngx_log_debug_http!(request, "headers {:?}", headers);
313-
ngx_log_debug_http!(request, "method {:?}", method);
314-
ngx_log_debug_http!(request, "uri {:?}", uri);
315-
ngx_log_debug_http!(request, "datetime_now {:?}", datetime_now);
316-
317-
let s = aws_sign_v4::AwsSign::new(
318-
method.as_str(),
319-
&uri,
320-
&datetime,
321-
&headers,
322-
"us-east-1",
323-
conf.access_key.as_str(),
324-
conf.secret_key.as_str(),
325-
"s3",
326-
"",
327-
);
328-
s.sign()
329-
};
307+
headers.insert("X-Amz-Date", datetime_now.parse().unwrap());
308+
ngx_log_debug_http!(request, "headers {:?}", headers);
309+
ngx_log_debug_http!(request, "method {:?}", method);
310+
ngx_log_debug_http!(request, "uri {:?}", uri);
311+
ngx_log_debug_http!(request, "datetime_now {:?}", datetime_now);
312+
313+
let s = aws_sign_v4::AwsSign::new(
314+
method.as_str(),
315+
&uri,
316+
&datetime,
317+
&headers,
318+
"us-east-1",
319+
conf.access_key.as_str(),
320+
conf.secret_key.as_str(),
321+
"s3",
322+
"",
323+
);
324+
s.sign()
325+
};
330326

331-
request.add_header_in("authorization", signature.as_str());
332-
request.add_header_in("X-Amz-Date", datetime_now.as_str());
327+
request.add_header_in("authorization", signature.as_str());
328+
request.add_header_in("X-Amz-Date", datetime_now.as_str());
333329

334-
for (name, value) in request.headers_out_iterator() {
335-
ngx_log_debug_http!(request, "headers_out {name}: {value}",);
336-
}
337-
for (name, value) in request.headers_in_iterator() {
338-
ngx_log_debug_http!(request, "headers_in {name}: {value}",);
339-
}
330+
for (name, value) in request.headers_out_iterator() {
331+
ngx_log_debug_http!(request, "headers_out {name}: {value}",);
332+
}
333+
for (name, value) in request.headers_in_iterator() {
334+
ngx_log_debug_http!(request, "headers_in {name}: {value}",);
335+
}
340336

341-
core::Status::NGX_OK
342-
});
337+
Some(core::Status::NGX_OK.into())
338+
}
339+
}

0 commit comments

Comments
 (0)