Skip to content

Commit 604117e

Browse files
withdrawal retries
1 parent c8317da commit 604117e

File tree

78 files changed

+1571
-323
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+1571
-323
lines changed

packages/rs-drive-abci/src/execution/engine/run_block_proposal/v0/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,14 @@ where
269269
Error::Execution(ExecutionError::UpdateValidatorProposedAppVersionError(e))
270270
})?; // This is a system error
271271

272+
// Rebroadcast expired withdrawals if they exist
273+
self.rebroadcast_expired_withdrawal_documents(
274+
&block_info,
275+
&last_committed_platform_state,
276+
transaction,
277+
platform_version,
278+
)?;
279+
272280
// Mark all previously broadcasted and chainlocked withdrawals as complete
273281
// only when we are on a new core height
274282
if block_state_info.core_chain_locked_height() != last_block_core_height {

packages/rs-drive-abci/src/execution/platform_events/protocol_upgrade/perform_events_on_first_block_of_protocol_change/v0/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use crate::platform_types::platform::Platform;
33
use dpp::version::PlatformVersion;
44
use dpp::version::ProtocolVersion;
55
use drive::drive::identity::withdrawals::paths::{
6-
get_withdrawal_root_path, WITHDRAWAL_TRANSACTIONS_SUM_AMOUNT_TREE_KEY,
6+
get_withdrawal_root_path, WITHDRAWAL_TRANSACTIONS_BROADCASTED_KEY,
7+
WITHDRAWAL_TRANSACTIONS_SUM_AMOUNT_TREE_KEY,
78
};
89
use drive::grovedb::{Element, Transaction};
910

@@ -68,6 +69,14 @@ impl<C> Platform<C> {
6869
None,
6970
&platform_version.drive,
7071
)?;
72+
self.drive.grove_insert_if_not_exists(
73+
(&path).into(),
74+
&WITHDRAWAL_TRANSACTIONS_BROADCASTED_KEY,
75+
Element::empty_tree(),
76+
Some(transaction),
77+
None,
78+
&platform_version.drive,
79+
)?;
7180
Ok(())
7281
}
7382
}

packages/rs-drive-abci/src/execution/platform_events/withdrawals/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ pub(in crate::execution) mod cleanup_expired_locks_of_withdrawal_amounts;
44
pub(in crate::execution) mod dequeue_and_build_unsigned_withdrawal_transactions;
55
pub(in crate::execution) mod fetch_transactions_block_inclusion_status;
66
pub(in crate::execution) mod pool_withdrawals_into_transactions_queue;
7+
pub(in crate::execution) mod rebroadcast_expired_withdrawal_documents;
78
pub(in crate::execution) mod update_broadcasted_withdrawal_statuses;

packages/rs-drive-abci/src/execution/platform_events/withdrawals/pool_withdrawals_into_transactions_queue/v0/mod.rs

Lines changed: 2 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -44,125 +44,8 @@ where
4444
);
4545
return Ok(());
4646
}
47-
let documents = self.drive.fetch_oldest_withdrawal_documents_by_status(
48-
withdrawals_contract::WithdrawalStatus::QUEUED.into(),
49-
platform_version
50-
.system_limits
51-
.withdrawal_transactions_per_block_limit,
52-
transaction,
53-
platform_version,
54-
)?;
55-
56-
if documents.is_empty() {
57-
return Ok(());
58-
}
59-
60-
// Only take documents up to the withdrawal amount
61-
let current_withdrawal_limit = self
62-
.drive
63-
.calculate_current_withdrawal_limit(transaction, platform_version)?;
64-
65-
// Only process documents up to the current withdrawal limit.
66-
let mut total_withdrawal_amount = 0u64;
67-
68-
// Iterate over the documents and accumulate their withdrawal amounts.
69-
let mut documents_to_process = vec![];
70-
for document in documents {
71-
// Get the withdrawal amount from the document properties.
72-
let amount: u64 = document
73-
.properties()
74-
.get_integer(withdrawal::properties::AMOUNT)?;
75-
76-
// Check if adding this amount would exceed the current withdrawal limit.
77-
let potential_total_withdrawal_amount =
78-
total_withdrawal_amount.checked_add(amount).ok_or_else(|| {
79-
Error::Execution(ExecutionError::Overflow(
80-
"overflow in total withdrawal amount",
81-
))
82-
})?;
83-
84-
if potential_total_withdrawal_amount > current_withdrawal_limit {
85-
// If adding this withdrawal would exceed the limit, stop processing further.
86-
break;
87-
}
88-
89-
total_withdrawal_amount = potential_total_withdrawal_amount;
90-
91-
// Add this document to the list of documents to be processed.
92-
documents_to_process.push(document);
93-
}
94-
95-
if documents_to_process.is_empty() {
96-
return Ok(());
97-
}
98-
99-
let start_transaction_index = self
100-
.drive
101-
.fetch_next_withdrawal_transaction_index(transaction, platform_version)?;
102-
103-
let (withdrawal_transactions, total_amount) = self
104-
.build_untied_withdrawal_transactions_from_documents(
105-
&mut documents_to_process,
106-
start_transaction_index,
107-
block_info,
108-
platform_version,
109-
)?;
110-
111-
let withdrawal_transactions_count = withdrawal_transactions.len();
112-
113-
let mut drive_operations = vec![];
114-
115-
self.drive
116-
.add_enqueue_untied_withdrawal_transaction_operations(
117-
withdrawal_transactions,
118-
total_amount,
119-
&mut drive_operations,
120-
platform_version,
121-
)?;
122-
123-
let end_transaction_index = start_transaction_index + withdrawal_transactions_count as u64;
124-
125-
self.drive
126-
.add_update_next_withdrawal_transaction_index_operation(
127-
end_transaction_index,
128-
&mut drive_operations,
129-
platform_version,
130-
)?;
131-
132-
tracing::debug!(
133-
"Pooled {} withdrawal documents into {} transactions with indices from {} to {}",
134-
documents_to_process.len(),
135-
withdrawal_transactions_count,
136-
start_transaction_index,
137-
end_transaction_index,
138-
);
139-
140-
let withdrawals_contract = self.drive.cache.system_data_contracts.load_withdrawals();
141-
142-
self.drive.add_update_multiple_documents_operations(
143-
&documents_to_process,
144-
&withdrawals_contract,
145-
withdrawals_contract
146-
.document_type_for_name(withdrawal::NAME)
147-
.map_err(|_| {
148-
Error::Execution(ExecutionError::CorruptedCodeExecution(
149-
"Can't fetch withdrawal data contract",
150-
))
151-
})?,
152-
&mut drive_operations,
153-
&platform_version.drive,
154-
)?;
155-
156-
self.drive.apply_drive_operations(
157-
drive_operations,
158-
true,
159-
block_info,
160-
transaction,
161-
platform_version,
162-
None,
163-
)?;
164-
165-
Ok(())
47+
// Just use the v1 as to not duplicate code
48+
self.pool_withdrawals_into_transactions_queue_v1(block_info, transaction, platform_version)
16649
}
16750
}
16851

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use crate::error::execution::ExecutionError;
2+
use crate::error::Error;
3+
use crate::platform_types::platform::Platform;
4+
5+
use crate::rpc::core::CoreRPCLike;
6+
use dpp::block::block_info::BlockInfo;
7+
8+
use crate::platform_types::platform_state::PlatformState;
9+
use dpp::version::PlatformVersion;
10+
use drive::grovedb::Transaction;
11+
12+
mod v0;
13+
mod v1;
14+
15+
impl<C> Platform<C>
16+
where
17+
C: CoreRPCLike,
18+
{
19+
/// Rebroadcasts expired withdrawal documents if any exist.
20+
///
21+
/// This function attempts to rebroadcast expired withdrawal documents by checking if there are
22+
/// any documents with the status `EXPIRED`. It updates the status of such documents to
23+
/// `BROADCASTED`, increments their revision, and reschedules them for broadcasting.
24+
///
25+
/// # Parameters
26+
/// - `block_info`: Information about the current block (e.g., timestamp).
27+
/// - `transaction`: The transaction within which the rebroadcast should be executed.
28+
/// - `platform_version`: The version of the platform, used to determine the correct method implementation.
29+
///
30+
/// # Returns
31+
/// - `Ok(())` if the rebroadcast process succeeds without issues.
32+
/// - `Err(ExecutionError::UnknownVersionMismatch)` if the platform version is unsupported.
33+
pub fn rebroadcast_expired_withdrawal_documents(
34+
&self,
35+
block_info: &BlockInfo,
36+
last_committed_platform_state: &PlatformState,
37+
transaction: &Transaction,
38+
platform_version: &PlatformVersion,
39+
) -> Result<(), Error> {
40+
match platform_version
41+
.drive_abci
42+
.methods
43+
.withdrawals
44+
.rebroadcast_expired_withdrawal_documents
45+
{
46+
0 => self.rebroadcast_expired_withdrawal_documents_v0(
47+
block_info,
48+
last_committed_platform_state,
49+
transaction,
50+
platform_version,
51+
),
52+
1 => self.rebroadcast_expired_withdrawal_documents_v1(
53+
block_info,
54+
transaction,
55+
platform_version,
56+
),
57+
version => Err(Error::Execution(ExecutionError::UnknownVersionMismatch {
58+
method: "rebroadcast_expired_withdrawal_documents".to_string(),
59+
known_versions: vec![0, 1],
60+
received: version,
61+
})),
62+
}
63+
}
64+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use crate::error::Error;
2+
use crate::platform_types::platform_state::v0::PlatformStateV0Methods;
3+
use crate::platform_types::platform_state::PlatformState;
4+
use crate::{platform_types::platform::Platform, rpc::core::CoreRPCLike};
5+
use dpp::block::block_info::BlockInfo;
6+
use dpp::version::PlatformVersion;
7+
use drive::grovedb::Transaction;
8+
9+
impl<C> Platform<C>
10+
where
11+
C: CoreRPCLike,
12+
{
13+
pub(super) fn rebroadcast_expired_withdrawal_documents_v0(
14+
&self,
15+
block_info: &BlockInfo,
16+
last_committed_platform_state: &PlatformState,
17+
transaction: &Transaction,
18+
platform_version: &PlatformVersion,
19+
) -> Result<(), Error> {
20+
// Currently Core only supports using the first 2 quorums (out of 24 for mainnet).
21+
// For us, we just use the latest quorum to be extra safe.
22+
let Some(position_of_current_quorum) =
23+
last_committed_platform_state.current_validator_set_position_in_list_by_most_recent()
24+
else {
25+
tracing::warn!("Current quorum not in current validator set, not making withdrawals");
26+
return Ok(());
27+
};
28+
if position_of_current_quorum != 0 {
29+
tracing::debug!(
30+
"Current quorum is not most recent, it is in position {}, not making withdrawals",
31+
position_of_current_quorum
32+
);
33+
return Ok(());
34+
}
35+
// Version 1 changes on Version 0, by not having the Core 2 Quorum limit.
36+
// Hence we can just use the v1 here after the extra logic of v0
37+
self.rebroadcast_expired_withdrawal_documents_v1(block_info, transaction, platform_version)
38+
}
39+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use dpp::block::block_info::BlockInfo;
2+
use dpp::data_contract::accessors::v0::DataContractV0Getters;
3+
use dpp::data_contracts::withdrawals_contract::WithdrawalStatus;
4+
use dpp::document::document_methods::DocumentMethodsV0;
5+
use dpp::document::{DocumentV0Getters, DocumentV0Setters};
6+
use dpp::platform_value::btreemap_extensions::BTreeValueMapHelper;
7+
8+
use dpp::system_data_contracts::withdrawals_contract::v1::document_types::withdrawal;
9+
use dpp::version::PlatformVersion;
10+
use std::collections::BTreeSet;
11+
12+
use crate::{
13+
error::{execution::ExecutionError, Error},
14+
platform_types::platform::Platform,
15+
rpc::core::CoreRPCLike,
16+
};
17+
use dpp::withdrawal::WithdrawalTransactionIndex;
18+
use drive::grovedb::Transaction;
19+
use drive::util::batch::DriveOperation;
20+
21+
impl<C> Platform<C>
22+
where
23+
C: CoreRPCLike,
24+
{
25+
/// Version 1 changes on Version 0, by not having the Core 2 Quorum limit.
26+
/// We should switch to Version 1 once Core has fixed the issue
27+
pub(super) fn rebroadcast_expired_withdrawal_documents_v1(
28+
&self,
29+
block_info: &BlockInfo,
30+
transaction: &Transaction,
31+
platform_version: &PlatformVersion,
32+
) -> Result<(), Error> {
33+
let expired_withdrawal_documents_to_retry_signing =
34+
self.drive.fetch_oldest_withdrawal_documents_by_status(
35+
WithdrawalStatus::EXPIRED.into(),
36+
platform_version
37+
.system_limits
38+
.retry_signing_expired_withdrawal_documents_per_block_limit,
39+
transaction.into(),
40+
platform_version,
41+
)?;
42+
43+
if expired_withdrawal_documents_to_retry_signing.is_empty() {
44+
return Ok(());
45+
}
46+
47+
// Collecting unique withdrawal indices of expired documents
48+
let expired_withdrawal_indices: Vec<WithdrawalTransactionIndex> =
49+
expired_withdrawal_documents_to_retry_signing
50+
.iter()
51+
.map(|document| {
52+
document
53+
.properties()
54+
.get_optional_u64(withdrawal::properties::TRANSACTION_INDEX)?
55+
.ok_or(Error::Execution(ExecutionError::CorruptedDriveResponse(
56+
"Can't get transaction index from withdrawal document".to_string(),
57+
)))
58+
})
59+
.collect::<Result<BTreeSet<WithdrawalTransactionIndex>, Error>>()?
60+
.into_iter()
61+
.collect();
62+
63+
let mut drive_operations: Vec<DriveOperation> = vec![];
64+
65+
// Collecting only documents that have been updated
66+
let mut documents_to_update = Vec::new();
67+
68+
for mut document in expired_withdrawal_documents_to_retry_signing {
69+
document.set_u8(
70+
withdrawal::properties::STATUS,
71+
WithdrawalStatus::BROADCASTED as u8,
72+
);
73+
74+
document.set_updated_at(Some(block_info.time_ms));
75+
76+
document.increment_revision().map_err(Error::Protocol)?;
77+
78+
documents_to_update.push(document);
79+
}
80+
81+
if documents_to_update.is_empty() {
82+
return Ok(());
83+
}
84+
85+
self.drive
86+
.move_broadcasted_withdrawal_transactions_back_to_queue_operations(
87+
expired_withdrawal_indices,
88+
&mut drive_operations,
89+
platform_version,
90+
)?;
91+
92+
let withdrawals_contract = self.drive.cache.system_data_contracts.load_withdrawals();
93+
94+
self.drive.add_update_multiple_documents_operations(
95+
&documents_to_update,
96+
&withdrawals_contract,
97+
withdrawals_contract
98+
.document_type_for_name(withdrawal::NAME)
99+
.map_err(|_| {
100+
Error::Execution(ExecutionError::CorruptedCodeExecution(
101+
"Can't fetch withdrawal data contract",
102+
))
103+
})?,
104+
&mut drive_operations,
105+
&platform_version.drive,
106+
)?;
107+
108+
self.drive.apply_drive_operations(
109+
drive_operations,
110+
true,
111+
block_info,
112+
transaction.into(),
113+
platform_version,
114+
None,
115+
)?;
116+
117+
Ok(())
118+
}
119+
}

0 commit comments

Comments
 (0)