Skip to content

Commit c464ace

Browse files
committed
Review fixes
1 parent 89735e3 commit c464ace

File tree

9 files changed

+159
-73
lines changed

9 files changed

+159
-73
lines changed

node/src/action_kind.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ pub enum ActionKind {
391391
P2pNetworkPnetEffectfulOutgoingData,
392392
P2pNetworkPnetEffectfulSetupNonce,
393393
P2pNetworkPubsubBroadcast,
394+
P2pNetworkPubsubBroadcastMessage,
394395
P2pNetworkPubsubBroadcastSigned,
395396
P2pNetworkPubsubBroadcastValidatedMessage,
396397
P2pNetworkPubsubGraft,
@@ -719,7 +720,7 @@ pub enum ActionKind {
719720
}
720721

721722
impl ActionKind {
722-
pub const COUNT: u16 = 609;
723+
pub const COUNT: u16 = 610;
723724
}
724725

725726
impl std::fmt::Display for ActionKind {
@@ -2003,6 +2004,7 @@ impl ActionKindGet for P2pNetworkPubsubAction {
20032004
Self::BroadcastValidatedMessage { .. } => {
20042005
ActionKind::P2pNetworkPubsubBroadcastValidatedMessage
20052006
}
2007+
Self::BroadcastMessage { .. } => ActionKind::P2pNetworkPubsubBroadcastMessage,
20062008
}
20072009
}
20082010
}

node/src/snark_pool/candidate/snark_pool_candidate_actions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub enum SnarkPoolCandidateAction {
4949
WorkVerifyError {
5050
peer_id: PeerId,
5151
verify_id: SnarkWorkVerifyId,
52-
batch: Vec<Snark>,
52+
batch: Vec<SnarkJobId>,
5353
},
5454
WorkVerifySuccess {
5555
peer_id: PeerId,

node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::BTreeMap;
22

33
use crate::{p2p_ready, SnarkPoolAction};
4-
use openmina_core::snark::Snark;
4+
use openmina_core::snark::{Snark, SnarkJobId};
55
use p2p::{
66
channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
77
disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
@@ -124,7 +124,7 @@ impl SnarkPoolCandidatesState {
124124
}
125125
}),
126126
on_error: redux::callback!(
127-
on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<Snark>)) -> crate::Action {
127+
on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<SnarkJobId>)) -> crate::Action {
128128
SnarkPoolCandidateAction::WorkVerifyError {
129129
peer_id: sender.parse().unwrap(),
130130
verify_id: req_id,
@@ -160,10 +160,10 @@ impl SnarkPoolCandidatesState {
160160
reason: P2pDisconnectionReason::SnarkPoolVerifyError,
161161
});
162162

163-
for snark in batch {
163+
for snark_job_id in batch {
164164
dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
165165
message_id: Some(BroadcastMessageId::Snark {
166-
job_id: snark.job_id(),
166+
job_id: snark_job_id.clone(),
167167
}),
168168
peer_id: None,
169169
reason: "Snark work verification failed".to_string(),

p2p/src/network/pubsub/p2p_network_pubsub_actions.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ pub enum P2pNetworkPubsubAction {
182182
BroadcastValidatedMessage {
183183
message_id: BroadcastMessageId,
184184
},
185+
186+
BroadcastMessage {
187+
message_id: P2pNetworkPubsubMessageCacheId,
188+
},
185189
}
186190

187191
impl From<P2pNetworkPubsubAction> for crate::P2pAction {

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 74 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -637,61 +637,35 @@ impl P2pNetworkPubsubState {
637637
Ok(())
638638
}
639639
P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id } => {
640-
let Some((mcache_message_id, message)) =
640+
let Some((mcache_message_id, _)) =
641641
pubsub_state.mcache.get_message_id_and_message(&message_id)
642642
else {
643643
bug_condition!("Message with id: {:?} not found", message_id);
644644
return Ok(());
645645
};
646646

647-
if let BroadcastMessageId::Transaction { tx } = &message_id {
648-
if let P2pNetworkPubsubMessageCacheMessage::PreValidatedTransactions {
649-
tx_hashes,
650-
..
651-
} = message
652-
{
653-
if let Some(value) = tx_hashes.get_mut(tx) {
654-
*value = true;
655-
} else {
656-
bug_condition!("Transaction with hash: {} not found", tx);
657-
return Ok(());
658-
}
647+
match message_id {
648+
BroadcastMessageId::Transaction { tx } => {
649+
pubsub_state.mcache.update_transaction_messages(&tx, true);
650+
let transaction_message_ids =
651+
pubsub_state.mcache.get_validated_transaction_messages();
659652

660-
let all_vertified = tx_hashes.values().all(|v| *v);
661-
if !all_vertified {
662-
return Ok(());
653+
let dispatcher = state_context.into_dispatcher();
654+
for message_id in transaction_message_ids {
655+
dispatcher
656+
.push(P2pNetworkPubsubAction::BroadcastMessage { message_id });
663657
}
664-
} else {
665-
bug_condition!("Invalid state for message id with type transaction");
666-
return Ok(());
658+
659+
Ok(())
660+
}
661+
_ => {
662+
let dispatcher = state_context.into_dispatcher();
663+
dispatcher.push(P2pNetworkPubsubAction::BroadcastMessage {
664+
message_id: mcache_message_id,
665+
});
666+
Ok(())
667667
}
668668
}
669-
670-
let raw_message = message.message().clone();
671-
let peer_id = *message.peer_id();
672-
673-
pubsub_state.reduce_incoming_validated_message(
674-
mcache_message_id,
675-
peer_id,
676-
&raw_message,
677-
);
678-
679-
let Some((_message_id, message)) =
680-
pubsub_state.mcache.get_message_id_and_message(&message_id)
681-
else {
682-
bug_condition!("Message with id: {:?} not found", message_id);
683-
return Ok(());
684-
};
685-
686-
*message = P2pNetworkPubsubMessageCacheMessage::Validated {
687-
message: raw_message,
688-
peer_id,
689-
time: *message.time(),
690-
};
691-
692-
let (dispatcher, state) = state_context.into_dispatcher_and_state();
693-
694-
Self::broadcast(dispatcher, state)
695669
}
696670
P2pNetworkPubsubAction::PruneMessages {} => {
697671
let messages = pubsub_state
@@ -717,25 +691,43 @@ impl P2pNetworkPubsubState {
717691
peer_id,
718692
..
719693
} => {
720-
let mut peer_id = peer_id;
694+
let mut involved_peers = peer_id.into_iter().collect::<Vec<_>>();
695+
let mut add_peer = |peer: &PeerId| {
696+
if !involved_peers.contains(peer) {
697+
involved_peers.push(*peer);
698+
}
699+
};
700+
721701
if let Some(message_id) = message_id {
722-
let Some((_message_id, message)) =
723-
pubsub_state.mcache.get_message_id_and_message(&message_id)
724-
else {
725-
bug_condition!("Message not found for id: {:?}", message_id);
726-
return Ok(());
727-
};
702+
match message_id {
703+
BroadcastMessageId::Transaction { tx } => {
704+
let invalid_messages =
705+
pubsub_state.mcache.get_messages_with_transaction(&tx);
706+
707+
for id in invalid_messages {
708+
let message = pubsub_state.mcache.remove_message(id);
709+
if let Some(message) = message {
710+
add_peer(message.peer_id());
711+
}
712+
}
713+
}
714+
message_id => {
715+
let Some((message_id, message)) =
716+
pubsub_state.mcache.get_message_id_and_message(&message_id)
717+
else {
718+
bug_condition!("Message not found for id: {:?}", message_id);
719+
return Ok(());
720+
};
728721

729-
if peer_id.is_none() {
730-
peer_id = Some(*message.peer_id());
722+
add_peer(message.peer_id());
723+
pubsub_state.mcache.remove_message(message_id);
724+
}
731725
}
732-
733-
pubsub_state.mcache.remove_message(_message_id);
734726
}
735727

736728
let dispatcher = state_context.into_dispatcher();
737729

738-
if let Some(peer_id) = peer_id {
730+
for peer_id in involved_peers {
739731
dispatcher.push(P2pDisconnectionAction::Init {
740732
peer_id,
741733
reason: P2pDisconnectionReason::InvalidMessage,
@@ -745,6 +737,31 @@ impl P2pNetworkPubsubState {
745737
Ok(())
746738
}
747739
P2pNetworkPubsubAction::IgnoreMessage { .. } => Ok(()),
740+
P2pNetworkPubsubAction::BroadcastMessage { message_id } => {
741+
let Some(message) = pubsub_state.mcache.map.get(&message_id) else {
742+
bug_condition!("Message with id: {:?} not found", message_id);
743+
return Ok(());
744+
};
745+
746+
let raw_message = message.message().clone();
747+
let peer_id = *message.peer_id();
748+
749+
pubsub_state.reduce_incoming_validated_message(message_id, peer_id, &raw_message);
750+
751+
let Some(message) = pubsub_state.mcache.map.get_mut(&message_id) else {
752+
bug_condition!("Message with id: {:?} not found", message_id);
753+
return Ok(());
754+
};
755+
756+
*message = P2pNetworkPubsubMessageCacheMessage::Validated {
757+
message: raw_message,
758+
peer_id,
759+
time: *message.time(),
760+
};
761+
762+
let (dispatcher, state) = state_context.into_dispatcher_and_state();
763+
Self::broadcast(dispatcher, state)
764+
}
748765
}
749766
}
750767

p2p/src/network/pubsub/p2p_network_pubsub_state.rs

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,11 +415,15 @@ impl P2pNetworkPubsubMessageCache {
415415
}
416416
}
417417

418-
pub fn remove_message(&mut self, message_id: P2pNetworkPubsubMessageCacheId) {
419-
let _ = self.map.remove(&message_id);
418+
pub fn remove_message(
419+
&mut self,
420+
message_id: P2pNetworkPubsubMessageCacheId,
421+
) -> Option<P2pNetworkPubsubMessageCacheMessage> {
422+
let message = self.map.remove(&message_id);
420423
if let Some(position) = self.queue.iter().position(|id| id == &message_id) {
421424
self.queue.remove(position);
422425
}
426+
message
423427
}
424428

425429
pub fn get_message_from_raw_message_id(
@@ -434,6 +438,64 @@ impl P2pNetworkPubsubMessageCache {
434438
}
435439
})
436440
}
441+
442+
pub fn update_transaction_messages(&mut self, tx: &TransactionHash, set_value: bool) {
443+
self.map.iter_mut().for_each(|(_, value)| {
444+
if let P2pNetworkPubsubMessageCacheMessage::PreValidatedTransactions {
445+
tx_hashes, ..
446+
} = value
447+
{
448+
if let Some(value) = tx_hashes.get_mut(tx) {
449+
*value = set_value;
450+
}
451+
}
452+
});
453+
}
454+
455+
pub fn get_validated_transaction_messages(&self) -> Vec<P2pNetworkPubsubMessageCacheId> {
456+
self.map
457+
.iter()
458+
.filter_map(|(key, value)| {
459+
if let P2pNetworkPubsubMessageCacheMessage::PreValidatedTransactions {
460+
tx_hashes,
461+
..
462+
} = value
463+
{
464+
if tx_hashes.iter().all(|(_, value)| *value) {
465+
Some(*key)
466+
} else {
467+
None
468+
}
469+
} else {
470+
None
471+
}
472+
})
473+
.collect()
474+
}
475+
476+
pub fn get_messages_with_transaction(
477+
&self,
478+
tx: &TransactionHash,
479+
) -> Vec<P2pNetworkPubsubMessageCacheId> {
480+
self.map
481+
.iter()
482+
.filter_map(|(key, value)| {
483+
if let P2pNetworkPubsubMessageCacheMessage::PreValidatedTransactions {
484+
tx_hashes,
485+
..
486+
} = value
487+
{
488+
if tx_hashes.get(tx).is_some() {
489+
Some(*key)
490+
} else {
491+
None
492+
}
493+
} else {
494+
None
495+
}
496+
})
497+
.collect()
498+
}
437499
}
438500

439501
pub fn source_from_message(message: &pb::Message) -> Result<libp2p_identity::PeerId, ParseError> {

snark/src/work_verify/snark_work_verify_actions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use openmina_core::SubstateAccess;
1+
use openmina_core::{snark::SnarkJobId, SubstateAccess};
22
use serde::{Deserialize, Serialize};
33

44
use openmina_core::{snark::Snark, ActionEvent};
@@ -17,7 +17,7 @@ pub enum SnarkWorkVerifyAction {
1717
batch: Vec<Snark>,
1818
sender: String,
1919
on_success: redux::Callback<(SnarkWorkVerifyId, String, Vec<Snark>)>,
20-
on_error: redux::Callback<(SnarkWorkVerifyId, String, Vec<Snark>)>,
20+
on_error: redux::Callback<(SnarkWorkVerifyId, String, Vec<SnarkJobId>)>,
2121
},
2222
Pending {
2323
req_id: SnarkWorkVerifyId,

snark/src/work_verify/snark_work_verify_reducer.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use openmina_core::{bug_condition, Substate, SubstateAccess};
1+
use openmina_core::{bug_condition, snark::Snark, Substate, SubstateAccess};
22
use redux::EnablingCondition;
33

44
use crate::work_verify_effectful::SnarkWorkVerifyEffectfulAction;
@@ -95,15 +95,16 @@ pub fn reducer<State, Action>(
9595
let callback = on_error.clone();
9696
let sender = std::mem::take(sender);
9797
let batch = std::mem::take(batch);
98+
let job_ids = batch.iter().map(Snark::job_id).collect();
9899
*req = SnarkWorkVerifyStatus::Error {
99100
time: meta.time(),
100-
batch: batch.clone(),
101+
batch,
101102
sender: sender.clone(),
102103
error: error.clone(),
103104
};
104105
// Dispatch
105106
let dispatcher = state_context.into_dispatcher();
106-
dispatcher.push_callback(callback, (*req_id, sender, batch));
107+
dispatcher.push_callback(callback, (*req_id, sender, job_ids));
107108
dispatcher.push(SnarkWorkVerifyAction::Finish { req_id: *req_id });
108109
}
109110
SnarkWorkVerifyAction::Success { req_id } => {

snark/src/work_verify/snark_work_verify_state.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use std::sync::Arc;
22

33
use serde::{Deserialize, Serialize};
44

5-
use openmina_core::requests::PendingRequests;
65
use openmina_core::snark::Snark;
6+
use openmina_core::{requests::PendingRequests, snark::SnarkJobId};
77

88
use crate::{TransactionVerifier, VerifierSRS};
99

@@ -50,14 +50,14 @@ pub enum SnarkWorkVerifyStatus {
5050
// `PeerId` here.
5151
sender: String,
5252
on_success: redux::Callback<(SnarkWorkVerifyId, String, Vec<Snark>)>,
53-
on_error: redux::Callback<(SnarkWorkVerifyId, String, Vec<Snark>)>,
53+
on_error: redux::Callback<(SnarkWorkVerifyId, String, Vec<SnarkJobId>)>,
5454
},
5555
Pending {
5656
time: redux::Timestamp,
5757
batch: Vec<Snark>,
5858
sender: String,
5959
on_success: redux::Callback<(SnarkWorkVerifyId, String, Vec<Snark>)>,
60-
on_error: redux::Callback<(SnarkWorkVerifyId, String, Vec<Snark>)>,
60+
on_error: redux::Callback<(SnarkWorkVerifyId, String, Vec<SnarkJobId>)>,
6161
},
6262
Error {
6363
time: redux::Timestamp,

0 commit comments

Comments
 (0)