Skip to content

Commit 2676e39

Browse files
authored
Merge pull request #1033 from openmina/fix/p2p/webrtc_to_libp2p_propagation
Fix WebRTC to LibP2P propagation
2 parents d548cfb + d8c3cb0 commit 2676e39

19 files changed

+352
-69
lines changed

node/common/src/service/p2p.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -123,28 +123,23 @@ impl P2pCryptoService for NodeService {
123123
fn sign_key(&mut self, key: &[u8; 32]) -> Vec<u8> {
124124
// TODO: make deterministic
125125
let msg = [b"noise-libp2p-static-key:", key.as_slice()].concat();
126-
let sig = self
127-
.p2p
128-
.mio
129-
.keypair()
130-
.sign(&msg)
131-
.expect("unable to create signature");
126+
let sig = self.p2p.sec_key.sign(&msg);
127+
let libp2p_sec_key = libp2p_identity::Keypair::try_from(self.p2p.sec_key.clone()).unwrap();
132128

133129
let mut payload = vec![];
134130
payload.extend_from_slice(b"\x0a\x24");
135-
payload.extend_from_slice(&self.p2p.mio.keypair().public().encode_protobuf());
131+
payload.extend_from_slice(&libp2p_sec_key.public().encode_protobuf());
136132
payload.extend_from_slice(b"\x12\x40");
137-
payload.extend_from_slice(&sig);
133+
payload.extend_from_slice(&sig.to_bytes());
138134
payload
139135
}
140136

141137
fn sign_publication(&mut self, publication: &[u8]) -> Vec<u8> {
142-
let msg = [b"libp2p-pubsub:", publication].concat();
143138
self.p2p
144-
.mio
145-
.keypair()
146-
.sign(&msg)
147-
.expect("unable to create signature")
139+
.sec_key
140+
.libp2p_pubsub_sign(publication)
141+
.to_bytes()
142+
.to_vec()
148143
}
149144

150145
fn verify_publication(

node/src/action_kind.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ pub enum ActionKind {
425425
P2pNetworkPubsubSignError,
426426
P2pNetworkPubsubValidateIncomingMessage,
427427
P2pNetworkPubsubValidateIncomingMessages,
428+
P2pNetworkPubsubWebRtcRebroadcast,
428429
P2pNetworkPubsubEffectfulSign,
429430
P2pNetworkPubsubEffectfulValidateIncomingMessages,
430431
P2pNetworkRpcHeartbeatSend,
@@ -717,7 +718,7 @@ pub enum ActionKind {
717718
}
718719

719720
impl ActionKind {
720-
pub const COUNT: u16 = 607;
721+
pub const COUNT: u16 = 608;
721722
}
722723

723724
impl std::fmt::Display for ActionKind {
@@ -1961,6 +1962,7 @@ impl ActionKindGet for P2pNetworkPubsubAction {
19611962
}
19621963
Self::Graft { .. } => ActionKind::P2pNetworkPubsubGraft,
19631964
Self::Prune { .. } => ActionKind::P2pNetworkPubsubPrune,
1965+
Self::WebRtcRebroadcast { .. } => ActionKind::P2pNetworkPubsubWebRtcRebroadcast,
19641966
Self::Broadcast { .. } => ActionKind::P2pNetworkPubsubBroadcast,
19651967
Self::Sign { .. } => ActionKind::P2pNetworkPubsubSign,
19661968
Self::SignError { .. } => ActionKind::P2pNetworkPubsubSignError,

node/src/snark_pool/snark_pool_reducer.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -202,15 +202,13 @@ impl SnarkPoolState {
202202
}
203203
}
204204

205-
// TODO: we only rebroadcast locally produced snarks here.
206-
// libp2p logic already broadcasts everything right now and doesn't
205+
// TODO: libp2p logic already broadcasts everything right now and doesn't
207206
// wait for validation, thad needs to be fixed. See #952
208-
if *is_sender_local {
209-
dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
210-
snark: snark.clone(),
211-
nonce: 0,
212-
});
213-
}
207+
dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
208+
snark: snark.clone(),
209+
nonce: 0,
210+
is_local: *is_sender_local,
211+
});
214212
}
215213
SnarkPoolAction::P2pSendAll { .. } => {
216214
let (dispatcher, global_state) = state_context.into_dispatcher_and_state();

node/src/transaction_pool/transaction_pool_actions.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ pub enum TransactionPoolAction {
6969
Rebroadcast {
7070
accepted: Vec<ValidCommandWithHash>,
7171
rejected: Vec<(ValidCommandWithHash, diff::Error)>,
72+
is_local: bool,
7273
},
7374
CollectTransactionsByFee,
7475
#[action_event(level = trace)]
@@ -115,9 +116,9 @@ impl redux::EnablingCondition<crate::State> for TransactionPoolAction {
115116
last_index,
116117
)
117118
}),
118-
TransactionPoolAction::Rebroadcast { accepted, rejected } => {
119-
!(accepted.is_empty() && rejected.is_empty())
120-
}
119+
TransactionPoolAction::Rebroadcast {
120+
accepted, rejected, ..
121+
} => !(accepted.is_empty() && rejected.is_empty()),
121122
_ => true,
122123
}
123124
}

node/src/transaction_pool/transaction_pool_reducer.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -300,11 +300,14 @@ impl TransactionPoolState {
300300
if let Some(rpc_action) = rpc_action {
301301
dispatcher.push(rpc_action);
302302
}
303-
// TODO: we only rebroadcast locally injected transactions here.
304-
// libp2p logic already broadcasts everything right now and doesn't
303+
// TODO: libp2p logic already broadcasts everything right now and doesn't
305304
// wait for validation, thad needs to be fixed. See #952
306-
if is_sender_local && was_accepted {
307-
dispatcher.push(TransactionPoolAction::Rebroadcast { accepted, rejected });
305+
if was_accepted {
306+
dispatcher.push(TransactionPoolAction::Rebroadcast {
307+
accepted,
308+
rejected,
309+
is_local: is_sender_local,
310+
});
308311
}
309312
}
310313
TransactionPoolAction::ApplyTransitionFrontierDiff {
@@ -372,7 +375,11 @@ impl TransactionPoolState {
372375
);
373376
}
374377
}
375-
TransactionPoolAction::Rebroadcast { accepted, rejected } => {
378+
TransactionPoolAction::Rebroadcast {
379+
accepted,
380+
rejected,
381+
is_local,
382+
} => {
376383
let rejected = rejected.iter().map(|(cmd, _)| cmd.data.forget_check());
377384

378385
let all_commands = accepted
@@ -387,6 +394,7 @@ impl TransactionPoolState {
387394
dispatcher.push(P2pChannelsTransactionAction::Libp2pBroadcast {
388395
transaction: Box::new((&cmd).into()),
389396
nonce: 0,
397+
is_local: *is_local,
390398
});
391399
}
392400
}

node/src/transition_frontier/sync/transition_frontier_sync_effects.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -322,11 +322,6 @@ impl TransitionFrontierSyncAction {
322322
if !store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit) {
323323
store.dispatch(TransitionFrontierSyncAction::BlocksSuccess);
324324
}
325-
326-
// TODO this should be handled by a callback
327-
store.dispatch(P2pNetworkPubsubAction::BroadcastValidatedMessage {
328-
message_id: p2p::BroadcastMessageId::BlockHash { hash: hash.clone() },
329-
});
330325
}
331326
TransitionFrontierSyncAction::BlocksSuccess => {}
332327
// Bootstrap/Catchup is practically complete at this point.

node/src/transition_frontier/transition_frontier_effects.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
use mina_p2p_messages::gossip::GossipNetMessageV2;
12
use redux::Timestamp;
23

34
use crate::block_producer::BlockProducerAction;
45
use crate::consensus::ConsensusAction;
56
use crate::ledger::LEDGER_DEPTH;
67
use crate::p2p::channels::best_tip::P2pChannelsBestTipAction;
8+
use crate::p2p::P2pNetworkPubsubAction;
79
use crate::snark_pool::{SnarkPoolAction, SnarkWork};
810
use crate::stats::sync::SyncingLedger;
911
use crate::{Store, TransactionPoolAction};
@@ -305,6 +307,18 @@ fn synced_effects<S: crate::Service>(
305307
best_tip: best_tip.block.clone(),
306308
});
307309
}
310+
// TODO this should be handled by a callback
311+
// If this get dispatched, we received block from libp2p.
312+
if !store.dispatch(P2pNetworkPubsubAction::BroadcastValidatedMessage {
313+
message_id: p2p::BroadcastMessageId::BlockHash {
314+
hash: best_tip.hash().clone(),
315+
},
316+
}) {
317+
// Otherwise block was received from WebRTC so inject it in libp2p.
318+
store.dispatch(P2pNetworkPubsubAction::WebRtcRebroadcast {
319+
message: GossipNetMessageV2::NewState(best_tip.block().clone()),
320+
});
321+
}
308322

309323
let best_tip_hash = best_tip.merkle_root_hash().clone();
310324
store.dispatch(ConsensusAction::Prune);

p2p/src/channels/snark/p2p_channels_snark_actions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub enum P2pChannelsSnarkAction {
5454
Libp2pBroadcast {
5555
snark: Snark,
5656
nonce: u32,
57+
is_local: bool,
5758
},
5859
}
5960

p2p/src/channels/snark/p2p_channels_snark_reducer.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,18 +210,27 @@ impl P2pChannelsSnarkState {
210210
}
211211
Ok(())
212212
}
213+
#[cfg(not(feature = "p2p-libp2p"))]
214+
P2pChannelsSnarkAction::Libp2pBroadcast { .. } => Ok(()),
213215
#[cfg(feature = "p2p-libp2p")]
214-
P2pChannelsSnarkAction::Libp2pBroadcast { snark, nonce } => {
216+
P2pChannelsSnarkAction::Libp2pBroadcast {
217+
snark,
218+
nonce,
219+
is_local,
220+
} => {
215221
let dispatcher = state_context.into_dispatcher();
216222
let message = Box::new((snark.statement(), (&snark).into()));
217223
let message = v2::NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(message);
218224
let nonce = nonce.into();
219225
let message = GossipNetMessageV2::SnarkPoolDiff { message, nonce };
220-
dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
226+
if is_local {
227+
dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
228+
} else {
229+
// rebroadcast snark if received from webrtc network, otherwise noop.
230+
dispatcher.push(P2pNetworkPubsubAction::WebRtcRebroadcast { message });
231+
}
221232
Ok(())
222233
}
223-
#[cfg(not(feature = "p2p-libp2p"))]
224-
P2pChannelsSnarkAction::Libp2pBroadcast { .. } => Ok(()),
225234
P2pChannelsSnarkAction::Libp2pReceived { peer_id, snark, .. } => {
226235
let (dispatcher, state) = state_context.into_dispatcher_and_state();
227236
let p2p_state: &P2pState = state.substate()?;

p2p/src/channels/transaction/p2p_channels_transaction_actions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub enum P2pChannelsTransactionAction {
5252
Libp2pBroadcast {
5353
transaction: Box<Transaction>,
5454
nonce: u32,
55+
is_local: bool,
5556
},
5657
}
5758

0 commit comments

Comments
 (0)