Skip to content

Commit

Permalink
Merge pull request #358 from teonite/consistent_port_config
Browse files Browse the repository at this point in the history
Make IP & port config consistent across all node connections
  • Loading branch information
wojcik91 authored Nov 19, 2024
2 parents 7bcc087 + 0aee346 commit 1c6a2db
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 50 deletions.
9 changes: 6 additions & 3 deletions resources/example_configs/EXAMPLE_NCTL_CONFIG.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
[rpc_server.main_server]
enable_server = true
address = "0.0.0.0:11102"
ip_address = "0.0.0.0"
port = 11102
qps_limit = 100
max_body_bytes = 2621440
cors_origin = ""

[rpc_server.speculative_exec_server]
enable_server = true
address = "0.0.0.0:25102"
ip_address = "0.0.0.0"
port = 25102
qps_limit = 1
max_body_bytes = 2621440
cors_origin = ""

[rpc_server.node_client]
address = "0.0.0.0:28102"
ip_address = "0.0.0.0"
port = 28102
max_message_size_bytes = 4194304
request_limit = 3
request_buffer_size = 16
Expand Down
11 changes: 7 additions & 4 deletions resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
[rpc_server.main_server]
enable_server = true
address = "0.0.0.0:11102"
ip_address = "0.0.0.0"
port = 11102
qps_limit = 100
max_body_bytes = 2621440
cors_origin = ""

[rpc_server.speculative_exec_server]
enable_server = true
address = "0.0.0.0:25102"
ip_address = "0.0.0.0"
port = 25102
qps_limit = 1
max_body_bytes = 2621440
cors_origin = ""

[rpc_server.node_client]
address = "0.0.0.0:28102"
ip_address = "0.0.0.0"
port = 28102
max_message_size_bytes = 4194304
request_limit = 3
request_buffer_size = 16
Expand Down Expand Up @@ -89,4 +92,4 @@ max_requests_per_second = 50
[admin_api_server]
port = 18887
max_concurrent_requests = 1
max_requests_per_second = 1
max_requests_per_second = 1
9 changes: 6 additions & 3 deletions resources/example_configs/EXAMPLE_NODE_CONFIG.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
[rpc_server.main_server]
enable_server = true
address = "0.0.0.0:7777"
ip_address = "0.0.0.0"
port = 7777
qps_limit = 100
max_body_bytes = 2621440
cors_origin = ""

[rpc_server.speculative_exec_server]
enable_server = true
address = "0.0.0.0:7778"
ip_address = "0.0.0.0"
port = 7778
qps_limit = 1
max_body_bytes = 2621440
cors_origin = ""

[rpc_server.node_client]
address = "3.20.57.210:7777"
ip_address = "3.20.57.210"
port = 7777
max_message_size_bytes = 4194304
request_limit = 10
request_buffer_size = 50
Expand Down
9 changes: 6 additions & 3 deletions resources/example_configs/default_debian_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ enable_server = true
# the JSON-RPC HTTP server will not run, but the node will be otherwise unaffected.
#
# The actual bound address will be reported via a log line if logging is enabled.
address = '0.0.0.0:7777'
ip_address = '0.0.0.0'
port= 7777

# The global max rate of requests (per second) before they are limited.
# Request will be delayed to the next 1 second bucket once limited.
Expand Down Expand Up @@ -44,7 +45,8 @@ enable_server = true
# but the node will be otherwise unaffected.
#
# The actual bound address will be reported via a log line if logging is enabled.
address = '0.0.0.0:7778'
ip_address = '0.0.0.0'
port = 7778

# The global max rate of requests (per second) before they are limited.
# Request will be delayed to the next 1 second bucket once limited.
Expand All @@ -65,7 +67,8 @@ cors_origin = ''
# =========================================
[rpc_server.node_client]
# The address of the node to connect to.
address = '127.0.0.1:7779'
ip_address = '127.0.0.1'
port = 7779
# Maximum size of a message in bytes.
max_message_size_bytes = 4_194_304
# Maximum number of in-flight node requests.
Expand Down
9 changes: 6 additions & 3 deletions resources/example_configs/default_rpc_only_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ enable_server = true
# the JSON-RPC HTTP server will not run, but the node will be otherwise unaffected.
#
# The actual bound address will be reported via a log line if logging is enabled.
address = '0.0.0.0:7777'
ip_address = '0.0.0.0'
port = 7777

# The global max rate of requests (per second) before they are limited.
# Request will be delayed to the next 1 second bucket once limited.
Expand Down Expand Up @@ -44,7 +45,8 @@ enable_server = true
# but the node will be otherwise unaffected.
#
# The actual bound address will be reported via a log line if logging is enabled.
address = '0.0.0.0:7778'
ip_address = '0.0.0.0'
port = 7778

# The global max rate of requests (per second) before they are limited.
# Request will be delayed to the next 1 second bucket once limited.
Expand All @@ -65,7 +67,8 @@ cors_origin = ''
# =========================================
[rpc_server.node_client]
# The address of the node to connect to.
address = '127.0.0.1:7779'
ip_address = '127.0.0.1'
port = 7779
# Maximum size of a message in bytes.
max_message_size_bytes = 4_194_304
# Maximum number of in-flight node requests.
Expand Down
45 changes: 29 additions & 16 deletions rpc_sidecar/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
convert::{TryFrom, TryInto},
net::{IpAddr, Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr},
};

use datasize::DataSize;
Expand All @@ -12,7 +12,8 @@ use crate::SpeculativeExecConfig;
/// Default binding address for the JSON-RPC HTTP server.
///
/// Uses a fixed port per node, but binds on any interface.
const DEFAULT_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
const DEFAULT_IP_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
const DEFAULT_PORT: u16 = 0;
/// Default rate limit in qps.
const DEFAULT_QPS_LIMIT: u64 = 100;
/// Default max body bytes. This is 2.5MB which should be able to accommodate the largest valid
Expand Down Expand Up @@ -73,8 +74,10 @@ pub struct RpcServerConfig {
pub struct RpcConfig {
/// Setting to enable the HTTP server.
pub enable_server: bool,
/// Address to bind JSON-RPC HTTP server to.
pub address: SocketAddr,
/// IP address to bind JSON-RPC HTTP server to.
pub ip_address: IpAddr,
/// TCP port to bind JSON-RPC HTTP server to.
pub port: u16,
/// Maximum rate limit in queries per second.
pub qps_limit: u64,
/// Maximum number of bytes to accept in a single request body.
Expand All @@ -88,7 +91,8 @@ impl RpcConfig {
pub fn new() -> Self {
RpcConfig {
enable_server: true,
address: DEFAULT_ADDRESS,
ip_address: DEFAULT_IP_ADDRESS,
port: DEFAULT_PORT,
qps_limit: DEFAULT_QPS_LIMIT,
max_body_bytes: DEFAULT_MAX_BODY_BYTES,
cors_origin: DEFAULT_CORS_ORIGIN.to_string(),
Expand All @@ -104,7 +108,8 @@ impl Default for RpcConfig {

/// Default address to connect to the node.
// Change this to SocketAddr, once SocketAddr::new is const stable.
const DEFAULT_NODE_CONNECT_ADDRESS: (IpAddr, u16) = (IpAddr::V4(Ipv4Addr::LOCALHOST), 28104);
const DEFAULT_NODE_CONNECT_IP_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
const DEFAULT_NODE_CONNECT_PORT: u16 = 28104;
/// Default maximum payload size.
const DEFAULT_MAX_PAYLOAD_SIZE: u32 = 4 * 1024 * 1024;
/// Default message timeout in seconds.
Expand All @@ -127,8 +132,10 @@ const DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT: u64 = 2;
// Disallow unknown fields to ensure config files and command-line overrides contain valid keys.
#[serde(deny_unknown_fields)]
pub struct NodeClientConfig {
/// Address of the node.
pub address: SocketAddr,
/// IP address of the node.
pub ip_address: IpAddr,
/// Port of the node.
pub port: u16,
/// Maximum size of a message in bytes.
pub max_message_size_bytes: u32,
/// Message transfer timeout in seconds.
Expand All @@ -148,7 +155,8 @@ impl NodeClientConfig {
/// Creates a default instance for `NodeClientConfig`.
pub fn new() -> Self {
NodeClientConfig {
address: DEFAULT_NODE_CONNECT_ADDRESS.into(),
ip_address: DEFAULT_NODE_CONNECT_IP_ADDRESS,
port: DEFAULT_NODE_CONNECT_PORT,
request_limit: DEFAULT_NODE_REQUEST_LIMIT,
max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE,
request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE,
Expand All @@ -166,9 +174,10 @@ impl NodeClientConfig {
/// Creates an instance of `NodeClientConfig` with specified listening port.
#[cfg(any(feature = "testing", test))]
pub fn new_with_port(port: u16) -> Self {
let local_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let localhost = IpAddr::V4(Ipv4Addr::LOCALHOST);
NodeClientConfig {
address: local_socket,
ip_address: localhost,
port,
request_limit: DEFAULT_NODE_REQUEST_LIMIT,
max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE,
request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE,
Expand All @@ -187,9 +196,10 @@ impl NodeClientConfig {
/// of reconnection retries.
#[cfg(any(feature = "testing", test))]
pub fn new_with_port_and_retries(port: u16, num_of_retries: usize) -> Self {
let local_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let localhost = IpAddr::V4(Ipv4Addr::LOCALHOST);
NodeClientConfig {
address: local_socket,
ip_address: localhost,
port,
request_limit: DEFAULT_NODE_REQUEST_LIMIT,
max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE,
request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE,
Expand All @@ -216,8 +226,10 @@ impl Default for NodeClientConfig {
// Disallow unknown fields to ensure config files and command-line overrides contain valid keys.
#[serde(deny_unknown_fields)]
pub struct NodeClientConfigTarget {
/// Address of the node.
pub address: SocketAddr,
/// IP address of the node.
pub ip_address: IpAddr,
/// TCP port of the node
pub port: u16,
/// Maximum size of a message in bytes.
pub max_message_size_bytes: u32,
/// Message transfer timeout in seconds.
Expand Down Expand Up @@ -245,7 +257,8 @@ impl TryFrom<NodeClientConfigTarget> for NodeClientConfig {
error: e.to_string(),
})?;
Ok(NodeClientConfig {
address: value.address,
ip_address: value.ip_address,
port: value.port,
request_limit: value.request_limit,
max_message_size_bytes: value.max_message_size_bytes,
request_buffer_size: value.request_buffer_size,
Expand Down
4 changes: 2 additions & 2 deletions rpc_sidecar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn retype_future_vec(
async fn run_rpc(config: RpcConfig, node_client: Arc<dyn NodeClient>) -> Result<(), Error> {
run_rpc_server(
node_client,
start_listening(&config.address)?,
start_listening(&SocketAddr::new(config.ip_address, config.port))?,
config.qps_limit,
config.max_body_bytes,
config.cors_origin.clone(),
Expand All @@ -85,7 +85,7 @@ async fn run_speculative_exec(
) -> anyhow::Result<()> {
run_speculative_exec_server(
node_client,
start_listening(&config.address)?,
start_listening(&SocketAddr::new(config.ip_address, config.port))?,
config.qps_limit,
config.max_body_bytes,
config.cors_origin.clone(),
Expand Down
12 changes: 7 additions & 5 deletions rpc_sidecar/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use metrics::rpc::{inc_disconnect, observe_reconnect_time};
use serde::de::DeserializeOwned;
use std::{
convert::{TryFrom, TryInto},
net::SocketAddr,
sync::{
atomic::{AtomicU16, Ordering},
Arc,
Expand Down Expand Up @@ -948,9 +949,10 @@ impl FramedNodeClient {
} else {
&config.exponential_backoff.max_attempts
};
let tcp_socket = SocketAddr::new(config.ip_address, config.port);
let mut current_attempt = 1;
loop {
match TcpStream::connect(config.address).await {
match TcpStream::connect(tcp_socket).await {
Ok(stream) => {
return Ok(Framed::new(
stream,
Expand All @@ -962,11 +964,11 @@ impl FramedNodeClient {
if !max_attempts.can_attempt(current_attempt) {
anyhow::bail!(
"Couldn't connect to node {} after {} attempts",
config.address,
tcp_socket,
current_attempt - 1
);
}
warn!(%err, "failed to connect to node {}, waiting {wait}ms before retrying", config.address);
warn!(%err, "failed to connect to node {tcp_socket}, waiting {wait}ms before retrying");
tokio::time::sleep(Duration::from_millis(wait)).await;
wait = (wait * config.exponential_backoff.coefficient)
.min(config.exponential_backoff.max_delay_ms);
Expand Down Expand Up @@ -1025,7 +1027,7 @@ impl NodeClient for FramedNodeClient {
let result = self.send_request_internal(&req, &mut client).await;
if let Err(err) = &result {
warn!(
addr = %self.config.address,
addr = %self.config.ip_address,
err = display_error(&err),
"binary port client handler error"
);
Expand All @@ -1039,7 +1041,7 @@ impl NodeClient for FramedNodeClient {
Err(err) => {
warn!(
%err,
addr = %self.config.address,
addr = %self.config.ip_address,
"binary port client failed to reconnect"
);
// schedule standard reconnect process with multiple retries
Expand Down
14 changes: 9 additions & 5 deletions rpc_sidecar/src/speculative_exec_config.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::net::{IpAddr, Ipv4Addr};

use datasize::DataSize;
use serde::Deserialize;

/// Default binding address for the speculative execution RPC HTTP server.
///
/// Uses a fixed port per node, but binds on any interface.
const DEFAULT_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1);
const DEFAULT_IP_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
const DEFAULT_PORT: u16 = 1;
/// Default rate limit in qps.
const DEFAULT_QPS_LIMIT: u64 = 1;
/// Default max body bytes (2.5MB).
Expand All @@ -21,8 +22,10 @@ const DEFAULT_CORS_ORIGIN: &str = "";
pub struct Config {
/// Setting to enable the HTTP server.
pub enable_server: bool,
/// Address to bind JSON-RPC speculative execution server to.
pub address: SocketAddr,
/// IP address to bind JSON-RPC speculative execution server to.
pub ip_address: IpAddr,
/// Port to bind JSON-RPC speculative execution server to.
pub port: u16,
/// Maximum rate limit in queries per second.
pub qps_limit: u64,
/// Maximum number of bytes to accept in a single request body.
Expand All @@ -36,7 +39,8 @@ impl Config {
pub fn new() -> Self {
Config {
enable_server: false,
address: DEFAULT_ADDRESS,
ip_address: DEFAULT_IP_ADDRESS,
port: DEFAULT_PORT,
qps_limit: DEFAULT_QPS_LIMIT,
max_body_bytes: DEFAULT_MAX_BODY_BYTES,
cors_origin: DEFAULT_CORS_ORIGIN.to_string(),
Expand Down
Loading

0 comments on commit 1c6a2db

Please sign in to comment.