Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
388 changes: 126 additions & 262 deletions bolt-sidecar/Cargo.lock

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions bolt-sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt"] }
metrics = "0.23"
metrics-exporter-prometheus = { version = "0.15.3", features = ["http-listener"] }

# commit-boost
commit-boost = { git = "https://github.com/Commit-Boost/commit-boost-client", rev = "0f8f69b" }
cb-common = { git = "https://github.com/Commit-Boost/commit-boost-client", rev = "0f8f69b" }
cb-common = { git = "https://github.com/Commit-Boost/commit-boost-client", tag = "v0.5.0" }

[dev-dependencies]
alloy-node-bindings = "0.8.3" # must match alloy version
Expand Down Expand Up @@ -116,4 +114,4 @@ unnecessary_self_imports = "warn"
use_self = "warn"

[features]
keystore-tests = []
keystore-tests = []
159 changes: 80 additions & 79 deletions bolt-sidecar/src/api/commitments/firewall/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures::{
stream::{FuturesUnordered, SplitSink, SplitStream},
FutureExt, SinkExt, StreamExt,
};
use serde::Serialize;
use serde_json::{json, Value};
use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll};
use tokio::{
Expand All @@ -23,15 +24,16 @@ use crate::{
api::commitments::{
server::CommitmentEvent,
spec::{
CommitmentError, RejectionError, GET_METADATA_METHOD, GET_VERSION_METHOD,
REQUEST_INCLUSION_METHOD,
CommitmentError, GET_METADATA_METHOD, GET_VERSION_METHOD, REQUEST_INCLUSION_METHOD,
},
},
common::BOLT_SIDECAR_VERSION,
config::limits::LimitsOpts,
primitives::{
commitment::SignedCommitment,
jsonrpc::{JsonResponse, JsonRpcRequestUuid},
jsonrpc::{
JsonRpcErrorResponse, JsonRpcRequestUuid, JsonRpcResponse, JsonRpcSuccessResponse,
},
misc::{Identified, IntoIdentified},
CommitmentRequest, InclusionRequest,
},
Expand Down Expand Up @@ -258,15 +260,14 @@ impl CommitmentRequestProcessor {
return;
};

let mut response =
JsonResponse { id: Some(Value::String(id.to_string())), ..Default::default() };

match result_commitment {
Ok(commitment) => response.result = json!(commitment),
let response: JsonRpcResponse = match result_commitment {
Ok(commitment) => JsonRpcSuccessResponse::new(json!(commitment))
.with_id(Value::String(id.to_string()))
.into(),
Err(e) => {
response.error = Some(e.into());
JsonRpcErrorResponse::new(e.into()).with_id(Value::String(id.to_string())).into()
}
}
};

let message =
Message::Text(serde_json::to_string(&response).expect("to stringify response"));
Expand All @@ -279,82 +280,82 @@ impl CommitmentRequestProcessor {
let rpc_url = self.url.clone();

trace!(?rpc_url, text, "received text message from websocket connection");
// Create the channel to send and receive the commitment response
let (tx, rx) = oneshot::channel();

let request = serde_json::from_str::<JsonRpcRequestUuid>(&text).map_err(|e| e.to_string());

// FIXME: still too nested, needs to be refactored.
let response = match request {
Err(e) => Err(e),
Ok(request) => {
let id = request.id;

match request.method.as_str() {
GET_VERSION_METHOD => Ok(JsonResponse {
id: Some(Value::String(id.to_string())),
result: Value::String(BOLT_SIDECAR_VERSION.clone()),
..Default::default()
}),
GET_METADATA_METHOD => Ok(JsonResponse {
id: Some(Value::String(id.to_string())),
result: serde_json::to_value(self.state.limits).expect("infallible"),
..Default::default()
}),
REQUEST_INCLUSION_METHOD => {
// Parse the inclusion request from the parameters
let inclusion_request = serde_json::from_value::<InclusionRequest>(
request.params.first().cloned().unwrap_or_default(),
)
.map_err(RejectionError::Json)
.inspect_err(|err| error!(?err, "Failed to parse inclusion request"));

match inclusion_request {
Err(e) => Ok(JsonResponse {
id: Some(Value::String(id.to_string())),
error: Some(e.into()),
..Default::default()
}),
Ok(inclusion_request) => {
let commitment_request =
CommitmentRequest::Inclusion(inclusion_request);

let commitment_event =
CommitmentEvent { request: commitment_request, response: tx };

if let Err(e) = self.api_events_tx.try_send(commitment_event) {
error!(?e, "failed to send commitment event through channel");
// NOTE: should we return an internal error to the RPC
// here?
return;
}

// add the pending response to self buffer for later processing
self.pending_commitment_responses
.push(PendingCommitmentResponse::new(rx, id));

return;
}
}
}
other => Err(format!("unsupported method: {}", other)),
}
let request = match serde_json::from_str::<JsonRpcRequestUuid>(&text) {
Ok(req) => req,
Err(e) => {
warn!(?e, ?rpc_url, "failed to parse JSON-RPC request");
return;
}
};

match response {
Ok(json_response) => {
let message = Message::text(
serde_json::to_string(&json_response).expect("to stringify version response"),
);
let id = request.id;

// Push the message to the outgoing messages queue for later
// processing
self.outgoing_messages.push_back(message);
match request.method.as_str() {
GET_VERSION_METHOD => {
let response =
JsonRpcSuccessResponse::new(Value::String(BOLT_SIDECAR_VERSION.clone()))
.with_uuid(id)
.into();
self.send_response(response);
}
Err(err) => {
warn!(?err, ?rpc_url, "failed to parse JSON-RPC request");
GET_METADATA_METHOD => {
let response =
JsonRpcSuccessResponse::new(json!(self.state.limits)).with_uuid(id).into();
self.send_response(response);
}
}
REQUEST_INCLUSION_METHOD => {
let Some(param) = request.params.first().cloned() else {
let response: JsonRpcResponse = JsonRpcErrorResponse::new(
CommitmentError::InvalidParams("missing inclusion request".into()).into(),
)
.with_uuid(id)
.into();
self.send_response(response);
return;
};

let inclusion_request = match serde_json::from_value::<InclusionRequest>(param) {
Ok(req) => req,
Err(e) => {
let msg = format!("failed to parse inclusion request: {}", e);
error!(?e, "failed to parse inclusion request");
let response: JsonRpcResponse =
JsonRpcErrorResponse::new(CommitmentError::InvalidParams(msg).into())
.with_uuid(id)
.into();
self.send_response(response);
return;
}
};

let commitment_request = CommitmentRequest::Inclusion(inclusion_request);
let commitment_event =
CommitmentEvent { request: commitment_request, response: tx };

if let Err(e) = self.api_events_tx.try_send(commitment_event) {
error!(?e, "failed to send commitment event through channel");
let response: JsonRpcResponse =
JsonRpcErrorResponse::new(CommitmentError::Internal.into())
.with_uuid(id)
.into();
self.send_response(response);
return;
}

// Push the pending commitment response to the queue
self.pending_commitment_responses.push(PendingCommitmentResponse::new(rx, id));
}
other => {
warn!(?rpc_url, "unsupported method: {}", other);
}
};
}

fn send_response<T: Serialize>(&mut self, response: JsonRpcResponse<T>) {
let message =
Message::text(serde_json::to_string(&response).expect("to stringify response"));
self.outgoing_messages.push_back(message);
}
}
42 changes: 23 additions & 19 deletions bolt-sidecar/src/api/commitments/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ use axum::{
};
use axum_extra::extract::WithRejection;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::json;
use tracing::{debug, error, info, instrument};

use crate::{
api::commitments::{
server::headers::auth_from_headers,
spec::{
CommitmentError, CommitmentsApi, RejectionError, GET_METADATA_METHOD,
GET_VERSION_METHOD, REQUEST_INCLUSION_METHOD,
CommitmentError, CommitmentsApi, GET_METADATA_METHOD, GET_VERSION_METHOD,
REQUEST_INCLUSION_METHOD,
},
},
common::BOLT_SIDECAR_VERSION,
config::limits::LimitsOpts,
primitives::{
jsonrpc::{JsonResponse, JsonRpcRequest},
jsonrpc::{JsonRpcRequest, JsonRpcResponse, JsonRpcSuccessResponse},
signature::SignatureError,
InclusionRequest,
},
Expand All @@ -48,28 +48,31 @@ pub async fn rpc_entrypoint(
headers: HeaderMap,
State(api): State<Arc<CommitmentsApiInner>>,
WithRejection(Json(payload), _): WithRejection<Json<JsonRpcRequest>, CommitmentError>,
) -> Result<Json<JsonResponse>, CommitmentError> {
) -> Result<Json<JsonRpcResponse>, CommitmentError> {
debug!("Received new request");

match payload.method.as_str() {
GET_VERSION_METHOD => Ok(Json(JsonResponse {
id: payload.id,
result: Value::String(BOLT_SIDECAR_VERSION.clone()),
..Default::default()
})),
GET_VERSION_METHOD => Ok(Json(
JsonRpcSuccessResponse {
id: payload.id,
result: json!(BOLT_SIDECAR_VERSION.to_string()),
..Default::default()
}
.into(),
)),

GET_METADATA_METHOD => {
let metadata = MetadataResponse {
limits: api.limits(),
version: BOLT_SIDECAR_VERSION.to_string(),
};

let response = JsonResponse {
let response = JsonRpcSuccessResponse {
id: payload.id,
result: serde_json::to_value(metadata)
.expect("infallible - metadata only contains primitive types"),
result: json!(metadata),
..Default::default()
};
}
.into();
Ok(Json(response))
}

Expand All @@ -80,12 +83,12 @@ pub async fn rpc_entrypoint(
})?;

let Some(request_json) = payload.params.first().cloned() else {
return Err(RejectionError::ValidationFailed("Bad params".to_string()).into());
return Err(CommitmentError::InvalidParams("missing param".to_string()));
};

// Parse the inclusion request from the parameters
let mut inclusion_request = serde_json::from_value::<InclusionRequest>(request_json)
.map_err(RejectionError::Json)
.map_err(CommitmentError::InvalidJson)
.inspect_err(|err| error!(?err, "Failed to parse inclusion request"))?;

debug!(?inclusion_request, "New inclusion request");
Expand Down Expand Up @@ -113,11 +116,12 @@ pub async fn rpc_entrypoint(
let inclusion_commitment = api.request_inclusion(inclusion_request).await?;

// Create the JSON-RPC response
let response = JsonResponse {
let response = JsonRpcSuccessResponse {
id: payload.id,
result: serde_json::to_value(inclusion_commitment).expect("infallible"),
result: json!(inclusion_commitment),
..Default::default()
};
}
.into();

Ok(Json(response))
}
Expand Down
21 changes: 13 additions & 8 deletions bolt-sidecar/src/api/commitments/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,16 @@ fn make_router(state: Arc<CommitmentsApiInner>) -> Router {

#[cfg(test)]
mod test {
use crate::{api::commitments::spec::SIGNATURE_HEADER, common::BOLT_SIDECAR_VERSION};
use crate::{
api::commitments::spec::SIGNATURE_HEADER, common::BOLT_SIDECAR_VERSION,
primitives::jsonrpc::JsonRpcError,
};
use alloy::signers::{k256::SecretKey, local::PrivateKeySigner};
use handlers::MetadataResponse;
use serde_json::json;

use crate::{
primitives::{jsonrpc::JsonResponse, signature::ECDSASignatureExt},
primitives::{jsonrpc::JsonRpcResponse, signature::ECDSASignatureExt},
test_util::{create_signed_inclusion_request, default_test_transaction},
};

Expand Down Expand Up @@ -223,12 +226,13 @@ mod test {
.send()
.await
.unwrap()
.json::<JsonResponse>()
.json::<JsonRpcResponse>()
.await
.unwrap();

// Assert unauthorized because of missing signature
assert_eq!(response.error.unwrap().code, -32003);
let expected_error: JsonRpcError = CommitmentError::NoSignature.into();
assert_eq!(response.into_error().unwrap().code(), expected_error.code);
}

#[tokio::test]
Expand Down Expand Up @@ -272,10 +276,10 @@ mod test {
.await
.unwrap();

let json = response.json::<JsonResponse>().await.unwrap();
let json = response.json::<JsonRpcResponse>().await.unwrap();

// Assert unauthorized because of missing signature
assert!(json.error.is_none());
assert!(json.into_success().is_some());

let _ = tx.send(());
});
Expand Down Expand Up @@ -316,11 +320,12 @@ mod test {
.send()
.await
.unwrap()
.json::<JsonResponse>()
.json::<JsonRpcResponse>()
.await
.unwrap();

let metadata: MetadataResponse = serde_json::from_value(response.result).unwrap();
let metadata: MetadataResponse =
serde_json::from_value(response.into_success().unwrap().result).unwrap();

assert_eq!(
metadata.limits.max_committed_gas_per_slot,
Expand Down
Loading