Skip to content

Commit afd0928

Browse files
committed
simplify rpc codec logic
1 parent bee4526 commit afd0928

File tree

5 files changed

+257
-491
lines changed

5 files changed

+257
-491
lines changed

beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs renamed to beacon_node/lighthouse_network/src/rpc/codec.rs

Lines changed: 246 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::rpc::methods::*;
2-
use crate::rpc::{
3-
codec::base::OutboundCodec,
4-
protocol::{Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN},
2+
use crate::rpc::protocol::{
3+
Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN,
54
};
65
use crate::rpc::{InboundRequest, OutboundRequest};
6+
use libp2p::bytes::BufMut;
77
use libp2p::bytes::BytesMut;
88
use snap::read::FrameDecoder;
99
use snap::write::FrameEncoder;
@@ -57,13 +57,13 @@ impl<E: EthSpec> SSZSnappyInboundCodec<E> {
5757
max_packet_size,
5858
}
5959
}
60-
}
6160

62-
// Encoder for inbound streams: Encodes RPC Responses sent to peers.
63-
impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
64-
type Error = RPCError;
65-
66-
fn encode(&mut self, item: RPCCodedResponse<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
61+
/// Encode an Rpc response to be sent.
62+
fn encode_response(
63+
&mut self,
64+
item: RPCCodedResponse<E>,
65+
dst: &mut BytesMut,
66+
) -> Result<(), RPCError> {
6767
let bytes = match &item {
6868
RPCCodedResponse::Success(resp) => match &resp {
6969
RPCResponse::Status(res) => res.as_ssz_bytes(),
@@ -124,6 +124,21 @@ impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
124124
}
125125
}
126126

127+
// Encoder for inbound streams: Encodes RPC Responses sent to peers.
128+
impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
129+
type Error = RPCError;
130+
131+
fn encode(&mut self, item: RPCCodedResponse<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
132+
dst.clear();
133+
dst.reserve(1);
134+
dst.put_u8(
135+
item.as_u8()
136+
.expect("Should never encode a stream termination"),
137+
);
138+
self.encode_response(item, dst)
139+
}
140+
}
141+
127142
// Decoder for inbound streams: Decodes RPC requests from peers
128143
impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
129144
type Item = InboundRequest<E>;
@@ -184,6 +199,8 @@ pub struct SSZSnappyOutboundCodec<E: EthSpec> {
184199
/// The fork name corresponding to the received context bytes.
185200
fork_name: Option<ForkName>,
186201
fork_context: Arc<ForkContext>,
202+
/// Keeps track of the current response code for a chunk.
203+
current_response_code: Option<u8>,
187204
phantom: PhantomData<E>,
188205
}
189206

@@ -205,66 +222,12 @@ impl<E: EthSpec> SSZSnappyOutboundCodec<E> {
205222
fork_name: None,
206223
fork_context,
207224
phantom: PhantomData,
225+
current_response_code: None,
208226
}
209227
}
210-
}
211-
212-
// Encoder for outbound streams: Encodes RPC Requests to peers
213-
impl<E: EthSpec> Encoder<OutboundRequest<E>> for SSZSnappyOutboundCodec<E> {
214-
type Error = RPCError;
215-
216-
fn encode(&mut self, item: OutboundRequest<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
217-
let bytes = match item {
218-
OutboundRequest::Status(req) => req.as_ssz_bytes(),
219-
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
220-
OutboundRequest::BlocksByRange(r) => match r {
221-
OldBlocksByRangeRequest::V1(req) => req.as_ssz_bytes(),
222-
OldBlocksByRangeRequest::V2(req) => req.as_ssz_bytes(),
223-
},
224-
OutboundRequest::BlocksByRoot(r) => match r {
225-
BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(),
226-
BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(),
227-
},
228-
OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(),
229-
OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(),
230-
OutboundRequest::DataColumnsByRange(req) => req.as_ssz_bytes(),
231-
OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(),
232-
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
233-
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
234-
};
235-
// SSZ encoded bytes should be within `max_packet_size`
236-
if bytes.len() > self.max_packet_size {
237-
return Err(RPCError::InternalError(
238-
"attempting to encode data > max_packet_size",
239-
));
240-
}
241228

242-
// Inserts the length prefix of the uncompressed bytes into dst
243-
// encoded as a unsigned varint
244-
self.inner
245-
.encode(bytes.len(), dst)
246-
.map_err(RPCError::from)?;
247-
248-
let mut writer = FrameEncoder::new(Vec::new());
249-
writer.write_all(&bytes).map_err(RPCError::from)?;
250-
writer.flush().map_err(RPCError::from)?;
251-
252-
// Write compressed bytes to `dst`
253-
dst.extend_from_slice(writer.get_ref());
254-
Ok(())
255-
}
256-
}
257-
258-
// Decoder for outbound streams: Decodes RPC responses from peers.
259-
//
260-
// The majority of the decoding has now been pushed upstream due to the changing specification.
261-
// We prefer to decode blocks and attestations with extra knowledge about the chain to perform
262-
// faster verification checks before decoding entire blocks/attestations.
263-
impl<E: EthSpec> Decoder for SSZSnappyOutboundCodec<E> {
264-
type Item = RPCResponse<E>;
265-
type Error = RPCError;
266-
267-
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
229+
// Decode an Rpc response.
230+
fn decode_response(&mut self, src: &mut BytesMut) -> Result<Option<RPCResponse<E>>, RPCError> {
268231
// Read the context bytes if required
269232
if self.protocol.has_context_bytes() && self.fork_name.is_none() {
270233
if src.len() >= CONTEXT_BYTES_LEN {
@@ -314,15 +277,8 @@ impl<E: EthSpec> Decoder for SSZSnappyOutboundCodec<E> {
314277
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
315278
}
316279
}
317-
}
318-
319-
impl<E: EthSpec> OutboundCodec<OutboundRequest<E>> for SSZSnappyOutboundCodec<E> {
320-
type CodecErrorType = ErrorType;
321280

322-
fn decode_error(
323-
&mut self,
324-
src: &mut BytesMut,
325-
) -> Result<Option<Self::CodecErrorType>, RPCError> {
281+
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<ErrorType>, RPCError> {
326282
let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else {
327283
return Ok(None);
328284
};
@@ -357,6 +313,95 @@ impl<E: EthSpec> OutboundCodec<OutboundRequest<E>> for SSZSnappyOutboundCodec<E>
357313
}
358314
}
359315

316+
// Encoder for outbound streams: Encodes RPC Requests to peers
317+
impl<E: EthSpec> Encoder<OutboundRequest<E>> for SSZSnappyOutboundCodec<E> {
318+
type Error = RPCError;
319+
320+
fn encode(&mut self, item: OutboundRequest<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
321+
let bytes = match item {
322+
OutboundRequest::Status(req) => req.as_ssz_bytes(),
323+
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
324+
OutboundRequest::BlocksByRange(r) => match r {
325+
OldBlocksByRangeRequest::V1(req) => req.as_ssz_bytes(),
326+
OldBlocksByRangeRequest::V2(req) => req.as_ssz_bytes(),
327+
},
328+
OutboundRequest::BlocksByRoot(r) => match r {
329+
BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(),
330+
BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(),
331+
},
332+
OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(),
333+
OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(),
334+
OutboundRequest::DataColumnsByRange(req) => req.as_ssz_bytes(),
335+
OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(),
336+
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
337+
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
338+
};
339+
// SSZ encoded bytes should be within `max_packet_size`
340+
if bytes.len() > self.max_packet_size {
341+
return Err(RPCError::InternalError(
342+
"attempting to encode data > max_packet_size",
343+
));
344+
}
345+
346+
// Inserts the length prefix of the uncompressed bytes into dst
347+
// encoded as a unsigned varint
348+
self.inner
349+
.encode(bytes.len(), dst)
350+
.map_err(RPCError::from)?;
351+
352+
let mut writer = FrameEncoder::new(Vec::new());
353+
writer.write_all(&bytes).map_err(RPCError::from)?;
354+
writer.flush().map_err(RPCError::from)?;
355+
356+
// Write compressed bytes to `dst`
357+
dst.extend_from_slice(writer.get_ref());
358+
Ok(())
359+
}
360+
}
361+
362+
// Decoder for outbound streams: Decodes RPC responses from peers.
363+
//
364+
// The majority of the decoding has now been pushed upstream due to the changing specification.
365+
// We prefer to decode blocks and attestations with extra knowledge about the chain to perform
366+
// faster verification checks before decoding entire blocks/attestations.
367+
impl<E: EthSpec> Decoder for SSZSnappyOutboundCodec<E> {
368+
type Item = RPCCodedResponse<E>;
369+
type Error = RPCError;
370+
371+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
372+
// if we have only received the response code, wait for more bytes
373+
if src.len() <= 1 {
374+
return Ok(None);
375+
}
376+
// using the response code determine which kind of payload needs to be decoded.
377+
let response_code = self.current_response_code.unwrap_or_else(|| {
378+
let resp_code = src.split_to(1)[0];
379+
self.current_response_code = Some(resp_code);
380+
resp_code
381+
});
382+
383+
let inner_result = {
384+
if RPCCodedResponse::<E>::is_response(response_code) {
385+
// decode an actual response and mutates the buffer if enough bytes have been read
386+
// returning the result.
387+
self.decode_response(src)
388+
.map(|r| r.map(RPCCodedResponse::Success))
389+
} else {
390+
// decode an error
391+
self.decode_error(src)
392+
.map(|r| r.map(|resp| RPCCodedResponse::from_error(response_code, resp)))
393+
}
394+
};
395+
// if the inner decoder was capable of decoding a chunk, we need to reset the current
396+
// response code for the next chunk
397+
if let Ok(Some(_)) = inner_result {
398+
self.current_response_code = None;
399+
}
400+
// return the result
401+
inner_result
402+
}
403+
}
404+
360405
/// Handle errors that we get from decoding an RPC message from the stream.
361406
/// `num_bytes_read` is the number of bytes the snappy decoder has read from the underlying stream.
362407
/// `max_compressed_len` is the maximum compressed size for a given uncompressed size.
@@ -999,7 +1044,7 @@ mod tests {
9991044
let mut snappy_inbound_codec =
10001045
SSZSnappyInboundCodec::<Spec>::new(snappy_protocol_id, max_packet_size, fork_context);
10011046

1002-
snappy_inbound_codec.encode(message, &mut buf)?;
1047+
snappy_inbound_codec.encode_response(message, &mut buf)?;
10031048
Ok(buf)
10041049
}
10051050

@@ -1044,7 +1089,7 @@ mod tests {
10441089
let mut snappy_outbound_codec =
10451090
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, max_packet_size, fork_context);
10461091
// decode message just as snappy message
1047-
snappy_outbound_codec.decode(message)
1092+
snappy_outbound_codec.decode_response(message)
10481093
}
10491094

10501095
/// Encodes the provided protocol message as bytes and tries to decode the encoding bytes.
@@ -1805,4 +1850,129 @@ mod tests {
18051850
RPCError::InvalidData(_)
18061851
));
18071852
}
1853+
1854+
#[test]
1855+
fn test_decode_status_message() {
1856+
let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap();
1857+
let mut buf = BytesMut::new();
1858+
buf.extend_from_slice(&message);
1859+
1860+
let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);
1861+
1862+
let fork_context = Arc::new(fork_context(ForkName::Base));
1863+
1864+
let chain_spec = Spec::default_spec();
1865+
1866+
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
1867+
snappy_protocol_id,
1868+
max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize),
1869+
fork_context,
1870+
);
1871+
1872+
// remove response code
1873+
let mut snappy_buf = buf.clone();
1874+
let _ = snappy_buf.split_to(1);
1875+
1876+
// decode message just as snappy message
1877+
let _snappy_decoded_message = snappy_outbound_codec
1878+
.decode_response(&mut snappy_buf)
1879+
.unwrap();
1880+
1881+
// decode message as ssz snappy chunk
1882+
let _snappy_decoded_chunk = snappy_outbound_codec.decode(&mut buf).unwrap();
1883+
}
1884+
1885+
#[test]
1886+
fn test_invalid_length_prefix() {
1887+
let mut uvi_codec: Uvi<u128> = Uvi::default();
1888+
let mut dst = BytesMut::with_capacity(1024);
1889+
1890+
// Smallest > 10 byte varint
1891+
let len: u128 = 2u128.pow(70);
1892+
1893+
// Insert length-prefix
1894+
uvi_codec.encode(len, &mut dst).unwrap();
1895+
1896+
let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);
1897+
1898+
let fork_context = Arc::new(fork_context(ForkName::Base));
1899+
1900+
let chain_spec = Spec::default_spec();
1901+
1902+
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
1903+
snappy_protocol_id,
1904+
max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize),
1905+
fork_context,
1906+
);
1907+
1908+
let snappy_decoded_message = snappy_outbound_codec.decode_response(&mut dst).unwrap_err();
1909+
1910+
assert_eq!(
1911+
snappy_decoded_message,
1912+
RPCError::IoError("input bytes exceed maximum".to_string()),
1913+
"length-prefix of > 10 bytes is invalid"
1914+
);
1915+
}
1916+
1917+
#[test]
1918+
fn test_length_limits() {
1919+
fn encode_len(len: usize) -> BytesMut {
1920+
let mut uvi_codec: Uvi<usize> = Uvi::default();
1921+
let mut dst = BytesMut::with_capacity(1024);
1922+
uvi_codec.encode(len, &mut dst).unwrap();
1923+
dst
1924+
}
1925+
1926+
let protocol_id = ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy);
1927+
1928+
// Response limits
1929+
let fork_context = Arc::new(fork_context(ForkName::Base));
1930+
1931+
let chain_spec = Spec::default_spec();
1932+
1933+
let max_rpc_size = max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize);
1934+
let limit = protocol_id.rpc_response_limits::<Spec>(&fork_context);
1935+
let mut max = encode_len(limit.max + 1);
1936+
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
1937+
protocol_id.clone(),
1938+
max_rpc_size,
1939+
fork_context.clone(),
1940+
);
1941+
assert!(matches!(
1942+
codec.decode_response(&mut max).unwrap_err(),
1943+
RPCError::InvalidData(_)
1944+
));
1945+
1946+
let mut min = encode_len(limit.min - 1);
1947+
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
1948+
protocol_id.clone(),
1949+
max_rpc_size,
1950+
fork_context.clone(),
1951+
);
1952+
assert!(matches!(
1953+
codec.decode_response(&mut min).unwrap_err(),
1954+
RPCError::InvalidData(_)
1955+
));
1956+
1957+
// Request limits
1958+
let limit = protocol_id.rpc_request_limits(&fork_context.spec);
1959+
let mut max = encode_len(limit.max + 1);
1960+
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
1961+
protocol_id.clone(),
1962+
max_rpc_size,
1963+
fork_context.clone(),
1964+
);
1965+
assert!(matches!(
1966+
codec.decode_response(&mut max).unwrap_err(),
1967+
RPCError::InvalidData(_)
1968+
));
1969+
1970+
let mut min = encode_len(limit.min - 1);
1971+
let mut codec =
1972+
SSZSnappyOutboundCodec::<Spec>::new(protocol_id, max_rpc_size, fork_context);
1973+
assert!(matches!(
1974+
codec.decode_response(&mut min).unwrap_err(),
1975+
RPCError::InvalidData(_)
1976+
));
1977+
}
18081978
}

0 commit comments

Comments
 (0)