Skip to content

Commit

Permalink
feat(ext/http): abort event when request is cancelled (#26781)
Browse files Browse the repository at this point in the history
```js
Deno.serve(async (req) => {
  const { promise, resolve } = Promise.withResolvers<void>();

  req.signal.addEventListener("abort", () => {
    resolve();
  });

  await promise;

  return new Response("Ok");
});
```
  • Loading branch information
littledivy committed Nov 10, 2024
1 parent 1363822 commit f16980b
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 8 deletions.
8 changes: 4 additions & 4 deletions ext/fetch/23_request.js
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,11 @@ class Request {
if (signal === undefined) {
const signal = newSignal();
this[_signalCache] = signal;
return signal;
}
this[_request].onCancel?.(() => {
signal[signalAbort](signalAbortError);
});

if (!signal.aborted && this[_request].isCancelled) {
signal[signalAbort](signalAbortError);
return signal;
}

return signal;
Expand Down
13 changes: 9 additions & 4 deletions ext/http/00_serve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import {
op_http_cancel,
op_http_close,
op_http_close_after_finish,
op_http_get_request_cancelled,
op_http_get_request_headers,
op_http_get_request_method_and_url,
op_http_read_request_body,
op_http_request_on_cancel,
op_http_serve,
op_http_serve_on,
op_http_set_promise_complete,
Expand Down Expand Up @@ -375,11 +375,16 @@ class InnerRequest {
return this.#external;
}

get isCancelled() {
onCancel(callback) {
if (this.#external === null) {
return true;
callback();
return;
}
return op_http_get_request_cancelled(this.#external);

PromisePrototypeThen(
op_http_request_on_cancel(this.#external),
callback,
);
}
}

Expand Down
13 changes: 13 additions & 0 deletions ext/http/http_next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,19 @@ pub fn op_http_get_request_cancelled(external: *const c_void) -> bool {
http.cancelled()
}

#[op2(async)]
pub async fn op_http_request_on_cancel(external: *const c_void) {
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_request_on_cancel") };
let (tx, rx) = tokio::sync::oneshot::channel();

http.on_cancel(tx);
drop(http);

rx.await.ok();
}

/// Returned promise resolves when body streaming finishes.
/// Call [`op_http_close_after_finish`] when done with the external.
#[op2(async)]
Expand Down
1 change: 1 addition & 0 deletions ext/http/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ deno_core::extension!(
http_next::op_http_close_after_finish,
http_next::op_http_get_request_header,
http_next::op_http_get_request_headers,
http_next::op_http_request_on_cancel,
http_next::op_http_get_request_method_and_url<HTTP>,
http_next::op_http_get_request_cancelled,
http_next::op_http_read_request_body,
Expand Down
10 changes: 10 additions & 0 deletions ext/http/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
use tokio::sync::oneshot;

pub type Request = hyper::Request<Incoming>;
pub type Response = hyper::Response<HttpRecordResponse>;
Expand Down Expand Up @@ -211,6 +212,7 @@ pub struct UpgradeUnavailableError;

struct HttpRecordInner {
server_state: SignallingRc<HttpServerState>,
closed_channel: Option<oneshot::Sender<()>>,
request_info: HttpConnectionProperties,
request_parts: http::request::Parts,
request_body: Option<RequestBodyState>,
Expand Down Expand Up @@ -276,6 +278,7 @@ impl HttpRecord {
response_body_finished: false,
response_body_waker: None,
trailers: None,
closed_channel: None,
been_dropped: false,
finished: false,
needs_close_after_finish: false,
Expand Down Expand Up @@ -312,6 +315,10 @@ impl HttpRecord {
RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish)
}

pub fn on_cancel(&self, sender: oneshot::Sender<()>) {
self.self_mut().closed_channel = Some(sender);
}

fn recycle(self: Rc<Self>) {
assert!(
Rc::strong_count(&self) == 1,
Expand Down Expand Up @@ -390,6 +397,9 @@ impl HttpRecord {
inner.been_dropped = true;
// The request body might include actual resources.
inner.request_body.take();
if let Some(closed_channel) = inner.closed_channel.take() {
let _ = closed_channel.send(());
}
}

/// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well).
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/serve_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4299,3 +4299,31 @@ Deno.test({

assert(cancelled);
});

Deno.test({
name: "AbortSignal event aborted when request is cancelled",
}, async () => {
const { promise, resolve } = Promise.withResolvers<void>();

const server = Deno.serve({
hostname: "0.0.0.0",
port: servePort,
onListen: () => resolve(),
}, async (request) => {
const { promise: promiseAbort, resolve: resolveAbort } = Promise
.withResolvers<void>();
request.signal.addEventListener("abort", () => resolveAbort());
assert(!request.signal.aborted);

await promiseAbort;

return new Response("Ok");
});

await promise;
await fetch(`http://localhost:${servePort}/`, {
signal: AbortSignal.timeout(100),
}).catch(() => {});

await server.shutdown();
});

0 comments on commit f16980b

Please sign in to comment.