Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/optimism/flashblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ tracing.workspace = true
eyre.workspace = true

[dev-dependencies]
test-case.workspace = true
117 changes: 114 additions & 3 deletions crates/optimism/flashblocks/src/ws/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
{
fn connect(&mut self) {
let ws_url = self.ws_url.clone();
let connector = self.connector.clone();
let mut connector = self.connector.clone();

Pin::new(&mut self.connect).set(Box::pin(async move { connector.connect(ws_url).await }));

Expand Down Expand Up @@ -154,7 +154,7 @@ pub trait WsConnect {
///
/// See the [`WsConnect`] documentation for details.
fn connect(
&self,
&mut self,
ws_url: Url,
) -> impl Future<Output = eyre::Result<Self::Stream>> + Send + Sync;
}
Expand All @@ -168,9 +168,120 @@ pub struct WsConnector;
impl WsConnect for WsConnector {
type Stream = WssStream;

async fn connect(&self, ws_url: Url) -> eyre::Result<WssStream> {
async fn connect(&mut self, ws_url: Url) -> eyre::Result<WssStream> {
let (stream, _response) = connect_async(ws_url.as_str()).await?;

Ok(stream.split().1)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::ExecutionPayloadBaseV1;
use alloy_primitives::bytes::Bytes;
use brotli::enc::BrotliEncoderParams;
use std::future;
use tokio_tungstenite::tungstenite::Error;

/// A `FakeConnector` creates [`FakeStream`].
///
/// It simulates the websocket stream instead of connecting to a real websocket.
#[derive(Clone)]
struct FakeConnector(FakeStream);

/// Simulates a websocket stream while using a preprogrammed set of messages instead.
#[derive(Default)]
struct FakeStream(Vec<Result<Message, Error>>);

impl Clone for FakeStream {
fn clone(&self) -> Self {
Self(
self.0
.iter()
.map(|v| match v {
Ok(msg) => Ok(msg.clone()),
Err(err) => unimplemented!("Cannot clone this error: {err}"),
})
.collect(),
)
}
}

impl Stream for FakeStream {
type Item = Result<Message, Error>;

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

Poll::Ready(this.0.pop())
}
}

impl WsConnect for FakeConnector {
type Stream = FakeStream;

fn connect(
&mut self,
_ws_url: Url,
) -> impl Future<Output = eyre::Result<Self::Stream>> + Send + Sync {
future::ready(Ok(self.0.clone()))
}
}

impl<T: IntoIterator<Item = Result<Message, Error>>> From<T> for FakeConnector {
fn from(value: T) -> Self {
Self(FakeStream(value.into_iter().collect()))
}
}

fn to_json_message(block: &FlashBlock) -> Result<Message, Error> {
Ok(Message::Binary(Bytes::from(serde_json::to_vec(block).unwrap())))
}

fn to_brotli_message(block: &FlashBlock) -> Result<Message, Error> {
let json = serde_json::to_vec(block).unwrap();
let mut compressed = Vec::new();
brotli::BrotliCompress(
&mut json.as_slice(),
&mut compressed,
&BrotliEncoderParams::default(),
)?;

Ok(Message::Binary(Bytes::from(compressed)))
}

#[test_case::test_case(to_json_message; "json")]
#[test_case::test_case(to_brotli_message; "brotli")]
#[tokio::test]
async fn test_stream_decodes_messages_successfully(
to_message: impl Fn(&FlashBlock) -> Result<Message, Error>,
) {
let flashblocks = [FlashBlock {
payload_id: Default::default(),
index: 0,
base: Some(ExecutionPayloadBaseV1 {
parent_beacon_block_root: Default::default(),
parent_hash: Default::default(),
fee_recipient: Default::default(),
prev_randao: Default::default(),
block_number: 0,
gas_limit: 0,
timestamp: 0,
extra_data: Default::default(),
base_fee_per_gas: Default::default(),
}),
diff: Default::default(),
metadata: Default::default(),
}];

let messages = FakeConnector::from(flashblocks.iter().map(to_message));
let ws_url = "http://localhost".parse().unwrap();
let stream = WsFlashBlockStream::with_connector(ws_url, messages);

let actual_messages: Vec<_> = stream.map(Result::unwrap).collect().await;
let expected_messages = flashblocks.to_vec();

assert_eq!(actual_messages, expected_messages);
}
}
Loading