Skip to content
This repository was archived by the owner on Sep 13, 2022. It is now read-only.

Commit 8ece029

Browse files
authored
fix(sync): Avoid requesting redundant transactions (#259)
* fix(sync): Avoid requesting redundant transactions * remove insert
1 parent be3c139 commit 8ece029

File tree

10 files changed

+47
-164
lines changed

10 files changed

+47
-164
lines changed

Diff for: core/consensus/src/adapter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ where
321321
&self,
322322
ctx: Context,
323323
height: u64,
324-
txs: Vec<Hash>,
324+
txs: Vec<SignedTransaction>,
325325
) -> ProtocolResult<()> {
326326
if let Err(e) = self.mempool.ensure_order_txs_sync(ctx.clone(), txs).await {
327327
log::error!("verify_txs error {:?}", e);

Diff for: core/consensus/src/synchronization.rs

+23-48
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,6 @@ impl<Adapter: SynchronizationAdapter> OverlordSynchronization<Adapter> {
150150
) -> ProtocolResult<()> {
151151
let mut current_consented_height = current_height;
152152

153-
let mut prepared_rich_block: Option<RichBlock> = None;
154-
155153
while current_consented_height < remote_height {
156154
let consenting_height = current_consented_height + 1;
157155
log::info!(
@@ -160,47 +158,28 @@ impl<Adapter: SynchronizationAdapter> OverlordSynchronization<Adapter> {
160158
consenting_height
161159
);
162160

163-
let consenting_rich_block: RichBlock = match prepared_rich_block.as_ref() {
164-
None => self
165-
.get_rich_block_from_remote(ctx.clone(), consenting_height)
166-
.await
167-
.map_err(|e| {
168-
log::error!(
169-
"[synchronization]: get_rich_block_from_remote error, height: {:?}",
170-
consenting_height
171-
);
172-
e
173-
})?,
174-
175-
Some(_) => prepared_rich_block.take().unwrap(),
176-
};
177-
178-
let consenting_proof: Proof = if consenting_height < remote_height {
179-
let proof_block = self
180-
.get_rich_block_from_remote(ctx.clone(), consenting_height + 1)
181-
.await
182-
.map_err(|e| {
183-
log::error!(
184-
"[synchronization]: get_rich_block_from_remote error, height: {:?}",
185-
consenting_height + 1
186-
);
187-
e
188-
})?;
189-
190-
prepared_rich_block = Some(proof_block.clone());
191-
proof_block.block.header.proof
192-
} else {
193-
self.adapter
194-
.get_proof_from_remote(ctx.clone(), consenting_height)
195-
.await
196-
.map_err(|e| {
197-
log::error!(
198-
"[synchronization]: get_proof_from_remote error, height: {:?}",
199-
consenting_height
200-
);
201-
e
202-
})?
203-
};
161+
let consenting_rich_block: RichBlock = self
162+
.get_rich_block_from_remote(ctx.clone(), consenting_height)
163+
.await
164+
.map_err(|e| {
165+
log::error!(
166+
"[synchronization]: get_rich_block_from_remote error, height: {:?}",
167+
consenting_height
168+
);
169+
e
170+
})?;
171+
172+
let consenting_proof: Proof = self
173+
.adapter
174+
.get_proof_from_remote(ctx.clone(), consenting_height)
175+
.await
176+
.map_err(|e| {
177+
log::error!(
178+
"[synchronization]: get_proof_from_remote error, height: {:?}",
179+
consenting_height
180+
);
181+
e
182+
})?;
204183

205184
self.adapter
206185
.verify_block_header(ctx.clone(), consenting_rich_block.block.clone())
@@ -263,11 +242,7 @@ impl<Adapter: SynchronizationAdapter> OverlordSynchronization<Adapter> {
263242
.verify_txs_sync(
264243
ctx.clone(),
265244
consenting_height,
266-
consenting_rich_block
267-
.txs
268-
.iter()
269-
.map(|signed_tx| signed_tx.tx_hash.clone())
270-
.collect(),
245+
consenting_rich_block.txs.clone(),
271246
)
272247
.await
273248
.map_err(|e| {

Diff for: core/consensus/src/tests/synchronization.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,12 @@ impl SynchronizationAdapter for MockCommonConsensusAdapter {
206206
Ok(self.remote_proofs.read().get(&height).unwrap().clone())
207207
}
208208

209-
async fn verify_txs_sync(&self, _: Context, _: u64, _: Vec<Hash>) -> ProtocolResult<()> {
209+
async fn verify_txs_sync(
210+
&self,
211+
_: Context,
212+
_: u64,
213+
_: Vec<SignedTransaction>,
214+
) -> ProtocolResult<()> {
210215
Ok(())
211216
}
212217
}

Diff for: core/mempool/src/adapter/message.rs

+1-42
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use async_trait::async_trait;
44
use futures::future::{try_join_all, TryFutureExt};
55
use protocol::{
6-
traits::{Context, MemPool, MessageHandler, Priority, Rpc, Storage},
6+
traits::{Context, MemPool, MessageHandler, Priority, Rpc},
77
types::{Hash, SignedTransaction},
88
};
99
use serde_derive::{Deserialize, Serialize};
@@ -13,7 +13,6 @@ use crate::context::TxContext;
1313
pub const END_GOSSIP_NEW_TXS: &str = "/gossip/mempool/new_txs";
1414
pub const RPC_PULL_TXS: &str = "/rpc_call/mempool/pull_txs";
1515
pub const RPC_RESP_PULL_TXS: &str = "/rpc_resp/mempool/pull_txs";
16-
pub const RPC_PULL_TXS_SYNC: &str = "/rpc_call/mempool/pull_txs_sync";
1716
pub const RPC_RESP_PULL_TXS_SYNC: &str = "/rpc_resp/mempool/pull_txs_sync";
1817

1918
#[derive(Debug, Serialize, Deserialize)]
@@ -121,43 +120,3 @@ where
121120
.await;
122121
}
123122
}
124-
125-
pub struct PullTxsSyncHandler<N, M> {
126-
network: Arc<N>,
127-
storage: Arc<M>,
128-
}
129-
130-
impl<N, M> PullTxsSyncHandler<N, M>
131-
where
132-
N: Rpc + 'static,
133-
M: Storage + 'static,
134-
{
135-
pub fn new(network: Arc<N>, storage: Arc<M>) -> Self {
136-
PullTxsSyncHandler { network, storage }
137-
}
138-
}
139-
140-
#[async_trait]
141-
impl<N, M> MessageHandler for PullTxsSyncHandler<N, M>
142-
where
143-
N: Rpc + 'static,
144-
M: Storage + 'static,
145-
{
146-
type Message = MsgPullTxs;
147-
148-
async fn process(&self, ctx: Context, msg: Self::Message) {
149-
let futs = msg
150-
.hashes
151-
.into_iter()
152-
.map(|tx_hash| self.storage.get_transaction_by_hash(tx_hash))
153-
.collect::<Vec<_>>();
154-
let ret = try_join_all(futs)
155-
.await
156-
.map(|sig_txs| MsgPushTxs { sig_txs });
157-
158-
self.network
159-
.response(ctx, RPC_RESP_PULL_TXS_SYNC, ret, Priority::High)
160-
.unwrap_or_else(move |e| log::warn!("[core_mempool] push txs {}", e))
161-
.await;
162-
}
163-
}

Diff for: core/mempool/src/adapter/mod.rs

+1-16
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use protocol::{
3030
};
3131

3232
use crate::adapter::message::{
33-
MsgNewTxs, MsgPullTxs, MsgPushTxs, END_GOSSIP_NEW_TXS, RPC_PULL_TXS, RPC_PULL_TXS_SYNC,
33+
MsgNewTxs, MsgPullTxs, MsgPushTxs, END_GOSSIP_NEW_TXS, RPC_PULL_TXS,
3434
};
3535
use crate::MemPoolError;
3636

@@ -211,21 +211,6 @@ where
211211
Ok(resp_msg.sig_txs)
212212
}
213213

214-
async fn pull_txs_sync(
215-
&self,
216-
ctx: Context,
217-
tx_hashes: Vec<Hash>,
218-
) -> ProtocolResult<Vec<SignedTransaction>> {
219-
let pull_msg = MsgPullTxs { hashes: tx_hashes };
220-
221-
let resp_msg = self
222-
.network
223-
.call::<MsgPullTxs, MsgPushTxs>(ctx, RPC_PULL_TXS_SYNC, pull_msg, Priority::High)
224-
.await?;
225-
226-
Ok(resp_msg.sig_txs)
227-
}
228-
229214
async fn broadcast_tx(&self, _ctx: Context, stx: SignedTransaction) -> ProtocolResult<()> {
230215
self.stx_tx
231216
.unbounded_send(stx)

Diff for: core/mempool/src/lib.rs

+12-31
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ mod tests;
88
mod tx_cache;
99

1010
pub use adapter::message::{
11-
MsgPushTxs, NewTxsHandler, PullTxsHandler, PullTxsSyncHandler, END_GOSSIP_NEW_TXS,
12-
RPC_PULL_TXS, RPC_PULL_TXS_SYNC, RPC_RESP_PULL_TXS, RPC_RESP_PULL_TXS_SYNC,
11+
MsgPushTxs, NewTxsHandler, PullTxsHandler, END_GOSSIP_NEW_TXS, RPC_PULL_TXS, RPC_RESP_PULL_TXS,
12+
RPC_RESP_PULL_TXS_SYNC,
1313
};
1414
pub use adapter::DefaultMemPoolAdapter;
1515
pub use adapter::{DEFAULT_BROADCAST_TXS_INTERVAL, DEFAULT_BROADCAST_TXS_SIZE};
@@ -227,37 +227,18 @@ where
227227
async fn ensure_order_txs_sync(
228228
&self,
229229
ctx: Context,
230-
order_tx_hashes: Vec<Hash>,
230+
order_txs: Vec<SignedTransaction>,
231231
) -> ProtocolResult<()> {
232-
let unknown_hashes = self.show_unknown_txs(order_tx_hashes);
233-
if !unknown_hashes.is_empty() {
234-
let unknown_len = unknown_hashes.len();
235-
let txs = self
236-
.adapter
237-
.pull_txs_sync(ctx.clone(), unknown_hashes)
232+
for signed_tx in order_txs.into_iter() {
233+
self.adapter
234+
.check_signature(ctx.clone(), signed_tx.clone())
235+
.await?;
236+
self.adapter
237+
.check_transaction(ctx.clone(), signed_tx.clone())
238+
.await?;
239+
self.adapter
240+
.check_storage_exist(ctx.clone(), signed_tx.tx_hash.clone())
238241
.await?;
239-
// Make sure response signed_txs is the same size of request hashes.
240-
if txs.len() != unknown_len {
241-
return Err(MemPoolError::EnsureBreak {
242-
require: unknown_len,
243-
response: txs.len(),
244-
}
245-
.into());
246-
}
247-
248-
for signed_tx in txs.into_iter() {
249-
self.adapter
250-
.check_signature(ctx.clone(), signed_tx.clone())
251-
.await?;
252-
self.adapter
253-
.check_transaction(ctx.clone(), signed_tx.clone())
254-
.await?;
255-
self.adapter
256-
.check_storage_exist(ctx.clone(), signed_tx.tx_hash.clone())
257-
.await?;
258-
self.callback_cache
259-
.insert(signed_tx.tx_hash.clone(), signed_tx);
260-
}
261242
}
262243

263244
Ok(())

Diff for: core/mempool/src/tests/mod.rs

-8
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,6 @@ impl MemPoolAdapter for HashMemPoolAdapter {
6161
Ok(vec)
6262
}
6363

64-
async fn pull_txs_sync(
65-
&self,
66-
_: Context,
67-
_: Vec<Hash>,
68-
) -> ProtocolResult<Vec<SignedTransaction>> {
69-
Ok(Vec::new())
70-
}
71-
7264
async fn broadcast_tx(&self, _ctx: Context, tx: SignedTransaction) -> ProtocolResult<()> {
7365
self.network_txs.insert(tx.tx_hash.clone(), tx);
7466
Ok(())

Diff for: protocol/src/traits/consensus.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pub trait SynchronizationAdapter: CommonConsensusAdapter + Send + Sync {
7979
&self,
8080
ctx: Context,
8181
height: u64,
82-
txs: Vec<Hash>,
82+
txs: Vec<SignedTransaction>,
8383
) -> ProtocolResult<()>;
8484
}
8585

Diff for: protocol/src/traits/mempool.rs

+1-7
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub trait MemPool: Send + Sync {
4444
async fn ensure_order_txs_sync(
4545
&self,
4646
ctx: Context,
47-
order_tx_hashes: Vec<Hash>,
47+
order_txs: Vec<SignedTransaction>,
4848
) -> ProtocolResult<()>;
4949

5050
async fn sync_propose_txs(
@@ -64,12 +64,6 @@ pub trait MemPoolAdapter: Send + Sync {
6464
tx_hashes: Vec<Hash>,
6565
) -> ProtocolResult<Vec<SignedTransaction>>;
6666

67-
async fn pull_txs_sync(
68-
&self,
69-
ctx: Context,
70-
tx_hashes: Vec<Hash>,
71-
) -> ProtocolResult<Vec<SignedTransaction>>;
72-
7367
async fn broadcast_tx(&self, ctx: Context, tx: SignedTransaction) -> ProtocolResult<()>;
7468

7569
async fn check_signature(&self, ctx: Context, tx: SignedTransaction) -> ProtocolResult<()>;

Diff for: src/default_start.rs

+1-9
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ use core_consensus::{
3030
};
3131
use core_mempool::{
3232
DefaultMemPoolAdapter, HashMemPool, MsgPushTxs, NewTxsHandler, PullTxsHandler,
33-
PullTxsSyncHandler, END_GOSSIP_NEW_TXS, RPC_PULL_TXS, RPC_PULL_TXS_SYNC, RPC_RESP_PULL_TXS,
34-
RPC_RESP_PULL_TXS_SYNC,
33+
END_GOSSIP_NEW_TXS, RPC_PULL_TXS, RPC_RESP_PULL_TXS, RPC_RESP_PULL_TXS_SYNC,
3534
};
3635
use core_network::{NetworkConfig, NetworkService};
3736
use core_storage::{adapter::rocks::RocksAdapter, ImplStorage};
@@ -266,13 +265,6 @@ pub async fn start<Mapping: 'static + ServiceMapping>(
266265
)?;
267266
network_service.register_rpc_response::<MsgPushTxs>(RPC_RESP_PULL_TXS)?;
268267

269-
network_service.register_endpoint_handler(
270-
RPC_PULL_TXS_SYNC,
271-
Box::new(PullTxsSyncHandler::new(
272-
Arc::new(network_service.handle()),
273-
Arc::clone(&storage),
274-
)),
275-
)?;
276268
network_service.register_rpc_response::<MsgPushTxs>(RPC_RESP_PULL_TXS_SYNC)?;
277269

278270
// Init Consensus

0 commit comments

Comments
 (0)