Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ opentelemetry-otlp = { version = "0.26.0", features = [
] }
opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio"] }
tracing-opentelemetry = "0.27.0"
flate2 = "1.0.35"
futures = "0.3.31"
metrics-derive = "0.1"
metrics = "0.24.0"
Expand Down
111 changes: 87 additions & 24 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ use crate::auth_layer::secret_to_bearer_header;
use crate::metrics::ServerMetrics;
use crate::server::PayloadSource;
use alloy_rpc_types_engine::JwtSecret;
use flate2::read::GzDecoder;
use http::header::AUTHORIZATION;
use http::response::Parts;
use http::{StatusCode, Uri};
use http_body_util::BodyExt;
use hyper_rustls::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use jsonrpsee::core::{http_helpers, BoxError};
use jsonrpsee::http_client::{HttpBody, HttpRequest, HttpResponse};
use std::io::Read;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use std::time::{Duration, Instant};
use std::{future::Future, pin::Pin};
use tower::{Layer, Service};
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -149,7 +153,7 @@ where
let _ = forward_request(
builder_client,
builder_req,
&builder_method,
builder_method,
builder_uri,
builder_secret,
builder_metrics,
Expand All @@ -163,7 +167,7 @@ where
forward_request(
client,
l2_req,
&method,
method,
l2_uri,
l2_secret,
metrics,
Expand All @@ -180,7 +184,7 @@ where
forward_request(
client,
req,
&method,
method,
l2_uri,
l2_secret,
metrics,
Expand All @@ -197,7 +201,7 @@ where
async fn forward_request(
client: Client<HttpsConnector<HttpConnector>, HttpBody>,
mut req: http::Request<HttpBody>,
method: &str,
method: String,
uri: Uri,
auth: JwtSecret,
metrics: Option<Arc<ServerMetrics>>,
Expand All @@ -218,26 +222,34 @@ async fn forward_request(
match client.request(req).await {
Ok(resp) => {
let (parts, body) = resp.into_parts();
let (body_bytes, _) = http_helpers::read_body(&parts.headers, body, u32::MAX)
let body_bytes = body
.collect()
.await
.map_err(|e| {
warn!(
error!(
target: "proxy::forward_request",
message = "error reading body",
message = "error collecting body",
error = %e,
);
e
})?;
let rpc_status_code = parse_response_code(&body_bytes);
record_metrics(
metrics,
parts.status.to_string(),
rpc_status_code,
method,
start,
source,
)
.await;
})?
.to_bytes()
.to_vec();
let parts_clone = parts.clone();
let body_bytes_clone = body_bytes.clone();

tokio::spawn(async move {
let _ = process_response(
parts_clone,
body_bytes_clone,
metrics,
method,
start.elapsed(),
source,
)
.await;
});

Ok(http::Response::from_parts(
parts,
HttpBody::from(body_bytes),
Expand All @@ -256,7 +268,7 @@ async fn forward_request(
StatusCode::INTERNAL_SERVER_ERROR.to_string(),
None,
method,
start,
start.elapsed(),
source,
)
.await;
Expand All @@ -265,26 +277,77 @@ async fn forward_request(
}
}

async fn process_response(
parts: Parts,
body_bytes: Vec<u8>,
metrics: Option<Arc<ServerMetrics>>,
method: String,
duration: Duration,
source: PayloadSource,
) -> Result<(), BoxError> {
// Check for GZIP compression
let is_gzipped = parts
.headers
.get(http::header::CONTENT_ENCODING)
.is_some_and(|val| val.as_bytes() == b"gzip");
let decoded_body = if is_gzipped {
// Decompress GZIP content
let mut decoder = GzDecoder::new(&body_bytes[..]);
let mut decoded = Vec::new();
decoder.read_to_end(&mut decoded).map_err(|e| {
warn!(
target: "proxy::process_response",
message = "error decompressing body",
error = %e,
);
e
})?;
decoded
} else {
body_bytes
};

// log the decoded body
debug!(
target: "proxy::forward_request",
message = "raw response body",
body = %String::from_utf8_lossy(&decoded_body),
);

let rpc_status_code = parse_response_code(&decoded_body);
record_metrics(
metrics,
parts.status.to_string(),
rpc_status_code,
method,
duration,
source,
)
.await;

Ok(())
}

async fn record_metrics(
metrics: Option<Arc<ServerMetrics>>,
http_status_code: String,
rpc_status_code: Option<String>,
method: &str,
start: Instant,
method: String,
duration: Duration,
source: PayloadSource,
) {
if let Some(metrics) = &metrics {
match source {
PayloadSource::Builder => {
metrics.record_builder_forwarded_call(start.elapsed(), method.to_string());
metrics.record_builder_forwarded_call(duration, method.to_string());
metrics.increment_builder_rpc_response_count(
http_status_code,
rpc_status_code,
method.to_string(),
);
}
PayloadSource::L2 => {
metrics.record_l2_forwarded_call(start.elapsed(), method.to_string());
metrics.record_l2_forwarded_call(duration, method.to_string());
metrics.increment_l2_rpc_response_count(
http_status_code,
rpc_status_code,
Expand Down
Loading