Reading full client body in NGX_HTTP_CONTENT_PHASE #222
-
|
We need to read the full client bodies to implement a transparent tunneling protocol on top of TLS and HTTP. I could not find any example in rust on how to do that. pub fn handler(request: &mut Request) -> Status {
let config = Module::location_conf(request).expect("location config is none");
match config.asl {
true => {
let rc = unsafe {
ngx_http_read_client_request_body(request.as_mut(), Some(client_request_body_cb))
};
if rc >= NGX_HTTP_SPECIAL_RESPONSE.try_into().unwrap() {
Status(rc)
} else {
Status::NGX_DONE
}
}
false => Status::NGX_DECLINED,
}
}
extern "C" fn client_request_body_cb(request: *mut ngx_http_request_t) {
let body = unsafe { *(*request).request_body };
let mut chain = unsafe { *body.bufs };
let mut bufs: Vec<MemoryBuffer> = vec![];
let first = MemoryBuffer::from_ngx_buf(body.buf);
if first.len() > 0 {
bufs.push(first);
}
loop {
if !chain.buf.is_null() {
let t = MemoryBuffer::from_ngx_buf(chain.buf);
if t.len() > 0 { // NOTE: There might be a slight bug/confusing behaviour when creating a MemoryBuffer from a len 0 ngx_buf, it panics at runtime
bufs.push(t);
}
}
if chain.next.is_null() {
break;
}
chain = unsafe { *chain.next };
}
}I'm not 100% sure if that is the right way to handle a ngx_chain. Do I also need for account for last_buf and last_in_chain on the buffers themselves? Also: do I need to evaluate buf.memory and decide if I need a MemoryBuffer vs. TemporaryBuffer? However, when nginx decides to spill the client body to temp, how do I handle this? My callback seems to be called with request.request_body having:
The issue is that I've never seen that file in my file system. I also can't make sense of the temp_file.file.info.st_size, which is 0, and temp_file.offset, which is ~1MiB, which seems to correspond to the body I sent ( let mut buffer: *mut u8 = unsafe {
*Request::from_ngx_http_request(request)
.pool()
.alloc(size)
.cast()
};
let x = unsafe {
ngx_read_file(
&mut temp_file.file,
buffer,
size,
size.try_into().unwrap(),
)
};
let v = unsafe {
Vec::from_raw_parts(buffer, size.try_into().unwrap(), size.try_into().unwrap())
};… but it always reads 0 bytes. Has anyone done this in Rust already and can give me some pointer? Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
|
You don't need to copy from the chain, you can use the data in-place or convert it to a
Not knowing what you want to do with the data, I can only suggest to check the corresponding section of nginx development guide, and examples of chain manipulation in Also in your code snippet, let v = unsafe {
Vec::from_raw_parts(buffer, size.try_into().unwrap(), size.try_into().unwrap())
};seems to be a guaranteed double free, because both I'd try to write this and similar fragments as let v: Vec<u8, _> = Vec::new_in(request.pool());
v.try_reserve_exact(size)?;
unsafe {
let n = ngx_read_file(&mut temp_file.file, v.as_mut_ptr().cast(), core::cmp::min(v.capacity(), size), 0);
if n < 0 {
return Err(...);
}
v.set_len(n as usize);
} |
Beta Was this translation helpful? Give feedback.
-
|
@bavshin-f5 I think I figured it out now, would you take a look if it look sane? The idea is that This should handle all cases, e.g. a couple of meme bufs followed by a file buf (even if nginx never does this for the client body currently). fn read_request_body<'a>(
request: &'a mut Request,
) -> std::result::Result<RequestBodyRead<'a>, HTTPStatus> {
let rc = unsafe {
ngx_http_read_client_request_body(request.into(), Some(client_request_body_handler))
};
if rc >= NGX_HTTP_SPECIAL_RESPONSE.try_into().unwrap() {
Err(HTTPStatus(rc.try_into().unwrap()))
} else {
Ok(RequestBodyRead { request })
}
}
#[derive(Debug)]
struct Buffer(ngx_buf_t);
impl Buffer {
async fn read<'a>(&'a self) -> Result<Cow<'a, [u8]>> {
let cow = if self.0.in_file() != 0 {
let size: usize = (self.0.file_last - self.0.file_pos).try_into().unwrap();
// so we can safely seek, close, … it
let fd: RawFd = unsafe {
let d = libc::dup((*self.0.file).fd.as_raw_fd());
if d < 0 {
return Err(std::io::Error::last_os_error().into());
}
d
};
let mut file = unsafe { tokio::fs::File::from_raw_fd(fd) };
file.seek(std::io::SeekFrom::Start(
self.0.file_pos.try_into().unwrap(),
))
.await?;
let mut buf = Vec::with_capacity(size);
let read = file.read_to_end(&mut buf).await?;
if read != size {
bail!("Expected to read {size}; got {read}");
}
Cow::from(buf)
} else {
let buf = unsafe {
std::slice::from_raw_parts(
self.0.pos,
self.0.last.offset_from(self.0.pos).try_into().expect("len"),
)
};
Cow::from(buf)
};
Ok(cow)
}
}
#[derive(Default, Debug)]
struct RequestBody {
bufs: RefCell<Option<Vec<Buffer>>>,
waker: RefCell<Option<Waker>>,
}
#[derive(Debug)]
struct RequestBodyRead<'a> {
request: &'a Request,
}
impl Future for RequestBodyRead<'_> {
type Output = Vec<Buffer>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let ctx = this
.request
.get_module_ctx::<RequestCtx>(unsafe { &*addr_of!(ngx_http_pep_module) })
.expect("ctx");
match ctx.body.bufs.borrow_mut().take() {
Some(bufs) => Poll::Ready(bufs),
None => {
let mut waker = ctx.body.waker.borrow_mut();
match waker.take() {
None => {
*waker = Some(cx.waker().clone());
}
Some(mut w) => w.clone_from(cx.waker()),
}
Poll::Pending
}
}
}
}
extern "C" fn client_request_body_handler(request: *mut ngx_http_request_t) {
let request_body = unsafe { *(*request).request_body };
let request = unsafe { Request::from_ngx_http_request(request) };
let ctx = request
.get_module_ctx::<RequestCtx>(unsafe { &*addr_of!(ngx_http_pep_module) })
.expect("ctx");
let mut bufs: Vec<Buffer> = Vec::new();
let mut chain = unsafe { *request_body.bufs };
loop {
let buf = unsafe { *chain.buf };
bufs.push(Buffer(buf));
if chain.next.is_null() {
break;
}
chain = unsafe { *chain.next };
}
*ctx.body.bufs.borrow_mut() = Some(bufs);
if let Some(waker) = ctx.body.waker.borrow_mut().take() {
waker.wake();
}
}The async fn read_m1(request: &mut Request) -> Result<Vec<u8>> {
let bufs = match read_request_body(request) {
Ok(bufs) => bufs,
Err(status) => {
ngx_log_debug_http!(
request,
"asl: passing ngx_http_read_client_request_body status: {status:?}"
);
request.set_status(status);
return Ok(vec![]);
}
}
.await;
let body = try_join_all(bufs.iter().map(|buf| buf.read()))
.await
.map(|chunks| {
let total: usize = chunks.iter().map(|c| c.len()).sum();
let mut out = Vec::with_capacity(total);
for c in chunks {
out.extend_from_slice(&c);
}
out
})?;
ngx_log_debug_http!(request, "asl: client body read, {} bytes", body.len());
let (handshake_state, m2) =
initiate_handshake(&SERVER_CONFIG, &body, &[]).map_err(|e| anyhow!("{e:?}"))?;
// TODO: make session cache non-blocking
let cid = tokio::spawn(async move { SESSION_CACHE.init_handshake(handshake_state) }).await?;
ngx_log_debug_http!(request, "asl: new cid {}, m2 {}b", cid, m2.len());
Ok(m2)
}
// infrastructure/handler, for reference
#[derive(Default, Debug)]
struct RequestCtx {
body: RequestBody,
task: Option<Task<()>>,
}
pub fn handler(request: &mut Request) -> Status {
let config = Module::location_conf(request).expect("location config is none");
match config.asl {
Some(true) => {
let ctx = request.pool().allocate(RequestCtx::default());
unsafe {
request.set_module_ctx(ctx.cast(), &*addr_of!(ngx_http_pep_module));
}
ngx_log_debug_http!(request, "asl: enter");
let r_ptr = AtomicPtr::new(request.into());
let task = spawn(Compat::new(async move {
let r_ptr = r_ptr.load(Ordering::Relaxed);
let request = unsafe { ngx::http::Request::from_ngx_http_request(r_ptr) };
let mut response = match read_m1(request).await {
Ok(response) => {
request.set_status(HTTPStatus::OK);
request
.ensure_header_out("content-type", "application/cbor")
.unwrap();
response
}
Err(e) => {
ngx_log_debug_http!(request, "asl: error - {e:?}");
request.set_status(HTTPStatus::INTERNAL_SERVER_ERROR);
request
.ensure_header_out("content-type", "text/plain")
.unwrap();
"Internal Server Error\n".to_string().into_bytes()
}
};
let buf: *mut ngx_buf_t = request.pool().calloc_type();
unsafe {
if buf.is_null() {
ngx_http_finalize_request(request.into(), Status::NGX_ERROR.into());
return;
}
(*buf).set_memory(1);
(*buf).set_last_buf(if request.is_main() { 1 } else { 0 });
(*buf).set_last_in_chain(1);
(*buf).start = response.as_mut_ptr();
(*buf).end = (*buf).start.add(response.len());
(*buf).pos = (*buf).start;
(*buf).last = (*buf).end;
}
let mut chain = ngx_chain_t {
buf,
next: ptr::null_mut(),
};
request.set_content_length_n(response.len());
request.send_header();
let request: *mut ngx_http_request_t = request.into();
unsafe {
ngx_http_finalize_request(request, ngx_http_output_filter(request, &mut chain))
};
}));
unsafe {
// store on ctx, we want it to be canceled on cleanup by nginx (via RequestCtx's Drop)
(*ctx).task = Some(task);
}
Status::NGX_DONE
}
_ => Status::NGX_DECLINED,
}
}Do you think this would be something for ngx-rust? I'd be happy to create a PR for this. |
Beta Was this translation helpful? Give feedback.
You don't need to copy from the chain, you can use the data in-place or convert it to a
Vecof&[u8]slice references (seengx_output_chain_to_iovecfor a rough example). The chain's data is guaranteed to live until you free it, pass to a chain writer (important for phase handlers other than content handler), or terminate the request.The reallocation and copying is only necessary if you want a single contiguous buffer, and even then you'd want to avoid
TemporaryBuffer/MemoryBufferand use something likeVecbacked byngx_pool_tinstead.temp_file.file.infois a zero-initialized reserved storage forngx_fd_info(fstat(2)) results.ngx_http_read_client_request_bodydoes not usengx_fd_info,…