Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add AuthorityAggregatorBuilder #5233

Merged
merged 3 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 30 additions & 74 deletions crates/sui-benchmark/src/bin/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use prometheus::Registry;
use rand::seq::SliceRandom;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use strum_macros::EnumString;
use sui_benchmark::drivers::bench_driver::BenchDriver;
use sui_benchmark::drivers::driver::Driver;
Expand All @@ -21,17 +20,9 @@ use sui_benchmark::workloads::workload::get_latest;
use sui_benchmark::workloads::{
make_combination_workload, make_shared_counter_workload, make_transfer_object_workload,
};
use sui_config::gateway::GatewayConfig;
use sui_config::Config;
use sui_config::PersistedConfig;
use sui_core::authority_aggregator::AuthAggMetrics;
use sui_core::authority_aggregator::{reconfig_from_genesis, AuthorityAggregator};
use sui_core::authority_client::make_authority_clients;
use sui_core::authority_aggregator::{reconfig_from_genesis, AuthorityAggregatorBuilder};
use sui_core::authority_client::AuthorityAPI;
use sui_core::authority_client::NetworkAuthorityClient;
use sui_core::epoch::committee_store::CommitteeStore;
use sui_core::safe_client::SafeClientMetrics;
use sui_core::validator_info::make_committee;
use sui_node::metrics;
use sui_types::base_types::ObjectID;
use sui_types::base_types::SuiAddress;
Expand All @@ -42,7 +33,6 @@ use sui_types::messages::BatchInfoResponseItem;
use sui_types::messages::TransactionInfoRequest;
use tracing::log::info;

use sui_core::authority_client::NetworkAuthorityClientMetrics;
use test_utils::authority::spawn_test_authorities;
use test_utils::authority::test_and_configure_authority_configs;
use test_utils::objects::generate_gas_objects_with_owner;
Expand All @@ -69,15 +59,16 @@ struct Opts {
pub num_client_threads: u64,
#[clap(long, default_value = "", global = true)]
pub log_path: String,
/// Path where gateway config is stored when running remote benchmark
/// This is also the path where gateway config is stored during local
/// benchmark
#[clap(long, default_value = "/tmp/gateway.yaml", global = true)]
pub gateway_config_path: String,
/// [Required for remote benchmark]
/// Path where genesis.blob is stored when running remote benchmark
#[clap(long, default_value = "/tmp/genesis.blob", global = true)]
pub genesis_blob_path: String,
/// [Required for remote benchmark]
/// Path where keypair for primary gas account is stored. The format of
/// this file is same as what `sui keytool generate` outputs
#[clap(long, default_value = "", global = true)]
pub keystore_path: String,
/// [Required for remote benchmark]
/// Object id of the primary gas coin used for benchmark
/// NOTE: THe remote network should have this coin in its genesis config
/// with large enough gas i.e. u64::MAX
Expand All @@ -87,7 +78,7 @@ struct Opts {
pub primary_gas_objects: u64,
/// Whether to run local or remote benchmark
/// NOTE: For running remote benchmark we must have the following
/// gateway_config_path, keypair_path and primary_gas_id
/// genesis_blob_path, keypair_path and primary_gas_id
#[clap(long, parse(try_from_str), default_value = "true", global = true)]
pub local: bool,
/// Default workload is 100% transfer object
Expand Down Expand Up @@ -253,12 +244,11 @@ async fn main() -> Result<()> {
config.log_file = Some(opts.log_path);
}
let _guard = config.with_env().init();
let registry: Registry = metrics::start_prometheus_server(
let registry: Arc<Registry> = Arc::new(metrics::start_prometheus_server(
format!("{}:{}", opts.client_metric_host, opts.client_metric_port)
.parse()
.unwrap(),
);
let network_authority_client_metrics = Arc::new(NetworkAuthorityClientMetrics::new(&registry));
));
let barrier = Arc::new(Barrier::new(2));
let cloned_barrier = barrier.clone();
let (primary_gas_id, owner, keypair, aggregator) = if opts.local {
Expand All @@ -274,39 +264,19 @@ async fn main() -> Result<()> {
});
Arc::new(configs)
};
let gateway_config = GatewayConfig {
epoch: 0,
validator_set: configs.validator_set().to_vec(),
send_timeout: Duration::from_secs(4),
recv_timeout: Duration::from_secs(4),
Comment on lines -280 to -281
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no clue for why this is causing more timeout errors, let's just try setting this to 30s.

buffer_size: 650000,
db_folder_path: PathBuf::from("/tmp/client_db"),
};
gateway_config.save(&opts.gateway_config_path)?;

// bring up servers ..
let (owner, keypair): (SuiAddress, AccountKeyPair) = test_account_keys().pop().unwrap();
let primary_gas = generate_gas_objects_with_owner(1, owner);
let primary_gas_id = primary_gas.get(0).unwrap().id();
// Make the client runtime wait until we are done creating genesis objects
let cloned_config = configs;
let cloned_config = configs.clone();
let cloned_gas = primary_gas;
let auth_clients = make_authority_clients(
&gateway_config.validator_set,
gateway_config.send_timeout,
gateway_config.recv_timeout,
Comment on lines -295 to -296
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^

network_authority_client_metrics.clone(),
);

let committee = make_committee(gateway_config.epoch, &gateway_config.validator_set)?;
let committee_store = Arc::new(CommitteeStore::new_for_testing(&committee));
let aggregator = Arc::new(AuthorityAggregator::new(
committee,
committee_store,
auth_clients.clone(),
AuthAggMetrics::new(&registry),
Arc::new(SafeClientMetrics::new(&registry)),
network_authority_client_metrics.clone(),
));
let (aggregator, auth_clients) = AuthorityAggregatorBuilder::from_network_config(&configs)
.with_registry(registry.clone())
.build()
.unwrap();

// spawn a thread to spin up sui nodes on the multi-threaded server runtime
let _ = std::thread::spawn(move || {
Expand Down Expand Up @@ -341,7 +311,12 @@ async fn main() -> Result<()> {
join_all(follower_handles).await;
});
});
(primary_gas_id, owner, Arc::new(keypair), aggregator)
(
primary_gas_id,
owner,
Arc::new(keypair),
Arc::new(aggregator),
)
} else {
eprintln!("Configuring remote benchmark..");
std::thread::spawn(move || {
Expand All @@ -352,33 +327,13 @@ async fn main() -> Result<()> {
cloned_barrier.wait().await;
});
});
let config_path = Some(&opts.gateway_config_path)
.filter(|s| !s.is_empty())
.map(PathBuf::from)
.ok_or_else(|| {
anyhow!(format!(
"Failed to find gateway config at path: {}",
opts.gateway_config_path
))
})?;
let config: GatewayConfig = PersistedConfig::read(&config_path)?;
let committee = make_committee(config.epoch, &config.validator_set)?;
let genesis = sui_config::node::Genesis::new_from_file(&opts.genesis_blob_path);
let genesis = genesis.genesis()?;
let (aggregator, _) = AuthorityAggregatorBuilder::from_genesis(genesis)
.with_registry(registry.clone())
.build()
.unwrap();

let authority_clients = make_authority_clients(
&config.validator_set,
config.send_timeout,
config.recv_timeout,
network_authority_client_metrics.clone(),
);
let committee_store = Arc::new(CommitteeStore::new_for_testing(&committee));
let aggregator = AuthorityAggregator::new(
committee,
committee_store,
authority_clients,
AuthAggMetrics::new(&registry),
Arc::new(SafeClientMetrics::new(&registry)),
network_authority_client_metrics.clone(),
);
let aggregator = Arc::new(reconfig_from_genesis(aggregator).await?);
eprintln!(
"Reconfiguration - Reconfiguration to epoch {} is done",
Expand Down Expand Up @@ -426,6 +381,7 @@ async fn main() -> Result<()> {
let prev_benchmark_stats_path = opts.compare_with.clone();
let curr_benchmark_stats_path = opts.benchmark_stats_path.clone();
let arc_agg = aggregator.clone();
let registry_clone = registry.clone();
let handle = std::thread::spawn(move || {
client_runtime.block_on(async move {
match opts.run_spec {
Expand Down Expand Up @@ -502,7 +458,7 @@ async fn main() -> Result<()> {
let show_progress = interval.is_unbounded();
let driver = BenchDriver::new(stat_collection_interval);
driver
.run(workloads, arc_agg, &registry, show_progress, interval)
.run(workloads, arc_agg, &registry_clone, show_progress, interval)
.await
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-benchmark/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ pub fn get_ed25519_keypair_from_keystore(
let keystore = FileBasedKeystore::new(&keystore_path)?;
match keystore.get_key(requested_address) {
Ok(SuiKeyPair::Ed25519SuiKeyPair(kp)) => Ok(kp.copy()),
_ => Err(anyhow::anyhow!("Unsupported key type")),
other => Err(anyhow::anyhow!("Invalid key type: {:?}", other)),
}
}
7 changes: 5 additions & 2 deletions crates/sui-benchmark/tests/simtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod test {
use std::sync::Arc;
use std::time::Duration;
use sui_config::SUI_KEYSTORE_FILENAME;
use sui_core::test_utils::test_authority_aggregator;
use sui_core::authority_aggregator::AuthorityAggregatorBuilder;
use test_utils::{
messages::get_gas_object_with_wallet_context, network::init_cluster_builder_env_aware,
};
Expand Down Expand Up @@ -76,7 +76,10 @@ mod test {
1, // transfer_object_weight
)];

let aggregator = Arc::new(test_authority_aggregator(swarm.config()));
let (aggregator, _) = AuthorityAggregatorBuilder::from_network_config(swarm.config())
.build()
.unwrap();
let aggregator = Arc::new(aggregator);

for w in workloads.iter_mut() {
w.workload.init(aggregator.clone()).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl Genesis {
}
}

fn genesis(&self) -> Result<&genesis::Genesis> {
pub fn genesis(&self) -> Result<&genesis::Genesis> {
match &self.location {
GenesisLocation::InPlace { genesis } => Ok(genesis),
GenesisLocation::File {
Expand Down
94 changes: 90 additions & 4 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
// SPDX-License-Identifier: Apache-2.0

use crate::authority_client::{
make_network_authority_client_sets_from_committee, AuthorityAPI, NetworkAuthorityClient,
NetworkAuthorityClientMetrics,
make_authority_clients, make_network_authority_client_sets_from_committee, AuthorityAPI,
NetworkAuthorityClient, NetworkAuthorityClientMetrics,
};
use crate::safe_client::{SafeClient, SafeClientMetrics};
use crate::validator_info::make_committee;
use async_trait::async_trait;

use futures::{future, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use move_core_types::value::MoveStructLayout;
use mysten_network::config::Config;
use sui_network::default_mysten_network_config;
use sui_types::crypto::AuthoritySignature;
use sui_config::genesis::Genesis;
use sui_config::NetworkConfig;
use sui_network::{
default_mysten_network_config, DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC,
};
use sui_types::crypto::{AuthorityPublicKeyBytes, AuthoritySignature};
use sui_types::object::{Object, ObjectFormatOptions, ObjectRead};
use sui_types::sui_system_state::SuiSystemState;
use sui_types::{
Expand All @@ -32,6 +37,7 @@ use tracing::{debug, error, info, instrument, trace, warn, Instrument};

use prometheus::{
register_histogram_with_registry, register_int_counter_with_registry, Histogram, IntCounter,
Registry,
};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::string::ToString;
Expand Down Expand Up @@ -2282,3 +2288,83 @@ pub async fn reconfig_from_genesis(
// Now transit from latest_epoch - 1 to latest_epoch
aggregator.recreate_with_net_addresses(latest_committee, &network_config)
}

pub struct AuthorityAggregatorBuilder<'a> {
network_config: Option<&'a NetworkConfig>,
genesis: Option<&'a Genesis>,
committee_store: Option<Arc<CommitteeStore>>,
registry: Option<Arc<Registry>>,
}

impl<'a> AuthorityAggregatorBuilder<'a> {
pub fn from_network_config(config: &'a NetworkConfig) -> Self {
Self {
network_config: Some(config),
genesis: None,
committee_store: None,
registry: None,
}
}

pub fn from_genesis(genesis: &'a Genesis) -> Self {
Self {
network_config: None,
genesis: Some(genesis),
committee_store: None,
registry: None,
}
}

pub fn with_committee_store(mut self, committee_store: Arc<CommitteeStore>) -> Self {
self.committee_store = Some(committee_store);
self
}

pub fn with_registry(mut self, registry: Arc<Registry>) -> Self {
self.registry = Some(registry);
self
}

pub fn build(
self,
) -> anyhow::Result<(
AuthorityAggregator<NetworkAuthorityClient>,
BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>,
)> {
let validator_info = if let Some(network_config) = self.network_config {
network_config.validator_set()
} else if let Some(genesis) = self.genesis {
genesis.validator_set()
} else {
anyhow::bail!("need either NetworkConfig or Genesis.");
};
let committee = make_committee(0, validator_info)?;
let registry = self
.registry
.unwrap_or_else(|| Arc::new(prometheus::Registry::new()));
let network_metrics = Arc::new(NetworkAuthorityClientMetrics::new(&registry));

let auth_clients = make_authority_clients(
validator_info,
DEFAULT_CONNECT_TIMEOUT_SEC,
DEFAULT_REQUEST_TIMEOUT_SEC,
network_metrics.clone(),
);
let committee_store = if let Some(committee_store) = self.committee_store {
committee_store
} else {
Arc::new(CommitteeStore::new_for_testing(&committee))
};
Ok((
AuthorityAggregator::new(
committee,
committee_store,
auth_clients.clone(),
AuthAggMetrics::new(&registry),
Arc::new(SafeClientMetrics::new(&registry)),
network_metrics,
),
auth_clients,
))
}
}
8 changes: 4 additions & 4 deletions crates/sui-core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,14 @@ pub fn make_network_authority_client_sets_from_genesis(

pub fn make_authority_clients(
validator_set: &[ValidatorInfo],
send_timeout: Duration,
recv_timeout: Duration,
connect_timeout: Duration,
request_timeout: Duration,
net_metrics: Arc<NetworkAuthorityClientMetrics>,
) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
let mut authority_clients = BTreeMap::new();
let mut network_config = mysten_network::config::Config::new();
network_config.connect_timeout = Some(send_timeout);
network_config.request_timeout = Some(recv_timeout);
network_config.connect_timeout = Some(connect_timeout);
network_config.request_timeout = Some(request_timeout);
for authority in validator_set {
let channel = network_config
.connect_lazy(authority.network_address())
Expand Down
9 changes: 6 additions & 3 deletions crates/sui-core/src/node_sync/node_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ where
mod test {
use super::*;
use crate::{
authority_active::gossip::GossipMetrics, authority_client::NetworkAuthorityClient,
node_sync::SyncStatus, test_utils::test_authority_aggregator,
authority_active::gossip::GossipMetrics, authority_aggregator::AuthorityAggregatorBuilder,
authority_client::NetworkAuthorityClient, node_sync::SyncStatus,
};
use std::sync::{Arc, Mutex};
use sui_macros::sim_test;
Expand Down Expand Up @@ -454,7 +454,10 @@ mod test {
// Set up an authority
let config = test_and_configure_authority_configs(1);
let authorities = spawn_test_authorities(objects, &config).await;
let net = Arc::new(test_authority_aggregator(&config));
let (agg, _) = AuthorityAggregatorBuilder::from_network_config(&config)
.build()
.unwrap();
let net = Arc::new(agg);

execute_transactions(&net, &transactions).await;

Expand Down
Loading