Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakub Zajkowski committed Oct 5, 2023
1 parent 13b0136 commit 0b35eae
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 185 deletions.
107 changes: 63 additions & 44 deletions listener/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use serde_json::Value;
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use tokio::{
sync::{
mpsc::{self, Receiver, Sender},
mpsc::{self, Sender},
Mutex,
},
time::sleep,
Expand All @@ -43,6 +43,14 @@ pub struct EventListenerBuilder {
pub no_message_timeout: Duration,
}

enum GetVersionResult {
Ok(Option<ProtocolVersion>),
Retry,
Error(Error),
}

type FilterWithEventId = Sender<(Filter, u32)>;
type CurrentFilterToIdHolder = Arc<Mutex<HashMap<Filter, u32>>>;
impl EventListenerBuilder {
pub fn build(&self) -> EventListener {
EventListener {
Expand Down Expand Up @@ -128,7 +136,7 @@ enum ConnectOutcome {

enum BuildVersionFetchError {
Error(anyhow::Error),
VersionNotAcceptable(String)
VersionNotAcceptable(String),
}
impl EventListener {
fn filtered_sse_url(&self, filter: &Filter) -> Result<Url, Error> {
Expand Down Expand Up @@ -178,37 +186,68 @@ impl EventListener {
}
}

async fn get_version(&mut self, current_attempt: usize) -> GetVersionResult {
let fetch_result = self
.fetch_build_version(self.node_build_version, current_attempt)
.await;
match fetch_result {
Ok(Some(new_node_build_version)) => {
if self.node_build_version != new_node_build_version {
return GetVersionResult::Ok(Some(new_node_build_version));
}
GetVersionResult::Ok(None)
}
Err(BuildVersionFetchError::VersionNotAcceptable(msg)) => {
//The node has a build version which sidecar can't talk to. Failing fast in this case.
GetVersionResult::Error(Error::msg(msg))
}
_ => GetVersionResult::Retry,
}
}

fn start_last_event_id_registry(
&self,
node_address: String,
sse_port: u16,
) -> (CurrentFilterToIdHolder, FilterWithEventId) {
let (last_seen_event_id_sender, mut last_seen_event_id_receiver) = mpsc::channel(10);
let last_event_id_for_filter: CurrentFilterToIdHolder =
Arc::new(Mutex::new(HashMap::<Filter, u32>::new()));
let last_event_id_for_filter_for_thread = last_event_id_for_filter.clone();
tokio::spawn(async move {
while let Some((filter, id)) = last_seen_event_id_receiver.recv().await {
EventListenerStatus::Connected.log_status(node_address.as_str(), sse_port);
let last_event_id_for_filter_clone = last_event_id_for_filter_for_thread.clone();
let mut guard = last_event_id_for_filter_clone.lock().await;
guard.insert(filter, id);
drop(guard);
}
});
(last_event_id_for_filter, last_seen_event_id_sender)
}

/// Spins up the connections and starts pushing data from node
///
/// * `is_empty_database` - if set to true, sidecar will connect to the node and fetch all the events the node has in it's cache.
pub async fn stream_aggregated_events(&mut self, is_empty_database: bool) -> Result<(), Error> {
EventListenerStatus::Preparing.log_status_for_event_listener(self);
let (last_seen_event_id_sender, last_seen_event_id_receiver) = mpsc::channel(10);
let last_event_id_for_filter = start_last_event_id_registry(
last_seen_event_id_receiver,
self.node.ip_address.to_string(),
self.node.sse_port,
);
let (last_event_id_for_filter, last_seen_event_id_sender) =
self.start_last_event_id_registry(self.node.ip_address.to_string(), self.node.sse_port);
EventListenerStatus::Connecting.log_status_for_event_listener(self);
let mut current_attempt = 1;
while current_attempt <= self.max_connection_attempts {
let fetch_result = self.fetch_build_version(self.node_build_version, current_attempt).await;
match fetch_result {
Ok(Some(new_node_build_version)) => {
if self.node_build_version != new_node_build_version {
self.node_build_version = new_node_build_version;
current_attempt = 1
}
},
Err(BuildVersionFetchError::VersionNotAcceptable(msg)) => {
//The node has a build version which sidecar can't talk to. Failing fast in this case.
return Err(Error::msg(msg));
},
_ => {
match self.get_version(current_attempt).await {
GetVersionResult::Ok(Some(protocol_version)) => {
self.node_build_version = protocol_version;
current_attempt = 1
}
GetVersionResult::Retry => {
sleep(self.delay_between_attempts).await;
current_attempt += 1;
continue;
}
GetVersionResult::Error(e) => return Err(e),
_ => {}
}
match self
.do_connect(
Expand All @@ -231,7 +270,7 @@ impl EventListener {
&mut self,
last_event_id_for_filter: Arc<Mutex<HashMap<Filter, u32>>>,
is_empty_database: bool,
last_seen_event_id_sender: Sender<(Filter, u32)>,
last_seen_event_id_sender: FilterWithEventId,
) -> Result<ConnectOutcome, Error> {
let connections = self
.build_connections(
Expand Down Expand Up @@ -315,7 +354,7 @@ impl EventListener {
&mut self,
last_event_id_for_filter: Arc<Mutex<HashMap<Filter, u32>>>,
is_empty_database: bool,
last_seen_event_id_sender: Sender<(Filter, u32)>,
last_seen_event_id_sender: FilterWithEventId,
) -> Result<HashMap<Filter, ConnectionManager>, Error> {
let filters = filters_from_version(self.node_build_version);
let mut connections = HashMap::new();
Expand Down Expand Up @@ -379,7 +418,7 @@ impl EventListener {
maybe_tasks: Option<ConnectionTasks>,
start_from_event_id: Option<u32>,
filter: Filter,
last_seen_event_id_sender: Sender<(Filter, u32)>,
last_seen_event_id_sender: FilterWithEventId,
) -> Result<ConnectionManager, Error> {
let bind_address_for_filter = self.filtered_sse_url(&filter)?;
let keep_alive_monitor = KeepAliveMonitor::new(
Expand Down Expand Up @@ -426,26 +465,6 @@ fn start_connections(
.collect()
}

fn start_last_event_id_registry(
mut last_seen_event_id_receiver: Receiver<(Filter, u32)>,
node_address: String,
sse_port: u16,
) -> Arc<Mutex<HashMap<Filter, u32>>> {
let last_event_id_for_filter: Arc<Mutex<HashMap<Filter, u32>>> =
Arc::new(Mutex::new(HashMap::<Filter, u32>::new()));
let last_event_id_for_filter_for_thread = last_event_id_for_filter.clone();
tokio::spawn(async move {
while let Some((filter, id)) = last_seen_event_id_receiver.recv().await {
EventListenerStatus::Connected.log_status(node_address.as_str(), sse_port);
let last_event_id_for_filter_clone = last_event_id_for_filter_for_thread.clone();
let mut guard = last_event_id_for_filter_clone.lock().await;
guard.insert(filter, id);
drop(guard);
}
});
last_event_id_for_filter
}

fn try_resolve_version(raw_response: Value) -> Result<ProtocolVersion, Error> {
match raw_response.get(BUILD_VERSION_KEY) {
Some(build_version_value) if build_version_value.is_string() => {
Expand Down
36 changes: 18 additions & 18 deletions sidecar/src/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
raw_sse_events_utils::tests::{
random_n_block_added, sse_server_example_1_5_2_data,
sse_server_example_1_5_2_data_second, sse_server_example_1_5_2_data_third,
EventsWithIds,
sse_server_shutdown_1_5_2_data, EventsWithIds,
},
shared::EventType,
testing_config::{prepare_config, TestingConfig},
Expand Down Expand Up @@ -290,8 +290,8 @@ async fn shutdown_should_be_passed_through() {
event_stream_server_port,
) = build_test_config();
let mut node_mock = MockNodeBuilder {
version: "1.0.0".to_string(),
data_of_node: sse_server_shutdown_1_0_0_data(),
version: "1.5.2".to_string(),
data_of_node: sse_server_shutdown_1_5_2_data(),
cache_of_node: None,
sse_port: Some(node_port_for_sse_connection),
rest_port: Some(node_port_for_rest_connection),
Expand All @@ -306,7 +306,7 @@ async fn shutdown_should_be_passed_through() {

let events_received = tokio::join!(join_handle).0.unwrap();
assert_eq!(events_received.len(), 3);
assert!(events_received.get(0).unwrap().contains("\"1.0.0\""));
assert!(events_received.get(0).unwrap().contains("\"1.5.2\""));
assert!(events_received.get(1).unwrap().contains("\"Shutdown\""));
assert!(events_received.get(2).unwrap().contains("\"BlockAdded\""));
}
Expand Down Expand Up @@ -335,8 +335,8 @@ async fn shutdown_should_be_passed_through_when_versions_change() {
let receiver = wait_for_n_messages(3, receiver, Duration::from_secs(120)).await;
stop_nodes_and_wait(vec![&mut node_mock]).await;
let mut node_mock = MockNodeBuilder::build_example_node_with_version(
node_port_for_sse_connection,
node_port_for_rest_connection,
Some(node_port_for_sse_connection),
Some(node_port_for_rest_connection),
"1.5.3",
);
start_nodes_and_wait(vec![&mut node_mock]).await;
Expand Down Expand Up @@ -392,7 +392,7 @@ async fn sidecar_should_use_start_from_if_database_is_empty() {
) = build_test_config();
let data_of_node = vec![(
Some("2".to_string()),
example_block_added_1_4_10(BLOCK_HASH_3, "3"),
example_block_added_1_5_2(BLOCK_HASH_3, "3"),
)];
let mut node_mock = MockNodeBuilder {
version: "1.5.2".to_string(),
Expand Down Expand Up @@ -435,7 +435,7 @@ async fn sidecar_should_not_use_start_from_if_database_is_not_empty() {
.unwrap();
let mut node_mock = MockNodeBuilder {
version: "1.5.2".to_string(),
data_of_node: sse_server_example_1_4_10_data_second(),
data_of_node: sse_server_example_1_5_2_data_second(),
cache_of_node: Some(sse_server_example_1_5_2_data()),
sse_port: Some(node_port_for_sse_connection),
rest_port: Some(node_port_for_rest_connection),
Expand All @@ -458,13 +458,13 @@ async fn sidecar_should_not_use_start_from_if_database_is_not_empty() {
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn sidecar_should_connect_to_multiple_nodes() {
let (sse_port_1, rest_port_1, mut mock_node_1) =
build_1_4_10(sse_server_example_1_5_2_data()).await;
build_1_5_2(sse_server_example_1_5_2_data()).await;
mock_node_1.start().await;
let (sse_port_2, rest_port_2, mut mock_node_2) =
build_1_4_10(sse_server_example_1_4_10_data_second()).await;
build_1_5_2(sse_server_example_1_5_2_data_second()).await;
mock_node_2.start().await;
let (sse_port_3, rest_port_3, mut mock_node_3) =
build_1_4_10(sse_server_example_1_4_10_data_third()).await;
build_1_5_2(sse_server_example_1_5_2_data_third()).await;
mock_node_3.start().await;
let (testing_config, event_stream_server_port, _temp_storage_dir) =
build_testing_config_based_on_ports(vec![
Expand Down Expand Up @@ -499,10 +499,10 @@ async fn sidecar_should_connect_to_multiple_nodes() {
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn sidecar_should_not_downgrade_api_version_when_new_nodes_disconnect() {
let (sse_port_1, rest_port_1, mut mock_node_1) =
build_1_4_10(sse_server_example_1_5_2_data()).await;
build_1_5_2(sse_server_example_1_5_2_data()).await;
mock_node_1.start().await;
let (sse_port_2, rest_port_2, mut mock_node_2) =
build_1_4_10(sse_server_example_1_4_9_data_second()).await;
build_1_5_2(sse_server_example_1_5_2_data_second()).await;
let (testing_config, event_stream_server_port, _temp_storage_dir) =
build_testing_config_based_on_ports(vec![
(sse_port_1, rest_port_1),
Expand Down Expand Up @@ -533,9 +533,9 @@ async fn sidecar_should_not_downgrade_api_version_when_new_nodes_disconnect() {
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sidecar_should_report_only_one_api_version_if_there_was_no_update() {
let (sse_port_1, rest_port_1, mut mock_node_1) =
build_1_4_10(sse_server_example_1_5_2_data()).await;
build_1_5_2(sse_server_example_1_5_2_data()).await;
let (sse_port_2, rest_port_2, mut mock_node_2) =
build_1_4_10(sse_server_example_1_4_10_data_second()).await;
build_1_5_2(sse_server_example_1_5_2_data_second()).await;
start_nodes_and_wait(vec![&mut mock_node_1, &mut mock_node_2]).await;
let (testing_config, event_stream_server_port, _temp_storage_dir) =
build_testing_config_based_on_ports(vec![
Expand Down Expand Up @@ -564,9 +564,9 @@ async fn sidecar_should_report_only_one_api_version_if_there_was_no_update() {
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn sidecar_should_connect_to_multiple_nodes_even_if_some_of_them_dont_respond() {
let (sse_port_1, rest_port_1, mut mock_node_1) =
build_1_4_10(sse_server_example_1_5_2_data()).await;
build_1_5_2(sse_server_example_1_5_2_data()).await;
let (sse_port_2, rest_port_2, mut mock_node_2) =
build_1_4_10(sse_server_example_1_4_10_data_second()).await;
build_1_5_2(sse_server_example_1_5_2_data_second()).await;
start_nodes_and_wait(vec![&mut mock_node_1, &mut mock_node_2]).await;
let (testing_config, event_stream_server_port, _temp_storage_dir) =
build_testing_config_based_on_ports(vec![
Expand Down Expand Up @@ -596,7 +596,7 @@ async fn sidecar_should_connect_to_multiple_nodes_even_if_some_of_them_dont_resp

async fn partial_connection_test(allow_partial_connection: bool) -> Vec<EventType> {
// Prepare the mock node, by the "default" config it should have only the /events and /events/main endpoints
let (sse_port, rest_port, mut node_mock) = build_1_4_10(sse_server_example_1_5_2_data()).await;
let (sse_port, rest_port, mut node_mock) = build_1_5_2(sse_server_example_1_5_2_data()).await;
// Setup config for the sidecar
// - Set the sidecar to reattempt connection only once after a 2 second delay.
// - Allow partial based on the value passed to the function.
Expand Down
Loading

0 comments on commit 0b35eae

Please sign in to comment.