From dc155733d13d2a7d5434eaf25d09e01acbbf5cb6 Mon Sep 17 00:00:00 2001 From: avalonche Date: Tue, 11 Mar 2025 19:33:30 +1100 Subject: [PATCH] Fix error reading body when forwarding calls --- Cargo.lock | 9 +++-- Cargo.toml | 1 + src/proxy.rs | 111 ++++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 93 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 62ee334f..f48b3c84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1435,9 +1435,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.35" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c" +checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc" dependencies = [ "crc32fast", "miniz_oxide", @@ -2536,9 +2536,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.8.0" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" dependencies = [ "adler2", ] @@ -3508,6 +3508,7 @@ dependencies = [ "ctor", "dotenv", "eyre", + "flate2", "futures", "futures-util", "http 1.1.0", diff --git a/Cargo.toml b/Cargo.toml index dc9daaa9..7b74ea9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ opentelemetry-otlp = { version = "0.26.0", features = [ ] } opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio"] } tracing-opentelemetry = "0.27.0" +flate2 = "1.0.35" futures = "0.3.31" metrics-derive = "0.1" metrics = "0.24.0" diff --git a/src/proxy.rs b/src/proxy.rs index 582aef01..786768e4 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -2,17 +2,21 @@ use crate::auth_layer::secret_to_bearer_header; use crate::metrics::ServerMetrics; use crate::server::PayloadSource; use alloy_rpc_types_engine::JwtSecret; +use flate2::read::GzDecoder; use http::header::AUTHORIZATION; +use http::response::Parts; use http::{StatusCode, Uri}; +use http_body_util::BodyExt; use hyper_rustls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; use hyper_util::client::legacy::Client; use hyper_util::rt::TokioExecutor; use jsonrpsee::core::{http_helpers, BoxError}; use jsonrpsee::http_client::{HttpBody, HttpRequest, HttpResponse}; +use std::io::Read; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Instant; +use std::time::{Duration, Instant}; use std::{future::Future, pin::Pin}; use tower::{Layer, Service}; use tracing::{debug, error, info, warn}; @@ -149,7 +153,7 @@ where let _ = forward_request( builder_client, builder_req, - &builder_method, + builder_method, builder_uri, builder_secret, builder_metrics, @@ -163,7 +167,7 @@ where forward_request( client, l2_req, - &method, + method, l2_uri, l2_secret, metrics, @@ -180,7 +184,7 @@ where forward_request( client, req, - &method, + method, l2_uri, l2_secret, metrics, @@ -197,7 +201,7 @@ where async fn forward_request( client: Client, HttpBody>, mut req: http::Request, - method: &str, + method: String, uri: Uri, auth: JwtSecret, metrics: Option>, @@ -218,26 +222,34 @@ async fn forward_request( match client.request(req).await { Ok(resp) => { let (parts, body) = resp.into_parts(); - let (body_bytes, _) = http_helpers::read_body(&parts.headers, body, u32::MAX) + let body_bytes = body + .collect() .await .map_err(|e| { - warn!( + error!( target: "proxy::forward_request", - message = "error reading body", + message = "error collecting body", error = %e, ); e - })?; - let rpc_status_code = parse_response_code(&body_bytes); - record_metrics( - metrics, - parts.status.to_string(), - rpc_status_code, - method, - start, - source, - ) - .await; + })? + .to_bytes() + .to_vec(); + let parts_clone = parts.clone(); + let body_bytes_clone = body_bytes.clone(); + + tokio::spawn(async move { + let _ = process_response( + parts_clone, + body_bytes_clone, + metrics, + method, + start.elapsed(), + source, + ) + .await; + }); + Ok(http::Response::from_parts( parts, HttpBody::from(body_bytes), @@ -256,7 +268,7 @@ async fn forward_request( StatusCode::INTERNAL_SERVER_ERROR.to_string(), None, method, - start, + start.elapsed(), source, ) .await; @@ -265,18 +277,69 @@ async fn forward_request( } } +async fn process_response( + parts: Parts, + body_bytes: Vec, + metrics: Option>, + method: String, + duration: Duration, + source: PayloadSource, +) -> Result<(), BoxError> { + // Check for GZIP compression + let is_gzipped = parts + .headers + .get(http::header::CONTENT_ENCODING) + .is_some_and(|val| val.as_bytes() == b"gzip"); + let decoded_body = if is_gzipped { + // Decompress GZIP content + let mut decoder = GzDecoder::new(&body_bytes[..]); + let mut decoded = Vec::new(); + decoder.read_to_end(&mut decoded).map_err(|e| { + warn!( + target: "proxy::process_response", + message = "error decompressing body", + error = %e, + ); + e + })?; + decoded + } else { + body_bytes + }; + + // log the decoded body + debug!( + target: "proxy::forward_request", + message = "raw response body", + body = %String::from_utf8_lossy(&decoded_body), + ); + + let rpc_status_code = parse_response_code(&decoded_body); + record_metrics( + metrics, + parts.status.to_string(), + rpc_status_code, + method, + duration, + source, + ) + .await; + + Ok(()) +} + async fn record_metrics( metrics: Option>, http_status_code: String, rpc_status_code: Option, - method: &str, - start: Instant, + method: String, + duration: Duration, source: PayloadSource, ) { if let Some(metrics) = &metrics { match source { PayloadSource::Builder => { - metrics.record_builder_forwarded_call(start.elapsed(), method.to_string()); + metrics.record_builder_forwarded_call(duration, method.to_string()); metrics.increment_builder_rpc_response_count( http_status_code, rpc_status_code, @@ -284,7 +347,7 @@ async fn record_metrics( ); } PayloadSource::L2 => { - metrics.record_l2_forwarded_call(start.elapsed(), method.to_string()); + metrics.record_l2_forwarded_call(duration, method.to_string()); metrics.increment_l2_rpc_response_count( http_status_code, rpc_status_code,