From 134b1bc363461598eeb8d9de74c492efa90347d3 Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Tue, 23 Aug 2022 16:47:27 +0200 Subject: [PATCH] Add hidden configuration for sending batched messages sequentially (#2543) Partial fix for #2350 This PR adds a hidden chain configuration `sequential_batch_tx`, which will cause the relayer to submit batched messages in serial after the previous transaction is committed to the chain. The config has default value `false`, which disables sequential sending by default. However we can instruct relayer operators to set this to `true`, in the rare case that the batched transactions are failing due to priority mempool being turned on by validators of a chain. Note that enabling sequential sending may cause degradation in performance. Hence we do not expose this config in the public documentation, to prevent relayer operators from shooting themselves in the foot. --- * Add sequential version of send_batched_messages * Add sequential_batch_tx chain config * Manual test behavior of integration tests with sequential_batch_tx * Add draft test for sequential batching * Add set_mempool_version to enable priority mempool * Add integration test for sequential batching * Relax time assertion for parallel batch * Add some documentation to send_messages functions Co-authored-by: Romain Ruetschi --- Cargo.lock | 1 + relayer/src/chain/cosmos.rs | 34 ++-- relayer/src/chain/cosmos/batch.rs | 162 +++++++++++++++--- relayer/src/chain/cosmos/wait.rs | 8 +- relayer/src/chain/mock.rs | 1 + relayer/src/config.rs | 8 + tools/integration-test/Cargo.toml | 1 + tools/integration-test/src/tests/mod.rs | 1 + .../src/tests/tendermint/mod.rs | 1 + .../src/tests/tendermint/sequential.rs | 141 +++++++++++++++ tools/test-framework/src/chain/config.rs | 11 ++ tools/test-framework/src/types/single/node.rs | 1 + 12 files changed, 332 insertions(+), 38 deletions(-) create mode 100644 tools/integration-test/src/tests/tendermint/mod.rs create mode 100644 tools/integration-test/src/tests/tendermint/sequential.rs diff --git a/Cargo.lock b/Cargo.lock index 2e9632d043..96da3c038f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1457,6 +1457,7 @@ dependencies = [ "tendermint", "tendermint-rpc", "time", + "toml", ] [[package]] diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index e50c21e7e2..f0a548c167 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -54,6 +54,7 @@ use ibc_proto::cosmos::staking::v1beta1::Params as StakingParams; use crate::chain::cosmos::batch::{ send_batched_messages_and_wait_check_tx, send_batched_messages_and_wait_commit, + sequential_send_batched_messages_and_wait_commit, }; use crate::chain::cosmos::encode::encode_to_bech32; use crate::chain::cosmos::gas::{calculate_fee, mul_ceil}; @@ -420,16 +421,29 @@ impl CosmosSdkChain { let account = get_or_fetch_account(&self.grpc_addr, &key_entry.account, &mut self.account).await?; - send_batched_messages_and_wait_commit( - &self.tx_config, - self.config.max_msg_num, - self.config.max_tx_size, - &key_entry, - account, - &self.config.memo_prefix, - proto_msgs, - ) - .await + if self.config.sequential_batch_tx { + sequential_send_batched_messages_and_wait_commit( + &self.tx_config, + self.config.max_msg_num, + self.config.max_tx_size, + &key_entry, + account, + &self.config.memo_prefix, + proto_msgs, + ) + .await + } else { + send_batched_messages_and_wait_commit( + &self.tx_config, + self.config.max_msg_num, + self.config.max_tx_size, + &key_entry, + account, + &self.config.memo_prefix, + proto_msgs, + ) + .await + } } async fn do_send_messages_and_wait_check_tx( diff --git a/relayer/src/chain/cosmos/batch.rs b/relayer/src/chain/cosmos/batch.rs index 9118d27152..f519a2c296 100644 --- a/relayer/src/chain/cosmos/batch.rs +++ b/relayer/src/chain/cosmos/batch.rs @@ -1,10 +1,12 @@ use core::mem; +use ibc::core::ics24_host::identifier::ChainId; use ibc::events::IbcEvent; use ibc::Height; use ibc_proto::google::protobuf::Any; use prost::Message; use tendermint_rpc::endpoint::broadcast::tx_sync::Response; +use tracing::debug; use crate::chain::cosmos::retry::send_tx_with_account_sequence_retry; use crate::chain::cosmos::types::account::Account; @@ -16,6 +18,13 @@ use crate::error::Error; use crate::event::IbcEventWithHeight; use crate::keyring::KeyEntry; +/** + Broadcast messages as multiple batched transactions to the chain all at once, + and then wait for all transactions to be committed. + This may improve performance in case when multiple transactions are + committed into the same block. However this approach may not work if + priority mempool is enabled. +*/ pub async fn send_batched_messages_and_wait_commit( config: &TxConfig, max_msg_num: MaxMsgNum, @@ -57,6 +66,43 @@ pub async fn send_batched_messages_and_wait_commit( Ok(events) } +/** + Send batched messages one after another, only after the previous one + has been committed. This is only used in case if parallel transactions + are committed in the wrong order due to interference from priority mempool. +*/ +pub async fn sequential_send_batched_messages_and_wait_commit( + config: &TxConfig, + max_msg_num: MaxMsgNum, + max_tx_size: MaxTxSize, + key_entry: &KeyEntry, + account: &mut Account, + tx_memo: &Memo, + messages: Vec, +) -> Result, Error> { + if messages.is_empty() { + return Ok(Vec::new()); + } + + let tx_sync_results = sequential_send_messages_as_batches( + config, + max_msg_num, + max_tx_size, + key_entry, + account, + tx_memo, + messages, + ) + .await?; + + let events = tx_sync_results + .into_iter() + .flat_map(|el| el.events) + .collect(); + + Ok(events) +} + pub async fn send_batched_messages_and_wait_check_tx( config: &TxConfig, max_msg_num: MaxMsgNum, @@ -97,8 +143,17 @@ async fn send_messages_as_batches( return Ok(Vec::new()); } + let message_count = messages.len(); + let batches = batch_messages(max_msg_num, max_tx_size, messages)?; + debug!( + "sending {} messages as {} batches to chain {} in parallel", + message_count, + batches.len(), + config.chain_id + ); + let mut tx_sync_results = Vec::new(); for batch in batches { @@ -107,37 +162,92 @@ async fn send_messages_as_batches( let response = send_tx_with_account_sequence_retry(config, key_entry, account, tx_memo, batch).await?; - if response.code.is_err() { - // Note: we don't have any height information in this case. This hack will fix itself - // once we remove the `ChainError` event (which is not actually an event) - let height = Height::new(config.chain_id.version(), 1).unwrap(); - - let events_per_tx = vec![IbcEventWithHeight::new(IbcEvent::ChainError(format!( - "check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}", - config.chain_id, response.hash, response.code, response.log - )), height); message_count]; - - let tx_sync_result = TxSyncResult { - response, - events: events_per_tx, - status: TxStatus::ReceivedResponse, - }; - - tx_sync_results.push(tx_sync_result); - } else { - let tx_sync_result = TxSyncResult { - response, - events: Vec::new(), - status: TxStatus::Pending { message_count }, - }; - - tx_sync_results.push(tx_sync_result); - } + let tx_sync_result = response_to_tx_sync_result(&config.chain_id, message_count, response); + + tx_sync_results.push(tx_sync_result); } Ok(tx_sync_results) } +async fn sequential_send_messages_as_batches( + config: &TxConfig, + max_msg_num: MaxMsgNum, + max_tx_size: MaxTxSize, + key_entry: &KeyEntry, + account: &mut Account, + tx_memo: &Memo, + messages: Vec, +) -> Result, Error> { + if messages.is_empty() { + return Ok(Vec::new()); + } + + let message_count = messages.len(); + + let batches = batch_messages(max_msg_num, max_tx_size, messages)?; + + debug!( + "sending {} messages as {} batches to chain {} in serial", + message_count, + batches.len(), + config.chain_id + ); + + let mut tx_sync_results = Vec::new(); + + for batch in batches { + let message_count = batch.len(); + + let response = + send_tx_with_account_sequence_retry(config, key_entry, account, tx_memo, batch).await?; + + let tx_sync_result = response_to_tx_sync_result(&config.chain_id, message_count, response); + + tx_sync_results.push(tx_sync_result); + + wait_for_block_commits( + &config.chain_id, + &config.rpc_client, + &config.rpc_address, + &config.rpc_timeout, + &mut tx_sync_results, + ) + .await?; + } + + Ok(tx_sync_results) +} + +fn response_to_tx_sync_result( + chain_id: &ChainId, + message_count: usize, + response: Response, +) -> TxSyncResult { + if response.code.is_err() { + // Note: we don't have any height information in this case. This hack will fix itself + // once we remove the `ChainError` event (which is not actually an event) + let height = Height::new(chain_id.version(), 1).unwrap(); + + let events_per_tx = vec![IbcEventWithHeight::new(IbcEvent::ChainError(format!( + "check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}", + chain_id, response.hash, response.code, response.log + )), height); message_count]; + + TxSyncResult { + response, + events: events_per_tx, + status: TxStatus::ReceivedResponse, + } + } else { + TxSyncResult { + response, + events: Vec::new(), + status: TxStatus::Pending { message_count }, + } + } +} + fn batch_messages( max_msg_num: MaxMsgNum, max_tx_size: MaxTxSize, diff --git a/relayer/src/chain/cosmos/wait.rs b/relayer/src/chain/cosmos/wait.rs index 917af94fd4..3ff813a053 100644 --- a/relayer/src/chain/cosmos/wait.rs +++ b/relayer/src/chain/cosmos/wait.rs @@ -6,7 +6,7 @@ use itertools::Itertools; use std::thread; use std::time::Instant; use tendermint_rpc::{HttpClient, Url}; -use tracing::{info, trace}; +use tracing::{debug, trace}; use crate::chain::cosmos::query::tx::query_tx_response; use crate::chain::cosmos::types::events::from_tx_response_event; @@ -26,6 +26,10 @@ pub async fn wait_for_block_commits( rpc_timeout: &Duration, tx_sync_results: &mut [TxSyncResult], ) -> Result<(), Error> { + if all_tx_results_found(tx_sync_results) { + return Ok(()); + } + let start_time = Instant::now(); let hashes = tx_sync_results @@ -33,7 +37,7 @@ pub async fn wait_for_block_commits( .map(|res| res.response.hash.to_string()) .join(", "); - info!( + debug!( id = %chain_id, "wait_for_block_commits: waiting for commit of tx hashes(s) {}", hashes diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index 6eec7cfe06..9b7eee2775 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -515,6 +515,7 @@ pub mod test_utils { address_type: AddressType::default(), memo_prefix: Default::default(), proof_specs: Default::default(), + sequential_batch_tx: false, } } } diff --git a/relayer/src/config.rs b/relayer/src/config.rs index 3a48bd1d61..022656e33a 100644 --- a/relayer/src/config.rs +++ b/relayer/src/config.rs @@ -365,6 +365,14 @@ pub struct ChainConfig { #[serde(default, with = "self::proof_specs")] pub proof_specs: ProofSpecs, + // This is an undocumented and hidden config to make the relayer wait for + // DeliverTX before sending the next transaction when sending messages in + // multiple batches. We will instruct relayer operators to turn this on + // in case relaying failed in a chain with priority mempool enabled. + // Warning: turning this on may cause degradation in performance. + #[serde(default)] + pub sequential_batch_tx: bool, + // these two need to be last otherwise we run into `ValueAfterTable` error when serializing to TOML /// The trust threshold defines what fraction of the total voting power of a known /// and trusted validator set is sufficient for a commit to be accepted going forward. diff --git a/tools/integration-test/Cargo.toml b/tools/integration-test/Cargo.toml index 3102151373..963347e42e 100644 --- a/tools/integration-test/Cargo.toml +++ b/tools/integration-test/Cargo.toml @@ -24,6 +24,7 @@ tendermint-rpc = { version = "=0.23.9", features = ["http-client", "websocket-c serde_json = "1" time = "0.3" +toml = "0.5" serde = "1.0.143" [features] diff --git a/tools/integration-test/src/tests/mod.rs b/tools/integration-test/src/tests/mod.rs index 3d11025cd8..b6c7f65c7f 100644 --- a/tools/integration-test/src/tests/mod.rs +++ b/tools/integration-test/src/tests/mod.rs @@ -16,6 +16,7 @@ pub mod memo; pub mod python; mod query_packet; pub mod supervisor; +pub mod tendermint; pub mod ternary_transfer; pub mod transfer; diff --git a/tools/integration-test/src/tests/tendermint/mod.rs b/tools/integration-test/src/tests/tendermint/mod.rs new file mode 100644 index 0000000000..7b0ee374b1 --- /dev/null +++ b/tools/integration-test/src/tests/tendermint/mod.rs @@ -0,0 +1 @@ +pub mod sequential; diff --git a/tools/integration-test/src/tests/tendermint/sequential.rs b/tools/integration-test/src/tests/tendermint/sequential.rs new file mode 100644 index 0000000000..d25910f156 --- /dev/null +++ b/tools/integration-test/src/tests/tendermint/sequential.rs @@ -0,0 +1,141 @@ +use std::time::Instant; + +use ibc_relayer::chain::tracking::TrackedMsgs; +use ibc_relayer::config::types::max_msg_num::MaxMsgNum; +use ibc_test_framework::chain::config; +use ibc_test_framework::prelude::*; +use ibc_test_framework::relayer::transfer::build_transfer_message; + +const MESSAGES_PER_BATCH: usize = 5; +const TOTAL_TRANSACTIONS: usize = 5; +const TOTAL_MESSAGES: usize = MESSAGES_PER_BATCH * TOTAL_TRANSACTIONS; +const BLOCK_TIME_MILLIS: u64 = 1000; +const BLOCK_TIME: Duration = Duration::from_millis(BLOCK_TIME_MILLIS); + +#[test] +fn test_sequential_commit() -> Result<(), Error> { + run_binary_channel_test(&SequentialCommitTest) +} + +pub struct SequentialCommitTest; + +impl TestOverrides for SequentialCommitTest { + fn modify_node_config(&self, config: &mut toml::Value) -> Result<(), Error> { + config::set_timeout_commit(config, BLOCK_TIME)?; + config::set_timeout_propose(config, BLOCK_TIME)?; + + // Enable priority mempool. Note that this is not working currently + config::set_mempool_version(config, "v1")?; + + Ok(()) + } + + fn modify_relayer_config(&self, config: &mut Config) { + // Use sequential batching for chain A, and default parallel batching for chain B + + let chain_config_a = &mut config.chains[0]; + chain_config_a.max_msg_num = MaxMsgNum::new(MESSAGES_PER_BATCH).unwrap(); + chain_config_a.sequential_batch_tx = true; + + let chain_config_b = &mut config.chains[1]; + chain_config_b.max_msg_num = MaxMsgNum::new(MESSAGES_PER_BATCH).unwrap(); + chain_config_b.sequential_batch_tx = false; + } + + fn should_spawn_supervisor(&self) -> bool { + false + } +} + +impl BinaryChannelTest for SequentialCommitTest { + fn run( + &self, + _config: &TestConfig, + _relayer: RelayerDriver, + chains: ConnectedChains, + channel: ConnectedChannel, + ) -> Result<(), Error> { + let wallet_a = chains.node_a.wallets().relayer().cloned(); + let wallet_b = chains.node_b.wallets().relayer().cloned(); + + { + let denom_a = chains.node_a.denom(); + + let transfer_message = build_transfer_message( + &channel.port_a.as_ref(), + &channel.channel_id_a.as_ref(), + &wallet_a.as_ref(), + &wallet_b.address(), + &denom_a, + 100, + )?; + + let messages = TrackedMsgs::new_static( + vec![transfer_message; TOTAL_MESSAGES], + "test_error_events", + ); + + let start = Instant::now(); + + chains.handle_a().send_messages_and_wait_commit(messages)?; + + let end = Instant::now(); + + let duration = end.duration_since(start); + + info!( + "time taken to send {} messages on chain A: {:?}", + TOTAL_MESSAGES, duration + ); + + // Time taken for submitting sequential batches should be around number of transactions * block time + + assert!( + duration + > Duration::from_millis((BLOCK_TIME_MILLIS * TOTAL_TRANSACTIONS as u64) - 1000) + ); + assert!( + duration + < Duration::from_millis((BLOCK_TIME_MILLIS * TOTAL_TRANSACTIONS as u64) + 1000) + ); + } + + { + let denom_b = chains.node_b.denom(); + + let transfer_message = build_transfer_message( + &channel.port_b.as_ref(), + &channel.channel_id_b.as_ref(), + &wallet_b.as_ref(), + &wallet_a.address(), + &denom_b, + 100, + )?; + + let messages = TrackedMsgs::new_static( + vec![transfer_message; TOTAL_MESSAGES], + "test_error_events", + ); + + let start = Instant::now(); + + chains.handle_b().send_messages_and_wait_commit(messages)?; + + let end = Instant::now(); + + let duration = end.duration_since(start); + + // Time taken for submitting sequential batches should be around a single block time, + // since the number of transactions are small enough to fit in a single block. + + info!( + "time taken to send {} messages on chain B: {:?}", + TOTAL_MESSAGES, duration + ); + + assert!(duration < Duration::from_millis(BLOCK_TIME_MILLIS * 2)); + } + + Ok(()) + } +} diff --git a/tools/test-framework/src/chain/config.rs b/tools/test-framework/src/chain/config.rs index afa65fb630..e66c746d26 100644 --- a/tools/test-framework/src/chain/config.rs +++ b/tools/test-framework/src/chain/config.rs @@ -73,6 +73,17 @@ pub fn set_p2p_port(config: &mut Value, port: u16) -> Result<(), Error> { Ok(()) } +pub fn set_mempool_version(config: &mut Value, version: &str) -> Result<(), Error> { + config + .get_mut("mempool") + .ok_or_else(|| eyre!("expect mempool section"))? + .as_table_mut() + .ok_or_else(|| eyre!("expect object"))? + .insert("mempool_version".to_string(), version.into()); + + Ok(()) +} + /// Set the `consensus.timeout_commit` field in the full node config. pub fn set_timeout_commit(config: &mut Value, duration: Duration) -> Result<(), Error> { config diff --git a/tools/test-framework/src/types/single/node.rs b/tools/test-framework/src/types/single/node.rs index 1d2a1a27d7..bc76275df0 100644 --- a/tools/test-framework/src/types/single/node.rs +++ b/tools/test-framework/src/types/single/node.rs @@ -151,6 +151,7 @@ impl FullNode { address_type: Default::default(), memo_prefix: Default::default(), proof_specs: Default::default(), + sequential_batch_tx: false, }) }