Skip to content

Commit

Permalink
Add hidden configuration for sending batched messages sequentially (#…
Browse files Browse the repository at this point in the history
…2543)

Partial fix for #2350

This PR adds a hidden chain configuration `sequential_batch_tx`, which will cause the relayer to submit batched messages in serial after the previous transaction is committed to the chain.

The config has default value `false`, which disables sequential sending by default. However we can instruct relayer operators to set this to `true`, in the rare case that the batched transactions are failing due to priority mempool being turned on by validators of a chain.

Note that enabling sequential sending may cause degradation in performance. Hence we do not expose this config in the public documentation, to prevent relayer operators from shooting themselves in the foot.

---

* Add sequential version of send_batched_messages

* Add sequential_batch_tx chain config

* Manual test behavior of integration tests with sequential_batch_tx

* Add draft test for sequential batching

* Add set_mempool_version to enable priority mempool

* Add integration test for sequential batching

* Relax time assertion for parallel batch

* Add some documentation to send_messages functions

Co-authored-by: Romain Ruetschi <[email protected]>
  • Loading branch information
soareschen and romac authored Aug 23, 2022
1 parent 7b2eb6c commit 134b1bc
Show file tree
Hide file tree
Showing 12 changed files with 332 additions and 38 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 24 additions & 10 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use ibc_proto::cosmos::staking::v1beta1::Params as StakingParams;

use crate::chain::cosmos::batch::{
send_batched_messages_and_wait_check_tx, send_batched_messages_and_wait_commit,
sequential_send_batched_messages_and_wait_commit,
};
use crate::chain::cosmos::encode::encode_to_bech32;
use crate::chain::cosmos::gas::{calculate_fee, mul_ceil};
Expand Down Expand Up @@ -420,16 +421,29 @@ impl CosmosSdkChain {
let account =
get_or_fetch_account(&self.grpc_addr, &key_entry.account, &mut self.account).await?;

send_batched_messages_and_wait_commit(
&self.tx_config,
self.config.max_msg_num,
self.config.max_tx_size,
&key_entry,
account,
&self.config.memo_prefix,
proto_msgs,
)
.await
if self.config.sequential_batch_tx {
sequential_send_batched_messages_and_wait_commit(
&self.tx_config,
self.config.max_msg_num,
self.config.max_tx_size,
&key_entry,
account,
&self.config.memo_prefix,
proto_msgs,
)
.await
} else {
send_batched_messages_and_wait_commit(
&self.tx_config,
self.config.max_msg_num,
self.config.max_tx_size,
&key_entry,
account,
&self.config.memo_prefix,
proto_msgs,
)
.await
}
}

async fn do_send_messages_and_wait_check_tx(
Expand Down
162 changes: 136 additions & 26 deletions relayer/src/chain/cosmos/batch.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use core::mem;

use ibc::core::ics24_host::identifier::ChainId;
use ibc::events::IbcEvent;
use ibc::Height;
use ibc_proto::google::protobuf::Any;
use prost::Message;
use tendermint_rpc::endpoint::broadcast::tx_sync::Response;
use tracing::debug;

use crate::chain::cosmos::retry::send_tx_with_account_sequence_retry;
use crate::chain::cosmos::types::account::Account;
Expand All @@ -16,6 +18,13 @@ use crate::error::Error;
use crate::event::IbcEventWithHeight;
use crate::keyring::KeyEntry;

/**
Broadcast messages as multiple batched transactions to the chain all at once,
and then wait for all transactions to be committed.
This may improve performance in case when multiple transactions are
committed into the same block. However this approach may not work if
priority mempool is enabled.
*/
pub async fn send_batched_messages_and_wait_commit(
config: &TxConfig,
max_msg_num: MaxMsgNum,
Expand Down Expand Up @@ -57,6 +66,43 @@ pub async fn send_batched_messages_and_wait_commit(
Ok(events)
}

/**
Send batched messages one after another, only after the previous one
has been committed. This is only used in case if parallel transactions
are committed in the wrong order due to interference from priority mempool.
*/
pub async fn sequential_send_batched_messages_and_wait_commit(
config: &TxConfig,
max_msg_num: MaxMsgNum,
max_tx_size: MaxTxSize,
key_entry: &KeyEntry,
account: &mut Account,
tx_memo: &Memo,
messages: Vec<Any>,
) -> Result<Vec<IbcEventWithHeight>, Error> {
if messages.is_empty() {
return Ok(Vec::new());
}

let tx_sync_results = sequential_send_messages_as_batches(
config,
max_msg_num,
max_tx_size,
key_entry,
account,
tx_memo,
messages,
)
.await?;

let events = tx_sync_results
.into_iter()
.flat_map(|el| el.events)
.collect();

Ok(events)
}

pub async fn send_batched_messages_and_wait_check_tx(
config: &TxConfig,
max_msg_num: MaxMsgNum,
Expand Down Expand Up @@ -97,8 +143,17 @@ async fn send_messages_as_batches(
return Ok(Vec::new());
}

let message_count = messages.len();

let batches = batch_messages(max_msg_num, max_tx_size, messages)?;

debug!(
"sending {} messages as {} batches to chain {} in parallel",
message_count,
batches.len(),
config.chain_id
);

let mut tx_sync_results = Vec::new();

for batch in batches {
Expand All @@ -107,37 +162,92 @@ async fn send_messages_as_batches(
let response =
send_tx_with_account_sequence_retry(config, key_entry, account, tx_memo, batch).await?;

if response.code.is_err() {
// Note: we don't have any height information in this case. This hack will fix itself
// once we remove the `ChainError` event (which is not actually an event)
let height = Height::new(config.chain_id.version(), 1).unwrap();

let events_per_tx = vec![IbcEventWithHeight::new(IbcEvent::ChainError(format!(
"check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}",
config.chain_id, response.hash, response.code, response.log
)), height); message_count];

let tx_sync_result = TxSyncResult {
response,
events: events_per_tx,
status: TxStatus::ReceivedResponse,
};

tx_sync_results.push(tx_sync_result);
} else {
let tx_sync_result = TxSyncResult {
response,
events: Vec::new(),
status: TxStatus::Pending { message_count },
};

tx_sync_results.push(tx_sync_result);
}
let tx_sync_result = response_to_tx_sync_result(&config.chain_id, message_count, response);

tx_sync_results.push(tx_sync_result);
}

Ok(tx_sync_results)
}

async fn sequential_send_messages_as_batches(
config: &TxConfig,
max_msg_num: MaxMsgNum,
max_tx_size: MaxTxSize,
key_entry: &KeyEntry,
account: &mut Account,
tx_memo: &Memo,
messages: Vec<Any>,
) -> Result<Vec<TxSyncResult>, Error> {
if messages.is_empty() {
return Ok(Vec::new());
}

let message_count = messages.len();

let batches = batch_messages(max_msg_num, max_tx_size, messages)?;

debug!(
"sending {} messages as {} batches to chain {} in serial",
message_count,
batches.len(),
config.chain_id
);

let mut tx_sync_results = Vec::new();

for batch in batches {
let message_count = batch.len();

let response =
send_tx_with_account_sequence_retry(config, key_entry, account, tx_memo, batch).await?;

let tx_sync_result = response_to_tx_sync_result(&config.chain_id, message_count, response);

tx_sync_results.push(tx_sync_result);

wait_for_block_commits(
&config.chain_id,
&config.rpc_client,
&config.rpc_address,
&config.rpc_timeout,
&mut tx_sync_results,
)
.await?;
}

Ok(tx_sync_results)
}

fn response_to_tx_sync_result(
chain_id: &ChainId,
message_count: usize,
response: Response,
) -> TxSyncResult {
if response.code.is_err() {
// Note: we don't have any height information in this case. This hack will fix itself
// once we remove the `ChainError` event (which is not actually an event)
let height = Height::new(chain_id.version(), 1).unwrap();

let events_per_tx = vec![IbcEventWithHeight::new(IbcEvent::ChainError(format!(
"check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}",
chain_id, response.hash, response.code, response.log
)), height); message_count];

TxSyncResult {
response,
events: events_per_tx,
status: TxStatus::ReceivedResponse,
}
} else {
TxSyncResult {
response,
events: Vec::new(),
status: TxStatus::Pending { message_count },
}
}
}

fn batch_messages(
max_msg_num: MaxMsgNum,
max_tx_size: MaxTxSize,
Expand Down
8 changes: 6 additions & 2 deletions relayer/src/chain/cosmos/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use itertools::Itertools;
use std::thread;
use std::time::Instant;
use tendermint_rpc::{HttpClient, Url};
use tracing::{info, trace};
use tracing::{debug, trace};

use crate::chain::cosmos::query::tx::query_tx_response;
use crate::chain::cosmos::types::events::from_tx_response_event;
Expand All @@ -26,14 +26,18 @@ pub async fn wait_for_block_commits(
rpc_timeout: &Duration,
tx_sync_results: &mut [TxSyncResult],
) -> Result<(), Error> {
if all_tx_results_found(tx_sync_results) {
return Ok(());
}

let start_time = Instant::now();

let hashes = tx_sync_results
.iter()
.map(|res| res.response.hash.to_string())
.join(", ");

info!(
debug!(
id = %chain_id,
"wait_for_block_commits: waiting for commit of tx hashes(s) {}",
hashes
Expand Down
1 change: 1 addition & 0 deletions relayer/src/chain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ pub mod test_utils {
address_type: AddressType::default(),
memo_prefix: Default::default(),
proof_specs: Default::default(),
sequential_batch_tx: false,
}
}
}
8 changes: 8 additions & 0 deletions relayer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,14 @@ pub struct ChainConfig {
#[serde(default, with = "self::proof_specs")]
pub proof_specs: ProofSpecs,

// This is an undocumented and hidden config to make the relayer wait for
// DeliverTX before sending the next transaction when sending messages in
// multiple batches. We will instruct relayer operators to turn this on
// in case relaying failed in a chain with priority mempool enabled.
// Warning: turning this on may cause degradation in performance.
#[serde(default)]
pub sequential_batch_tx: bool,

// these two need to be last otherwise we run into `ValueAfterTable` error when serializing to TOML
/// The trust threshold defines what fraction of the total voting power of a known
/// and trusted validator set is sufficient for a commit to be accepted going forward.
Expand Down
1 change: 1 addition & 0 deletions tools/integration-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tendermint-rpc = { version = "=0.23.9", features = ["http-client", "websocket-c

serde_json = "1"
time = "0.3"
toml = "0.5"
serde = "1.0.143"

[features]
Expand Down
1 change: 1 addition & 0 deletions tools/integration-test/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod memo;
pub mod python;
mod query_packet;
pub mod supervisor;
pub mod tendermint;
pub mod ternary_transfer;
pub mod transfer;

Expand Down
1 change: 1 addition & 0 deletions tools/integration-test/src/tests/tendermint/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod sequential;
Loading

0 comments on commit 134b1bc

Please sign in to comment.