Skip to content

Commit

Permalink
feat(congestion): reject new transactions on RPC level
Browse files Browse the repository at this point in the history
Summary: In this PR, we introduce a new failure mode on the RPC level
when a transaction is submitted under congestion. The error is of type
`InvalidTxError` and called `ShardCongested` with a single field
`shard_id` referencing the congested shard.

## Details

With [cross-shard congestion
control](near/NEPs#539) being stabilized soon,
we want to reject new transactions as early as possible when the
receiver shard is already overloaded with traffic.

On the chunk producer level, all transactions going to a congested shard
will be dropped. This keeps the memory requirements of chunk producers
bounded. Further, we decided to go for a relatively low threshold in
order to keep the latency of accepted transactions low, preventing new
transactions as soon as we hit 25% congestion on a specific shard.
Consequently, when shards are congested, it will not be long before
transactions are rejected.

This has consequences for the users. On the positive side, they will no
longer have to wait for a long time not knowing if their transaction
will be accepted or not. Either, it is executed within a bounded time
(at most 20 blocks after inclusion) or it will be rejected immediately.

But on the negative side, when a shard is congested, they will have to
actively retry sending the transaction until it gets accepted.

We hope that this can be automated by wallets, which can also provide
useful live updates to the user about what is happening. But for this,
they will need to understand and handle the new error `ShardCongested`
different from existing errors.
  • Loading branch information
jakmeier committed May 31, 2024
1 parent 0a9195e commit d1ea36f
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 6 deletions.
16 changes: 16 additions & 0 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,25 @@ impl RuntimeAdapter for NightshadeRuntime {
verify_signature: bool,
epoch_id: &EpochId,
current_protocol_version: ProtocolVersion,
receiver_congestion_info: Option<ExtendedCongestionInfo>,
) -> Result<Option<InvalidTxError>, Error> {
let runtime_config = self.runtime_config_store.get_config(current_protocol_version);

if let Some(congestion_info) = receiver_congestion_info {
let congestion_control = CongestionControl::new(
runtime_config.congestion_control_config,
congestion_info.congestion_info,
congestion_info.missed_chunks_count,
);
if !congestion_control.shard_accepts_transactions() {
let receiver_shard =
self.account_id_to_shard_uid(transaction.transaction.receiver_id(), epoch_id)?;
return Ok(Some(InvalidTxError::ShardCongested {
shard_id: receiver_shard.shard_id,
}));
}
}

if let Some(state_root) = state_root {
let shard_uid =
self.account_id_to_shard_uid(transaction.transaction.signer_id(), epoch_id)?;
Expand Down
3 changes: 2 additions & 1 deletion chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use near_primitives::account::{AccessKey, Account};
use near_primitives::apply::ApplyChunkReason;
use near_primitives::block::Tip;
use near_primitives::block_header::{Approval, ApprovalInner};
use near_primitives::congestion_info::CongestionInfo;
use near_primitives::congestion_info::{CongestionInfo, ExtendedCongestionInfo};
use near_primitives::epoch_manager::block_info::BlockInfo;
use near_primitives::epoch_manager::epoch_info::EpochInfo;
use near_primitives::epoch_manager::EpochConfig;
Expand Down Expand Up @@ -1089,6 +1089,7 @@ impl RuntimeAdapter for KeyValueRuntime {
_verify_signature: bool,
_epoch_id: &EpochId,
_current_protocol_version: ProtocolVersion,
_receiver_congestion_info: Option<ExtendedCongestionInfo>,
) -> Result<Option<InvalidTxError>, Error> {
Ok(None)
}
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ pub trait RuntimeAdapter: Send + Sync {
verify_signature: bool,
epoch_id: &EpochId,
current_protocol_version: ProtocolVersion,
receiver_congestion_info: Option<ExtendedCongestionInfo>,
) -> Result<Option<InvalidTxError>, Error>;

/// Returns an ordered list of valid transactions from the pool up the given limits.
Expand Down
28 changes: 24 additions & 4 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2259,7 +2259,8 @@ impl Client {
) -> Result<ProcessTxResponse, Error> {
let head = self.chain.head()?;
let me = self.validator_signer.as_ref().map(|vs| vs.validator_id());
let cur_block_header = self.chain.head_header()?;
let cur_block = self.chain.get_head_block()?;
let cur_block_header = cur_block.header();
let transaction_validity_period = self.chain.transaction_validity_period;
// here it is fine to use `cur_block_header` as it is a best effort estimate. If the transaction
// were to be included, the block that the chunk points to will have height >= height of
Expand All @@ -2274,12 +2275,23 @@ impl Client {
}
let gas_price = cur_block_header.next_gas_price();
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&head.last_block_hash)?;

let receiver_shard =
self.epoch_manager.account_id_to_shard_id(tx.transaction.receiver_id(), &epoch_id)?;
let receiver_congestion_info =
cur_block.shards_congestion_info().get(&receiver_shard).copied();
let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?;

if let Some(err) = self
.runtime_adapter
.validate_tx(gas_price, None, tx, true, &epoch_id, protocol_version)
.validate_tx(
gas_price,
None,
tx,
true,
&epoch_id,
protocol_version,
receiver_congestion_info,
)
.expect("no storage errors")
{
debug!(target: "client", tx_hash = ?tx.get_hash(), ?err, "Invalid tx during basic validation");
Expand Down Expand Up @@ -2311,7 +2323,15 @@ impl Client {
};
if let Some(err) = self
.runtime_adapter
.validate_tx(gas_price, Some(state_root), tx, false, &epoch_id, protocol_version)
.validate_tx(
gas_price,
Some(state_root),
tx,
false,
&epoch_id,
protocol_version,
receiver_congestion_info,
)
.expect("no storage errors")
{
debug!(target: "client", ?err, "Invalid tx");
Expand Down
10 changes: 9 additions & 1 deletion chain/jsonrpc/res/rpc_errors_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@
"Expired",
"ActionsValidation",
"TransactionSizeExceeded",
"InvalidTransactionVersion"
"InvalidTransactionVersion",
"ShardCongested"
],
"props": {}
},
Expand Down Expand Up @@ -772,6 +773,13 @@
"subtypes": [],
"props": {}
},
"ShardCongested": {
"name": "ShardCongested",
"subtypes": [],
"props": {
"shard_id": ""
}
},
"SignerDoesNotExist": {
"name": "SignerDoesNotExist",
"subtypes": [],
Expand Down
6 changes: 6 additions & 0 deletions core/primitives/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ pub enum InvalidTxError {
TransactionSizeExceeded { size: u64, limit: u64 },
/// Transaction version is invalid.
InvalidTransactionVersion,
/// The receiver shard of the transaction is too congestion to accept new
/// transactions at the moment.
ShardCongested { shard_id: u32 },
}

impl std::error::Error for InvalidTxError {}
Expand Down Expand Up @@ -576,6 +579,9 @@ impl Display for InvalidTxError {
InvalidTxError::InvalidTransactionVersion => {
write!(f, "Transaction version is invalid")
}
InvalidTxError::ShardCongested { shard_id } => {
write!(f, "Shard {shard_id} is currently congested and rejects new transactions.")
}
}
}
}
Expand Down

0 comments on commit d1ea36f

Please sign in to comment.