Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions send-transaction-service/src/send_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ pub struct Config {
pub batch_send_rate_ms: u64,
/// When the retry pool exceeds this max size, new transactions are dropped after their first broadcast attempt
pub retry_pool_max_size: usize,
pub tpu_peers: Option<Vec<SocketAddr>>,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the value of an Option here, instead of just a Vec that may be empty (when flag isn't set)?

Copy link
Copy Markdown
Author

@t-nelson t-nelson Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pedantry mostly. preference for type safety over implicit sentinels

}

impl Default for Config {
Expand All @@ -127,6 +128,7 @@ impl Default for Config {
batch_size: DEFAULT_TRANSACTION_BATCH_SIZE,
batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS,
retry_pool_max_size: MAX_TRANSACTION_RETRY_POOL_SIZE,
tpu_peers: None,
}
}
}
Expand Down Expand Up @@ -566,12 +568,18 @@ impl SendTransactionService {
stats: &SendTransactionServiceStats,
) {
// Processing the transactions in batch
let addresses = Self::get_tpu_addresses_with_slots(
let mut addresses = config
.tpu_peers
.as_ref()
.map(|addrs| addrs.iter().map(|a| (a, 0)).collect::<Vec<_>>())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line would be so much simpler and easier to read if tpu_peers was just a Vec.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would also be easier to read if we didn't collect the leader id with the leader slot just for the sake of a debug-level log message :upside:

i started to remove it but refrained for the sake of staying on topic

.unwrap_or_default();
let leader_addresses = Self::get_tpu_addresses_with_slots(
tpu_address,
leader_info,
config,
connection_cache.protocol(),
);
addresses.extend(leader_addresses);

let wire_transactions = transactions
.iter()
Expand All @@ -584,8 +592,8 @@ impl SendTransactionService {
})
.collect::<Vec<&[u8]>>();

for address in &addresses {
Self::send_transactions(address.0, &wire_transactions, connection_cache, stats);
for (address, _) in &addresses {
Self::send_transactions(address, &wire_transactions, connection_cache, stats);
}
}

Expand Down Expand Up @@ -702,14 +710,20 @@ impl SendTransactionService {

let iter = wire_transactions.chunks(config.batch_size);
for chunk in iter {
let mut addresses = config
Comment thread
t-nelson marked this conversation as resolved.
.tpu_peers
.as_ref()
.map(|addrs| addrs.iter().collect::<Vec<_>>())
.unwrap_or_default();
let mut leader_info_provider = leader_info_provider.lock().unwrap();
let leader_info = leader_info_provider.get_leader_info();
let addresses = Self::get_tpu_addresses(
let leader_addresses = Self::get_tpu_addresses(
tpu_address,
leader_info,
config,
connection_cache.protocol(),
);
addresses.extend(leader_addresses);

for address in &addresses {
Self::send_transactions(address, chunk, connection_cache, stats);
Expand Down
16 changes: 16 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,22 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.default_value(&default_args.rpc_send_transaction_retry_pool_max_size)
.help("The maximum size of transactions retry pool."),
)
.arg(
Arg::with_name("rpc_send_transaction_tpu_peer")
.long("rpc-send-transaction-tpu-peer")
.takes_value(true)
.number_of_values(1)
.multiple(true)
.value_name("HOST:PORT")
.validator(solana_net_utils::is_host_port)
.help("Peer(s) to broadcast transactions to instead of the current leader")
)
.arg(
Arg::with_name("rpc_send_transaction_also_leader")
.long("rpc-send-transaction-also-leader")
.requires("rpc_send_transaction_tpu_peer")
.help("With `--rpc-send-transaction-tpu-peer HOST:PORT`, also send to the current leader")
)
.arg(
Arg::with_name("rpc_scan_and_fix_roots")
.long("rpc-scan-and-fix-roots")
Expand Down
28 changes: 23 additions & 5 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,27 @@ pub fn main() {
);
exit(1);
}
let rpc_send_transaction_tpu_peers = matches
.values_of("rpc_send_transaction_tpu_peer")
.map(|values| {
values
.map(solana_net_utils::parse_host_port)
.collect::<Result<Vec<SocketAddr>, String>>()
})
.transpose()
.unwrap_or_else(|e| {
eprintln!("failed to parse rpc send-transaction-service tpu peer address: {e}");
exit(1);
});
let rpc_send_transaction_also_leader = matches.is_present("rpc_send_transaction_also_leader");
let leader_forward_count =
if rpc_send_transaction_tpu_peers.is_some() && !rpc_send_transaction_also_leader {
// rpc-sts is configured to send only to specific tpu peers. disable leader forwards
0
} else {
value_t_or_exit!(matches, "rpc_send_transaction_leader_forward_count", u64)
};

let full_api = matches.is_present("full_rpc_api");

let mut validator_config = ValidatorConfig {
Expand Down Expand Up @@ -1399,11 +1420,7 @@ pub fn main() {
contact_debug_interval,
send_transaction_service_config: send_transaction_service::Config {
retry_rate_ms: rpc_send_retry_rate_ms,
leader_forward_count: value_t_or_exit!(
matches,
"rpc_send_transaction_leader_forward_count",
u64
),
leader_forward_count,
default_max_retries: value_t!(
matches,
"rpc_send_transaction_default_max_retries",
Expand All @@ -1422,6 +1439,7 @@ pub fn main() {
"rpc_send_transaction_retry_pool_max_size",
usize
),
tpu_peers: rpc_send_transaction_tpu_peers,
},
no_poh_speed_test: matches.is_present("no_poh_speed_test"),
no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
Expand Down