Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions polkadot/node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,9 +852,7 @@ impl Initialized {
};
gum::trace!(target: LOG_TARGET, "Loaded recent disputes from db");

let _ = tx.send(
recent_disputes.into_iter().map(|(k, v)| (k.0, k.1, v)).collect::<Vec<_>>(),
);
let _ = tx.send(recent_disputes);
},
DisputeCoordinatorMessage::ActiveDisputes(tx) => {
gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::ActiveDisputes");
Expand All @@ -866,10 +864,7 @@ impl Initialized {

let _ = tx.send(
get_active_with_status(recent_disputes.into_iter(), now)
.map(|((session_idx, candidate_hash), dispute_status)| {
(session_idx, candidate_hash, dispute_status)
})
.collect(),
.collect::<BTreeMap<_, _>>(),
);
},
DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => {
Expand Down
13 changes: 6 additions & 7 deletions polkadot/node/core/dispute-coordinator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
sync::{
atomic::{AtomicU64, Ordering as AtomicOrdering},
Arc,
Expand Down Expand Up @@ -773,10 +773,9 @@ fn too_many_unconfirmed_statements_are_considered_spam() {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
})
.await;

assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
BTreeMap::from([((session, candidate_hash1), DisputeStatus::Active)])
);

let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -910,7 +909,7 @@ fn approval_vote_import_works() {

assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
BTreeMap::from([((session, candidate_hash1), DisputeStatus::Active)])
);

let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -1023,7 +1022,7 @@ fn dispute_gets_confirmed_via_participation() {

assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
BTreeMap::from([((session, candidate_hash1), DisputeStatus::Active)])
);

let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -1168,7 +1167,7 @@ fn dispute_gets_confirmed_at_byzantine_threshold() {

assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Confirmed)]
BTreeMap::from([((session, candidate_hash1), DisputeStatus::Confirmed)])
);

let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -1430,7 +1429,7 @@ fn conflicting_votes_lead_to_dispute_participation() {

assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash, DisputeStatus::Active)]
BTreeMap::from([((session, candidate_hash), DisputeStatus::Active)])
);

let (tx, rx) = oneshot::channel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,16 @@ where
metrics.on_fetched_onchain_disputes(onchain.keys().len() as u64);

gum::trace!(target: LOG_TARGET, ?leaf, "Fetching recent disputes");
let recent_disputes = request_disputes(sender).await;
// Filter out unconfirmed disputes. However if the dispute is already onchain - don't skip it.
// In this case we'd better push as much fresh votes as possible to bring it to conclusion
// faster.
let recent_disputes = request_disputes(sender)
.await
.into_iter()
.filter(|(key, dispute_status)| {
dispute_status.is_confirmed_concluded() || onchain.contains_key(key)
})
.collect::<BTreeMap<_, _>>();
gum::trace!(
target: LOG_TARGET,
?leaf,
Expand All @@ -154,14 +163,6 @@ where

gum::trace!(target: LOG_TARGET, ?leaf, "Filtering recent disputes");

// Filter out unconfirmed disputes. However if the dispute is already onchain - don't skip it.
// In this case we'd better push as much fresh votes as possible to bring it to conclusion
// faster.
let recent_disputes = recent_disputes
.into_iter()
.filter(|d| d.2.is_confirmed_concluded() || onchain.contains_key(&(d.0, d.1)))
.collect::<Vec<_>>();

gum::trace!(target: LOG_TARGET, ?leaf, "Partitioning recent disputes");
let partitioned = partition_recent_disputes(recent_disputes, &onchain);
metrics.on_partition_recent_disputes(&partitioned);
Expand Down Expand Up @@ -337,53 +338,38 @@ fn concluded_onchain(onchain_state: &DisputeState) -> bool {
}

fn partition_recent_disputes(
recent: Vec<(SessionIndex, CandidateHash, DisputeStatus)>,
recent: BTreeMap<(SessionIndex, CandidateHash), DisputeStatus>,
onchain: &HashMap<(SessionIndex, CandidateHash), DisputeState>,
) -> PartitionedDisputes {
let mut partitioned = PartitionedDisputes::new();

// Drop any duplicates
let unique_recent = recent
.into_iter()
.map(|(session_index, candidate_hash, dispute_state)| {
((session_index, candidate_hash), dispute_state)
})
.collect::<HashMap<_, _>>();

// Split recent disputes in ACTIVE and INACTIVE
let time_now = &secs_since_epoch();
let (active, inactive): (
Vec<(SessionIndex, CandidateHash, DisputeStatus)>,
Vec<(SessionIndex, CandidateHash, DisputeStatus)>,
) = unique_recent
.into_iter()
.map(|((session_index, candidate_hash), dispute_state)| {
(session_index, candidate_hash, dispute_state)
})
.partition(|(_, _, status)| !dispute_is_inactive(status, time_now));

// Split ACTIVE in three groups...
for (session_index, candidate_hash, _) in active {
match onchain.get(&(session_index, candidate_hash)) {
Some(d) => match concluded_onchain(d) {
true => partitioned.active_concluded_onchain.push((session_index, candidate_hash)),
false =>
partitioned.active_unconcluded_onchain.push((session_index, candidate_hash)),
},
None => partitioned.active_unknown_onchain.push((session_index, candidate_hash)),
};
}

// ... and INACTIVE in three more
for (session_index, candidate_hash, _) in inactive {
match onchain.get(&(session_index, candidate_hash)) {
Some(onchain_state) =>
if concluded_onchain(onchain_state) {
partitioned.inactive_concluded_onchain.push((session_index, candidate_hash));
} else {
partitioned.inactive_unconcluded_onchain.push((session_index, candidate_hash));
for ((session_index, candidate_hash), dispute_state) in recent {
let key = (session_index, candidate_hash);
if dispute_is_inactive(&dispute_state, time_now) {
match onchain.get(&key) {
Some(onchain_state) =>
if concluded_onchain(onchain_state) {
partitioned
.inactive_concluded_onchain
.push((session_index, candidate_hash));
} else {
partitioned
.inactive_unconcluded_onchain
.push((session_index, candidate_hash));
},
None => partitioned.inactive_unknown_onchain.push((session_index, candidate_hash)),
}
} else {
match onchain.get(&(session_index, candidate_hash)) {
Some(d) => match concluded_onchain(d) {
true =>
partitioned.active_concluded_onchain.push((session_index, candidate_hash)),
false =>
partitioned.active_unconcluded_onchain.push((session_index, candidate_hash)),
},
None => partitioned.inactive_unknown_onchain.push((session_index, candidate_hash)),
None => partitioned.active_unknown_onchain.push((session_index, candidate_hash)),
}
}
}

Expand Down Expand Up @@ -441,7 +427,7 @@ fn is_vote_worth_to_keep(
/// Request disputes identified by `CandidateHash` and the `SessionIndex`.
async fn request_disputes(
sender: &mut impl overseer::ProvisionerSenderTrait,
) -> Vec<(SessionIndex, CandidateHash, DisputeStatus)> {
) -> BTreeMap<(SessionIndex, CandidateHash), DisputeStatus> {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::RecentDisputes(tx);

Expand All @@ -450,7 +436,7 @@ async fn request_disputes(

let recent_disputes = rx.await.unwrap_or_else(|err| {
gum::warn!(target: LOG_TARGET, err=?err, "Unable to gather recent disputes");
Vec::new()
BTreeMap::new()
});
recent_disputes
}
Expand Down
Loading
Loading