diff --git a/Cargo.lock b/Cargo.lock index d95c7d3d94e8..7e454d9e7173 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6718,10 +6718,12 @@ dependencies = [ "system-configuration", "tokio", "tokio-rustls 0.24.1", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 0.25.4", "winreg", diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 363a08c70371..cd3d7522b647 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -5127,10 +5127,12 @@ dependencies = [ "system-configuration", "tokio", "tokio-rustls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots", "winreg", @@ -6772,6 +6774,19 @@ dependencies = [ "wasmparser 0.202.0", ] +[[package]] +name = "wasm-streams" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e072d4e72f700fb3443d8fe94a39315df013eef1104903cdb0a2abd322bbecd" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmer-derive" version = "4.4.0" diff --git a/linera-base/src/http.rs b/linera-base/src/http.rs index 1dc85696711e..ea7e684e8689 100644 --- a/linera-base/src/http.rs +++ b/linera-base/src/http.rs @@ -169,25 +169,6 @@ impl Response { } } -#[cfg(with_reqwest)] -impl Response { - /// Creates a [`Response`] from a [`reqwest::Response`], waiting for it to be fully - /// received. - pub async fn from_reqwest(response: reqwest::Response) -> reqwest::Result { - let headers = response - .headers() - .into_iter() - .map(|(name, value)| Header::new(name.to_string(), value.as_bytes())) - .collect(); - - Ok(Response { - status: response.status().as_u16(), - headers, - body: response.bytes().await?.to_vec(), - }) - } -} - /// A header for an HTTP request or response. #[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, WitLoad, WitStore, WitType)] #[witty(name = "http-header")] diff --git a/linera-chain/src/unit_tests/chain_tests.rs b/linera-chain/src/unit_tests/chain_tests.rs index e2872c79b6f0..f62b40b87fcc 100644 --- a/linera-chain/src/unit_tests/chain_tests.rs +++ b/linera-chain/src/unit_tests/chain_tests.rs @@ -119,7 +119,7 @@ async fn test_block_size_limit() { let mut chain = ChainStateView::new(chain_id).await; // The size of the executed valid block below. - let maximum_executed_block_size = 742; + let maximum_executed_block_size = 750; // Initialize the chain. let mut config = make_open_chain_config(); diff --git a/linera-execution/Cargo.toml b/linera-execution/Cargo.toml index 8c68aef8361b..8e5f2b99163f 100644 --- a/linera-execution/Cargo.toml +++ b/linera-execution/Cargo.toml @@ -66,7 +66,7 @@ lru.workspace = true oneshot.workspace = true prometheus = { workspace = true, optional = true } proptest = { workspace = true, optional = true } -reqwest = { workspace = true, features = ["blocking", "json"] } +reqwest = { workspace = true, features = ["blocking", "json", "stream"] } revm = { workspace = true, optional = true, features = ["serde"] } revm-primitives = { workspace = true, optional = true } serde.workspace = true diff --git a/linera-execution/src/execution_state_actor.rs b/linera-execution/src/execution_state_actor.rs index 7b2a74b5a2bc..c395938fb1b3 100644 --- a/linera-execution/src/execution_state_actor.rs +++ b/linera-execution/src/execution_state_actor.rs @@ -9,7 +9,7 @@ use std::sync::LazyLock; use std::time::Duration; use custom_debug_derive::Debug; -use futures::channel::mpsc; +use futures::{channel::mpsc, StreamExt as _}; #[cfg(with_metrics)] use linera_base::prometheus_util::{ exponential_bucket_latencies, register_histogram_vec, MeasureLatency as _, @@ -406,7 +406,12 @@ where let response = request.send().await?; - callback.respond(http::Response::from_reqwest(response).await?); + let response_size_limit = committee.policy().maximum_http_response_bytes; + + callback.respond( + self.receive_http_response(response, response_size_limit) + .await?, + ); } ReadBlobContent { blob_id, callback } => { @@ -430,6 +435,71 @@ where } } +impl ExecutionStateView +where + C: Context + Clone + Send + Sync + 'static, + C::Extra: ExecutionRuntimeContext, +{ + /// Receives an HTTP response, returning the prepared [`http::Response`] instance. + /// + /// Ensures that the response does not exceed the provided `size_limit`. + async fn receive_http_response( + &mut self, + response: reqwest::Response, + size_limit: u64, + ) -> Result { + let status = response.status().as_u16(); + let maybe_content_length = response.content_length(); + + let headers = response + .headers() + .iter() + .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes())) + .collect::>(); + + let total_header_size = headers + .iter() + .map(|header| (header.name.as_bytes().len() + header.value.len()) as u64) + .sum(); + + let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or( + ExecutionError::HttpResponseSizeLimitExceeded { + limit: size_limit, + size: total_header_size, + }, + )?; + + if let Some(content_length) = maybe_content_length { + if content_length > remaining_bytes { + return Err(ExecutionError::HttpResponseSizeLimitExceeded { + limit: size_limit, + size: content_length + total_header_size, + }); + } + } + + let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize); + let mut body_stream = response.bytes_stream(); + + while let Some(bytes) = body_stream.next().await.transpose()? { + remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or( + ExecutionError::HttpResponseSizeLimitExceeded { + limit: size_limit, + size: bytes.len() as u64 + (size_limit - remaining_bytes), + }, + )?; + + body.extend(&bytes); + } + + Ok(http::Response { + status, + headers, + body, + }) + } +} + /// Requests to the execution state. #[derive(Debug)] pub enum ExecutionRequest { diff --git a/linera-execution/src/lib.rs b/linera-execution/src/lib.rs index 69ec54835b18..f5a7de60d856 100644 --- a/linera-execution/src/lib.rs +++ b/linera-execution/src/lib.rs @@ -236,6 +236,8 @@ pub enum ExecutionError { MaximumServiceOracleExecutionTimeExceeded, #[error("Serialized size of the executed block exceeds limit")] ExecutedBlockTooLarge, + #[error("HTTP response exceeds the size limit of {limit} bytes, having at least {size} bytes")] + HttpResponseSizeLimitExceeded { limit: u64, size: u64 }, #[error("Runtime failed to respond to application")] MissingRuntimeResponse, #[error("Module ID {0:?} is invalid")] diff --git a/linera-execution/src/policy.rs b/linera-execution/src/policy.rs index 23a95d91699c..f67fcd0d2a9e 100644 --- a/linera-execution/src/policy.rs +++ b/linera-execution/src/policy.rs @@ -63,6 +63,8 @@ pub struct ResourceControlPolicy { pub maximum_bytes_read_per_block: u64, /// The maximum data to write per block pub maximum_bytes_written_per_block: u64, + /// The maximum size in bytes of a received HTTP response. + pub maximum_http_response_bytes: u64, /// The maximum amount of time allowed to wait for an HTTP response. pub http_request_timeout_ms: u64, /// The list of hosts that contracts and services can send HTTP requests to. @@ -92,6 +94,7 @@ impl fmt::Display for ResourceControlPolicy { maximum_block_proposal_size, maximum_bytes_read_per_block, maximum_bytes_written_per_block, + maximum_http_response_bytes, http_request_allow_list, http_request_timeout_ms, } = self; @@ -119,6 +122,7 @@ impl fmt::Display for ResourceControlPolicy { {maximum_block_proposal_size} maximum size of a block proposal\n\ {maximum_bytes_read_per_block} maximum number bytes read per block\n\ {maximum_bytes_written_per_block} maximum number bytes written per block\n\ + {maximum_http_response_bytes} maximum number of bytes of an HTTP response\n\ {http_request_timeout_ms} ms timeout for HTTP requests\n\ HTTP hosts allowed for contracts and services: {http_request_allow_list:#?}\n", )?; @@ -158,6 +162,7 @@ impl ResourceControlPolicy { maximum_block_proposal_size: u64::MAX, maximum_bytes_read_per_block: u64::MAX, maximum_bytes_written_per_block: u64::MAX, + maximum_http_response_bytes: u64::MAX, http_request_timeout_ms: u64::MAX, http_request_allow_list: BTreeSet::new(), } @@ -225,6 +230,7 @@ impl ResourceControlPolicy { maximum_block_proposal_size: 13_000_000, maximum_bytes_read_per_block: 100_000_000, maximum_bytes_written_per_block: 10_000_000, + maximum_http_response_bytes: 10_000, http_request_timeout_ms: 20_000, http_request_allow_list: BTreeSet::new(), } diff --git a/linera-execution/tests/fee_consumption.rs b/linera-execution/tests/fee_consumption.rs index 604723e0fa42..396891727b38 100644 --- a/linera-execution/tests/fee_consumption.rs +++ b/linera-execution/tests/fee_consumption.rs @@ -154,7 +154,8 @@ async fn test_fee_consumption( maximum_block_proposal_size: 59, maximum_bytes_read_per_block: 61, maximum_bytes_written_per_block: 67, - http_request_timeout_ms: 71, + maximum_http_response_bytes: 71, + http_request_timeout_ms: 73, http_request_allow_list: BTreeSet::new(), }; diff --git a/linera-rpc/tests/snapshots/format__format.yaml.snap b/linera-rpc/tests/snapshots/format__format.yaml.snap index 1bd4ffee2d26..d0434928ef90 100644 --- a/linera-rpc/tests/snapshots/format__format.yaml.snap +++ b/linera-rpc/tests/snapshots/format__format.yaml.snap @@ -868,6 +868,7 @@ ResourceControlPolicy: - maximum_block_proposal_size: U64 - maximum_bytes_read_per_block: U64 - maximum_bytes_written_per_block: U64 + - maximum_http_response_bytes: U64 - http_request_timeout_ms: U64 - http_request_allow_list: SEQ: STR diff --git a/linera-service-graphql-client/gql/service_schema.graphql b/linera-service-graphql-client/gql/service_schema.graphql index 8d9395b705df..95116b76d8f7 100644 --- a/linera-service-graphql-client/gql/service_schema.graphql +++ b/linera-service-graphql-client/gql/service_schema.graphql @@ -1120,6 +1120,10 @@ input ResourceControlPolicy { """ maximumBytesWrittenPerBlock: Int! """ + The maximum size in bytes of a received HTTP response. + """ + maximumHttpResponseBytes: Int! + """ The maximum amount of time allowed to wait for an HTTP response. """ httpRequestTimeoutMs: Int!