Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ async = ["aerospike-core"]
sync = ["aerospike-sync"]
rt-tokio = ["aerospike-core/rt-tokio", "aerospike-macro/rt-tokio"]
rt-async-std = ["aerospike-core/rt-async-std", "aerospike-macro/rt-async-std"]
tokio-rustls = ["aerospike-core/tokio-rustls"]
tokio-native-tls = ["aerospike-core/tokio-native-tls"]

[[bench]]
name = "client_server"
harness = false

[workspace]
members = ["tools/benchmark", "aerospike-core", "aerospike-rt", "aerospike-sync", "aerospike-macro"]
members = ["tools/benchmark", "aerospike-core", "aerospike-rt", "aerospike-sync", "aerospike-macro", "aerospike-tls"]

[dev-dependencies]
env_logger = "0.7"
Expand Down
4 changes: 4 additions & 0 deletions aerospike-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ serde = { version = "1.0", features = ["derive"], optional = true }
aerospike-rt = {path = "../aerospike-rt"}
futures = {version = "0.3.16" }
async-trait = "0.1.51"
aerospike-tls = { path = "../aerospike-tls", optional = true }

[features]
serialization = ["serde"]
rt-tokio = ["aerospike-rt/rt-tokio"]
rt-async-std = ["aerospike-rt/rt-async-std"]
tls = []
tokio-rustls = ["aerospike-tls/tokio-rustls", "rt-tokio", "tls"]
tokio-native-tls = ["aerospike-tls/tokio-native-tls", "rt-tokio", "tls"]

[dev-dependencies]
env_logger = "0.7"
Expand Down
256 changes: 256 additions & 0 deletions aerospike-core/src/cluster/info_helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
use std::str::FromStr;

use crate::errors::{ErrorKind, Result};


#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ServicesResponse<'a> {
pub peers_generation: u32,
pub port: u16,
pub nodes: Vec<NodeResponse<'a>>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct NodeResponse<'a> {
pub node_id: &'a str,
pub tls_name: Option<&'a str>,
pub endpoints: Vec<&'a str>,
}

pub fn parse_services_response<'a>(response: &'a str) -> Result<ServicesResponse<'a>> {
// peers-generation, port, [ list of [ NodeIDs/Names, TLSName(if defined), [ List of endpoints/IPaddresses ]]]

const COMMON : char = ',';
const OPEN_BRACE : char = '[';
const CLOSE_BRACE : char = ']';

fn remove_outer_bracers<'a>(input: &'a str) -> Result<&'a str> {
match (input.chars().next(), input.chars().nth(input.len() - 1)) {
(Some(OPEN_BRACE), Some(CLOSE_BRACE)) => Ok(&input[1..input.len()-1]),
_ => Result::Err(ErrorKind::BadResponse(format!("Missing outer bracers {input}")).into())
}
}

fn read_generation_and_port_and_nodes<'a>(input: &'a str) -> Result<(u32, u16, Vec<NodeResponse>)> {

fn read_nodes<'a>(nodes: &'a str) -> Result<Vec<NodeResponse>> {
// [ list of [ NodeIDs/Names, TLSName(if defined), [ List of endpoints/IPaddresses ]]]
let mut result : Vec<NodeResponse> = vec![];

fn read_node<'a>(node: &'a str) -> Result<NodeResponse> {
// [ NodeIDs/Names, TLSName(if defined), [ List of endpoints/IPaddresses ]]

fn read_endpoints<'a>(endpoints: &'a str) -> Result<Vec<&'a str>> {
// [ List of endpoints/IPaddresses ]

let endpoints = remove_outer_bracers(endpoints)?;

Ok(endpoints.split(COMMON).collect::<Vec<_>>())
}

let node = remove_outer_bracers(node)?;

let first_common = node.find(COMMON)
.ok_or(ErrorKind::BadResponse("Missing section after node id".to_string()))?;

let node_id = node.get(0..first_common)
.ok_or(ErrorKind::BadResponse("Missing node id".to_string()))?;

let second_common = node[first_common+1..]
.find(COMMON)
.map(|x| x + first_common + 1)
.ok_or(ErrorKind::BadResponse("Missing section after tls name".to_string()))?;

let tls_name = node.get(first_common+1..second_common)
.ok_or(ErrorKind::BadResponse("Missing tls name".to_string()))?;
let tls_name = if tls_name.is_empty() { None } else { Some(tls_name) };

let endpoints_slice = node.get(second_common+1..)
.ok_or(ErrorKind::BadResponse("Missing endpoints list".to_string()))?;

let endpoints = read_endpoints(endpoints_slice)?;

Ok(NodeResponse { node_id, tls_name, endpoints })
}

let nodes = remove_outer_bracers(nodes)?;

let mut opened_bracers : i32 = 0;
let mut first_opened_brace_pos : Option<usize> = None;
for (pos, ch) in nodes.char_indices() {
match ch {
OPEN_BRACE => {
if first_opened_brace_pos.is_none() {
first_opened_brace_pos = Some(pos);
}
opened_bracers += 1;

},
CLOSE_BRACE => {
opened_bracers -= 1;
if opened_bracers < 0 {
return Result::Err(ErrorKind::BadResponse("Malformed nodes list".to_string()).into());
}

if opened_bracers == 0 {
let Some(opened_brace_pos) = first_opened_brace_pos else {
return Result::Err(ErrorKind::BadResponse("Wrong node list parser state".to_string()).into());
};

let node_slice = nodes.get(opened_brace_pos..pos+1)
.ok_or(ErrorKind::BadResponse("Invalid node slice in list".to_string()))?;

let node = read_node(node_slice)?;

result.push(node);

first_opened_brace_pos = None;
}
},
_ => {},
}
}

Ok(result)
}

let first_common = input
.find(COMMON)
.ok_or(ErrorKind::BadResponse("Missing peers generation".to_string()))?;

let peers_generation_slice = input.get(0..first_common)
.ok_or(ErrorKind::BadResponse("Missing peers generation".to_string()))?;

let peers_generation = u32::from_str(peers_generation_slice)
.map_err(|_| ErrorKind::BadResponse("Peers generation should be u32".to_string()))?;

let second_common = input[first_common+1..]
.find(COMMON)
.map(|x| x + first_common + 1)
.ok_or(ErrorKind::BadResponse("Missing port".to_string()))?;

let port_slice = input.get(first_common+1..second_common)
.ok_or(ErrorKind::BadResponse("Missing port".to_string()))?;

let port = u16::from_str(port_slice)
.map_err(|_| ErrorKind::BadResponse("TCP port should be u16".to_string()))?;

let nodes = input.get(second_common+1..)
.ok_or(ErrorKind::BadResponse("Missing node list".to_string()))?;

let nodes = read_nodes(nodes)?;

Ok((peers_generation, port, nodes))
}

let (peers_generation, port, nodes) = read_generation_and_port_and_nodes(response)?;

Ok(ServicesResponse {
peers_generation,
port,
nodes,
})
}

mod tests {
use crate::cluster::info_helper::{ServicesResponse, NodeResponse, parse_services_response};

#[test]
fn positive_cases() {
let responses = [
"9,3000,[[BB9040011AC4202,,[172.17.0.4]],[BB9050011AC4202,,[172.17.0.5]]]",
"9,3000,[[BB9060011AC4202,,[74.125.239.53]],[BB9070011AC4202,,[74.125.239.54]]]",
"10,4333,[[BB9060011AC4202,clusternode,[74.125.239.53]],[BB9070011AC4202,clusternode,[74.125.239.54]]]",
"10,4333,[[BB9040011AC4202,clusternode,[172.17.0.4,74.125.239.53]],[BB9050011AC4202,clusternode,[172.17.0.5,74.125.239.54]]]",
];

let parsed_responses = [
ServicesResponse {
peers_generation: 9,
port: 3000,
nodes: vec![
NodeResponse {
node_id: "BB9040011AC4202",
tls_name: None,
endpoints: vec![
"172.17.0.4",
]
},
NodeResponse {
node_id: "BB9050011AC4202",
tls_name: None,
endpoints: vec![
"172.17.0.5",
]
},
],
},
ServicesResponse {
peers_generation: 9,
port: 3000,
nodes: vec![
NodeResponse {
node_id: "BB9060011AC4202",
tls_name: None,
endpoints: vec![
"74.125.239.53",
]
},
NodeResponse {
node_id: "BB9070011AC4202",
tls_name: None,
endpoints: vec![
"74.125.239.54",
]
},
],
},
ServicesResponse {
peers_generation: 10,
port: 4333,
nodes: vec![
NodeResponse {
node_id: "BB9060011AC4202",
tls_name: Some("clusternode"),
endpoints: vec![
"74.125.239.53",
]
},
NodeResponse {
node_id: "BB9070011AC4202",
tls_name: Some("clusternode"),
endpoints: vec![
"74.125.239.54",
]
},
],
},
ServicesResponse {
peers_generation: 10,
port: 4333,
nodes: vec![
NodeResponse {
node_id: "BB9040011AC4202",
tls_name: Some("clusternode"),
endpoints: vec![
"172.17.0.4",
"74.125.239.53",
]
},
NodeResponse {
node_id: "BB9050011AC4202",
tls_name: Some("clusternode"),
endpoints: vec![
"172.17.0.5",
"74.125.239.54",
]
},
],
},
];

for (parsed, response) in parsed_responses.iter().zip(responses.iter()) {
assert_eq!(parsed, &parse_services_response(response).unwrap());
}
}
}
1 change: 1 addition & 0 deletions aerospike-core/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod node;
pub mod node_validator;
pub mod partition;
pub mod partition_tokenizer;
mod info_helper;

use aerospike_rt::time::{Duration, Instant};
use std::collections::HashMap;
Expand Down
58 changes: 30 additions & 28 deletions aerospike-core/src/cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::collections::HashMap;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::result::Result as StdResult;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use std::sync::Arc;

Expand All @@ -28,6 +27,8 @@ use crate::net::{ConnectionPool, Host, PooledConnection};
use crate::policy::ClientPolicy;
use aerospike_rt::RwLock;

use super::info_helper::parse_services_response;

pub const PARTITIONS: usize = 4096;

/// The node instance holding connections and node settings.
Expand Down Expand Up @@ -144,10 +145,16 @@ impl Node {

// Returns the services that the client should use for the cluster tend
const fn services_name(&self) -> &'static str {
if self.client_policy.use_services_alternate {
"services-alternate"
} else {
"services"
#[cfg(not(feature = "tls"))]
let has_tls = false;
#[cfg(feature = "tls")]
let has_tls = self.client_policy.tls_policy.is_some();

match (self.client_policy.use_services_alternate, has_tls) {
(true, true) => "peers-tls-alt",
(true, false) => "peers-clear-alt",
(false, true) => "peers-tls-std",
(false, false) => "peers-clear-std",
}
}

Expand Down Expand Up @@ -204,31 +211,26 @@ impl Node {
Some(friend_string) => friend_string,
};

let friend_names = friend_string.split(';');
for friend in friend_names {
let mut friend_info = friend.split(':');
if friend_info.clone().count() != 2 {
error!(
"Node info from asinfo:services is malformed. Expected HOST:PORT, but got \
'{}'",
friend
);
continue;
}
let services_response = parse_services_response(friend_string)?;

let host = friend_info.next().unwrap();
let port = u16::from_str(friend_info.next().unwrap())?;
let alias = match self.client_policy.ip_map {
Some(ref ip_map) if ip_map.contains_key(host) => {
Host::new(ip_map.get(host).unwrap(), port)
}
_ => Host::new(host, port),
};
let empty_ip_map : HashMap<String, String> = HashMap::new();

if current_aliases.contains_key(&alias) {
self.reference_count.fetch_add(1, Ordering::Relaxed);
} else if !friends.contains(&alias) {
friends.push(alias);
let ip_map = self.client_policy.ip_map.as_ref().unwrap_or(&empty_ip_map);

for node in services_response.nodes {
for endpoint in node.endpoints {
let mapped_ip = ip_map
.get(endpoint)
.map(|x| x.as_str())
.unwrap_or(endpoint);

let alias = Host::new(mapped_ip, services_response.port, node.tls_name);

if current_aliases.contains_key(&alias) {
self.reference_count.fetch_add(1, Ordering::Relaxed);
} else if !friends.contains(&alias) {
friends.push(alias);
}
}
}

Expand Down
Loading