Skip to content

Commit

Permalink
Final changes for the 1.0.0 release:
Browse files Browse the repository at this point in the history
* made sidear only connectable to nodes > 1.5.2
* removed switching deserialization code to work with the backwards incompatible change introduced in 1.2.0
  • Loading branch information
Jakub Zajkowski committed Oct 6, 2023
1 parent 38e7247 commit 36ca2dd
Show file tree
Hide file tree
Showing 17 changed files with 322 additions and 562 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ members = [
"listener",
"types"
]

[workspace.dependencies]
once_cell = "1.18.0"
10 changes: 5 additions & 5 deletions USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ Here is an example of how the stream might look like if the node went offline fo
```
curl -sN http://127.0.0.1:19999/events/deploys

data:{"ApiVersion":"1.4.7"}
data:{"ApiVersion":"1.5.2"}

data:{"BlockAdded":{"block_hash":"b487aae22b406e303d96fc44b092f993df6f3b43ceee7b7f5b1f361f676492d6","block":{"hash":"b487aae22b406e303d96fc44b092f993df6f3b43ceee7b7f5b1f361f676492d6","header":{"parent_hash":"4a28718301a83a43563ec42a184294725b8dd188aad7a9fceb8a2fa1400c680e","state_root_hash":"63274671f2a860e39bb029d289e688526e4828b70c79c678649748e5e376cb07","body_hash":"6da90c09f3fc4559d27b9fff59ab2453be5752260b07aec65e0e3a61734f656a","random_bit":true,"accumulated_seed":"c8b4f30a3e3e082f4f206f972e423ffb23d152ca34241ff94ba76189716b61da","era_end":{"era_report":{"equivocators":[],"rewards":{"01026ca707c348ed8012ac6a1f28db031fadd6eb67203501a353b867a08c8b9a80":1559401400039,"010427c1d1227c9d2aafe8c06c6e6b276da8dcd8fd170ca848b8e3e8e1038a6dc8":25895190891},"inactive_validators":[]},"next_era_validator_weights":{"01026ca707c348ed8012ac6a1f28db031fadd6eb67203501a353b867a08c8b9a80":"50538244651768072","010427c1d1227c9d2aafe8c06c6e6b276da8dcd8fd170ca848b8e3e8e1038a6dc8":"839230678448335"}},"timestamp":"2021-04-08T05:14:14.912Z","era_id":90,"height":1679394427512,"protocol_version":"1.0.0"},"body":{"proposer":"012bac1d0ff9240ff0b7b06d555815640497861619ca12583ddef434885416e69b","deploy_hashes":[],"transfer_hashes":[]}}}}
id:1
Expand All @@ -130,7 +130,7 @@ id:2

:

data:{"ApiVersion":"1.4.8"}
data:{"ApiVersion":"1.5.2"}

data:{"BlockAdded":{"block_hash":"1c76e7abf5780b49d3a66beef7b75bbf261834f494dededb8f2e349735659c03","block":{"hash":"1c76e7abf5780b49d3a66beef7b75bbf261834f494dededb8f2e349735659c03","header":{"parent_hash":"4a28718301a83a43563ec42a184294725b8dd188aad7a9fceb8a2fa1400c680e","state_root_hash":"63274671f2a860e39bb029d289e688526e4828b70c79c678649748e5e376cb07","body_hash":"6da90c09f3fc4559d27b9fff59ab2453be5752260b07aec65e0e3a61734f656a","random_bit":true,"accumulated_seed":"c8b4f30a3e3e082f4f206f972e423ffb23d152ca34241ff94ba76189716b61da","era_end":{"era_report":{"equivocators":[],"rewards":[{"validator":"01026ca707c348ed8012ac6a1f28db031fadd6eb67203501a353b867a08c8b9a80","amount":1559401400039},{"validator":"010427c1d1227c9d2aafe8c06c6e6b276da8dcd8fd170ca848b8e3e8e1038a6dc8","amount":25895190891}],"inactive_validators":[]},"next_era_validator_weights":[{"validator":"01026ca707c348ed8012ac6a1f28db031fadd6eb67203501a353b867a08c8b9a80","weight":"50538244651768072"},{"validator":"010427c1d1227c9d2aafe8c06c6e6b276da8dcd8fd170ca848b8e3e8e1038a6dc8","weight":"839230678448335"}]},"timestamp":"2021-04-08T05:14:14.912Z","era_id":90,"height":1679394457791,"protocol_version":"1.0.0"},"body":{"proposer":"012bac1d0ff9240ff0b7b06d555815640497861619ca12583ddef434885416e69b","deploy_hashes":[],"transfer_hashes":[]},"proofs":[]}}}
id:3
Expand Down Expand Up @@ -184,7 +184,7 @@ curl -s http://127.0.0.1:18888/block
<summary><b>Sample output</b></summary>

```json
{"block_hash":"95b0d7b7e94eb79a7d2c79f66e2324474fc8f54536b9e6b447413fa6d00c2581","block":{"hash":"95b0d7b7e94eb79a7d2c79f66e2324474fc8f54536b9e6b447413fa6d00c2581","header":{"parent_hash":"48a99605ed4d1b27f9ddf8a1a0819c576bec57dd7a1b105247e48a5165b4194b","state_root_hash":"8d439b84b62e0a30f8e115047ce31c5ddeb30bd46eba3de9715412c2979be26e","body_hash":"b34c6c6ea69669597578a1912548ef823f627fe667ddcdb6bcd000acd27c7a2f","random_bit":true,"accumulated_seed":"058b14c76832b32e8cd00750e767c60f407fb13b3b0c1e63aea2d6526202924d","era_end":null,"timestamp":"2022-11-20T12:44:22.912Z","era_id":7173,"height":1277846,"protocol_version":"1.4.8"},"body":{"proposer":"0169e1552a97843ff2ef4318e8a028a9f4ed0c16b3d96f6a6eee21e6ca0d4022bc","deploy_hashes":[],"transfer_hashes":["d2193e27d6f269a6f4e0ede0cca805baa861d553df8c9f438cc7af56acf40c2b"]},"proofs":[]}}
{"block_hash":"95b0d7b7e94eb79a7d2c79f66e2324474fc8f54536b9e6b447413fa6d00c2581","block":{"hash":"95b0d7b7e94eb79a7d2c79f66e2324474fc8f54536b9e6b447413fa6d00c2581","header":{"parent_hash":"48a99605ed4d1b27f9ddf8a1a0819c576bec57dd7a1b105247e48a5165b4194b","state_root_hash":"8d439b84b62e0a30f8e115047ce31c5ddeb30bd46eba3de9715412c2979be26e","body_hash":"b34c6c6ea69669597578a1912548ef823f627fe667ddcdb6bcd000acd27c7a2f","random_bit":true,"accumulated_seed":"058b14c76832b32e8cd00750e767c60f407fb13b3b0c1e63aea2d6526202924d","era_end":null,"timestamp":"2022-11-20T12:44:22.912Z","era_id":7173,"height":1277846,"protocol_version":"1.5.2"},"body":{"proposer":"0169e1552a97843ff2ef4318e8a028a9f4ed0c16b3d96f6a6eee21e6ca0d4022bc","deploy_hashes":[],"transfer_hashes":["d2193e27d6f269a6f4e0ede0cca805baa861d553df8c9f438cc7af56acf40c2b"]},"proofs":[]}}
```
</details>
<br></br>
Expand All @@ -206,7 +206,7 @@ curl -s http://127.0.0.1:18888/block/96a989a7f4514909b442faba3acbf643378fb7f57f9
<summary><b>Sample output</b></summary>

```json
{"block_hash":"96a989a7f4514909b442faba3acbf643378fb7f57f9c9e32013fdfad64e3c8a5","block":{"hash":"96a989a7f4514909b442faba3acbf643378fb7f57f9c9e32013fdfad64e3c8a5","header":{"parent_hash":"8f29120995ae6942d1a48cc4ac8dc3be5de5886f1fb53140356c907f1a70d7ef","state_root_hash":"c8964dddfe3660f481f750c5acd776fe7e08c1e168a4184707d07da6bac5397c","body_hash":"31984faf50cfb2b96774e388a16407cbf362b66d22e1d55201cc0709fa3e1803","random_bit":false,"accumulated_seed":"5ce60583fc1a8b3da07900b7223636eadd97ea8eef6abec28cdbe4b3326c1d6c","era_end":null,"timestamp":"2022-11-20T18:36:05.504Z","era_id":7175,"height":1278485,"protocol_version":"1.4.8"},"body":{"proposer":"017de9688caedd0718baed968179ddbe0b0532a8ef0a9a1cb9dfabe9b0f6016fa8","deploy_hashes":[],"transfer_hashes":[]},"proofs":[]}}
{"block_hash":"96a989a7f4514909b442faba3acbf643378fb7f57f9c9e32013fdfad64e3c8a5","block":{"hash":"96a989a7f4514909b442faba3acbf643378fb7f57f9c9e32013fdfad64e3c8a5","header":{"parent_hash":"8f29120995ae6942d1a48cc4ac8dc3be5de5886f1fb53140356c907f1a70d7ef","state_root_hash":"c8964dddfe3660f481f750c5acd776fe7e08c1e168a4184707d07da6bac5397c","body_hash":"31984faf50cfb2b96774e388a16407cbf362b66d22e1d55201cc0709fa3e1803","random_bit":false,"accumulated_seed":"5ce60583fc1a8b3da07900b7223636eadd97ea8eef6abec28cdbe4b3326c1d6c","era_end":null,"timestamp":"2022-11-20T18:36:05.504Z","era_id":7175,"height":1278485,"protocol_version":"1.5.2"},"body":{"proposer":"017de9688caedd0718baed968179ddbe0b0532a8ef0a9a1cb9dfabe9b0f6016fa8","deploy_hashes":[],"transfer_hashes":[]},"proofs":[]}}
```
</details>
<br></br>
Expand All @@ -227,7 +227,7 @@ curl -s http://127.0.0.1:18888/block/1278485
<summary><b>Sample output</b></summary>

```json
{"block_hash":"96a989a7f4514909b442faba3acbf643378fb7f57f9c9e32013fdfad64e3c8a5","block":{"hash":"96a989a7f4514909b442faba3acbf643378fb7f57f9c9e32013fdfad64e3c8a5","header":{"parent_hash":"8f29120995ae6942d1a48cc4ac8dc3be5de5886f1fb53140356c907f1a70d7ef","state_root_hash":"c8964dddfe3660f481f750c5acd776fe7e08c1e168a4184707d07da6bac5397c","body_hash":"31984faf50cfb2b96774e388a16407cbf362b66d22e1d55201cc0709fa3e1803","random_bit":false,"accumulated_seed":"5ce60583fc1a8b3da07900b7223636eadd97ea8eef6abec28cdbe4b3326c1d6c","era_end":null,"timestamp":"2022-11-20T18:36:05.504Z","era_id":7175,"height":1278485,"protocol_version":"1.4.8"},"body":{"proposer":"017de9688caedd0718baed968179ddbe0b0532a8ef0a9a1cb9dfabe9b0f6016fa8","deploy_hashes":[],"transfer_hashes":[]},"proofs":[]}}
{"block_hash":"96a989a7f4514909b442faba3acbf643378fb7f57f9c9e32013fdfad64e3c8a5","block":{"hash":"96a989a7f4514909b442faba3acbf643378fb7f57f9c9e32013fdfad64e3c8a5","header":{"parent_hash":"8f29120995ae6942d1a48cc4ac8dc3be5de5886f1fb53140356c907f1a70d7ef","state_root_hash":"c8964dddfe3660f481f750c5acd776fe7e08c1e168a4184707d07da6bac5397c","body_hash":"31984faf50cfb2b96774e388a16407cbf362b66d22e1d55201cc0709fa3e1803","random_bit":false,"accumulated_seed":"5ce60583fc1a8b3da07900b7223636eadd97ea8eef6abec28cdbe4b3326c1d6c","era_end":null,"timestamp":"2022-11-20T18:36:05.504Z","era_id":7175,"height":1278485,"protocol_version":"1.5.2"},"body":{"proposer":"017de9688caedd0718baed968179ddbe0b0532a8ef0a9a1cb9dfabe9b0f6016fa8","deploy_hashes":[],"transfer_hashes":[]},"proofs":[]}}
```
</details>
<br></br>
Expand Down
1 change: 1 addition & 0 deletions listener/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tokio-stream = { version = "0.1.4", features = ["sync"] }
tokio-util = "0.7.8"
tracing = "0.1"
url = "2.3.1"
once_cell = {workspace = true}

[dev-dependencies]
casper-event-types = { path = "../types", version = "1.0.0", features = ["sse-data-testing"]}
Expand Down
166 changes: 8 additions & 158 deletions listener/src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use crate::{
use anyhow::Error;
use casper_event_types::{
metrics,
sse_data::{deserialize, SseData, SseDataDeserializeError},
sse_data::{deserialize, SseData},
Filter,
};
use casper_types::ProtocolVersion;
use eventsource_stream::Event;
use reqwest::Url;
use std::{
Expand All @@ -23,7 +22,6 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::{error, trace, warn};

type DeserializationFn = fn(&str) -> Result<(SseData, bool), SseDataDeserializeError>;
const FETCHING_FROM_STREAM_FAILED: &str = "fetching_from_stream_failed";
const DESERIALIZATION_ERROR: &str = "deserialization_error";
const EVENT_WITHOUT_ID: &str = "event_without_id";
Expand All @@ -40,7 +38,6 @@ pub(super) struct ConnectionManager {
sse_event_sender: Sender<SseEvent>,
maybe_tasks: Option<ConnectionTasks>,
filter: Filter,
deserialization_fn: DeserializationFn,
current_event_id_sender: Sender<(Filter, u32)>,
cancellation_token: CancellationToken,
}
Expand Down Expand Up @@ -84,9 +81,6 @@ pub struct ConnectionManagerBuilder {
pub(super) start_from_event_id: Option<u32>,
/// Nodes filter to which we are connected
pub(super) filter: Filter,
/// Build version of the node. It's necessary because after 1.2 non-backwards compatible changes to the event
/// structure were introduced and we need to apply different deserialization logic
pub(super) node_build_version: ProtocolVersion,
/// Channel via which we inform that this filter observed a specific event_id so the ConnectionListener can give
/// a correct start_from_event_id parameter in case of a connection restart
pub(super) current_event_id_sender: Sender<(Filter, u32)>,
Expand All @@ -111,7 +105,6 @@ impl ConnectionManagerBuilder {
sse_event_sender: self.sse_data_sender,
maybe_tasks: self.maybe_tasks,
filter: self.filter,
deserialization_fn: determine_deserializer(self.node_build_version),
current_event_id_sender: self.current_event_id_sender,
cancellation_token: self.cancellation_token,
}
Expand Down Expand Up @@ -214,7 +207,7 @@ impl ConnectionManager {
}

async fn handle_event(&mut self, event: Event) -> Result<(), Error> {
match (self.deserialization_fn)(&event.data) {
match deserialize(&event.data) {
Err(serde_error) => {
count_error(DESERIALIZATION_ERROR);
let error_message = format!("Serde Error: {}", serde_error);
Expand Down Expand Up @@ -358,15 +351,6 @@ fn expected_first_message_to_be_api_version(data: String) -> ConnectionManagerEr
)))
}

fn determine_deserializer(node_build_version: ProtocolVersion) -> DeserializationFn {
let one_two_zero = ProtocolVersion::from_parts(1, 2, 0);
if node_build_version.lt(&one_two_zero) {
casper_event_types::sse_data_1_0_0::deserialize
} else {
deserialize
}
}

fn count_error(reason: &str) {
metrics::ERROR_COUNTS
.with_label_values(&["connection_manager", reason])
Expand All @@ -380,7 +364,7 @@ mod tests {
sse_connector::{MockSseConnection, StreamConnector},
SseEvent,
};
use casper_event_types::{sse_data::test_support::*, sse_data_1_0_0::test_support::*, Filter};
use casper_event_types::{sse_data::test_support::*, Filter};
use tokio::sync::mpsc::{channel, Receiver};
use tokio_util::sync::CancellationToken;
use url::Url;
Expand Down Expand Up @@ -412,8 +396,8 @@ mod tests {
#[tokio::test]
async fn given_data_without_api_version_should_fail() {
let data = vec![
example_block_added_1_0_0(BLOCK_HASH_1, "1"),
example_block_added_1_0_0(BLOCK_HASH_2, "2"),
example_block_added_1_5_2(BLOCK_HASH_1, "1"),
example_block_added_1_5_2(BLOCK_HASH_2, "2"),
];
let connector = Box::new(MockSseConnection::build_with_data(data));
let (mut connection_manager, _, _) = build_manager(connector);
Expand All @@ -431,8 +415,8 @@ mod tests {
async fn given_data_should_pass_data() {
let data = vec![
example_api_version(),
example_block_added_1_0_0(BLOCK_HASH_1, "1"),
example_block_added_1_0_0(BLOCK_HASH_2, "2"),
example_block_added_1_5_2(BLOCK_HASH_1, "1"),
example_block_added_1_5_2(BLOCK_HASH_2, "2"),
];
let connector = Box::new(MockSseConnection::build_with_data(data));
let (mut connection_manager, data_tx, event_ids) = build_manager(connector);
Expand All @@ -450,7 +434,7 @@ mod tests {
let data = vec![
example_api_version(),
"XYZ".to_string(),
example_block_added_1_0_0(BLOCK_HASH_2, "2"),
example_block_added_1_5_2(BLOCK_HASH_2, "2"),
];
let connector = Box::new(MockSseConnection::build_with_data(data));
let (mut connection_manager, data_tx, _event_ids) = build_manager(connector);
Expand Down Expand Up @@ -496,143 +480,9 @@ mod tests {
sse_event_sender: data_tx,
maybe_tasks: None,
filter: Filter::Sigs,
deserialization_fn: casper_event_types::sse_data_1_0_0::deserialize,
current_event_id_sender: event_id_tx,
cancellation_token,
};
(manager, data_rx, event_id_rx)
}
}

#[cfg(test)]
mod deserialization_tests {
use casper_event_types::{
sse_data::test_support::{example_block_added_1_4_10, BLOCK_HASH_1},
sse_data_1_0_0::test_support::example_block_added_1_0_0,
};
use serde_json::Value;

use super::*;

#[tokio::test]
async fn given_determine_deserializer_and_1_0_0_should_return_1_0_0_deserializer() {
let legacy_block_added_raw = example_block_added_1_0_0(BLOCK_HASH_1, "1");
let new_format_block_added_raw = example_block_added_1_4_10(BLOCK_HASH_1, "1");
let protocol_version = ProtocolVersion::from_parts(1, 0, 0);
let deserializer = determine_deserializer(protocol_version);
let tuple = (deserializer)(&legacy_block_added_raw).unwrap();
let sse_data = tuple.0;
assert!(tuple.1);

assert!(matches!(sse_data, SseData::BlockAdded { .. }));
if let SseData::BlockAdded {
block_hash: _,
block,
} = sse_data.clone()
{
assert!(block.proofs.is_empty());
assert_eq!(
serde_json::to_value(sse_data).unwrap(),
serde_json::from_str::<Value>(&new_format_block_added_raw).unwrap()
);
}
}

#[tokio::test]
async fn given_determine_deserializer_and_1_1_0_should_return_generic_deserializer_which_fails_on_contemporary_block_added(
) {
let new_format_block_added_raw = example_block_added_1_4_10(BLOCK_HASH_1, "1");
let protocol_version = ProtocolVersion::from_parts(1, 1, 0);
let deserializer = determine_deserializer(protocol_version);
let result = (deserializer)(&new_format_block_added_raw);
assert!(result.is_err());
}

#[tokio::test]
async fn given_determine_deserializer_and_1_1_0_should_return_generic_deserializer_which_deserializes_legacy_block_added(
) {
let legacy_block_added_raw = example_block_added_1_0_0(BLOCK_HASH_1, "1");
let new_format_block_added_raw = example_block_added_1_4_10(BLOCK_HASH_1, "1");
let protocol_version = ProtocolVersion::from_parts(1, 1, 0);
let deserializer = determine_deserializer(protocol_version);
let tuple = (deserializer)(&legacy_block_added_raw).unwrap();
let sse_data = tuple.0;
assert!(tuple.1);

assert!(matches!(sse_data, SseData::BlockAdded { .. }));
if let SseData::BlockAdded {
block_hash: _,
block,
} = sse_data.clone()
{
assert!(block.proofs.is_empty());
let sse_data_1_4_10 =
serde_json::from_str::<Value>(&new_format_block_added_raw).unwrap();
assert_eq!(serde_json::to_value(sse_data).unwrap(), sse_data_1_4_10);
}
}

#[tokio::test]
async fn given_determine_deserializer_and_1_2_0_should_return_generic_deserializer_which_fails_on_legacy_block_added(
) {
let legacy_block_added_raw = example_block_added_1_0_0(BLOCK_HASH_1, "1");
let protocol_version = ProtocolVersion::from_parts(1, 2, 0);
let deserializer = determine_deserializer(protocol_version);
let result = (deserializer)(&legacy_block_added_raw);
assert!(result.is_err());
}

#[tokio::test]
async fn given_determine_deserializer_and_1_2_0_should_deserialize_contemporary_block_added_payload(
) {
let block_added_raw = example_block_added_1_4_10(BLOCK_HASH_1, "1");
let protocol_version = ProtocolVersion::from_parts(1, 2, 0);
let deserializer = determine_deserializer(protocol_version);
let tuple = (deserializer)(&block_added_raw).unwrap();
let sse_data = tuple.0;
assert!(!tuple.1);

assert!(matches!(sse_data, SseData::BlockAdded { .. }));
if let SseData::BlockAdded {
block_hash: _,
block,
} = sse_data.clone()
{
assert!(block.proofs.is_empty());
let raw = serde_json::to_string(&sse_data);
assert_eq!(raw.unwrap(), block_added_raw);
}
}

#[tokio::test]
async fn given_determine_deserializer_and_1_4_10_should_return_generic_deserializer_which_fails_on_legacy_block_added(
) {
let block_added_raw = example_block_added_1_0_0(BLOCK_HASH_1, "1");
let protocol_version = ProtocolVersion::from_parts(1, 2, 0);
let deserializer = determine_deserializer(protocol_version);
let result = (deserializer)(&block_added_raw);
assert!(result.is_err());
}

#[tokio::test]
async fn given_determine_deserializer_and_1_4_10_should_deserialize_contemporary_block_added_payload(
) {
let block_added_raw = example_block_added_1_4_10(BLOCK_HASH_1, "1");
let protocol_version = ProtocolVersion::from_parts(1, 4, 10);
let deserializer = determine_deserializer(protocol_version);
let tuple = (deserializer)(&block_added_raw).unwrap();
let sse_data = tuple.0;
assert!(!tuple.1);

assert!(matches!(sse_data, SseData::BlockAdded { .. }));
if let SseData::BlockAdded {
block_hash: _,
block,
} = sse_data.clone()
{
assert!(block.proofs.is_empty());
let raw = serde_json::to_string(&sse_data);
assert_eq!(raw.unwrap(), block_added_raw);
}
}
}
Loading

0 comments on commit 36ca2dd

Please sign in to comment.