Skip to content
Merged
2 changes: 1 addition & 1 deletion src/codec/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ mod tests {
let bytes = [3u8; 64];
let mut bytes = BytesMut::from(&bytes[..]);

let decoded = codec.decode(&mut bytes);
assert!(codec.decode(&mut bytes).unwrap().is_none());
}

#[test]
Expand Down
97 changes: 87 additions & 10 deletions src/crypto/noise/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ pub struct NoiseSocket<S: AsyncRead + AsyncWrite + Unpin> {
read_buffer: Vec<u8>,
canonical_max_read: usize,
decrypt_buffer: Option<Vec<u8>>,
peer: PeerId,
ty: HandshakeTransport,
}

impl<S: AsyncRead + AsyncWrite + Unpin> NoiseSocket<S> {
Expand All @@ -358,6 +360,8 @@ impl<S: AsyncRead + AsyncWrite + Unpin> NoiseSocket<S> {
noise: NoiseContext,
max_read_ahead_factor: usize,
max_write_buffer_size: usize,
peer: PeerId,
ty: HandshakeTransport,
) -> Self {
Self {
io,
Expand All @@ -380,6 +384,8 @@ impl<S: AsyncRead + AsyncWrite + Unpin> NoiseSocket<S> {
max_read: max_read_ahead_factor * MAX_NOISE_MSG_LEN,
},
canonical_max_read: max_read_ahead_factor * MAX_NOISE_MSG_LEN,
peer,
ty,
}
}

Expand Down Expand Up @@ -424,7 +430,13 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for NoiseSocket<S> {
},
};

tracing::trace!(target: LOG_TARGET, ?nread, "read data from socket");
tracing::trace!(
target: LOG_TARGET,
?nread,
ty = ?this.ty,
peer = ?this.peer,
"read data from socket"
);

this.nread += nread;
this.read_state = ReadState::ReadFrameLen;
Expand All @@ -433,13 +445,25 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for NoiseSocket<S> {
let mut remaining = match this.nread.checked_sub(this.offset) {
Some(remaining) => remaining,
None => {
tracing::error!(target: LOG_TARGET, "offset is larger than the number of bytes read");
tracing::error!(
target: LOG_TARGET,
ty = ?this.ty,
peer = ?this.peer,
nread = ?this.nread,
offset = ?this.offset,
"offset is larger than the number of bytes read"
);
return Poll::Ready(Err(io::ErrorKind::PermissionDenied.into()));
}
};

if remaining < 2 {
tracing::trace!(target: LOG_TARGET, "reset read buffer");
tracing::trace!(
target: LOG_TARGET,
ty = ?this.ty,
peer = ?this.peer,
"reset read buffer"
);
this.reset_read_state(remaining);
continue;
}
Expand All @@ -456,13 +480,20 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for NoiseSocket<S> {
}
};

tracing::trace!(target: LOG_TARGET, "current frame size = {frame_size}");
tracing::trace!(
target: LOG_TARGET,
ty = ?this.ty,
peer = ?this.peer,
"current frame size = {frame_size}"
);

if remaining < frame_size {
// `read_buffer` can fit the full frame size.
if this.nread + frame_size < this.canonical_max_read {
tracing::trace!(
target: LOG_TARGET,
ty = ?this.ty,
peer = ?this.peer,
max_size = ?this.canonical_max_read,
next_frame_size = ?(this.nread + frame_size),
"read buffer can fit the full frame",
Expand All @@ -475,7 +506,12 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for NoiseSocket<S> {
continue;
}

tracing::trace!(target: LOG_TARGET, "use auxiliary buffer extension");
tracing::trace!(
target: LOG_TARGET,
ty = ?this.ty,
peer = ?this.peer,
"use auxiliary buffer extension"
);

// use the auxiliary memory at the end of the read buffer for reading the
// frame
Expand All @@ -489,6 +525,8 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for NoiseSocket<S> {
if frame_size <= NOISE_EXTRA_ENCRYPT_SPACE {
tracing::error!(
target: LOG_TARGET,
ty = ?this.ty,
peer = ?this.peer,
?frame_size,
max_size = ?NOISE_EXTRA_ENCRYPT_SPACE,
"invalid frame size",
Expand Down Expand Up @@ -542,7 +580,16 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for NoiseSocket<S> {
buf,
) {
Err(error) => {
tracing::error!(target: LOG_TARGET, ?error, "failed to decrypt message");
tracing::error!(
target: LOG_TARGET,
ty = ?this.ty,
peer = ?this.peer,
buf_len = ?buf.len(),
frame_size = ?frame_size,
?error,
"failed to decrypt message"
);

return Poll::Ready(Err(io::ErrorKind::InvalidData.into()));
}
Ok(nread) => {
Expand All @@ -560,7 +607,16 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for NoiseSocket<S> {
&mut buffer,
) {
Err(error) => {
tracing::error!(target: LOG_TARGET, ?error, "failed to decrypt message");
tracing::error!(
target: LOG_TARGET,
ty = ?this.ty,
peer = ?this.peer,
buf_len = ?buf.len(),
frame_size = ?frame_size,
?error,
"failed to decrypt message for smaller buffer"
);

return Poll::Ready(Err(io::ErrorKind::InvalidData.into()));
}
Ok(nread) => {
Expand Down Expand Up @@ -605,7 +661,14 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for NoiseSocket<S> {

match this.noise.write_message(chunk, &mut this.encrypt_buffer[offset + 2..]) {
Err(error) => {
tracing::error!(target: LOG_TARGET, ?error, "failed to encrypt message");
tracing::error!(
target: LOG_TARGET,
?error,
ty = ?this.ty,
peer = ?this.peer,
"failed to encrypt message"
);

return Poll::Ready(Err(io::ErrorKind::InvalidData.into()));
}
Ok(nwritten) => {
Expand Down Expand Up @@ -701,6 +764,15 @@ fn parse_and_verify_peer_id(
Ok(peer_id)
}

/// The type of the transport used for the crypto/noise protocol.
///
/// This is used for logging purposes.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HandshakeTransport {
Tcp,
WebSocket,
}

/// Perform Noise handshake.
pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
mut io: S,
Expand All @@ -709,9 +781,10 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
max_read_ahead_factor: usize,
max_write_buffer_size: usize,
timeout: std::time::Duration,
ty: HandshakeTransport,
) -> Result<(NoiseSocket<S>, PeerId), NegotiationError> {
let handle_handshake = async move {
tracing::debug!(target: LOG_TARGET, ?role, "start noise handshake");
tracing::debug!(target: LOG_TARGET, ?role, ?ty, "start noise handshake");

let mut noise = NoiseContext::new(keypair, role)?;
let payload = match role {
Expand All @@ -727,7 +800,7 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
let payload = handshake_schema::NoiseHandshakePayload::decode(message)
.map_err(ParseError::from)
.map_err(|err| {
tracing::error!(target: LOG_TARGET, ?err, "failed to decode remote identity message");
tracing::error!(target: LOG_TARGET, ?err, ?ty, "failed to decode remote identity message");
err
})?;

Expand Down Expand Up @@ -764,6 +837,8 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
noise.into_transport()?,
max_read_ahead_factor,
max_write_buffer_size,
peer,
ty,
),
peer,
))
Expand Down Expand Up @@ -818,6 +893,7 @@ mod tests {
MAX_READ_AHEAD_FACTOR,
MAX_WRITE_BUFFER_SIZE,
std::time::Duration::from_secs(10),
HandshakeTransport::Tcp,
),
handshake(
io2,
Expand All @@ -826,6 +902,7 @@ mod tests {
MAX_READ_AHEAD_FACTOR,
MAX_WRITE_BUFFER_SIZE,
std::time::Duration::from_secs(10),
HandshakeTransport::Tcp,
)
);
let (mut res1, mut res2) = (res1.unwrap(), res2.unwrap());
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/libp2p/kademlia/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ mod tests {
.collect::<Vec<_>>();

let target = Key::from(PeerId::random());
let mut iter = bucket.closest_iter(&target);
let iter = bucket.closest_iter(&target);
let mut prev = None;

for node in iter {
Expand Down Expand Up @@ -173,7 +173,7 @@ mod tests {
.collect::<Vec<_>>();

let target = Key::from(PeerId::random());
let mut iter = bucket.closest_iter(&target);
let iter = bucket.closest_iter(&target);
let mut prev = None;
let mut num_peers = 0usize;

Expand Down
5 changes: 5 additions & 0 deletions src/transport/tcp/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ impl TcpConnection {
max_read_ahead_factor,
max_write_buffer_size,
substream_open_timeout,
noise::HandshakeTransport::Tcp,
)
.await?;

Expand Down Expand Up @@ -1154,6 +1155,7 @@ mod tests {
5,
2,
std::time::Duration::from_secs(10),
noise::HandshakeTransport::Tcp,
)
.await
.unwrap();
Expand Down Expand Up @@ -1212,6 +1214,7 @@ mod tests {
5,
2,
std::time::Duration::from_secs(10),
noise::HandshakeTransport::Tcp,
)
.await
.unwrap();
Expand Down Expand Up @@ -1286,6 +1289,7 @@ mod tests {
5,
2,
std::time::Duration::from_secs(10),
noise::HandshakeTransport::Tcp,
)
.await
.unwrap();
Expand Down Expand Up @@ -1339,6 +1343,7 @@ mod tests {
5,
2,
std::time::Duration::from_secs(10),
noise::HandshakeTransport::Tcp,
)
.await
.unwrap();
Expand Down
6 changes: 6 additions & 0 deletions src/transport/websocket/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ impl WebSocketConnection {
max_read_ahead_factor,
max_write_buffer_size,
substream_open_timeout,
noise::HandshakeTransport::WebSocket,
)
.await?;

Expand Down Expand Up @@ -887,6 +888,7 @@ mod tests {
5,
2,
std::time::Duration::from_secs(10),
noise::HandshakeTransport::WebSocket,
)
.await
.unwrap();
Expand Down Expand Up @@ -1094,6 +1096,7 @@ mod tests {
5,
2,
std::time::Duration::from_secs(10),
noise::HandshakeTransport::WebSocket,
)
.await
.unwrap();
Expand Down Expand Up @@ -1162,6 +1165,7 @@ mod tests {
5,
2,
std::time::Duration::from_secs(10),
noise::HandshakeTransport::WebSocket,
)
.await
.unwrap();
Expand Down Expand Up @@ -1261,6 +1265,7 @@ mod tests {
5,
2,
std::time::Duration::from_secs(10),
noise::HandshakeTransport::WebSocket,
)
.await
.unwrap();
Expand Down Expand Up @@ -1320,6 +1325,7 @@ mod tests {
5,
2,
std::time::Duration::from_secs(10),
noise::HandshakeTransport::WebSocket,
)
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests/conformance/rust/kademlia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use libp2p::{
identify, identity,
kad::{
self, store::RecordStore, AddProviderOk, GetProvidersOk, InboundRequest,
KademliaEvent as Libp2pKademliaEvent, QueryResult, RecordKey as Libp2pRecordKey,
KademliaEvent as Libp2pKademliaEvent, QueryResult,
},
swarm::{keep_alive, AddressScore, NetworkBehaviour, SwarmBuilder, SwarmEvent},
PeerId, Swarm,
Expand Down
Loading