Skip to content

Commit 0f0993f

Browse files
committed
fix orphan issues
1 parent 7ea6dae commit 0f0993f

File tree

5 files changed

+252
-25
lines changed

5 files changed

+252
-25
lines changed

test/src/main.rs

+3
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,9 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
426426
Box::new(DeclaredWrongCyclesChunk),
427427
Box::new(DeclaredWrongCyclesAndRelayAgain),
428428
Box::new(OrphanTxAccepted),
429+
Box::new(TxPoolOrphanNormal),
430+
Box::new(TxPoolOrphanReverse),
431+
Box::new(TxPoolOrphanUnordered),
429432
Box::new(OrphanTxRejected),
430433
Box::new(GetRawTxPool),
431434
Box::new(PoolReconcile),

test/src/node.rs

+20-3
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,13 @@ impl Node {
346346
self.submit_transaction(&self.new_transaction_spend_tip_cellbase())
347347
}
348348

349+
// generate a transaction which spend tip block's cellbase and capacity
350+
pub fn new_transaction_with_capacity(&self, capacity: Capacity) -> TransactionView {
351+
let block = self.get_tip_block();
352+
let cellbase = &block.transactions()[0];
353+
self.new_transaction_with_since_capacity(cellbase.hash(), 0, capacity)
354+
}
355+
349356
// generate a transaction which spend tip block's cellbase
350357
pub fn new_transaction_spend_tip_cellbase(&self) -> TransactionView {
351358
let block = self.get_tip_block();
@@ -539,11 +546,12 @@ impl Node {
539546
self.new_transaction_with_since_capacity(hash, since, capacity_bytes!(100))
540547
}
541548

542-
pub fn new_transaction_with_since_capacity(
549+
pub fn new_transaction_with_capacity_and_index(
543550
&self,
544551
hash: Byte32,
545-
since: u64,
546552
capacity: Capacity,
553+
index: u32,
554+
since: u64,
547555
) -> TransactionView {
548556
let always_success_cell_dep = self.always_success_cell_dep();
549557
let always_success_script = self.always_success_script();
@@ -557,10 +565,19 @@ impl Node {
557565
.build(),
558566
)
559567
.output_data(Default::default())
560-
.input(CellInput::new(OutPoint::new(hash, 0), since))
568+
.input(CellInput::new(OutPoint::new(hash, index), since))
561569
.build()
562570
}
563571

572+
pub fn new_transaction_with_since_capacity(
573+
&self,
574+
hash: Byte32,
575+
since: u64,
576+
capacity: Capacity,
577+
) -> TransactionView {
578+
self.new_transaction_with_capacity_and_index(hash, capacity, 0, since)
579+
}
580+
564581
pub fn new_always_failure_transaction(&self, hash: Byte32) -> TransactionView {
565582
let always_failure_cell_dep = self.always_failure_cell_dep();
566583
let always_failure_script = self.always_failure_script();

test/src/specs/tx_pool/orphan_tx.rs

+205
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ use crate::utils::wait_until;
33
use crate::{Net, Node, Spec};
44
use ckb_jsonrpc_types::Status;
55
use ckb_network::SupportProtocols;
6+
use ckb_types::core::{capacity_bytes, Capacity, TransactionBuilder, TransactionView};
7+
use ckb_types::packed::CellOutputBuilder;
8+
use ckb_types::{
9+
packed::{CellInput, OutPoint},
10+
prelude::*,
11+
};
612

713
const ALWAYS_SUCCESS_SCRIPT_CYCLE: u64 = 537;
814
// always_failure, as the name implies, so it doesn't matter what the cycles are
@@ -97,3 +103,202 @@ impl Spec for OrphanTxRejected {
97103
assert!(matches!(ret.tx_status.status, Status::Rejected));
98104
}
99105
}
106+
107+
// construct a tx chain with such structure:
108+
//
109+
// parent
110+
// |
111+
// tx1
112+
// / | \
113+
// tx11 tx12 tx13
114+
// \ | /
115+
// final_tx
116+
//
117+
fn build_tx_chain(
118+
node0: &Node,
119+
) -> (
120+
Net,
121+
(
122+
TransactionView,
123+
TransactionView,
124+
TransactionView,
125+
TransactionView,
126+
TransactionView,
127+
TransactionView,
128+
),
129+
) {
130+
node0.mine_until_out_bootstrap_period();
131+
let parent = node0.new_transaction_with_capacity(capacity_bytes!(800));
132+
133+
let script = node0.always_success_script();
134+
let new_output1 = CellOutputBuilder::default()
135+
.capacity(capacity_bytes!(200).pack())
136+
.lock(script.clone())
137+
.build();
138+
let new_output2 = new_output1.clone();
139+
let new_output3 = new_output1.clone();
140+
141+
let tx1 = parent
142+
.as_advanced_builder()
143+
.set_inputs(vec![CellInput::new(OutPoint::new(parent.hash(), 0), 0)])
144+
.set_outputs(vec![new_output1, new_output2, new_output3])
145+
.set_outputs_data(vec![Default::default(); 3])
146+
.build();
147+
148+
let tx11 =
149+
node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 0, 0);
150+
let tx12 =
151+
node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 1, 0);
152+
let tx13 =
153+
node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 2, 0);
154+
155+
let cell_dep = node0.always_success_cell_dep();
156+
let final_output = CellOutputBuilder::default()
157+
.capacity(capacity_bytes!(80).pack())
158+
.lock(script)
159+
.build();
160+
let final_tx = TransactionBuilder::default()
161+
.cell_dep(cell_dep)
162+
.set_inputs(vec![
163+
CellInput::new(OutPoint::new(tx11.hash(), 0), 0),
164+
CellInput::new(OutPoint::new(tx12.hash(), 0), 0),
165+
CellInput::new(OutPoint::new(tx13.hash(), 0), 0),
166+
])
167+
.set_outputs(vec![final_output])
168+
.set_outputs_data(vec![Default::default(); 1])
169+
.build();
170+
171+
let mut net = Net::new(
172+
"orphan_tx_test",
173+
node0.consensus(),
174+
vec![SupportProtocols::RelayV3],
175+
);
176+
net.connect(node0);
177+
178+
(net, (parent, tx1, tx11, tx12, tx13, final_tx))
179+
}
180+
181+
fn run_replay_tx(
182+
net: &Net,
183+
node0: &Node,
184+
tx: TransactionView,
185+
orphan_tx_cnt: u64,
186+
pending_cnt: u64,
187+
) -> bool {
188+
relay_tx(net, node0, tx, ALWAYS_SUCCESS_SCRIPT_CYCLE);
189+
190+
wait_until(5, || {
191+
let tx_pool_info = node0.get_tip_tx_pool_info();
192+
tx_pool_info.orphan.value() == orphan_tx_cnt && tx_pool_info.pending.value() == pending_cnt
193+
})
194+
}
195+
196+
pub struct TxPoolOrphanNormal;
197+
impl Spec for TxPoolOrphanNormal {
198+
fn run(&self, nodes: &mut Vec<Node>) {
199+
let node0 = &nodes[0];
200+
let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(node0);
201+
202+
assert!(
203+
run_replay_tx(&net, node0, parent, 0, 1),
204+
"parent sended expect nothing in orphan pool"
205+
);
206+
assert!(
207+
run_replay_tx(&net, node0, tx1, 0, 2),
208+
"tx1 is send expect nothing in orphan pool"
209+
);
210+
assert!(
211+
run_replay_tx(&net, node0, tx11, 0, 3),
212+
"tx11 is send expect nothing in orphan pool"
213+
);
214+
assert!(
215+
run_replay_tx(&net, node0, tx12, 0, 4),
216+
"tx12 is send expect nothing in orphan pool"
217+
);
218+
assert!(
219+
run_replay_tx(&net, node0, tx13, 0, 5),
220+
"tx13 is send expect nothing in orphan pool"
221+
);
222+
assert!(
223+
run_replay_tx(&net, node0, final_tx, 0, 6),
224+
"final_tx is send expect nothing in orphan pool"
225+
);
226+
}
227+
}
228+
229+
pub struct TxPoolOrphanReverse;
230+
impl Spec for TxPoolOrphanReverse {
231+
fn run(&self, nodes: &mut Vec<Node>) {
232+
let node0 = &nodes[0];
233+
let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(node0);
234+
235+
assert!(
236+
run_replay_tx(&net, node0, final_tx, 1, 0),
237+
"expect final_tx is in orphan pool"
238+
);
239+
240+
assert!(
241+
run_replay_tx(&net, node0, tx13, 2, 0),
242+
"tx13 in orphan pool"
243+
);
244+
assert!(
245+
run_replay_tx(&net, node0, tx12, 3, 0),
246+
"tx12 is in orphan pool"
247+
);
248+
assert!(run_replay_tx(&net, node0, tx11, 4, 0), "tx11 is in orphan");
249+
250+
assert!(run_replay_tx(&net, node0, tx1, 5, 0), "tx1 is in orphan");
251+
252+
assert!(
253+
run_replay_tx(&net, node0, parent, 0, 6),
254+
"all is in pending"
255+
);
256+
}
257+
}
258+
259+
pub struct TxPoolOrphanUnordered;
260+
impl Spec for TxPoolOrphanUnordered {
261+
fn run(&self, nodes: &mut Vec<Node>) {
262+
let node0 = &nodes[0];
263+
let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(node0);
264+
265+
assert!(
266+
run_replay_tx(&net, node0, final_tx, 1, 0),
267+
"expect final_tx is in orphan pool"
268+
);
269+
270+
assert!(
271+
run_replay_tx(&net, node0, tx11, 2, 0),
272+
"tx11 in orphan pool"
273+
);
274+
let tx12_clone = tx12.clone();
275+
assert!(
276+
run_replay_tx(&net, node0, tx12, 3, 0),
277+
"tx12 is in orphan pool"
278+
);
279+
280+
// set tx12_clone with rpc
281+
let ret = node0
282+
.rpc_client()
283+
.send_transaction_result(tx12_clone.data().into());
284+
assert!(ret
285+
.err()
286+
.unwrap()
287+
.to_string()
288+
.contains("already exist in transaction_pool"));
289+
290+
assert!(
291+
run_replay_tx(&net, node0, parent, 3, 1),
292+
"parent is sent, should be in pending without change orphan pool"
293+
);
294+
assert!(
295+
run_replay_tx(&net, node0, tx1, 1, 4),
296+
"tx1 is send, orphan pool only contains final_tx"
297+
);
298+
299+
assert!(
300+
run_replay_tx(&net, node0, tx13, 0, 6),
301+
"tx13 is send, orphan pool is empty"
302+
);
303+
}
304+
}

tx-pool/src/component/orphan.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,10 @@ impl OrphanPool {
135135
self.limit_size();
136136
}
137137

138-
pub fn find_by_previous(&self, tx: &TransactionView) -> Option<ProposalShortId> {
138+
pub fn find_by_previous(&self, tx: &TransactionView) -> Vec<ProposalShortId> {
139139
tx.output_pts()
140140
.iter()
141-
.find_map(|out_point| self.by_out_point.get(out_point).cloned())
141+
.filter_map(|out_point| self.by_out_point.get(out_point).cloned())
142+
.collect::<Vec<_>>()
142143
}
143144
}

tx-pool/src/process.rs

+21-20
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,12 @@ impl TxPoolService {
277277
// non contextual verify first
278278
self.non_contextual_verify(&tx, None)?;
279279

280-
if self.chunk_contains(&tx).await || self.orphan_contains(&tx).await {
280+
if self.chunk_contains(&tx).await {
281+
return Err(Reject::Duplicated(tx.hash()));
282+
}
283+
284+
if self.orphan_contains(&tx).await {
285+
debug!("reject tx {} already in orphan pool", tx.hash());
281286
return Err(Reject::Duplicated(tx.hash()));
282287
}
283288

@@ -466,15 +471,12 @@ impl TxPoolService {
466471
.add_orphan_tx(tx, peer, declared_cycle)
467472
}
468473

469-
pub(crate) async fn find_orphan_by_previous(
470-
&self,
471-
tx: &TransactionView,
472-
) -> Option<OrphanEntry> {
474+
pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
473475
let orphan = self.orphan.read().await;
474-
if let Some(id) = orphan.find_by_previous(tx) {
475-
return orphan.get(&id).cloned();
476-
}
477-
None
476+
let ids = orphan.find_by_previous(tx);
477+
ids.iter()
478+
.map(|id| orphan.get(id).cloned().unwrap())
479+
.collect::<Vec<_>>()
478480
}
479481

480482
pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
@@ -486,12 +488,13 @@ impl TxPoolService {
486488
orphan_queue.push_back(tx.clone());
487489

488490
while let Some(previous) = orphan_queue.pop_front() {
489-
if let Some(orphan) = self.find_orphan_by_previous(&previous).await {
491+
let orphans = self.find_orphan_by_previous(&previous).await;
492+
for orphan in orphans.into_iter() {
490493
if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
491494
debug!(
492-
"process_orphan {} add to chunk, find previous from {}",
495+
"process_orphan {} add to chunk, find previous from {}",
496+
orphan.tx.hash(),
493497
tx.hash(),
494-
orphan.tx.hash()
495498
);
496499
self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
497500
self.chunk
@@ -522,19 +525,20 @@ impl TxPoolService {
522525
});
523526
debug!(
524527
"process_orphan {} success, find previous from {}",
525-
tx.hash(),
526-
orphan.tx.hash()
528+
orphan.tx.hash(),
529+
tx.hash()
527530
);
528531
self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
529532
orphan_queue.push_back(orphan.tx);
530533
}
531534
Err(reject) => {
532535
debug!(
533536
"process_orphan {} reject {}, find previous from {}",
534-
tx.hash(),
537+
orphan.tx.hash(),
535538
reject,
536-
orphan.tx.hash()
539+
tx.hash(),
537540
);
541+
538542
if !is_missing_input(&reject) {
539543
self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
540544
if reject.is_malformed_tx() {
@@ -554,7 +558,6 @@ impl TxPoolService {
554558
self.put_recent_reject(&orphan.tx.hash(), &reject).await;
555559
}
556560
}
557-
break;
558561
}
559562
}
560563
}
@@ -928,9 +931,7 @@ impl TxPoolService {
928931
let mut ids = vec![];
929932
for tx in txs {
930933
ids.push(tx.proposal_short_id());
931-
if let Some(orphan) = self.find_orphan_by_previous(&tx).await {
932-
ids.push(orphan.tx.proposal_short_id());
933-
}
934+
self.process_orphan_tx(tx).await;
934935
}
935936
let mut orphan = self.orphan.write().await;
936937
orphan.remove_orphan_txs(ids.into_iter());

0 commit comments

Comments
 (0)