diff --git a/src/discv5.rs b/src/discv5.rs index e25700a1..4250d0cf 100644 --- a/src/discv5.rs +++ b/src/discv5.rs @@ -20,6 +20,7 @@ use crate::{ }, node_info::{NodeAddress, NodeContact}, service::{QueryKind, Service, ServiceRequest, TalkRequest}, + socket::UnrecognizedFrame, Config, Enr, IpMode, }; use enr::{CombinedKey, EnrKey, Error as EnrError, NodeId}; @@ -78,6 +79,8 @@ pub enum Event { SocketUpdated(SocketAddr), /// A node has initiated a talk request. TalkRequest(TalkRequest), + /// A received unrecognized frame. + UnrecognizedFrame(UnrecognizedFrame), } /// The main Discv5 Service struct. This provides the user-level API for performing queries and diff --git a/src/handler/mod.rs b/src/handler/mod.rs index f3272761..7c088df1 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -33,7 +33,7 @@ use crate::{ packet::{ChallengeData, IdNonce, MessageNonce, Packet, PacketKind}, rpc::{Message, Request, RequestBody, RequestId, Response, ResponseBody}, socket, - socket::{FilterConfig, Socket}, + socket::{FilterConfig, Socket, UnrecognizedFrame}, Enr, ProtocolIdentity, }; use delay_map::HashMapDelay; @@ -139,6 +139,8 @@ pub enum HandlerOut { socket: SocketAddr, node_id: NodeId, }, + /// A frame that could not be decoded as discv5. + UnrecognizedFrame(UnrecognizedFrame), /// These sessions have expired from the cache. ExpiredSessions(Vec), } @@ -273,6 +275,14 @@ impl Handler { listen_sockets.push((ipv4, ipv4_port).into()); listen_sockets.push((ipv6, ipv6_port).into()); } + ListenConfig::FromSockets { ref ipv4, ref ipv6 } => { + if let Some(s) = ipv4 { + listen_sockets.push(s.local_addr().expect("socket must have local addr")); + } + if let Some(s) = ipv6 { + listen_sockets.push(s.local_addr().expect("socket must have local addr")); + } + } }; let socket_config = socket::SocketConfig { @@ -341,8 +351,21 @@ impl Handler { HandlerIn::WhoAreYou(wru_ref, enr) => self.send_challenge(wru_ref, enr).await, } } - Some(inbound_packet) = self.socket.recv.recv() => { - self.process_inbound_packet(inbound_packet).await; + Some(incoming_packet) = self.socket.recv.recv() => { + match incoming_packet { + socket::RecvPacket::Inbound(inbound_packet) => { + self.process_inbound_packet(inbound_packet).await; + } + socket::RecvPacket::UnrecognizedFrame(frame) => { + if let Err(e) = self + .service_send + .send(HandlerOut::UnrecognizedFrame(frame)) + .await + { + warn!(error = %e, "Failed to inform of unrecognized frame") + } + } + } } Some(Ok((node_address, active_request))) = self.active_requests.next() => { self.handle_request_timeout(node_address, active_request).await; diff --git a/src/ipmode.rs b/src/ipmode.rs index bc292d7f..3614abc4 100644 --- a/src/ipmode.rs +++ b/src/ipmode.rs @@ -29,6 +29,12 @@ impl IpMode { ListenConfig::Ipv4 { .. } => Ip4, ListenConfig::Ipv6 { .. } => Ip6, ListenConfig::DualStack { .. } => DualStack, + ListenConfig::FromSockets { ipv4, ipv6 } => match (ipv4, ipv6) { + (Some(_), Some(_)) => DualStack, + (Some(_), None) => Ip4, + (None, Some(_)) => Ip6, + (None, None) => Ip4, + }, } } diff --git a/src/service.rs b/src/service.rs index 3b8b8d6f..94372693 100644 --- a/src/service.rs +++ b/src/service.rs @@ -416,6 +416,9 @@ impl Service { } self.send_event(Event::UnverifiableEnr{enr, socket, node_id}); } + HandlerOut::UnrecognizedFrame(frame) => { + self.send_event(Event::UnrecognizedFrame(frame)); + } HandlerOut::ExpiredSessions(expired_sessions) => { self.send_event(Event::SessionsExpired(expired_sessions)); } diff --git a/src/socket/mod.rs b/src/socket/mod.rs index ff4bbcd4..fa145617 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -23,7 +23,7 @@ pub use filter::{ rate_limiter::{RateLimiter, RateLimiterBuilder}, FilterConfig, }; -pub use recv::InboundPacket; +pub use recv::{InboundPacket, RecvPacket, UnrecognizedFrame}; pub use send::OutboundPacket; /// Configuration for the sockets to listen on. @@ -45,6 +45,11 @@ pub enum ListenConfig { ipv6: Ipv6Addr, ipv6_port: u16, }, + /// Use pre-created UDP sockets. This allows sharing sockets with other protocol handlers. + FromSockets { + ipv4: Option>, + ipv6: Option>, + }, } /// Convenience objects for setting up the recv handler. @@ -68,7 +73,7 @@ pub struct SocketConfig { /// Creates the UDP socket and handles the exit futures for the send/recv UDP handlers. pub struct Socket { pub send: mpsc::Sender, - pub recv: mpsc::Receiver, + pub recv: mpsc::Receiver, sender_exit: Option>, recv_exit: Option>, } @@ -133,6 +138,17 @@ impl Socket { Some(ipv6_socket), ) } + ListenConfig::FromSockets { ipv4, ipv6 } => match (ipv4, ipv6) { + (Some(v4), Some(v6)) => (v4.clone(), Some(v6.clone()), Some(v4), Some(v6)), + (Some(v4), None) => (v4.clone(), None, Some(v4), None), + (None, Some(v6)) => (v6.clone(), None, None, Some(v6)), + (None, None) => { + return Err(Error::new( + std::io::ErrorKind::InvalidInput, + "At least one socket must be provided", + )) + } + }, }; // spawn the recv handler @@ -219,6 +235,11 @@ impl ListenConfig { ipv6, ipv6_port, }, + ListenConfig::FromSockets { .. } => { + panic!( + "`with_ipv4` cannot be called on `FromSockets`; sockets are already provided" + ) + } } } @@ -245,6 +266,11 @@ impl ListenConfig { ipv6: ip, ipv6_port: port, }, + ListenConfig::FromSockets { .. } => { + panic!( + "`with_ipv6` cannot be called on `FromSockets`; sockets are already provided" + ) + } } } } diff --git a/src/socket/recv.rs b/src/socket/recv.rs index 22f83a6b..e6eb2364 100644 --- a/src/socket/recv.rs +++ b/src/socket/recv.rs @@ -14,6 +14,7 @@ use tokio::{ use tracing::{debug, trace, warn}; /// The object sent back by the Recv handler. +#[derive(Debug)] pub struct InboundPacket { /// The originating socket addr. pub src_address: SocketAddr, @@ -25,6 +26,21 @@ pub struct InboundPacket { pub authenticated_data: Vec, } +/// An unrecognized frame received by the Recv handler. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct UnrecognizedFrame { + pub src_address: SocketAddr, + pub packet: Vec, +} + +/// Packet output produced by the recv handler. +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +pub enum RecvPacket { + Inbound(InboundPacket), + UnrecognizedFrame(UnrecognizedFrame), +} + /// Convenience objects for setting up the recv handler. pub struct RecvHandlerConfig { pub filter_config: FilterConfig, @@ -55,7 +71,7 @@ pub(crate) struct RecvHandler { /// The protocol identity expected in received packets. protocol_identity: ProtocolIdentity, /// The channel to send the packet handler. - handler: mpsc::Sender, + handler: mpsc::Sender, /// Exit channel to shutdown the recv handler. exit: oneshot::Receiver<()>, } @@ -64,7 +80,7 @@ impl RecvHandler { /// Spawns the `RecvHandler` on a provided executor. pub(crate) fn spawn( config: RecvHandlerConfig, - ) -> (mpsc::Receiver, oneshot::Sender<()>) { + ) -> (mpsc::Receiver, oneshot::Sender<()>) { let (exit_sender, exit) = oneshot::channel(); let RecvHandlerConfig { filter_config, @@ -194,6 +210,16 @@ impl RecvHandler { Ok(p) => p, Err(e) => { debug!(error = ?e, "Packet decoding failed"); // could not decode the packet, drop it + let frame = UnrecognizedFrame { + src_address, + packet: recv_buffer[..length].to_vec(), + }; + self.handler + .send(RecvPacket::UnrecognizedFrame(frame)) + .await + .unwrap_or_else( + |err| warn!(error = %err, "Could not send unrecognized frame to handler"), + ); return; } }; @@ -222,7 +248,7 @@ impl RecvHandler { // send the filtered decoded packet to the handler. self.handler - .send(inbound) + .send(RecvPacket::Inbound(inbound)) .await .unwrap_or_else(|e| warn!(error = %e,"Could not send packet to handler")); }