Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/discv5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
},
node_info::{NodeAddress, NodeContact},
service::{QueryKind, Service, ServiceRequest, TalkRequest},
socket::UnrecognizedFrame,
Config, Enr, IpMode,
};
use enr::{CombinedKey, EnrKey, Error as EnrError, NodeId};
Expand Down Expand Up @@ -78,6 +79,8 @@ pub enum Event {
SocketUpdated(SocketAddr),
/// A node has initiated a talk request.
TalkRequest(TalkRequest),
/// A received unrecognized frame.
UnrecognizedFrame(UnrecognizedFrame),
}

/// The main Discv5 Service struct. This provides the user-level API for performing queries and
Expand Down
29 changes: 26 additions & 3 deletions src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
packet::{ChallengeData, IdNonce, MessageNonce, Packet, PacketKind},
rpc::{Message, Request, RequestBody, RequestId, Response, ResponseBody},
socket,
socket::{FilterConfig, Socket},
socket::{FilterConfig, Socket, UnrecognizedFrame},
Enr, ProtocolIdentity,
};
use delay_map::HashMapDelay;
Expand Down Expand Up @@ -139,6 +139,8 @@ pub enum HandlerOut {
socket: SocketAddr,
node_id: NodeId,
},
/// A frame that could not be decoded as discv5.
UnrecognizedFrame(UnrecognizedFrame),
/// These sessions have expired from the cache.
ExpiredSessions(Vec<NodeAddress>),
}
Expand Down Expand Up @@ -273,6 +275,14 @@ impl Handler {
listen_sockets.push((ipv4, ipv4_port).into());
listen_sockets.push((ipv6, ipv6_port).into());
}
ListenConfig::FromSockets { ref ipv4, ref ipv6 } => {
if let Some(s) = ipv4 {
listen_sockets.push(s.local_addr().expect("socket must have local addr"));
}
if let Some(s) = ipv6 {
listen_sockets.push(s.local_addr().expect("socket must have local addr"));
}
}
};

let socket_config = socket::SocketConfig {
Expand Down Expand Up @@ -341,8 +351,21 @@ impl Handler {
HandlerIn::WhoAreYou(wru_ref, enr) => self.send_challenge(wru_ref, enr).await,
}
}
Some(inbound_packet) = self.socket.recv.recv() => {
self.process_inbound_packet(inbound_packet).await;
Some(incoming_packet) = self.socket.recv.recv() => {
match incoming_packet {
socket::RecvPacket::Inbound(inbound_packet) => {
self.process_inbound_packet(inbound_packet).await;
}
socket::RecvPacket::UnrecognizedFrame(frame) => {
if let Err(e) = self
.service_send
.send(HandlerOut::UnrecognizedFrame(frame))
.await
{
warn!(error = %e, "Failed to inform of unrecognized frame")
}
}
}
}
Some(Ok((node_address, active_request))) = self.active_requests.next() => {
self.handle_request_timeout(node_address, active_request).await;
Expand Down
6 changes: 6 additions & 0 deletions src/ipmode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ impl IpMode {
ListenConfig::Ipv4 { .. } => Ip4,
ListenConfig::Ipv6 { .. } => Ip6,
ListenConfig::DualStack { .. } => DualStack,
ListenConfig::FromSockets { ipv4, ipv6 } => match (ipv4, ipv6) {
(Some(_), Some(_)) => DualStack,
(Some(_), None) => Ip4,
(None, Some(_)) => Ip6,
(None, None) => Ip4,
},
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ impl Service {
}
self.send_event(Event::UnverifiableEnr{enr, socket, node_id});
}
HandlerOut::UnrecognizedFrame(frame) => {
self.send_event(Event::UnrecognizedFrame(frame));
}
HandlerOut::ExpiredSessions(expired_sessions) => {
self.send_event(Event::SessionsExpired(expired_sessions));
}
Expand Down
30 changes: 28 additions & 2 deletions src/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use filter::{
rate_limiter::{RateLimiter, RateLimiterBuilder},
FilterConfig,
};
pub use recv::InboundPacket;
pub use recv::{InboundPacket, RecvPacket, UnrecognizedFrame};
pub use send::OutboundPacket;

/// Configuration for the sockets to listen on.
Expand All @@ -45,6 +45,11 @@ pub enum ListenConfig {
ipv6: Ipv6Addr,
ipv6_port: u16,
},
/// Use pre-created UDP sockets. This allows sharing sockets with other protocol handlers.
FromSockets {
ipv4: Option<Arc<UdpSocket>>,
ipv6: Option<Arc<UdpSocket>>,
},
}

/// Convenience objects for setting up the recv handler.
Expand All @@ -68,7 +73,7 @@ pub struct SocketConfig {
/// Creates the UDP socket and handles the exit futures for the send/recv UDP handlers.
pub struct Socket {
pub send: mpsc::Sender<OutboundPacket>,
pub recv: mpsc::Receiver<InboundPacket>,
pub recv: mpsc::Receiver<RecvPacket>,
sender_exit: Option<oneshot::Sender<()>>,
recv_exit: Option<oneshot::Sender<()>>,
}
Expand Down Expand Up @@ -133,6 +138,17 @@ impl Socket {
Some(ipv6_socket),
)
}
ListenConfig::FromSockets { ipv4, ipv6 } => match (ipv4, ipv6) {
(Some(v4), Some(v6)) => (v4.clone(), Some(v6.clone()), Some(v4), Some(v6)),
(Some(v4), None) => (v4.clone(), None, Some(v4), None),
(None, Some(v6)) => (v6.clone(), None, None, Some(v6)),
(None, None) => {
return Err(Error::new(
std::io::ErrorKind::InvalidInput,
"At least one socket must be provided",
))
}
},
};

// spawn the recv handler
Expand Down Expand Up @@ -219,6 +235,11 @@ impl ListenConfig {
ipv6,
ipv6_port,
},
ListenConfig::FromSockets { .. } => {
panic!(
"`with_ipv4` cannot be called on `FromSockets`; sockets are already provided"
)
}
}
}

Expand All @@ -245,6 +266,11 @@ impl ListenConfig {
ipv6: ip,
ipv6_port: port,
},
ListenConfig::FromSockets { .. } => {
panic!(
"`with_ipv6` cannot be called on `FromSockets`; sockets are already provided"
)
}
}
}
}
Expand Down
32 changes: 29 additions & 3 deletions src/socket/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::{
use tracing::{debug, trace, warn};

/// The object sent back by the Recv handler.
#[derive(Debug)]
pub struct InboundPacket {
/// The originating socket addr.
pub src_address: SocketAddr,
Expand All @@ -25,6 +26,21 @@ pub struct InboundPacket {
pub authenticated_data: Vec<u8>,
}

/// An unrecognized frame received by the Recv handler.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UnrecognizedFrame {
pub src_address: SocketAddr,
pub packet: Vec<u8>,
}

/// Packet output produced by the recv handler.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum RecvPacket {
Inbound(InboundPacket),
UnrecognizedFrame(UnrecognizedFrame),
}

/// Convenience objects for setting up the recv handler.
pub struct RecvHandlerConfig {
pub filter_config: FilterConfig,
Expand Down Expand Up @@ -55,7 +71,7 @@ pub(crate) struct RecvHandler {
/// The protocol identity expected in received packets.
protocol_identity: ProtocolIdentity,
/// The channel to send the packet handler.
handler: mpsc::Sender<InboundPacket>,
handler: mpsc::Sender<RecvPacket>,
/// Exit channel to shutdown the recv handler.
exit: oneshot::Receiver<()>,
}
Expand All @@ -64,7 +80,7 @@ impl RecvHandler {
/// Spawns the `RecvHandler` on a provided executor.
pub(crate) fn spawn(
config: RecvHandlerConfig,
) -> (mpsc::Receiver<InboundPacket>, oneshot::Sender<()>) {
) -> (mpsc::Receiver<RecvPacket>, oneshot::Sender<()>) {
let (exit_sender, exit) = oneshot::channel();
let RecvHandlerConfig {
filter_config,
Expand Down Expand Up @@ -194,6 +210,16 @@ impl RecvHandler {
Ok(p) => p,
Err(e) => {
debug!(error = ?e, "Packet decoding failed"); // could not decode the packet, drop it
let frame = UnrecognizedFrame {
src_address,
packet: recv_buffer[..length].to_vec(),
};
self.handler
.send(RecvPacket::UnrecognizedFrame(frame))
.await
.unwrap_or_else(
|err| warn!(error = %err, "Could not send unrecognized frame to handler"),
);
return;
}
};
Expand Down Expand Up @@ -222,7 +248,7 @@ impl RecvHandler {

// send the filtered decoded packet to the handler.
self.handler
.send(inbound)
.send(RecvPacket::Inbound(inbound))
.await
.unwrap_or_else(|e| warn!(error = %e,"Could not send packet to handler"));
}
Expand Down
Loading