From 447929b6b2b4f67515256202518de4496fa635b9 Mon Sep 17 00:00:00 2001 From: klkvr Date: Tue, 14 Apr 2026 02:29:35 +0400 Subject: [PATCH 1/7] feat: add escape hatch for fallback decoding --- src/config.rs | 25 ++++++++++++++++++++++++- src/handler/mod.rs | 19 ++++++++++++++----- src/handler/tests.rs | 1 + src/ipmode.rs | 6 ++++++ src/socket/mod.rs | 26 +++++++++++++++++++++++--- src/socket/recv.rs | 15 +++++++++++++-- 6 files changed, 81 insertions(+), 11 deletions(-) diff --git a/src/config.rs b/src/config.rs index 493ece237..5cd0eb417 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,7 +3,14 @@ use crate::{ kbucket::MAX_NODES_PER_BUCKET, socket::ListenConfig, Enr, Executor, PermitBanList, ProtocolIdentity, RateLimiter, RateLimiterBuilder, }; -use std::time::Duration; +use std::{future::Future, net::SocketAddr, pin::Pin, sync::Arc, time::Duration}; + +/// Callback type for handling packets that fail to decode. +/// The callback receives the raw packet bytes and source address, and returns a future +/// that is awaited before processing the next packet. +pub type OnDecodeFailure = Arc< + dyn Fn(&[u8], SocketAddr) -> Pin + Send + '_>> + Send + Sync, +>; /// Configuration parameters that define the performance of the discovery network. #[derive(Clone)] @@ -113,6 +120,11 @@ pub struct Config { /// The protocol identity to use in network messages. pub protocol_identity: ProtocolIdentity, + + /// Optional callback for handling packets that fail to decode. This can be used to forward + /// undecoded packets to another protocol handler. The returned future is awaited, providing + /// backpressure if the handler is slow. + pub on_decode_failure: Option, } #[derive(Debug)] @@ -160,6 +172,7 @@ impl ConfigBuilder { executor: None, listen_config, protocol_identity: ProtocolIdentity::default(), + on_decode_failure: None, }; ConfigBuilder { config } @@ -344,6 +357,16 @@ impl ConfigBuilder { self } + /// Sets a callback invoked with the raw packet bytes and source address when a packet fails + /// to decode. This can be used to forward undecoded packets to another protocol handler. + pub fn on_decode_failure( + &mut self, + callback: OnDecodeFailure, + ) -> &mut Self { + self.config.on_decode_failure = Some(callback); + self + } + pub fn build(&mut self) -> Config { // If an executor is not provided, assume a current tokio runtime is running. if self.config.executor.is_none() { diff --git a/src/handler/mod.rs b/src/handler/mod.rs index f32727618..7be17b86e 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -261,17 +261,25 @@ impl Handler { }; let mut listen_sockets = SmallVec::default(); - match config.listen_config { - ListenConfig::Ipv4 { ip, port } => listen_sockets.push((ip, port).into()), - ListenConfig::Ipv6 { ip, port } => listen_sockets.push((ip, port).into()), + match &config.listen_config { + ListenConfig::Ipv4 { ip, port } => listen_sockets.push((*ip, *port).into()), + ListenConfig::Ipv6 { ip, port } => listen_sockets.push((*ip, *port).into()), ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port, } => { - listen_sockets.push((ipv4, ipv4_port).into()); - listen_sockets.push((ipv6, ipv6_port).into()); + listen_sockets.push((*ipv4, *ipv4_port).into()); + listen_sockets.push((*ipv6, *ipv6_port).into()); + } + ListenConfig::FromSockets { ipv4, ipv6 } => { + if let Some(s) = ipv4 { + listen_sockets.push(s.local_addr().expect("socket must have local addr")); + } + if let Some(s) = ipv6 { + listen_sockets.push(s.local_addr().expect("socket must have local addr")); + } } }; @@ -283,6 +291,7 @@ impl Handler { protocol_identity: config.protocol_identity, expected_responses: filter_expected_responses.clone(), ban_duration: config.ban_duration, + on_decode_failure: config.on_decode_failure.clone(), }; // Attempt to bind to the socket before spinning up the send/recv tasks. diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 79d2b2e53..5dc6439bc 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -57,6 +57,7 @@ async fn build_handler( expected_responses: filter_expected_responses.clone(), ban_duration: config.ban_duration, protocol_identity: Default::default(), + on_decode_failure: None, } }; diff --git a/src/ipmode.rs b/src/ipmode.rs index bc292d7f9..3614abc47 100644 --- a/src/ipmode.rs +++ b/src/ipmode.rs @@ -29,6 +29,12 @@ impl IpMode { ListenConfig::Ipv4 { .. } => Ip4, ListenConfig::Ipv6 { .. } => Ip6, ListenConfig::DualStack { .. } => DualStack, + ListenConfig::FromSockets { ipv4, ipv6 } => match (ipv4, ipv6) { + (Some(_), Some(_)) => DualStack, + (Some(_), None) => Ip4, + (None, Some(_)) => Ip6, + (None, None) => Ip4, + }, } } diff --git a/src/socket/mod.rs b/src/socket/mod.rs index ff4bbcd4e..52d48126e 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -1,4 +1,4 @@ -use crate::{Executor, ProtocolIdentity}; +use crate::{config::OnDecodeFailure, Executor, ProtocolIdentity}; use parking_lot::RwLock; use recv::*; use send::*; @@ -45,6 +45,11 @@ pub enum ListenConfig { ipv6: Ipv6Addr, ipv6_port: u16, }, + /// Use pre-created UDP sockets. This allows sharing sockets with other protocol handlers. + FromSockets { + ipv4: Option>, + ipv6: Option>, + }, } /// Convenience objects for setting up the recv handler. @@ -63,6 +68,8 @@ pub struct SocketConfig { pub local_node_id: enr::NodeId, /// The protocol identity used in sent and received packets. pub protocol_identity: ProtocolIdentity, + /// Optional callback for handling packets that fail to decode. + pub on_decode_failure: Option, } /// Creates the UDP socket and handles the exit futures for the send/recv UDP handlers. @@ -101,6 +108,7 @@ impl Socket { expected_responses, local_node_id, protocol_identity, + on_decode_failure, } = config; // For recv socket, intentionally forgetting which socket is the ipv4 and which is the ipv6 one. @@ -133,6 +141,17 @@ impl Socket { Some(ipv6_socket), ) } + ListenConfig::FromSockets { ipv4, ipv6 } => match (ipv4, ipv6) { + (Some(v4), Some(v6)) => (v4.clone(), Some(v6.clone()), Some(v4), Some(v6)), + (Some(v4), None) => (v4.clone(), None, Some(v4), None), + (None, Some(v6)) => (v6.clone(), None, None, Some(v6)), + (None, None) => { + return Err(Error::new( + std::io::ErrorKind::InvalidInput, + "At least one socket must be provided", + )) + } + }, }; // spawn the recv handler @@ -145,6 +164,7 @@ impl Socket { protocol_identity, expected_responses, ban_duration, + on_decode_failure, }; let (recv, recv_exit) = RecvHandler::spawn(recv_config); @@ -201,7 +221,7 @@ impl ListenConfig { /// Sets an ipv4 socket. This will override any past ipv4 configuration and will promote the configuration to dual socket if an ipv6 socket is configured. pub fn with_ipv4(self, ip: Ipv4Addr, port: u16) -> ListenConfig { match self { - ListenConfig::Ipv4 { .. } => ListenConfig::Ipv4 { ip, port }, + ListenConfig::Ipv4 { .. } | ListenConfig::FromSockets { .. } => ListenConfig::Ipv4 { ip, port }, ListenConfig::Ipv6 { ip: ipv6, port: ipv6_port, @@ -227,7 +247,7 @@ impl ListenConfig { /// Sets an ipv6 socket. This will override any past ipv6 configuration and will promote the configuration to dual socket if an ipv4 socket is configured. pub fn with_ipv6(self, ip: Ipv6Addr, port: u16) -> ListenConfig { match self { - ListenConfig::Ipv6 { .. } => ListenConfig::Ipv6 { ip, port }, + ListenConfig::Ipv6 { .. } | ListenConfig::FromSockets { .. } => ListenConfig::Ipv6 { ip, port }, ListenConfig::Ipv4 { ip: ipv4, port: ipv4_port, diff --git a/src/socket/recv.rs b/src/socket/recv.rs index 22f83a6b1..2067dc8c2 100644 --- a/src/socket/recv.rs +++ b/src/socket/recv.rs @@ -3,7 +3,7 @@ //! Every UDP packet passes a filter before being processed. use super::filter::{Filter, FilterConfig}; -use crate::{metrics::METRICS, node_info::NodeAddress, packet::*, Executor}; +use crate::{config::OnDecodeFailure, metrics::METRICS, node_info::NodeAddress, packet::*, Executor}; use parking_lot::RwLock; use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use tokio::{ @@ -36,6 +36,9 @@ pub struct RecvHandlerConfig { pub local_node_id: enr::NodeId, pub protocol_identity: ProtocolIdentity, pub expected_responses: Arc>>, + /// Optional callback invoked with the raw packet bytes and source address when a packet fails + /// to decode. This can be used to forward undecoded packets to another protocol handler. + pub on_decode_failure: Option, } /// The main task that handles inbound UDP packets. @@ -56,6 +59,8 @@ pub(crate) struct RecvHandler { protocol_identity: ProtocolIdentity, /// The channel to send the packet handler. handler: mpsc::Sender, + /// Optional callback for packets that fail to decode. + on_decode_failure: Option, /// Exit channel to shutdown the recv handler. exit: oneshot::Receiver<()>, } @@ -75,6 +80,7 @@ impl RecvHandler { local_node_id, protocol_identity, expected_responses, + on_decode_failure, } = config; let filter_enabled = filter_config.enabled; @@ -90,6 +96,7 @@ impl RecvHandler { node_id: local_node_id, protocol_identity, handler, + on_decode_failure, exit, }; @@ -193,7 +200,11 @@ impl RecvHandler { ) { Ok(p) => p, Err(e) => { - debug!(error = ?e, "Packet decoding failed"); // could not decode the packet, drop it + if let Some(cb) = &self.on_decode_failure { + cb(&recv_buffer[..length], src_address).await; + } else { + debug!(error = ?e, "Packet decoding failed"); + } return; } }; From 1e5bc96d412ddb3601f50eb48c8421db1801d4c0 Mon Sep 17 00:00:00 2001 From: klkvr Date: Tue, 14 Apr 2026 22:19:05 +0400 Subject: [PATCH 2/7] use trait --- src/config.rs | 20 +++++--------------- src/lib.rs | 2 +- src/socket/mod.rs | 14 +++++++++----- src/socket/recv.rs | 25 ++++++++++++++++++++----- 4 files changed, 35 insertions(+), 26 deletions(-) diff --git a/src/config.rs b/src/config.rs index 5cd0eb417..343a43814 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,16 +1,9 @@ //! A set of configuration parameters to tune the discovery protocol. use crate::{ - kbucket::MAX_NODES_PER_BUCKET, socket::ListenConfig, Enr, Executor, PermitBanList, - ProtocolIdentity, RateLimiter, RateLimiterBuilder, + kbucket::MAX_NODES_PER_BUCKET, socket::ListenConfig, Enr, Executor, OnDecodeFailure, + PermitBanList, ProtocolIdentity, RateLimiter, RateLimiterBuilder, }; -use std::{future::Future, net::SocketAddr, pin::Pin, sync::Arc, time::Duration}; - -/// Callback type for handling packets that fail to decode. -/// The callback receives the raw packet bytes and source address, and returns a future -/// that is awaited before processing the next packet. -pub type OnDecodeFailure = Arc< - dyn Fn(&[u8], SocketAddr) -> Pin + Send + '_>> + Send + Sync, ->; +use std::{sync::Arc, time::Duration}; /// Configuration parameters that define the performance of the discovery network. #[derive(Clone)] @@ -124,7 +117,7 @@ pub struct Config { /// Optional callback for handling packets that fail to decode. This can be used to forward /// undecoded packets to another protocol handler. The returned future is awaited, providing /// backpressure if the handler is slow. - pub on_decode_failure: Option, + pub on_decode_failure: Option>, } #[derive(Debug)] @@ -359,10 +352,7 @@ impl ConfigBuilder { /// Sets a callback invoked with the raw packet bytes and source address when a packet fails /// to decode. This can be used to forward undecoded packets to another protocol handler. - pub fn on_decode_failure( - &mut self, - callback: OnDecodeFailure, - ) -> &mut Self { + pub fn on_decode_failure(&mut self, callback: Arc) -> &mut Self { self.config.on_decode_failure = Some(callback); self } diff --git a/src/lib.rs b/src/lib.rs index 2d02c2524..1e51b6dd9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -125,7 +125,7 @@ pub use kbucket::{ConnectionDirection, ConnectionState, Key}; pub use packet::ProtocolIdentity; pub use permit_ban::PermitBanList; pub use service::TalkRequest; -pub use socket::{ListenConfig, RateLimiter, RateLimiterBuilder}; +pub use socket::{ListenConfig, OnDecodeFailure, RateLimiter, RateLimiterBuilder}; // Re-export the ENR crate pub use enr; diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 52d48126e..916abb238 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -1,4 +1,4 @@ -use crate::{config::OnDecodeFailure, Executor, ProtocolIdentity}; +use crate::{Executor, ProtocolIdentity}; use parking_lot::RwLock; use recv::*; use send::*; @@ -23,7 +23,7 @@ pub use filter::{ rate_limiter::{RateLimiter, RateLimiterBuilder}, FilterConfig, }; -pub use recv::InboundPacket; +pub use recv::{InboundPacket, OnDecodeFailure}; pub use send::OutboundPacket; /// Configuration for the sockets to listen on. @@ -69,7 +69,7 @@ pub struct SocketConfig { /// The protocol identity used in sent and received packets. pub protocol_identity: ProtocolIdentity, /// Optional callback for handling packets that fail to decode. - pub on_decode_failure: Option, + pub on_decode_failure: Option>, } /// Creates the UDP socket and handles the exit futures for the send/recv UDP handlers. @@ -221,7 +221,9 @@ impl ListenConfig { /// Sets an ipv4 socket. This will override any past ipv4 configuration and will promote the configuration to dual socket if an ipv6 socket is configured. pub fn with_ipv4(self, ip: Ipv4Addr, port: u16) -> ListenConfig { match self { - ListenConfig::Ipv4 { .. } | ListenConfig::FromSockets { .. } => ListenConfig::Ipv4 { ip, port }, + ListenConfig::Ipv4 { .. } | ListenConfig::FromSockets { .. } => { + ListenConfig::Ipv4 { ip, port } + } ListenConfig::Ipv6 { ip: ipv6, port: ipv6_port, @@ -247,7 +249,9 @@ impl ListenConfig { /// Sets an ipv6 socket. This will override any past ipv6 configuration and will promote the configuration to dual socket if an ipv4 socket is configured. pub fn with_ipv6(self, ip: Ipv6Addr, port: u16) -> ListenConfig { match self { - ListenConfig::Ipv6 { .. } | ListenConfig::FromSockets { .. } => ListenConfig::Ipv6 { ip, port }, + ListenConfig::Ipv6 { .. } | ListenConfig::FromSockets { .. } => { + ListenConfig::Ipv6 { ip, port } + } ListenConfig::Ipv4 { ip: ipv4, port: ipv4_port, diff --git a/src/socket/recv.rs b/src/socket/recv.rs index 2067dc8c2..08ee550ed 100644 --- a/src/socket/recv.rs +++ b/src/socket/recv.rs @@ -3,9 +3,11 @@ //! Every UDP packet passes a filter before being processed. use super::filter::{Filter, FilterConfig}; -use crate::{config::OnDecodeFailure, metrics::METRICS, node_info::NodeAddress, packet::*, Executor}; +use crate::{metrics::METRICS, node_info::NodeAddress, packet::*, Executor}; use parking_lot::RwLock; -use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, future::Future, net::SocketAddr, pin::Pin, sync::Arc, time::Duration, +}; use tokio::{ net::UdpSocket, sync::{mpsc, oneshot}, @@ -38,7 +40,7 @@ pub struct RecvHandlerConfig { pub expected_responses: Arc>>, /// Optional callback invoked with the raw packet bytes and source address when a packet fails /// to decode. This can be used to forward undecoded packets to another protocol handler. - pub on_decode_failure: Option, + pub on_decode_failure: Option>, } /// The main task that handles inbound UDP packets. @@ -60,7 +62,7 @@ pub(crate) struct RecvHandler { /// The channel to send the packet handler. handler: mpsc::Sender, /// Optional callback for packets that fail to decode. - on_decode_failure: Option, + on_decode_failure: Option>, /// Exit channel to shutdown the recv handler. exit: oneshot::Receiver<()>, } @@ -201,7 +203,8 @@ impl RecvHandler { Ok(p) => p, Err(e) => { if let Some(cb) = &self.on_decode_failure { - cb(&recv_buffer[..length], src_address).await; + cb.on_decode_failure(&recv_buffer[..length], src_address) + .await; } else { debug!(error = ?e, "Packet decoding failed"); } @@ -238,3 +241,15 @@ impl RecvHandler { .unwrap_or_else(|e| warn!(error = %e,"Could not send packet to handler")); } } + +/// Trait for handling packets that fail to decode. +/// +/// The handler receives the raw packet bytes and source address. The returned future is awaited +/// before processing the next packet, providing backpressure if the handler is slow. +pub trait OnDecodeFailure: Send + Sync { + fn on_decode_failure( + &self, + data: &[u8], + src: SocketAddr, + ) -> Pin + Send + '_>>; +} From 7ace31ebf7f9fcd5ac72769bb13f7bbec7bcb068 Mon Sep 17 00:00:00 2001 From: klkvr Date: Mon, 20 Apr 2026 17:51:09 +0400 Subject: [PATCH 3/7] fix: review comment + clippy --- src/query_pool/peers/closest.rs | 4 ++-- src/query_pool/peers/predicate.rs | 4 ++-- src/service.rs | 8 +++----- src/socket/mod.rs | 18 ++++++++++++------ 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/query_pool/peers/closest.rs b/src/query_pool/peers/closest.rs index 334d8a39f..58373f2d1 100644 --- a/src/query_pool/peers/closest.rs +++ b/src/query_pool/peers/closest.rs @@ -324,8 +324,8 @@ where /// Consumes the query, returning the target and the closest peers. pub fn into_result(self) -> Vec { self.closest_peers - .into_iter() - .filter_map(|(_, peer)| { + .into_values() + .filter_map(|peer| { if let QueryPeerState::Succeeded = peer.state { Some(peer.key.into_preimage()) } else { diff --git a/src/query_pool/peers/predicate.rs b/src/query_pool/peers/predicate.rs index 3b4442019..c1f2aa128 100644 --- a/src/query_pool/peers/predicate.rs +++ b/src/query_pool/peers/predicate.rs @@ -314,8 +314,8 @@ where /// Consumes the query, returning the peers who match the predicate. pub fn into_result(self) -> Vec { self.closest_peers - .into_iter() - .filter_map(|(_, peer)| { + .into_values() + .filter_map(|peer| { if let QueryPeerState::Succeeded = peer.state { if peer.predicate_match { Some(peer.key.into_preimage()) diff --git a/src/service.rs b/src/service.rs index 10b26d31d..99ddb3997 100644 --- a/src/service.rs +++ b/src/service.rs @@ -616,11 +616,9 @@ impl Service { // check if we need to update the known ENR let mut to_request_enr = None; match self.kbuckets.write().entry(&node_address.node_id.into()) { - kbucket::Entry::Present(ref mut entry, _) => { - if entry.value().seq() < enr_seq { - let enr = entry.value().clone(); - to_request_enr = Some(enr); - } + kbucket::Entry::Present(ref mut entry, _) if entry.value().seq() < enr_seq => { + let enr = entry.value().clone(); + to_request_enr = Some(enr); } kbucket::Entry::Pending(ref mut entry, _) => { if entry.value().seq() < enr_seq { diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 916abb238..bdcce22ee 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -221,9 +221,7 @@ impl ListenConfig { /// Sets an ipv4 socket. This will override any past ipv4 configuration and will promote the configuration to dual socket if an ipv6 socket is configured. pub fn with_ipv4(self, ip: Ipv4Addr, port: u16) -> ListenConfig { match self { - ListenConfig::Ipv4 { .. } | ListenConfig::FromSockets { .. } => { - ListenConfig::Ipv4 { ip, port } - } + ListenConfig::Ipv4 { .. } => ListenConfig::Ipv4 { ip, port }, ListenConfig::Ipv6 { ip: ipv6, port: ipv6_port, @@ -241,6 +239,11 @@ impl ListenConfig { ipv6, ipv6_port, }, + ListenConfig::FromSockets { .. } => { + panic!( + "`with_ipv4` cannot be called on `FromSockets`; sockets are already provided" + ) + } } } @@ -249,9 +252,7 @@ impl ListenConfig { /// Sets an ipv6 socket. This will override any past ipv6 configuration and will promote the configuration to dual socket if an ipv4 socket is configured. pub fn with_ipv6(self, ip: Ipv6Addr, port: u16) -> ListenConfig { match self { - ListenConfig::Ipv6 { .. } | ListenConfig::FromSockets { .. } => { - ListenConfig::Ipv6 { ip, port } - } + ListenConfig::Ipv6 { .. } => ListenConfig::Ipv6 { ip, port }, ListenConfig::Ipv4 { ip: ipv4, port: ipv4_port, @@ -269,6 +270,11 @@ impl ListenConfig { ipv6: ip, ipv6_port: port, }, + ListenConfig::FromSockets { .. } => { + panic!( + "`with_ipv6` cannot be called on `FromSockets`; sockets are already provided" + ) + } } } } From 4335fd7366225c0340e810093240bb4754888944 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Fri, 24 Apr 2026 22:37:14 +0400 Subject: [PATCH 4/7] return UnrecognizedFrame when not able to parse frame Amp-Thread-ID: https://ampcode.com/threads/T-019dc034-c40d-71b8-ab30-466ee6ca0ad9 Co-authored-by: Amp --- src/config.rs | 19 +++----------- src/discv5.rs | 3 +++ src/handler/mod.rs | 22 ++++++++++++++--- src/handler/tests.rs | 1 - src/lib.rs | 2 +- src/service.rs | 3 +++ src/socket/mod.rs | 8 ++---- src/socket/recv.rs | 59 +++++++++++++++++++++----------------------- 8 files changed, 58 insertions(+), 59 deletions(-) diff --git a/src/config.rs b/src/config.rs index 343a43814..493ece237 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,9 +1,9 @@ //! A set of configuration parameters to tune the discovery protocol. use crate::{ - kbucket::MAX_NODES_PER_BUCKET, socket::ListenConfig, Enr, Executor, OnDecodeFailure, - PermitBanList, ProtocolIdentity, RateLimiter, RateLimiterBuilder, + kbucket::MAX_NODES_PER_BUCKET, socket::ListenConfig, Enr, Executor, PermitBanList, + ProtocolIdentity, RateLimiter, RateLimiterBuilder, }; -use std::{sync::Arc, time::Duration}; +use std::time::Duration; /// Configuration parameters that define the performance of the discovery network. #[derive(Clone)] @@ -113,11 +113,6 @@ pub struct Config { /// The protocol identity to use in network messages. pub protocol_identity: ProtocolIdentity, - - /// Optional callback for handling packets that fail to decode. This can be used to forward - /// undecoded packets to another protocol handler. The returned future is awaited, providing - /// backpressure if the handler is slow. - pub on_decode_failure: Option>, } #[derive(Debug)] @@ -165,7 +160,6 @@ impl ConfigBuilder { executor: None, listen_config, protocol_identity: ProtocolIdentity::default(), - on_decode_failure: None, }; ConfigBuilder { config } @@ -350,13 +344,6 @@ impl ConfigBuilder { self } - /// Sets a callback invoked with the raw packet bytes and source address when a packet fails - /// to decode. This can be used to forward undecoded packets to another protocol handler. - pub fn on_decode_failure(&mut self, callback: Arc) -> &mut Self { - self.config.on_decode_failure = Some(callback); - self - } - pub fn build(&mut self) -> Config { // If an executor is not provided, assume a current tokio runtime is running. if self.config.executor.is_none() { diff --git a/src/discv5.rs b/src/discv5.rs index e25700a1b..4250d0cfb 100644 --- a/src/discv5.rs +++ b/src/discv5.rs @@ -20,6 +20,7 @@ use crate::{ }, node_info::{NodeAddress, NodeContact}, service::{QueryKind, Service, ServiceRequest, TalkRequest}, + socket::UnrecognizedFrame, Config, Enr, IpMode, }; use enr::{CombinedKey, EnrKey, Error as EnrError, NodeId}; @@ -78,6 +79,8 @@ pub enum Event { SocketUpdated(SocketAddr), /// A node has initiated a talk request. TalkRequest(TalkRequest), + /// A received unrecognized frame. + UnrecognizedFrame(UnrecognizedFrame), } /// The main Discv5 Service struct. This provides the user-level API for performing queries and diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 7be17b86e..d2c206af1 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -33,7 +33,7 @@ use crate::{ packet::{ChallengeData, IdNonce, MessageNonce, Packet, PacketKind}, rpc::{Message, Request, RequestBody, RequestId, Response, ResponseBody}, socket, - socket::{FilterConfig, Socket}, + socket::{FilterConfig, Socket, UnrecognizedFrame}, Enr, ProtocolIdentity, }; use delay_map::HashMapDelay; @@ -139,6 +139,8 @@ pub enum HandlerOut { socket: SocketAddr, node_id: NodeId, }, + /// A frame that could not be decoded as discv5. + UnrecognizedFrame(UnrecognizedFrame), /// These sessions have expired from the cache. ExpiredSessions(Vec), } @@ -291,7 +293,6 @@ impl Handler { protocol_identity: config.protocol_identity, expected_responses: filter_expected_responses.clone(), ban_duration: config.ban_duration, - on_decode_failure: config.on_decode_failure.clone(), }; // Attempt to bind to the socket before spinning up the send/recv tasks. @@ -350,8 +351,21 @@ impl Handler { HandlerIn::WhoAreYou(wru_ref, enr) => self.send_challenge(wru_ref, enr).await, } } - Some(inbound_packet) = self.socket.recv.recv() => { - self.process_inbound_packet(inbound_packet).await; + Some(incoming_packet) = self.socket.recv.recv() => { + match incoming_packet { + socket::RecvPacket::Inbound(inbound_packet) => { + self.process_inbound_packet(inbound_packet).await; + } + socket::RecvPacket::UnrecognizedFrame(frame) => { + if let Err(e) = self + .service_send + .send(HandlerOut::UnrecognizedFrame(frame)) + .await + { + warn!(error = %e, "Failed to inform of unrecognized frame") + } + } + } } Some(Ok((node_address, active_request))) = self.active_requests.next() => { self.handle_request_timeout(node_address, active_request).await; diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 5dc6439bc..79d2b2e53 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -57,7 +57,6 @@ async fn build_handler( expected_responses: filter_expected_responses.clone(), ban_duration: config.ban_duration, protocol_identity: Default::default(), - on_decode_failure: None, } }; diff --git a/src/lib.rs b/src/lib.rs index 1e51b6dd9..2d02c2524 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -125,7 +125,7 @@ pub use kbucket::{ConnectionDirection, ConnectionState, Key}; pub use packet::ProtocolIdentity; pub use permit_ban::PermitBanList; pub use service::TalkRequest; -pub use socket::{ListenConfig, OnDecodeFailure, RateLimiter, RateLimiterBuilder}; +pub use socket::{ListenConfig, RateLimiter, RateLimiterBuilder}; // Re-export the ENR crate pub use enr; diff --git a/src/service.rs b/src/service.rs index 99ddb3997..7c421c827 100644 --- a/src/service.rs +++ b/src/service.rs @@ -416,6 +416,9 @@ impl Service { } self.send_event(Event::UnverifiableEnr{enr, socket, node_id}); } + HandlerOut::UnrecognizedFrame(frame) => { + self.send_event(Event::UnrecognizedFrame(frame)); + } HandlerOut::ExpiredSessions(expired_sessions) => { self.send_event(Event::SessionsExpired(expired_sessions)); } diff --git a/src/socket/mod.rs b/src/socket/mod.rs index bdcce22ee..fa1456174 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -23,7 +23,7 @@ pub use filter::{ rate_limiter::{RateLimiter, RateLimiterBuilder}, FilterConfig, }; -pub use recv::{InboundPacket, OnDecodeFailure}; +pub use recv::{InboundPacket, RecvPacket, UnrecognizedFrame}; pub use send::OutboundPacket; /// Configuration for the sockets to listen on. @@ -68,14 +68,12 @@ pub struct SocketConfig { pub local_node_id: enr::NodeId, /// The protocol identity used in sent and received packets. pub protocol_identity: ProtocolIdentity, - /// Optional callback for handling packets that fail to decode. - pub on_decode_failure: Option>, } /// Creates the UDP socket and handles the exit futures for the send/recv UDP handlers. pub struct Socket { pub send: mpsc::Sender, - pub recv: mpsc::Receiver, + pub recv: mpsc::Receiver, sender_exit: Option>, recv_exit: Option>, } @@ -108,7 +106,6 @@ impl Socket { expected_responses, local_node_id, protocol_identity, - on_decode_failure, } = config; // For recv socket, intentionally forgetting which socket is the ipv4 and which is the ipv6 one. @@ -164,7 +161,6 @@ impl Socket { protocol_identity, expected_responses, ban_duration, - on_decode_failure, }; let (recv, recv_exit) = RecvHandler::spawn(recv_config); diff --git a/src/socket/recv.rs b/src/socket/recv.rs index 08ee550ed..46d14ff04 100644 --- a/src/socket/recv.rs +++ b/src/socket/recv.rs @@ -5,9 +5,7 @@ use super::filter::{Filter, FilterConfig}; use crate::{metrics::METRICS, node_info::NodeAddress, packet::*, Executor}; use parking_lot::RwLock; -use std::{ - collections::HashMap, future::Future, net::SocketAddr, pin::Pin, sync::Arc, time::Duration, -}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use tokio::{ net::UdpSocket, sync::{mpsc, oneshot}, @@ -16,6 +14,7 @@ use tokio::{ use tracing::{debug, trace, warn}; /// The object sent back by the Recv handler. +#[derive(Debug)] pub struct InboundPacket { /// The originating socket addr. pub src_address: SocketAddr, @@ -27,6 +26,20 @@ pub struct InboundPacket { pub authenticated_data: Vec, } +/// An unrecognized frame received by the Recv handler. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct UnrecognizedFrame { + pub src_address: SocketAddr, + pub packet: Vec, +} + +/// Packet output produced by the recv handler. +#[derive(Debug)] +pub enum RecvPacket { + Inbound(InboundPacket), + UnrecognizedFrame(UnrecognizedFrame), +} + /// Convenience objects for setting up the recv handler. pub struct RecvHandlerConfig { pub filter_config: FilterConfig, @@ -38,9 +51,6 @@ pub struct RecvHandlerConfig { pub local_node_id: enr::NodeId, pub protocol_identity: ProtocolIdentity, pub expected_responses: Arc>>, - /// Optional callback invoked with the raw packet bytes and source address when a packet fails - /// to decode. This can be used to forward undecoded packets to another protocol handler. - pub on_decode_failure: Option>, } /// The main task that handles inbound UDP packets. @@ -60,9 +70,7 @@ pub(crate) struct RecvHandler { /// The protocol identity expected in received packets. protocol_identity: ProtocolIdentity, /// The channel to send the packet handler. - handler: mpsc::Sender, - /// Optional callback for packets that fail to decode. - on_decode_failure: Option>, + handler: mpsc::Sender, /// Exit channel to shutdown the recv handler. exit: oneshot::Receiver<()>, } @@ -71,7 +79,7 @@ impl RecvHandler { /// Spawns the `RecvHandler` on a provided executor. pub(crate) fn spawn( config: RecvHandlerConfig, - ) -> (mpsc::Receiver, oneshot::Sender<()>) { + ) -> (mpsc::Receiver, oneshot::Sender<()>) { let (exit_sender, exit) = oneshot::channel(); let RecvHandlerConfig { filter_config, @@ -82,7 +90,6 @@ impl RecvHandler { local_node_id, protocol_identity, expected_responses, - on_decode_failure, } = config; let filter_enabled = filter_config.enabled; @@ -98,7 +105,6 @@ impl RecvHandler { node_id: local_node_id, protocol_identity, handler, - on_decode_failure, exit, }; @@ -202,12 +208,15 @@ impl RecvHandler { ) { Ok(p) => p, Err(e) => { - if let Some(cb) = &self.on_decode_failure { - cb.on_decode_failure(&recv_buffer[..length], src_address) - .await; - } else { - debug!(error = ?e, "Packet decoding failed"); - } + debug!(error = ?e, "Packet decoding failed"); // could not decode the packet, drop it + let frame = UnrecognizedFrame { + src_address, + packet: recv_buffer[..length].to_vec(), + }; + self.handler + .send(RecvPacket::UnrecognizedFrame(frame)) + .await + .unwrap_or_else(|err| warn!(error = %err, "Could not send unrecognized frame to handler")); return; } }; @@ -236,20 +245,8 @@ impl RecvHandler { // send the filtered decoded packet to the handler. self.handler - .send(inbound) + .send(RecvPacket::Inbound(inbound)) .await .unwrap_or_else(|e| warn!(error = %e,"Could not send packet to handler")); } } - -/// Trait for handling packets that fail to decode. -/// -/// The handler receives the raw packet bytes and source address. The returned future is awaited -/// before processing the next packet, providing backpressure if the handler is slow. -pub trait OnDecodeFailure: Send + Sync { - fn on_decode_failure( - &self, - data: &[u8], - src: SocketAddr, - ) -> Pin + Send + '_>>; -} From d7b98f37a3428fe82af2a9d5b83cbb80cd01d942 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 28 Apr 2026 11:07:28 +0100 Subject: [PATCH 5/7] nit: use ref instead --- src/handler/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index d2c206af1..7c088df17 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -263,19 +263,19 @@ impl Handler { }; let mut listen_sockets = SmallVec::default(); - match &config.listen_config { - ListenConfig::Ipv4 { ip, port } => listen_sockets.push((*ip, *port).into()), - ListenConfig::Ipv6 { ip, port } => listen_sockets.push((*ip, *port).into()), + match config.listen_config { + ListenConfig::Ipv4 { ip, port } => listen_sockets.push((ip, port).into()), + ListenConfig::Ipv6 { ip, port } => listen_sockets.push((ip, port).into()), ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port, } => { - listen_sockets.push((*ipv4, *ipv4_port).into()); - listen_sockets.push((*ipv6, *ipv6_port).into()); + listen_sockets.push((ipv4, ipv4_port).into()); + listen_sockets.push((ipv6, ipv6_port).into()); } - ListenConfig::FromSockets { ipv4, ipv6 } => { + ListenConfig::FromSockets { ref ipv4, ref ipv6 } => { if let Some(s) = ipv4 { listen_sockets.push(s.local_addr().expect("socket must have local addr")); } From 3cf9ee3bd6203fdf8df1cf711d5502ef1ef93aa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 28 Apr 2026 11:15:43 +0100 Subject: [PATCH 6/7] cargo fmt --- src/socket/recv.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/socket/recv.rs b/src/socket/recv.rs index 46d14ff04..276b79c82 100644 --- a/src/socket/recv.rs +++ b/src/socket/recv.rs @@ -216,7 +216,9 @@ impl RecvHandler { self.handler .send(RecvPacket::UnrecognizedFrame(frame)) .await - .unwrap_or_else(|err| warn!(error = %err, "Could not send unrecognized frame to handler")); + .unwrap_or_else( + |err| warn!(error = %err, "Could not send unrecognized frame to handler"), + ); return; } }; From 8bd29a083c5aa08e7b512faf041f5f536c24e1d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 28 Apr 2026 11:34:15 +0100 Subject: [PATCH 7/7] cargo clippy --- src/socket/recv.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/socket/recv.rs b/src/socket/recv.rs index 276b79c82..e6eb23648 100644 --- a/src/socket/recv.rs +++ b/src/socket/recv.rs @@ -34,6 +34,7 @@ pub struct UnrecognizedFrame { } /// Packet output produced by the recv handler. +#[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum RecvPacket { Inbound(InboundPacket),