Skip to content

Commit

Permalink
Merge pull request #357 from teonite/validate_network_name
Browse files Browse the repository at this point in the history
Validate network name if specified
  • Loading branch information
wojcik91 authored Dec 3, 2024
2 parents e9233e5 + 6310c9e commit a1a0c11
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 46 deletions.
13 changes: 11 additions & 2 deletions event_sidecar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ pub async fn run(
config: SseEventServerConfig,
index_storage_folder: String,
maybe_database: Option<Database>,
maybe_network_name: Option<String>,
) -> Result<ExitCode, Error> {
validate_config(&config)?;
let (event_listeners, sse_data_receivers) = build_event_listeners(&config)?;
let (event_listeners, sse_data_receivers) = build_event_listeners(&config, maybe_network_name)?;
// This channel allows SseData to be sent from multiple connected nodes to the single EventStreamServer.
let (outbound_sse_data_sender, outbound_sse_data_receiver) =
mpsc_channel(config.outbound_channel_size.unwrap_or(DEFAULT_CHANNEL_SIZE));
Expand Down Expand Up @@ -238,14 +239,20 @@ pub async fn run_rest_server(

fn build_event_listeners(
config: &SseEventServerConfig,
maybe_network_name: Option<String>,
) -> Result<(Vec<EventListener>, Vec<Receiver<SseEvent>>), Error> {
let mut event_listeners = Vec::with_capacity(config.connections.len());
let mut sse_data_receivers = Vec::new();
for connection in &config.connections {
let (inbound_sse_data_sender, inbound_sse_data_receiver) =
mpsc_channel(config.inbound_channel_size.unwrap_or(DEFAULT_CHANNEL_SIZE));
sse_data_receivers.push(inbound_sse_data_receiver);
let event_listener = builder(connection, inbound_sse_data_sender)?.build();
let event_listener = builder(
connection,
inbound_sse_data_sender,
maybe_network_name.clone(),
)?
.build();
event_listeners.push(event_listener?);
}
Ok((event_listeners, sse_data_receivers))
Expand All @@ -254,11 +261,13 @@ fn build_event_listeners(
fn builder(
connection: &Connection,
inbound_sse_data_sender: Sender<SseEvent>,
maybe_network_name: Option<String>,
) -> Result<EventListenerBuilder, Error> {
let node_interface = NodeConnectionInterface {
ip_address: connection.ip_address,
sse_port: connection.sse_port,
rest_port: connection.rest_port,
network_name: maybe_network_name,
};
let event_listener_builder = EventListenerBuilder {
node: node_interface,
Expand Down
13 changes: 12 additions & 1 deletion event_sidecar/src/testing/testing_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct TestingConfig {
pub(crate) event_server_config: SseEventServerConfig,
pub(crate) storage_config: StorageConfig,
pub(crate) rest_api_server_config: RestApiServerConfig,
pub(crate) network_name: Option<String>,
}

#[cfg(test)]
Expand Down Expand Up @@ -44,14 +45,19 @@ pub fn get_port() -> u16 {
/// - The outbound server (REST & SSE) ports are set dynamically to free ports.
/// - If `enable_db_storage` is set to false, the database storage is disabled.
#[cfg(test)]
pub(crate) fn prepare_config(temp_storage: &TempDir, enable_db_storage: bool) -> TestingConfig {
pub(crate) fn prepare_config(
temp_storage: &TempDir,
enable_db_storage: bool,
maybe_network_name: Option<String>,
) -> TestingConfig {
let path_to_temp_storage = temp_storage.path().to_string_lossy().to_string();

let mut testing_config = TestingConfig::default();
if !enable_db_storage {
testing_config.storage_config.clear_db_storage();
}
testing_config.set_storage_folder(path_to_temp_storage);
testing_config.set_network_name(maybe_network_name);
testing_config.allocate_available_ports();

testing_config
Expand All @@ -67,6 +73,7 @@ impl TestingConfig {
event_server_config,
storage_config,
rest_api_server_config,
network_name: None,
}
}

Expand All @@ -84,6 +91,10 @@ impl TestingConfig {
self.storage_config = storage;
}

pub(crate) fn set_network_name(&mut self, maybe_network_name: Option<String>) {
self.network_name = maybe_network_name;
}

pub(crate) fn add_connection(
&mut self,
ip_address: Option<IpAddr>,
Expand Down
72 changes: 63 additions & 9 deletions event_sidecar/src/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ use crate::{
sse_events::{BlockAdded, Fault},
},
utils::tests::{
any_string_contains, build_test_config, build_test_config_with_retries,
build_test_config_without_connections, build_test_config_without_db_storage,
start_nodes_and_wait, start_sidecar, start_sidecar_with_rest_api, stop_nodes_and_wait,
wait_for_n_messages,
any_string_contains, build_test_config, build_test_config_with_network_name,
build_test_config_with_retries, build_test_config_without_connections,
build_test_config_without_db_storage, start_nodes_and_wait, start_sidecar,
start_sidecar_with_rest_api, stop_nodes_and_wait, wait_for_n_messages,
},
};

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn should_not_allow_zero_max_attempts() {
let temp_storage_dir = tempdir().expect("Should have created a temporary storage directory");

let mut testing_config = prepare_config(&temp_storage_dir, true);
let mut testing_config = prepare_config(&temp_storage_dir, true, None);

let sse_port_for_node = testing_config.add_connection(None, None, None);

Expand All @@ -51,6 +51,7 @@ async fn should_not_allow_zero_max_attempts() {
testing_config.inner(),
storage_folder,
Some(Database::SqliteDatabaseWrapper(sqlite_database)),
None,
)
.await
.expect_err("Sidecar should return an Err on shutdown");
Expand Down Expand Up @@ -290,7 +291,7 @@ async fn should_fail_to_reconnect() {
node_port_for_sse_connection,
node_port_for_rest_connection,
event_stream_server_port,
) = build_test_config_with_retries(2, 2, true);
) = build_test_config_with_retries(2, 2, true, None);
let (data_of_node, test_rng) = random_n_block_added(30, 0, test_rng);
let mut node_mock = MockNodeBuilder {
version: "2.0.0".to_string(),
Expand Down Expand Up @@ -339,7 +340,7 @@ async fn should_reconnect() {
node_port_for_sse_connection,
node_port_for_rest_connection,
event_stream_server_port,
) = build_test_config_with_retries(10, 1, true);
) = build_test_config_with_retries(10, 1, true, None);
let (data_of_node, test_rng) = random_n_block_added(30, 0, test_rng);
let mut node_mock = MockNodeBuilder {
version: "2.0.0".to_string(),
Expand Down Expand Up @@ -428,7 +429,50 @@ async fn connecting_to_node_prior_to_2_0_0_should_fail() {
}
.build();
start_nodes_and_wait(vec![&mut node_mock]).await;
start_sidecar(testing_config).await;
let sidecar_join = start_sidecar(testing_config).await;
let (join_handle, _) = fetch_data_from_endpoint_with_panic_flag(
"/events?start_from=0",
event_stream_server_port,
false,
)
.await;
sleep(Duration::from_secs(10)).await; //Give some time for sidecar to read data from node (which it actually shouldn't do in this scenario)
stop_nodes_and_wait(vec![&mut node_mock]).await;

let events_received = tokio::join!(join_handle).0.unwrap();
assert_eq!(events_received.len(), 0);

let shutdown_err = sidecar_join
.await
.unwrap()
.expect_err("Sidecar should return an Err message on shutdown");

assert_eq!(
shutdown_err.to_string(),
"Connected node(s) are unavailable"
)
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn connecting_to_node_with_wrong_network_name_should_fail() {
let (
testing_config,
_temp_storage_dir,
node_port_for_sse_connection,
node_port_for_rest_connection,
event_stream_server_port,
) = build_test_config_with_network_name("network-1");
let mut node_mock = MockNodeBuilder {
version: "2.0.0".to_string(),
network_name: "not-network-1".to_string(),
data_of_node: sse_server_shutdown_2_0_0_data(),
cache_of_node: None,
sse_port: Some(node_port_for_sse_connection),
rest_port: Some(node_port_for_rest_connection),
}
.build();
start_nodes_and_wait(vec![&mut node_mock]).await;
let sidecar_join = start_sidecar(testing_config).await;
let (join_handle, _) = fetch_data_from_endpoint_with_panic_flag(
"/events?start_from=0",
event_stream_server_port,
Expand All @@ -440,6 +484,16 @@ async fn connecting_to_node_prior_to_2_0_0_should_fail() {

let events_received = tokio::join!(join_handle).0.unwrap();
assert_eq!(events_received.len(), 0);

let shutdown_err = sidecar_join
.await
.unwrap()
.expect_err("Sidecar should return an Err message on shutdown");

assert_eq!(
shutdown_err.to_string(),
"Connected node(s) are unavailable"
)
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
Expand Down Expand Up @@ -825,7 +879,7 @@ pub fn build_testing_config_based_on_ports(
ports_of_nodes: Vec<(u16, u16)>,
) -> (TestingConfig, u16, TempDir) {
let (mut testing_config, temp_storage_dir, event_stream_server_port) =
build_test_config_without_connections(true);
build_test_config_without_connections(true, None);
for (sse_port, rest_port) in ports_of_nodes {
testing_config.add_connection(None, Some(sse_port), Some(rest_port));
testing_config.set_retries_for_node(sse_port, 5, 2);
Expand Down
4 changes: 3 additions & 1 deletion event_sidecar/src/tests/performance_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la
let test_rng = TestRng::new();

let temp_storage_dir = tempdir().expect("Should have created a temporary storage directory");
let mut testing_config = prepare_config(&temp_storage_dir, true);
let mut testing_config = prepare_config(&temp_storage_dir, true, None);
testing_config.add_connection(None, None, None);
let node_port_for_sse_connection = testing_config
.event_server_config
Expand Down Expand Up @@ -293,6 +293,7 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la
ip_address,
sse_port: node_port_for_sse_connection,
rest_port: node_port_for_rest_connection,
network_name: None,
};
let (node_event_tx, node_event_rx) = mpsc::channel(100);
let mut node_event_listener = EventListenerBuilder {
Expand Down Expand Up @@ -321,6 +322,7 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la
ip_address: IpAddr::from_str("127.0.0.1").expect("Couldn't parse IpAddr"),
sse_port: node_port_for_sse_connection,
rest_port: node_port_for_rest_connection,
network_name: None,
};
let mut sidecar_event_listener = EventListenerBuilder {
node: sidecar_node_interface,
Expand Down
27 changes: 21 additions & 6 deletions event_sidecar/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,29 +274,38 @@ pub mod tests {
}

pub fn build_test_config() -> (TestingConfig, TempDir, u16, u16, u16) {
build_test_config_with_retries(10, 1, true)
build_test_config_with_retries(10, 1, true, None)
}

pub fn build_test_config_without_db_storage() -> (TestingConfig, TempDir, u16, u16, u16) {
build_test_config_with_retries(10, 1, false)
build_test_config_with_retries(10, 1, false, None)
}

pub fn build_test_config_with_network_name(
network_name: &str,
) -> (TestingConfig, TempDir, u16, u16, u16) {
build_test_config_with_retries(10, 1, true, Some(network_name.into()))
}

pub fn build_test_config_without_connections(
enable_db_storage: bool,
maybe_network_name: Option<String>,
) -> (TestingConfig, TempDir, u16) {
let temp_storage_dir =
tempdir().expect("Should have created a temporary storage directory");
let testing_config = prepare_config(&temp_storage_dir, enable_db_storage);
let testing_config =
prepare_config(&temp_storage_dir, enable_db_storage, maybe_network_name);
let event_stream_server_port = testing_config.event_stream_server_port();
(testing_config, temp_storage_dir, event_stream_server_port)
}
pub fn build_test_config_with_retries(
max_attempts: usize,
delay_between_retries: usize,
enable_db_storage: bool,
maybe_network_name: Option<String>,
) -> (TestingConfig, TempDir, u16, u16, u16) {
let (mut testing_config, temp_storage_dir, event_stream_server_port) =
build_test_config_without_connections(enable_db_storage);
build_test_config_without_connections(enable_db_storage, maybe_network_name);
testing_config.add_connection(None, None, None);
let node_port_for_sse_connection = testing_config
.event_server_config
Expand Down Expand Up @@ -406,7 +415,7 @@ pub mod tests {
let context = build_postgres_database().await.unwrap();
let temp_storage_dir =
tempdir().expect("Should have created a temporary storage directory");
let mut testing_config = prepare_config(&temp_storage_dir, true);
let mut testing_config = prepare_config(&temp_storage_dir, true, None);
let event_stream_server_port = testing_config.event_stream_server_port();
testing_config.set_storage(StorageConfig::postgres_with_port(context.port));
testing_config.add_connection(None, None, None);
Expand Down Expand Up @@ -467,6 +476,12 @@ pub mod tests {
run_rest_server(rest_api_server_config, database_for_rest_api).await
});
}
run(sse_config, storage_folder, maybe_database).await
run(
sse_config,
storage_folder,
maybe_database,
testing_config.network_name,
)
.await
}
}
8 changes: 8 additions & 0 deletions listener/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ impl EventListener {
let fetch_result = self.version_fetcher.fetch().await;
match fetch_result {
Ok(node_metadata) => {
// check if reveived network name matches optional configuration
if let Some(network_name) = &self.node.network_name {
if *network_name != node_metadata.network_name {
let msg = format!("Network name {network_name} does't match name {} configured for node connection", node_metadata.network_name);
error!("{msg}");
return GetNodeMetadataResult::Error(Error::msg(msg));
}
}
if self.node_metadata != node_metadata {
return GetNodeMetadataResult::Ok(Some(node_metadata));
}
Expand Down
2 changes: 2 additions & 0 deletions listener/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct NodeConnectionInterface {
pub ip_address: IpAddr,
pub sse_port: u16,
pub rest_port: u16,
pub network_name: Option<String>,
}

#[cfg(test)]
Expand All @@ -20,6 +21,7 @@ impl Default for NodeConnectionInterface {
ip_address: "127.0.0.1".parse().unwrap(),
sse_port: 100,
rest_port: 200,
network_name: None,
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions rpc_sidecar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ pub const SUPPORTED_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::from_pa
pub const CLIENT_SHUTDOWN_EXIT_CODE: u8 = 0x3;

pub type MaybeRpcServerReturn<'a> = Result<Option<BoxFuture<'a, Result<ExitCode, Error>>>, Error>;
pub async fn build_rpc_server<'a>(config: RpcServerConfig) -> MaybeRpcServerReturn<'a> {
pub async fn build_rpc_server<'a>(
config: RpcServerConfig,
maybe_network_name: Option<String>,
) -> MaybeRpcServerReturn<'a> {
let (node_client, reconnect_loop, keepalive_loop) =
FramedNodeClient::new(config.node_client.clone()).await?;
FramedNodeClient::new(config.node_client.clone(), maybe_network_name).await?;
let node_client: Arc<dyn NodeClient> = Arc::new(node_client);
let mut futures = Vec::new();
let main_server_config = config.main_server;
Expand Down
Loading

0 comments on commit a1a0c11

Please sign in to comment.