diff --git a/subxt/src/backend/chain_head/mod.rs b/subxt/src/backend/chain_head/mod.rs index 5bb12112708..4d3c2615c26 100644 --- a/subxt/src/backend/chain_head/mod.rs +++ b/subxt/src/backend/chain_head/mod.rs @@ -48,6 +48,7 @@ pub use subxt_rpcs::methods::chain_head::ChainHeadRpcMethods; pub struct ChainHeadBackendBuilder { max_block_life: usize, transaction_timeout_secs: usize, + submit_transactions_ignoring_follow_events: bool, _marker: std::marker::PhantomData, } @@ -63,6 +64,7 @@ impl ChainHeadBackendBuilder { Self { max_block_life: usize::MAX, transaction_timeout_secs: 240, + submit_transactions_ignoring_follow_events: false, _marker: std::marker::PhantomData, } } @@ -84,11 +86,35 @@ impl ChainHeadBackendBuilder { /// give up waiting. /// /// Provide a value here to denote how long, in seconds, to wait before giving up. Defaults to 240 seconds. + /// + /// If [`Self::submit_transactions_ignoring_follow_events()`] is called, this timeout is ignored. pub fn transaction_timeout(mut self, timeout_secs: usize) -> Self { self.transaction_timeout_secs = timeout_secs; self } + /// When a transaction is submitted, we normally synchronize the events that we get back with events from + /// our background `chainHead_follow` subscription, to ensure that any blocks hashes that we see can be + /// immediately queried (for example to get events or state at that block), and are kept around unless they + /// are no longer needed. + /// + /// The main downside of this synchronization is that there may be a delay in being handed back a + /// [`TransactionStatus::InFinalizedBlock`] event while we wait to see the same block hash emitted from + /// our background `chainHead_follow` subscription in order to ensure it's available for querying. + /// + /// Calling this method turns off this synchronization, speeding up the response and removing any reliance + /// on the `chainHead_follow` subscription continuing to run without stopping throughout submitting a transaction. + /// + /// # Warning + /// + /// This can lead to errors when calling APIs like `wait_for_finalized_success`, which will try to retrieve events + /// at the finalized block, because there will be a race and the finalized block may not be available for querying + /// yet. + pub fn submit_transactions_ignoring_follow_events(mut self) -> Self { + self.submit_transactions_ignoring_follow_events = true; + self + } + /// A low-level API to build the backend and driver which requires polling the driver for the backend /// to make progress. /// @@ -117,6 +143,8 @@ impl ChainHeadBackendBuilder { methods: rpc_methods, follow_handle: follow_stream_driver.handle(), transaction_timeout_secs: self.transaction_timeout_secs, + submit_transactions_ignoring_follow_events: self + .submit_transactions_ignoring_follow_events, }; let driver = ChainHeadBackendDriver { driver: follow_stream_driver, @@ -187,6 +215,8 @@ pub struct ChainHeadBackend { follow_handle: FollowStreamDriverHandle, // How long to wait until giving up on transactions: transaction_timeout_secs: usize, + // Don't synchronise blocks with chainHead_follow when submitting txs: + submit_transactions_ignoring_follow_events: bool, } impl ChainHeadBackend { @@ -558,170 +588,230 @@ impl Backend for ChainHeadBackend { &self, extrinsic: &[u8], ) -> Result>, Error> { - let transaction_timeout_secs = self.transaction_timeout_secs as u64; + // Submit a transaction. This makes no attempt to sync with follow events, + async fn submit_transaction_ignoring_follow_events( + extrinsic: &[u8], + methods: &ChainHeadRpcMethods, + ) -> Result>, Error> { + let tx_progress = methods + .transactionwatch_v1_submit_and_watch(extrinsic) + .await? + .map(|ev| { + ev.map(|tx_status| { + use subxt_rpcs::methods::chain_head::TransactionStatus as RpcTransactionStatus; + match tx_status { + RpcTransactionStatus::Validated => TransactionStatus::Validated, + RpcTransactionStatus::Broadcasted => TransactionStatus::Broadcasted, + RpcTransactionStatus::BestChainBlockIncluded { block: None } => { + TransactionStatus::NoLongerInBestBlock + }, + RpcTransactionStatus::BestChainBlockIncluded { block: Some(block) } => { + TransactionStatus::InBestBlock { hash: BlockRef::from_hash(block.hash) } + }, + RpcTransactionStatus::Finalized { block } => { + TransactionStatus::InFinalizedBlock { hash: BlockRef::from_hash(block.hash) } + }, + RpcTransactionStatus::Error { error } => { + TransactionStatus::Error { message: error } + }, + RpcTransactionStatus::Invalid { error } => { + TransactionStatus::Invalid { message: error } + }, + RpcTransactionStatus::Dropped { error } => { + TransactionStatus::Dropped { message: error } + }, + } + }).map_err(Into::into) + }); - // We care about new and finalized block hashes. - enum SeenBlockMarker { - New, - Finalized, + Ok(StreamOf(Box::pin(tx_progress))) } - // First, subscribe to new blocks. - let mut seen_blocks_sub = self.follow_handle.subscribe().events(); + // Submit a transaction. This synchronizes with chainHead_follow events to ensure + // that block hashes returned are ready to be queried. + async fn submit_transaction_tracking_follow_events( + extrinsic: &[u8], + transaction_timeout_secs: u64, + methods: &ChainHeadRpcMethods, + follow_handle: &FollowStreamDriverHandle, + ) -> Result>, Error> { + // We care about new and finalized block hashes. + enum SeenBlockMarker { + New, + Finalized, + } - // Then, submit the transaction. - let mut tx_progress = self - .methods - .transactionwatch_v1_submit_and_watch(extrinsic) - .await?; + // First, subscribe to new blocks. + let mut seen_blocks_sub = follow_handle.subscribe().events(); - let mut seen_blocks = HashMap::new(); - let mut done = false; + // Then, submit the transaction. + let mut tx_progress = methods + .transactionwatch_v1_submit_and_watch(extrinsic) + .await?; - // If we see the finalized event, we start waiting until we find a finalized block that - // matches, so we can guarantee to return a pinned block hash and be properly in sync - // with chainHead_follow. - let mut finalized_hash: Option = None; + let mut seen_blocks = HashMap::new(); + let mut done = false; - // Record the start time so that we can time out if things appear to take too long. - let start_instant = web_time::Instant::now(); + // If we see the finalized event, we start waiting until we find a finalized block that + // matches, so we can guarantee to return a pinned block hash and be properly in sync + // with chainHead_follow. + let mut finalized_hash: Option = None; - // A quick helper to return a generic error. - let err_other = |s: &str| Some(Err(Error::Other(s.into()))); + // Record the start time so that we can time out if things appear to take too long. + let start_instant = web_time::Instant::now(); - // Now we can attempt to associate tx events with pinned blocks. - let tx_stream = futures::stream::poll_fn(move |cx| { - loop { - // Bail early if we're finished; nothing else to do. - if done { - return Poll::Ready(None); - } + // A quick helper to return a generic error. + let err_other = |s: &str| Some(Err(Error::Other(s.into()))); - // Bail if we exceed 4 mins; something very likely went wrong. - if start_instant.elapsed().as_secs() > transaction_timeout_secs { - return Poll::Ready(err_other( - "Timeout waiting for the transaction to be finalized", - )); - } + // Now we can attempt to associate tx events with pinned blocks. + let tx_stream = futures::stream::poll_fn(move |cx| { + loop { + // Bail early if we're finished; nothing else to do. + if done { + return Poll::Ready(None); + } - // Poll for a follow event, and error if the stream has unexpectedly ended. - let follow_ev_poll = match seen_blocks_sub.poll_next_unpin(cx) { - Poll::Ready(None) => { - return Poll::Ready(err_other("chainHead_follow stream ended unexpectedly")) + // Bail if we exceed 4 mins; something very likely went wrong. + if start_instant.elapsed().as_secs() > transaction_timeout_secs { + return Poll::Ready(err_other( + "Timeout waiting for the transaction to be finalized", + )); } - Poll::Ready(Some(follow_ev)) => Poll::Ready(follow_ev), - Poll::Pending => Poll::Pending, - }; - let follow_ev_is_pending = follow_ev_poll.is_pending(); - - // If there was a follow event, then handle it and loop around to see if there are more. - // We want to buffer follow events until we hit Pending, so that we are as up-to-date as possible - // for when we see a BestBlockChanged event, so that we have the best change of already having - // seen the block that it mentions and returning a proper pinned block. - if let Poll::Ready(follow_ev) = follow_ev_poll { - match follow_ev { - FollowEvent::NewBlock(ev) => { - // Optimization: once we have a `finalized_hash`, we only care about finalized - // block refs now and can avoid bothering to save new blocks. - if finalized_hash.is_none() { - seen_blocks.insert( - ev.block_hash.hash(), - (SeenBlockMarker::New, ev.block_hash), - ); - } + + // Poll for a follow event, and error if the stream has unexpectedly ended. + let follow_ev_poll = match seen_blocks_sub.poll_next_unpin(cx) { + Poll::Ready(None) => { + return Poll::Ready(err_other( + "chainHead_follow stream ended unexpectedly", + )) } - FollowEvent::Finalized(ev) => { - for block_ref in ev.finalized_block_hashes { - seen_blocks.insert( - block_ref.hash(), - (SeenBlockMarker::Finalized, block_ref), - ); + Poll::Ready(Some(follow_ev)) => Poll::Ready(follow_ev), + Poll::Pending => Poll::Pending, + }; + let follow_ev_is_pending = follow_ev_poll.is_pending(); + + // If there was a follow event, then handle it and loop around to see if there are more. + // We want to buffer follow events until we hit Pending, so that we are as up-to-date as possible + // for when we see a BestBlockChanged event, so that we have the best change of already having + // seen the block that it mentions and returning a proper pinned block. + if let Poll::Ready(follow_ev) = follow_ev_poll { + match follow_ev { + FollowEvent::NewBlock(ev) => { + // Optimization: once we have a `finalized_hash`, we only care about finalized + // block refs now and can avoid bothering to save new blocks. + if finalized_hash.is_none() { + seen_blocks.insert( + ev.block_hash.hash(), + (SeenBlockMarker::New, ev.block_hash), + ); + } } + FollowEvent::Finalized(ev) => { + for block_ref in ev.finalized_block_hashes { + seen_blocks.insert( + block_ref.hash(), + (SeenBlockMarker::Finalized, block_ref), + ); + } + } + FollowEvent::Stop => { + // If we get this event, we'll lose all of our existing pinned blocks and have a gap + // in which we may lose the finalized block that the TX is in. For now, just error if + // this happens, to prevent the case in which we never see a finalized block and wait + // forever. + return Poll::Ready(err_other("chainHead_follow emitted 'stop' event during transaction submission")); + } + _ => {} } - FollowEvent::Stop => { - // If we get this event, we'll lose all of our existing pinned blocks and have a gap - // in which we may lose the finalized block that the TX is in. For now, just error if - // this happens, to prevent the case in which we never see a finalized block and wait - // forever. - return Poll::Ready(err_other("chainHead_follow emitted 'stop' event during transaction submission")); - } - _ => {} + continue; } - continue; - } - // If we have a finalized hash, we are done looking for tx events and we are just waiting - // for a pinned block with a matching hash (which must appear eventually given it's finalized). - if let Some(hash) = &finalized_hash { - if let Some((SeenBlockMarker::Finalized, block_ref)) = seen_blocks.remove(hash) - { - // Found it! Hand back the event with a pinned block. We're done. - done = true; - let ev = TransactionStatus::InFinalizedBlock { - hash: block_ref.into(), - }; - return Poll::Ready(Some(Ok(ev))); - } else { - // Not found it! If follow ev is pending, then return pending here and wait for - // a new one to come in, else loop around and see if we get another one immediately. - seen_blocks.clear(); - if follow_ev_is_pending { - return Poll::Pending; + // If we have a finalized hash, we are done looking for tx events and we are just waiting + // for a pinned block with a matching hash (which must appear eventually given it's finalized). + if let Some(hash) = &finalized_hash { + if let Some((SeenBlockMarker::Finalized, block_ref)) = + seen_blocks.remove(hash) + { + // Found it! Hand back the event with a pinned block. We're done. + done = true; + let ev = TransactionStatus::InFinalizedBlock { + hash: block_ref.into(), + }; + return Poll::Ready(Some(Ok(ev))); } else { - continue; + // Not found it! If follow ev is pending, then return pending here and wait for + // a new one to come in, else loop around and see if we get another one immediately. + seen_blocks.clear(); + if follow_ev_is_pending { + return Poll::Pending; + } else { + continue; + } } } - } - // If we don't have a finalized block yet, we keep polling for tx progress events. - let tx_progress_ev = match tx_progress.poll_next_unpin(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(None) => return Poll::Ready(err_other("No more transaction progress events, but we haven't seen a Finalized one yet")), - Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e.into()))), - Poll::Ready(Some(Ok(ev))) => ev, - }; + // If we don't have a finalized block yet, we keep polling for tx progress events. + let tx_progress_ev = match tx_progress.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(err_other("No more transaction progress events, but we haven't seen a Finalized one yet")), + Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e.into()))), + Poll::Ready(Some(Ok(ev))) => ev, + }; + + // When we get one, map it to the correct format (or for finalized ev, wait for the pinned block): + use subxt_rpcs::methods::chain_head::TransactionStatus as RpcTransactionStatus; + let tx_progress_ev = match tx_progress_ev { + RpcTransactionStatus::Finalized { block } => { + // We'll wait until we have seen this hash, to try to guarantee + // that when we return this event, the corresponding block is + // pinned and accessible. + finalized_hash = Some(block.hash); + continue; + } + RpcTransactionStatus::BestChainBlockIncluded { block: Some(block) } => { + // Look up a pinned block ref if we can, else return a non-pinned + // block that likely isn't accessible. We have no guarantee that a best + // block on the node a tx was sent to will ever be known about on the + // chainHead_follow subscription. + let block_ref = match seen_blocks.get(&block.hash) { + Some((_, block_ref)) => block_ref.clone().into(), + None => BlockRef::from_hash(block.hash), + }; + TransactionStatus::InBestBlock { hash: block_ref } + } + RpcTransactionStatus::BestChainBlockIncluded { block: None } => { + TransactionStatus::NoLongerInBestBlock + } + RpcTransactionStatus::Broadcasted => TransactionStatus::Broadcasted, + RpcTransactionStatus::Dropped { error, .. } => { + TransactionStatus::Dropped { message: error } + } + RpcTransactionStatus::Error { error } => { + TransactionStatus::Error { message: error } + } + RpcTransactionStatus::Invalid { error } => { + TransactionStatus::Invalid { message: error } + } + RpcTransactionStatus::Validated => TransactionStatus::Validated, + }; + return Poll::Ready(Some(Ok(tx_progress_ev))); + } + }); - // When we get one, map it to the correct format (or for finalized ev, wait for the pinned block): - use subxt_rpcs::methods::chain_head::TransactionStatus as RpcTransactionStatus; - let tx_progress_ev = match tx_progress_ev { - RpcTransactionStatus::Finalized { block } => { - // We'll wait until we have seen this hash, to try to guarantee - // that when we return this event, the corresponding block is - // pinned and accessible. - finalized_hash = Some(block.hash); - continue; - } - RpcTransactionStatus::BestChainBlockIncluded { block: Some(block) } => { - // Look up a pinned block ref if we can, else return a non-pinned - // block that likely isn't accessible. We have no guarantee that a best - // block on the node a tx was sent to will ever be known about on the - // chainHead_follow subscription. - let block_ref = match seen_blocks.get(&block.hash) { - Some((_, block_ref)) => block_ref.clone().into(), - None => BlockRef::from_hash(block.hash), - }; - TransactionStatus::InBestBlock { hash: block_ref } - } - RpcTransactionStatus::BestChainBlockIncluded { block: None } => { - TransactionStatus::NoLongerInBestBlock - } - RpcTransactionStatus::Broadcasted => TransactionStatus::Broadcasted, - RpcTransactionStatus::Dropped { error, .. } => { - TransactionStatus::Dropped { message: error } - } - RpcTransactionStatus::Error { error } => { - TransactionStatus::Error { message: error } - } - RpcTransactionStatus::Invalid { error } => { - TransactionStatus::Invalid { message: error } - } - RpcTransactionStatus::Validated => TransactionStatus::Validated, - }; - return Poll::Ready(Some(Ok(tx_progress_ev))); - } - }); + Ok(StreamOf(Box::pin(tx_stream))) + } - Ok(StreamOf(Box::pin(tx_stream))) + if self.submit_transactions_ignoring_follow_events { + submit_transaction_ignoring_follow_events(extrinsic, &self.methods).await + } else { + submit_transaction_tracking_follow_events::( + extrinsic, + self.transaction_timeout_secs as u64, + &self.methods, + &self.follow_handle, + ) + .await + } } async fn call(