Skip to content

Commit f145cbf

Browse files
authored
refactor(ext/fetch): simplify fetch ops (#19494)
Addresses feedback from #19412 (comment)
1 parent 3d71c36 commit f145cbf

File tree

5 files changed

+61
-79
lines changed

5 files changed

+61
-79
lines changed

cli/js/40_testing.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ function prettyResourceNames(name) {
228228
return ["A fetch request", "started", "finished"];
229229
case "fetchRequestBody":
230230
return ["A fetch request body", "created", "closed"];
231-
case "fetchResponseBody":
231+
case "fetchResponse":
232232
return ["A fetch response body", "created", "consumed"];
233233
case "httpClient":
234234
return ["An HTTP client", "created", "closed"];
@@ -295,7 +295,7 @@ function resourceCloseHint(name) {
295295
return "Await the promise returned from `fetch()` or abort the fetch with an abort signal.";
296296
case "fetchRequestBody":
297297
return "Terminate the request body `ReadableStream` by closing or erroring it.";
298-
case "fetchResponseBody":
298+
case "fetchResponse":
299299
return "Consume or close the response body `ReadableStream`, e.g `await resp.text()` or `await resp.body.cancel()`.";
300300
case "httpClient":
301301
return "Close the HTTP client by calling `httpClient.close()`.";
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,2 @@
1-
{
2-
"0": "stdin",
3-
"1": "stdout",
4-
"2": "stderr",
5-
"5": "fetchResponseBody"
6-
}
1+
{ "0": "stdin", "1": "stdout", "2": "stderr", "5": "fetchResponse" }
72
{ "0": "stdin", "1": "stdout", "2": "stderr" }

ext/fetch/26_fetch.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ function opFetch(method, url, headers, clientRid, hasBody, bodyLength, body) {
8686
* @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>}
8787
*/
8888
function opFetchSend(rid) {
89-
return core.opAsync("op_fetch_send", rid, true);
89+
return core.opAsync("op_fetch_send", rid);
9090
}
9191

9292
/**

ext/fetch/lib.rs

+55-59
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ deno_core::extension!(deno_fetch,
112112
ops = [
113113
op_fetch<FP>,
114114
op_fetch_send,
115-
op_fetch_response_into_byte_stream,
116115
op_fetch_response_upgrade,
117116
op_fetch_custom_client<FP>,
118117
],
@@ -427,7 +426,6 @@ pub struct FetchResponse {
427426
pub async fn op_fetch_send(
428427
state: Rc<RefCell<OpState>>,
429428
rid: ResourceId,
430-
into_byte_stream: bool,
431429
) -> Result<FetchResponse, AnyError> {
432430
let request = state
433431
.borrow_mut()
@@ -459,27 +457,10 @@ pub async fn op_fetch_send(
459457
(None, None)
460458
};
461459

462-
let response_rid = if !into_byte_stream {
463-
state
464-
.borrow_mut()
465-
.resource_table
466-
.add(FetchResponseResource {
467-
response: res,
468-
size: content_length,
469-
})
470-
} else {
471-
let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| {
472-
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
473-
}));
474-
state
475-
.borrow_mut()
476-
.resource_table
477-
.add(FetchResponseBodyResource {
478-
reader: AsyncRefCell::new(stream.peekable()),
479-
cancel: CancelHandle::default(),
480-
size: content_length,
481-
})
482-
};
460+
let response_rid = state
461+
.borrow_mut()
462+
.resource_table
463+
.add(FetchResponseResource::new(res, content_length));
483464

484465
Ok(FetchResponse {
485466
status: status.as_u16(),
@@ -493,28 +474,6 @@ pub async fn op_fetch_send(
493474
})
494475
}
495476

496-
#[op]
497-
pub fn op_fetch_response_into_byte_stream(
498-
state: &mut OpState,
499-
rid: ResourceId,
500-
) -> Result<ResourceId, AnyError> {
501-
let raw_response = state.resource_table.take::<FetchResponseResource>(rid)?;
502-
let raw_response = Rc::try_unwrap(raw_response)
503-
.expect("Someone is holding onto FetchResponseResource");
504-
let stream: BytesStream =
505-
Box::pin(raw_response.response.bytes_stream().map(|r| {
506-
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
507-
}));
508-
509-
let rid = state.resource_table.add(FetchResponseBodyResource {
510-
reader: AsyncRefCell::new(stream.peekable()),
511-
cancel: CancelHandle::default(),
512-
size: raw_response.size,
513-
});
514-
515-
Ok(rid)
516-
}
517-
518477
#[op]
519478
pub async fn op_fetch_response_upgrade(
520479
state: Rc<RefCell<OpState>>,
@@ -530,7 +489,7 @@ pub async fn op_fetch_response_upgrade(
530489
let (read, write) = tokio::io::duplex(1024);
531490
let (read_rx, write_tx) = tokio::io::split(read);
532491
let (mut write_rx, mut read_tx) = tokio::io::split(write);
533-
let upgraded = raw_response.response.upgrade().await?;
492+
let upgraded = raw_response.upgrade().await?;
534493
{
535494
// Stage 3: Pump the data
536495
let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded);
@@ -698,35 +657,72 @@ impl Resource for FetchRequestBodyResource {
698657
type BytesStream =
699658
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
700659

660+
pub enum FetchResponseReader {
661+
Start(Response),
662+
BodyReader(Peekable<BytesStream>),
663+
}
664+
665+
impl Default for FetchResponseReader {
666+
fn default() -> Self {
667+
let stream: BytesStream = Box::pin(deno_core::futures::stream::empty());
668+
Self::BodyReader(stream.peekable())
669+
}
670+
}
701671
#[derive(Debug)]
702672
pub struct FetchResponseResource {
703-
pub response: Response,
673+
pub response_reader: AsyncRefCell<FetchResponseReader>,
674+
pub cancel: CancelHandle,
704675
pub size: Option<u64>,
705676
}
706677

707-
impl Resource for FetchResponseResource {
708-
fn name(&self) -> Cow<str> {
709-
"fetchResponse".into()
678+
impl FetchResponseResource {
679+
pub fn new(response: Response, size: Option<u64>) -> Self {
680+
Self {
681+
response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)),
682+
cancel: CancelHandle::default(),
683+
size,
684+
}
710685
}
711-
}
712686

713-
pub struct FetchResponseBodyResource {
714-
pub reader: AsyncRefCell<Peekable<BytesStream>>,
715-
pub cancel: CancelHandle,
716-
pub size: Option<u64>,
687+
pub async fn upgrade(self) -> Result<reqwest::Upgraded, AnyError> {
688+
let reader = self.response_reader.into_inner();
689+
match reader {
690+
FetchResponseReader::Start(resp) => Ok(resp.upgrade().await?),
691+
_ => unreachable!(),
692+
}
693+
}
717694
}
718695

719-
impl Resource for FetchResponseBodyResource {
696+
impl Resource for FetchResponseResource {
720697
fn name(&self) -> Cow<str> {
721-
"fetchResponseBody".into()
698+
"fetchResponse".into()
722699
}
723700

724701
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
725702
Box::pin(async move {
726-
let reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
703+
let mut reader =
704+
RcRef::map(&self, |r| &r.response_reader).borrow_mut().await;
727705

706+
let body = loop {
707+
match &mut *reader {
708+
FetchResponseReader::BodyReader(reader) => break reader,
709+
FetchResponseReader::Start(_) => {}
710+
}
711+
712+
match std::mem::take(&mut *reader) {
713+
FetchResponseReader::Start(resp) => {
714+
let stream: BytesStream = Box::pin(resp.bytes_stream().map(|r| {
715+
r.map_err(|err| {
716+
std::io::Error::new(std::io::ErrorKind::Other, err)
717+
})
718+
}));
719+
*reader = FetchResponseReader::BodyReader(stream.peekable());
720+
}
721+
FetchResponseReader::BodyReader(_) => unreachable!(),
722+
}
723+
};
728724
let fut = async move {
729-
let mut reader = Pin::new(reader);
725+
let mut reader = Pin::new(body);
730726
loop {
731727
match reader.as_mut().peek_mut().await {
732728
Some(Ok(chunk)) if !chunk.is_empty() => {

ext/node/polyfills/http.ts

+2-11
Original file line numberDiff line numberDiff line change
@@ -595,13 +595,7 @@ class ClientRequest extends OutgoingMessage {
595595
(async () => {
596596
try {
597597
const [res, _] = await Promise.all([
598-
core.opAsync(
599-
"op_fetch_send",
600-
this._req.requestRid,
601-
/* false because we want to have access to actual Response,
602-
not the bytes stream of response (because we need to handle upgrades) */
603-
false,
604-
),
598+
core.opAsync("op_fetch_send", this._req.requestRid),
605599
(async () => {
606600
if (this._bodyWriteRid) {
607601
try {
@@ -700,10 +694,7 @@ class ClientRequest extends OutgoingMessage {
700694
this.emit("close");
701695
} else {
702696
{
703-
const responseRid = core.ops.op_fetch_response_into_byte_stream(
704-
res.responseRid,
705-
);
706-
incoming._bodyRid = responseRid;
697+
incoming._bodyRid = res.responseRid;
707698
}
708699
this.emit("response", incoming);
709700
}

0 commit comments

Comments
 (0)