Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ where
cancel,
} = self;

tracing::debug!("flashblocks payload handler started");
tracing::info!(target: "payload_builder", "flashblocks payload handler started");

loop {
tokio::select! {
Some(payload) = built_rx.recv() => {
if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) {
warn!(e = ?e, "failed to send BuiltPayload event");
warn!(target: "payload_builder", e = ?e, "failed to send BuiltPayload event");
}
// ignore error here; if p2p was disabled, the channel will be closed.
let _ = p2p_tx.send(payload.into()).await;
Expand All @@ -113,13 +113,13 @@ where
);
match res {
Ok((payload, _)) => {
tracing::info!(hash = payload.block().hash().to_string(), block_number = payload.block().header().number, "successfully executed received flashblock");
tracing::info!(target: "payload_builder", hash = payload.block().hash().to_string(), block_number = payload.block().header().number, "successfully executed external received flashblock");
if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload)) {
warn!(e = ?e, "failed to send BuiltPayload event on synced block");
warn!(target: "payload_builder", e = ?e, "failed to send BuiltPayload event on synced block");
}
}
Err(e) => {
tracing::error!(error = ?e, "failed to execute received flashblock");
tracing::error!(target: "payload_builder", error = ?e, "failed to execute external received flashblock");
}
}
});
Expand Down Expand Up @@ -147,7 +147,7 @@ where

let start = tokio::time::Instant::now();

tracing::info!(header = ?payload.block().header(), "executing flashblock");
tracing::info!(target: "payload_builder", header = ?payload.block().header(), "executing external flashblock");

let mut cached_reads = reth::revm::cached::CachedReads::default();
let parent_hash = payload.block().sealed_header().parent_hash;
Expand Down Expand Up @@ -202,7 +202,7 @@ where
.is_jovian_active_at_timestamp(timestamp)
{
if extra_data.len() != 17 {
tracing::debug!(len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock for jovian fork");
tracing::trace!(target: "payload_builder", len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock for jovian fork");
bail!("extra data length should be 17 bytes");
}
let eip_1559_params = extra_data[1..9].try_into().ok();
Expand All @@ -213,13 +213,13 @@ where
(eip_1559_params, Some(min_base_fee))
} else if chain_spec.is_holocene_active_at_timestamp(timestamp) {
if extra_data.len() != 9 {
tracing::debug!(len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock for holocene fork");
tracing::trace!(target: "payload_builder", len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock for holocene fork");
bail!("extra data length should be 9 bytes");
}
(extra_data[1..9].try_into().ok(), None)
} else {
if !extra_data.is_empty() {
tracing::debug!(len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock for pre holocene fork");
tracing::trace!(target: "payload_builder", len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock for pre holocene fork");
bail!("extra data length should be 0 bytes");
}
(None, None)
Expand Down Expand Up @@ -295,7 +295,7 @@ where

builder_ctx.metrics.block_synced_success.increment(1);

tracing::info!(header = ?built_payload.block().header(), "successfully executed flashblock");
tracing::info!(target: "payload_builder", header = ?built_payload.block().header(), "successfully executed external flashblock");
Ok((built_payload, fb_payload))
}

Expand Down
6 changes: 3 additions & 3 deletions crates/op-rbuilder/src/builders/flashblocks/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl FlashblocksServiceBuilder {
tracing::error!(error = %e, "p2p node exited");
}
});
tracing::info!(multiaddrs = ?multiaddrs, "flashblocks p2p node started");
tracing::info!(target: "payload_builder", multiaddrs = ?multiaddrs, "flashblocks p2p node started");

let incoming_message_rx = incoming_message_rxs
.remove(&FLASHBLOCKS_STREAM_PROTOCOL)
Expand Down Expand Up @@ -182,7 +182,7 @@ impl FlashblocksServiceBuilder {
.clone()
.spawn_metrics_collector(Duration::from_secs(1));

tracing::info!("Flashblocks payload builder service started");
tracing::info!(target: "payload_builder", "Flashblocks payload builder service started");
Ok(payload_builder_handle)
}
}
Expand All @@ -207,7 +207,7 @@ where
{
Ok(builder_tx) => Some(builder_tx),
Err(e) => {
tracing::warn!(error = %e, "Failed to bootstrap flashtestations, builder will not include flashtestations txs");
tracing::warn!(target: "payload_builder", error = %e, "Failed to bootstrap flashtestations, builder will not include flashtestations txs");
None
}
}
Expand Down
26 changes: 13 additions & 13 deletions crates/op-rbuilder/src/builders/flashblocks/wspub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio_tungstenite::{
WebSocketStream, accept_async,
tungstenite::{Message, Utf8Bytes},
};
use tracing::{debug, warn};
use tracing::{debug, info, trace, warn};

use crate::{metrics::OpRBuilderMetrics, tokio_metrics::MonitoredTask};

Expand Down Expand Up @@ -90,7 +90,7 @@ impl Drop for WebSocketPublisher {
fn drop(&mut self) {
// Notify the listener loop to terminate
let _ = self.term.send(true);
tracing::info!("WebSocketPublisher dropped, terminating listener loop");
info!(target: "payload_builder", "WebSocketPublisher dropped, terminating listener loop");
}
}

Expand All @@ -112,7 +112,7 @@ async fn listener_loop(
let listen_addr = listener
.local_addr()
.expect("Failed to get local address of listener");
tracing::info!("Flashblocks WebSocketPublisher listening on {listen_addr}");
info!(target: "payload_builder", "Flashblocks WebSocketPublisher listening on {listen_addr}");

let mut term = term;

Expand Down Expand Up @@ -140,17 +140,17 @@ async fn listener_loop(
Ok(stream) => {
tokio::spawn(async move {
subs.fetch_add(1, Ordering::Relaxed);
tracing::debug!("WebSocket connection established with {}", peer_addr);
debug!(target: "payload_builder", "WebSocket connection established with {}", peer_addr);

// Handle the WebSocket connection in a dedicated task
broadcast_loop(stream, metrics, term, receiver_clone, sent).await;

subs.fetch_sub(1, Ordering::Relaxed);
tracing::debug!("WebSocket connection closed for {}", peer_addr);
debug!(target: "payload_builder", "WebSocket connection closed for {}", peer_addr);
});
}
Err(e) => {
warn!("Failed to accept WebSocket connection from {peer_addr}: {e}");
warn!(target: "payload_builder", "Failed to accept WebSocket connection from {peer_addr}: {e}");
}
}
}
Expand Down Expand Up @@ -184,7 +184,7 @@ async fn broadcast_loop(
// Check if the publisher is terminated
_ = term.changed() => {
if *term.borrow() {
tracing::info!("WebSocketPublisher is terminating, closing broadcast loop");
info!(target: "payload_builder", "WebSocketPublisher is terminating, closing broadcast loop");
return;
}
}
Expand All @@ -197,30 +197,30 @@ async fn broadcast_loop(
sent.fetch_add(1, Ordering::Relaxed);
metrics.messages_sent_count.increment(1);

tracing::debug!("Broadcasted payload: {:?}", payload);
trace!(target: "payload_builder", "Broadcasted payload: {:?}", payload);
if let Err(e) = stream.send(Message::Text(payload)).await {
tracing::debug!("Closing flashblocks subscription for {peer_addr}: {e}");
debug!(target: "payload_builder", "Send payload error for flashblocks subscription {peer_addr}: {e}");
break; // Exit the loop if sending fails
}
}
Err(RecvError::Closed) => {
tracing::debug!("Broadcast channel closed, exiting broadcast loop");
debug!(target: "payload_builder", "Broadcast channel closed, exiting broadcast loop");
return;
}
Err(RecvError::Lagged(_)) => {
tracing::warn!("Broadcast channel lagged, some messages were dropped");
warn!(target: "payload_builder", "Broadcast channel lagged, some messages were dropped");
}
},

// Ping-pong handled by tokio_tungstenite when you perform read on the socket
message = stream.next() => if let Some(message) = message { match message {
// We handle only close frame to highlight conn closing
Ok(Message::Close(_)) => {
tracing::info!("Closing frame received, stopping connection for {peer_addr}");
info!(target: "payload_builder", "Closing frame received, stopping connection for {peer_addr}");
break;
}
Err(e) => {
tracing::warn!("Received error. Closing flashblocks subscription for {peer_addr}: {e}");
warn!(target: "payload_builder", "Received error. Closing flashblocks subscription for {peer_addr}: {e}");
break;
}
_ => (),
Expand Down
4 changes: 2 additions & 2 deletions crates/p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl BehaviourEvent {
continue;
}

tracing::debug!("mDNS discovered peer {peer_id} at {multiaddr}");
tracing::debug!(target: "flashblocks-p2p", "mDNS discovered peer {peer_id} at {multiaddr}");
swarm.add_peer_address(peer_id, multiaddr);
swarm.dial(peer_id).unwrap_or_else(|e| {
tracing::error!("failed to dial mDNS discovered peer {peer_id}: {e}")
Expand All @@ -106,7 +106,7 @@ impl BehaviourEvent {
}
mdns::Event::Expired(list) => {
for (peer_id, multiaddr) in list {
tracing::debug!("mDNS expired peer {peer_id} at {multiaddr}");
tracing::debug!(target: "flashblocks-p2p", "mDNS expired peer {peer_id} at {multiaddr}");
}
}
},
Expand Down
28 changes: 14 additions & 14 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ impl<M: Message + 'static> Node<M> {
tokio::select! {
biased;
_ = cancellation_token.cancelled() => {
debug!("cancellation token triggered, shutting down node");
debug!(target: "flashblocks-p2p", "cancellation token triggered, shutting down node");
handles.into_iter().for_each(|h| h.abort());
break Ok(());
}
Some(message) = outgoing_message_rx.recv() => {
let protocol = message.protocol();
debug!("received message to broadcast on protocol {protocol}");
debug!(target: "flashblocks-p2p", "received message to broadcast on protocol {protocol}");
if let Err(e) = outgoing_streams_handler.broadcast_message(message).await {
warn!("failed to broadcast message on protocol {protocol}: {e:?}");
warn!(target: "flashblocks-p2p", "failed to broadcast message on protocol {protocol}: {e:?}");
}
}
event = swarm.select_next_some() => {
Expand All @@ -158,10 +158,10 @@ impl<M: Message + 'static> Node<M> {
address,
..
} => {
debug!("new listen address: {address}");
debug!(target: "flashblocks-p2p", "new listen address: {address}");
}
SwarmEvent::ExternalAddrConfirmed { address } => {
debug!("external address confirmed: {address}");
debug!(target: "flashblocks-p2p", "external address confirmed: {address}");
}
SwarmEvent::ConnectionEstablished {
peer_id,
Expand All @@ -170,7 +170,7 @@ impl<M: Message + 'static> Node<M> {
} => {
// when a new connection is established, open outbound streams for each protocol
// and add them to the outgoing streams handler.
debug!("connection established with peer {peer_id}");
debug!(target: "flashblocks-p2p", "fb p2p connection established with peer {peer_id}");
if !outgoing_streams_handler.has_peer(&peer_id) {
for protocol in &protocols {
match swarm
Expand All @@ -180,10 +180,10 @@ impl<M: Message + 'static> Node<M> {
.await
{
Ok(stream) => { outgoing_streams_handler.insert_peer_and_stream(peer_id, protocol.clone(), stream);
debug!("opened outbound stream with peer {peer_id} with protocol {protocol} on connection {connection_id}");
debug!(target: "flashblocks-p2p", "opened outbound stream with peer {peer_id} with protocol {protocol} on connection {connection_id}");
}
Err(e) => {
warn!("failed to open stream with peer {peer_id} on connection {connection_id}: {e:?}");
warn!(target: "flashblocks-p2p", "failed to open stream with peer {peer_id} on connection {connection_id}: {e:?}");
}
}
}
Expand All @@ -194,7 +194,7 @@ impl<M: Message + 'static> Node<M> {
cause,
..
} => {
debug!("connection closed with peer {peer_id}: {cause:?}");
debug!(target: "flashblocks-p2p", "connection closed with peer {peer_id}: {cause:?}");
outgoing_streams_handler.remove_peer(&peer_id);
}
SwarmEvent::Behaviour(event) => event.handle(&mut swarm),
Expand Down Expand Up @@ -434,21 +434,21 @@ impl<M: Message + 'static> IncomingStreamsHandler<M> {
loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
debug!("cancellation token triggered, shutting down incoming streams handler for protocol {protocol}");
debug!(target: "flashblocks-p2p", "cancellation token triggered, shutting down incoming streams handler for protocol {protocol}");
return;
}
Some((from, stream)) = incoming.next() => {
debug!("new incoming stream on protocol {protocol} from peer {from}");
debug!(target: "flashblocks-p2p", "new incoming stream on protocol {protocol} from peer {from}");
handle_stream_futures.push(tokio::spawn(handle_incoming_stream(from, stream, tx.clone())));
}
Some(res) = handle_stream_futures.next() => {
match res {
Ok(Ok(())) => {}
Ok(Err(e)) => {
warn!("error handling incoming stream: {e:?}");
warn!(target: "flashblocks-p2p", "error handling incoming stream: {e:?}");
}
Err(e) => {
warn!("task handling incoming stream panicked: {e:?}");
warn!(target: "flashblocks-p2p", "task handling incoming stream panicked: {e:?}");
}
}
}
Expand All @@ -475,7 +475,7 @@ async fn handle_incoming_stream<M: Message>(
match res {
Ok(str) => {
let payload = M::from_str(&str).wrap_err("failed to decode stream message")?;
debug!("got message from peer {peer_id}: {payload:?}");
debug!(target: "flashblocks-p2p", "got message from peer {peer_id}: {payload:?}");
let _ = payload_tx.send(payload).await;
}
Err(e) => {
Expand Down
5 changes: 3 additions & 2 deletions crates/p2p/src/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl StreamsHandler {
.get_mut(&peer)
.expect("stream map must exist for peer");
let Some(stream) = protocol_to_stream.remove(&protocol) else {
warn!("no stream for protocol {protocol:?} to peer {peer}");
warn!(target: "flashblocks-p2p", "no stream for protocol {protocol:?} to peer {peer}");
continue;
};
let stream = stream.compat();
Expand Down Expand Up @@ -85,12 +85,13 @@ impl StreamsHandler {
protocol_to_stream.insert(protocol.clone(), stream);
}
Err(e) => {
warn!("failed to send payload to peer: {e:?}");
warn!(target: "flashblocks-p2p", "failed to send payload to peer: {e:?}");
}
}
}

debug!(
target: "flashblocks-p2p",
"broadcasted message to {} peers",
self.peers_to_stream.len()
);
Expand Down
Loading