diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 1cc20bb8e..b56a49596 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -82,13 +82,13 @@ where cancel, } = self; - tracing::debug!("flashblocks payload handler started"); + tracing::info!(target: "payload_builder", "flashblocks payload handler started"); loop { tokio::select! { Some(payload) = built_rx.recv() => { if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { - warn!(e = ?e, "failed to send BuiltPayload event"); + warn!(target: "payload_builder", e = ?e, "failed to send BuiltPayload event"); } // ignore error here; if p2p was disabled, the channel will be closed. let _ = p2p_tx.send(payload.into()).await; @@ -113,13 +113,13 @@ where ); match res { Ok((payload, _)) => { - tracing::info!(hash = payload.block().hash().to_string(), block_number = payload.block().header().number, "successfully executed received flashblock"); + tracing::info!(target: "payload_builder", hash = payload.block().hash().to_string(), block_number = payload.block().header().number, "successfully executed external received flashblock"); if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload)) { - warn!(e = ?e, "failed to send BuiltPayload event on synced block"); + warn!(target: "payload_builder", e = ?e, "failed to send BuiltPayload event on synced block"); } } Err(e) => { - tracing::error!(error = ?e, "failed to execute received flashblock"); + tracing::error!(target: "payload_builder", error = ?e, "failed to execute external received flashblock"); } } }); @@ -147,7 +147,7 @@ where let start = tokio::time::Instant::now(); - tracing::info!(header = ?payload.block().header(), "executing flashblock"); + tracing::info!(target: "payload_builder", header = ?payload.block().header(), "executing external flashblock"); let mut cached_reads = reth::revm::cached::CachedReads::default(); let parent_hash = payload.block().sealed_header().parent_hash; @@ -202,7 +202,7 @@ where .is_jovian_active_at_timestamp(timestamp) { if extra_data.len() != 17 { - tracing::debug!(len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock for jovian fork"); + tracing::trace!(target: "payload_builder", len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock for jovian fork"); bail!("extra data length should be 17 bytes"); } let eip_1559_params = extra_data[1..9].try_into().ok(); @@ -213,13 +213,13 @@ where (eip_1559_params, Some(min_base_fee)) } else if chain_spec.is_holocene_active_at_timestamp(timestamp) { if extra_data.len() != 9 { - tracing::debug!(len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock for holocene fork"); + tracing::trace!(target: "payload_builder", len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock for holocene fork"); bail!("extra data length should be 9 bytes"); } (extra_data[1..9].try_into().ok(), None) } else { if !extra_data.is_empty() { - tracing::debug!(len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock for pre holocene fork"); + tracing::trace!(target: "payload_builder", len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock for pre holocene fork"); bail!("extra data length should be 0 bytes"); } (None, None) @@ -295,7 +295,7 @@ where builder_ctx.metrics.block_synced_success.increment(1); - tracing::info!(header = ?built_payload.block().header(), "successfully executed flashblock"); + tracing::info!(target: "payload_builder", header = ?built_payload.block().header(), "successfully executed external flashblock"); Ok((built_payload, fb_payload)) } diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index 32a366596..256d2de59 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -94,7 +94,7 @@ impl FlashblocksServiceBuilder { tracing::error!(error = %e, "p2p node exited"); } }); - tracing::info!(multiaddrs = ?multiaddrs, "flashblocks p2p node started"); + tracing::info!(target: "payload_builder", multiaddrs = ?multiaddrs, "flashblocks p2p node started"); let incoming_message_rx = incoming_message_rxs .remove(&FLASHBLOCKS_STREAM_PROTOCOL) @@ -182,7 +182,7 @@ impl FlashblocksServiceBuilder { .clone() .spawn_metrics_collector(Duration::from_secs(1)); - tracing::info!("Flashblocks payload builder service started"); + tracing::info!(target: "payload_builder", "Flashblocks payload builder service started"); Ok(payload_builder_handle) } } @@ -207,7 +207,7 @@ where { Ok(builder_tx) => Some(builder_tx), Err(e) => { - tracing::warn!(error = %e, "Failed to bootstrap flashtestations, builder will not include flashtestations txs"); + tracing::warn!(target: "payload_builder", error = %e, "Failed to bootstrap flashtestations, builder will not include flashtestations txs"); None } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs index c733aa58d..3f86c9079 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs @@ -18,7 +18,7 @@ use tokio_tungstenite::{ WebSocketStream, accept_async, tungstenite::{Message, Utf8Bytes}, }; -use tracing::{debug, warn}; +use tracing::{debug, info, trace, warn}; use crate::{metrics::OpRBuilderMetrics, tokio_metrics::MonitoredTask}; @@ -90,7 +90,7 @@ impl Drop for WebSocketPublisher { fn drop(&mut self) { // Notify the listener loop to terminate let _ = self.term.send(true); - tracing::info!("WebSocketPublisher dropped, terminating listener loop"); + info!(target: "payload_builder", "WebSocketPublisher dropped, terminating listener loop"); } } @@ -112,7 +112,7 @@ async fn listener_loop( let listen_addr = listener .local_addr() .expect("Failed to get local address of listener"); - tracing::info!("Flashblocks WebSocketPublisher listening on {listen_addr}"); + info!(target: "payload_builder", "Flashblocks WebSocketPublisher listening on {listen_addr}"); let mut term = term; @@ -140,17 +140,17 @@ async fn listener_loop( Ok(stream) => { tokio::spawn(async move { subs.fetch_add(1, Ordering::Relaxed); - tracing::debug!("WebSocket connection established with {}", peer_addr); + debug!(target: "payload_builder", "WebSocket connection established with {}", peer_addr); // Handle the WebSocket connection in a dedicated task broadcast_loop(stream, metrics, term, receiver_clone, sent).await; subs.fetch_sub(1, Ordering::Relaxed); - tracing::debug!("WebSocket connection closed for {}", peer_addr); + debug!(target: "payload_builder", "WebSocket connection closed for {}", peer_addr); }); } Err(e) => { - warn!("Failed to accept WebSocket connection from {peer_addr}: {e}"); + warn!(target: "payload_builder", "Failed to accept WebSocket connection from {peer_addr}: {e}"); } } } @@ -184,7 +184,7 @@ async fn broadcast_loop( // Check if the publisher is terminated _ = term.changed() => { if *term.borrow() { - tracing::info!("WebSocketPublisher is terminating, closing broadcast loop"); + info!(target: "payload_builder", "WebSocketPublisher is terminating, closing broadcast loop"); return; } } @@ -197,18 +197,18 @@ async fn broadcast_loop( sent.fetch_add(1, Ordering::Relaxed); metrics.messages_sent_count.increment(1); - tracing::debug!("Broadcasted payload: {:?}", payload); + trace!(target: "payload_builder", "Broadcasted payload: {:?}", payload); if let Err(e) = stream.send(Message::Text(payload)).await { - tracing::debug!("Closing flashblocks subscription for {peer_addr}: {e}"); + debug!(target: "payload_builder", "Send payload error for flashblocks subscription {peer_addr}: {e}"); break; // Exit the loop if sending fails } } Err(RecvError::Closed) => { - tracing::debug!("Broadcast channel closed, exiting broadcast loop"); + debug!(target: "payload_builder", "Broadcast channel closed, exiting broadcast loop"); return; } Err(RecvError::Lagged(_)) => { - tracing::warn!("Broadcast channel lagged, some messages were dropped"); + warn!(target: "payload_builder", "Broadcast channel lagged, some messages were dropped"); } }, @@ -216,11 +216,11 @@ async fn broadcast_loop( message = stream.next() => if let Some(message) = message { match message { // We handle only close frame to highlight conn closing Ok(Message::Close(_)) => { - tracing::info!("Closing frame received, stopping connection for {peer_addr}"); + info!(target: "payload_builder", "Closing frame received, stopping connection for {peer_addr}"); break; } Err(e) => { - tracing::warn!("Received error. Closing flashblocks subscription for {peer_addr}: {e}"); + warn!(target: "payload_builder", "Received error. Closing flashblocks subscription for {peer_addr}: {e}"); break; } _ => (), diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index bedfb78ec..a1d727b81 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -97,7 +97,7 @@ impl BehaviourEvent { continue; } - tracing::debug!("mDNS discovered peer {peer_id} at {multiaddr}"); + tracing::debug!(target: "flashblocks-p2p", "mDNS discovered peer {peer_id} at {multiaddr}"); swarm.add_peer_address(peer_id, multiaddr); swarm.dial(peer_id).unwrap_or_else(|e| { tracing::error!("failed to dial mDNS discovered peer {peer_id}: {e}") @@ -106,7 +106,7 @@ impl BehaviourEvent { } mdns::Event::Expired(list) => { for (peer_id, multiaddr) in list { - tracing::debug!("mDNS expired peer {peer_id} at {multiaddr}"); + tracing::debug!(target: "flashblocks-p2p", "mDNS expired peer {peer_id} at {multiaddr}"); } } }, diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 757be3b25..898930505 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -141,15 +141,15 @@ impl Node { tokio::select! { biased; _ = cancellation_token.cancelled() => { - debug!("cancellation token triggered, shutting down node"); + debug!(target: "flashblocks-p2p", "cancellation token triggered, shutting down node"); handles.into_iter().for_each(|h| h.abort()); break Ok(()); } Some(message) = outgoing_message_rx.recv() => { let protocol = message.protocol(); - debug!("received message to broadcast on protocol {protocol}"); + debug!(target: "flashblocks-p2p", "received message to broadcast on protocol {protocol}"); if let Err(e) = outgoing_streams_handler.broadcast_message(message).await { - warn!("failed to broadcast message on protocol {protocol}: {e:?}"); + warn!(target: "flashblocks-p2p", "failed to broadcast message on protocol {protocol}: {e:?}"); } } event = swarm.select_next_some() => { @@ -158,10 +158,10 @@ impl Node { address, .. } => { - debug!("new listen address: {address}"); + debug!(target: "flashblocks-p2p", "new listen address: {address}"); } SwarmEvent::ExternalAddrConfirmed { address } => { - debug!("external address confirmed: {address}"); + debug!(target: "flashblocks-p2p", "external address confirmed: {address}"); } SwarmEvent::ConnectionEstablished { peer_id, @@ -170,7 +170,7 @@ impl Node { } => { // when a new connection is established, open outbound streams for each protocol // and add them to the outgoing streams handler. - debug!("connection established with peer {peer_id}"); + debug!(target: "flashblocks-p2p", "fb p2p connection established with peer {peer_id}"); if !outgoing_streams_handler.has_peer(&peer_id) { for protocol in &protocols { match swarm @@ -180,10 +180,10 @@ impl Node { .await { Ok(stream) => { outgoing_streams_handler.insert_peer_and_stream(peer_id, protocol.clone(), stream); - debug!("opened outbound stream with peer {peer_id} with protocol {protocol} on connection {connection_id}"); + debug!(target: "flashblocks-p2p", "opened outbound stream with peer {peer_id} with protocol {protocol} on connection {connection_id}"); } Err(e) => { - warn!("failed to open stream with peer {peer_id} on connection {connection_id}: {e:?}"); + warn!(target: "flashblocks-p2p", "failed to open stream with peer {peer_id} on connection {connection_id}: {e:?}"); } } } @@ -194,7 +194,7 @@ impl Node { cause, .. } => { - debug!("connection closed with peer {peer_id}: {cause:?}"); + debug!(target: "flashblocks-p2p", "connection closed with peer {peer_id}: {cause:?}"); outgoing_streams_handler.remove_peer(&peer_id); } SwarmEvent::Behaviour(event) => event.handle(&mut swarm), @@ -434,21 +434,21 @@ impl IncomingStreamsHandler { loop { tokio::select! { _ = cancellation_token.cancelled() => { - debug!("cancellation token triggered, shutting down incoming streams handler for protocol {protocol}"); + debug!(target: "flashblocks-p2p", "cancellation token triggered, shutting down incoming streams handler for protocol {protocol}"); return; } Some((from, stream)) = incoming.next() => { - debug!("new incoming stream on protocol {protocol} from peer {from}"); + debug!(target: "flashblocks-p2p", "new incoming stream on protocol {protocol} from peer {from}"); handle_stream_futures.push(tokio::spawn(handle_incoming_stream(from, stream, tx.clone()))); } Some(res) = handle_stream_futures.next() => { match res { Ok(Ok(())) => {} Ok(Err(e)) => { - warn!("error handling incoming stream: {e:?}"); + warn!(target: "flashblocks-p2p", "error handling incoming stream: {e:?}"); } Err(e) => { - warn!("task handling incoming stream panicked: {e:?}"); + warn!(target: "flashblocks-p2p", "task handling incoming stream panicked: {e:?}"); } } } @@ -475,7 +475,7 @@ async fn handle_incoming_stream( match res { Ok(str) => { let payload = M::from_str(&str).wrap_err("failed to decode stream message")?; - debug!("got message from peer {peer_id}: {payload:?}"); + debug!(target: "flashblocks-p2p", "got message from peer {peer_id}: {payload:?}"); let _ = payload_tx.send(payload).await; } Err(e) => { diff --git a/crates/p2p/src/outgoing.rs b/crates/p2p/src/outgoing.rs index 2440e0f72..40abac1cb 100644 --- a/crates/p2p/src/outgoing.rs +++ b/crates/p2p/src/outgoing.rs @@ -56,7 +56,7 @@ impl StreamsHandler { .get_mut(&peer) .expect("stream map must exist for peer"); let Some(stream) = protocol_to_stream.remove(&protocol) else { - warn!("no stream for protocol {protocol:?} to peer {peer}"); + warn!(target: "flashblocks-p2p", "no stream for protocol {protocol:?} to peer {peer}"); continue; }; let stream = stream.compat(); @@ -85,12 +85,13 @@ impl StreamsHandler { protocol_to_stream.insert(protocol.clone(), stream); } Err(e) => { - warn!("failed to send payload to peer: {e:?}"); + warn!(target: "flashblocks-p2p", "failed to send payload to peer: {e:?}"); } } } debug!( + target: "flashblocks-p2p", "broadcasted message to {} peers", self.peers_to_stream.len() );