Skip to content

Commit 53f685d

Browse files
committed
fix orphan issues
1 parent dc95426 commit 53f685d

File tree

5 files changed

+253
-23
lines changed

5 files changed

+253
-23
lines changed

test/src/main.rs

+3
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,9 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
426426
Box::new(DeclaredWrongCyclesChunk),
427427
Box::new(DeclaredWrongCyclesAndRelayAgain),
428428
Box::new(OrphanTxAccepted),
429+
Box::new(TxPoolOrphanNormal),
430+
Box::new(TxPoolOrphanReverse),
431+
Box::new(TxPoolOrphanUnordered),
429432
Box::new(OrphanTxRejected),
430433
Box::new(GetRawTxPool),
431434
Box::new(PoolReconcile),

test/src/node.rs

+20-3
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,13 @@ impl Node {
344344
self.submit_transaction(&self.new_transaction_spend_tip_cellbase())
345345
}
346346

347+
// generate a transaction which spend tip block's cellbase and capacity
348+
pub fn new_transaction_with_capacity(&self, capacity: Capacity) -> TransactionView {
349+
let block = self.get_tip_block();
350+
let cellbase = &block.transactions()[0];
351+
self.new_transaction_with_since_capacity(cellbase.hash(), 0, capacity)
352+
}
353+
347354
// generate a transaction which spend tip block's cellbase
348355
pub fn new_transaction_spend_tip_cellbase(&self) -> TransactionView {
349356
let block = self.get_tip_block();
@@ -522,11 +529,12 @@ impl Node {
522529
self.new_transaction_with_since_capacity(hash, since, capacity_bytes!(100))
523530
}
524531

525-
pub fn new_transaction_with_since_capacity(
532+
pub fn new_transaction_with_capacity_and_index(
526533
&self,
527534
hash: Byte32,
528-
since: u64,
529535
capacity: Capacity,
536+
index: u32,
537+
since: u64,
530538
) -> TransactionView {
531539
let always_success_cell_dep = self.always_success_cell_dep();
532540
let always_success_script = self.always_success_script();
@@ -540,10 +548,19 @@ impl Node {
540548
.build(),
541549
)
542550
.output_data(Default::default())
543-
.input(CellInput::new(OutPoint::new(hash, 0), since))
551+
.input(CellInput::new(OutPoint::new(hash, index), since))
544552
.build()
545553
}
546554

555+
pub fn new_transaction_with_since_capacity(
556+
&self,
557+
hash: Byte32,
558+
since: u64,
559+
capacity: Capacity,
560+
) -> TransactionView {
561+
self.new_transaction_with_capacity_and_index(hash, capacity, 0, since)
562+
}
563+
547564
pub fn new_always_failure_transaction(&self, hash: Byte32) -> TransactionView {
548565
let always_failure_cell_dep = self.always_failure_cell_dep();
549566
let always_failure_script = self.always_failure_script();

test/src/specs/tx_pool/orphan_tx.rs

+205
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ use crate::utils::wait_until;
33
use crate::{Net, Node, Spec};
44
use ckb_jsonrpc_types::Status;
55
use ckb_network::SupportProtocols;
6+
use ckb_types::core::{capacity_bytes, Capacity, TransactionBuilder, TransactionView};
7+
use ckb_types::packed::CellOutputBuilder;
8+
use ckb_types::{
9+
packed::{CellInput, OutPoint},
10+
prelude::*,
11+
};
612

713
const ALWAYS_SUCCESS_SCRIPT_CYCLE: u64 = 537;
814
// always_failure, as the name implies, so it doesn't matter what the cycles are
@@ -97,3 +103,202 @@ impl Spec for OrphanTxRejected {
97103
assert!(matches!(ret.tx_status.status, Status::Rejected));
98104
}
99105
}
106+
107+
// construct a tx chain with such structure:
108+
//
109+
// parent
110+
// |
111+
// tx1
112+
// / | \
113+
// tx11 tx12 tx13
114+
// \ | /
115+
// final_tx
116+
//
117+
fn build_tx_chain(
118+
node0: &Node,
119+
) -> (
120+
Net,
121+
(
122+
TransactionView,
123+
TransactionView,
124+
TransactionView,
125+
TransactionView,
126+
TransactionView,
127+
TransactionView,
128+
),
129+
) {
130+
node0.mine_until_out_bootstrap_period();
131+
let parent = node0.new_transaction_with_capacity(capacity_bytes!(800));
132+
133+
let script = node0.always_success_script();
134+
let new_output1 = CellOutputBuilder::default()
135+
.capacity(capacity_bytes!(200).pack())
136+
.lock(script.clone())
137+
.build();
138+
let new_output2 = new_output1.clone();
139+
let new_output3 = new_output1.clone();
140+
141+
let tx1 = parent
142+
.as_advanced_builder()
143+
.set_inputs(vec![CellInput::new(OutPoint::new(parent.hash(), 0), 0)])
144+
.set_outputs(vec![new_output1, new_output2, new_output3])
145+
.set_outputs_data(vec![Default::default(); 3])
146+
.build();
147+
148+
let tx11 =
149+
node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 0, 0);
150+
let tx12 =
151+
node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 1, 0);
152+
let tx13 =
153+
node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 2, 0);
154+
155+
let cell_dep = node0.always_success_cell_dep();
156+
let final_output = CellOutputBuilder::default()
157+
.capacity(capacity_bytes!(80).pack())
158+
.lock(script)
159+
.build();
160+
let final_tx = TransactionBuilder::default()
161+
.cell_dep(cell_dep)
162+
.set_inputs(vec![
163+
CellInput::new(OutPoint::new(tx11.hash(), 0), 0),
164+
CellInput::new(OutPoint::new(tx12.hash(), 0), 0),
165+
CellInput::new(OutPoint::new(tx13.hash(), 0), 0),
166+
])
167+
.set_outputs(vec![final_output])
168+
.set_outputs_data(vec![Default::default(); 1])
169+
.build();
170+
171+
let mut net = Net::new(
172+
"orphan_tx_test",
173+
node0.consensus(),
174+
vec![SupportProtocols::RelayV3],
175+
);
176+
net.connect(node0);
177+
178+
(net, (parent, tx1, tx11, tx12, tx13, final_tx))
179+
}
180+
181+
fn run_replay_tx(
182+
net: &Net,
183+
node0: &Node,
184+
tx: TransactionView,
185+
orphan_tx_cnt: u64,
186+
pending_cnt: u64,
187+
) -> bool {
188+
relay_tx(net, node0, tx, ALWAYS_SUCCESS_SCRIPT_CYCLE);
189+
190+
wait_until(5, || {
191+
let tx_pool_info = node0.get_tip_tx_pool_info();
192+
tx_pool_info.orphan.value() == orphan_tx_cnt && tx_pool_info.pending.value() == pending_cnt
193+
})
194+
}
195+
196+
pub struct TxPoolOrphanNormal;
197+
impl Spec for TxPoolOrphanNormal {
198+
fn run(&self, nodes: &mut Vec<Node>) {
199+
let node0 = &nodes[0];
200+
let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(node0);
201+
202+
assert!(
203+
run_replay_tx(&net, node0, parent, 0, 1),
204+
"parent sended expect nothing in orphan pool"
205+
);
206+
assert!(
207+
run_replay_tx(&net, node0, tx1, 0, 2),
208+
"tx1 is send expect nothing in orphan pool"
209+
);
210+
assert!(
211+
run_replay_tx(&net, node0, tx11, 0, 3),
212+
"tx11 is send expect nothing in orphan pool"
213+
);
214+
assert!(
215+
run_replay_tx(&net, node0, tx12, 0, 4),
216+
"tx12 is send expect nothing in orphan pool"
217+
);
218+
assert!(
219+
run_replay_tx(&net, node0, tx13, 0, 5),
220+
"tx13 is send expect nothing in orphan pool"
221+
);
222+
assert!(
223+
run_replay_tx(&net, node0, final_tx, 0, 6),
224+
"final_tx is send expect nothing in orphan pool"
225+
);
226+
}
227+
}
228+
229+
pub struct TxPoolOrphanReverse;
230+
impl Spec for TxPoolOrphanReverse {
231+
fn run(&self, nodes: &mut Vec<Node>) {
232+
let node0 = &nodes[0];
233+
let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(node0);
234+
235+
assert!(
236+
run_replay_tx(&net, node0, final_tx, 1, 0),
237+
"expect final_tx is in orphan pool"
238+
);
239+
240+
assert!(
241+
run_replay_tx(&net, node0, tx13, 2, 0),
242+
"tx13 in orphan pool"
243+
);
244+
assert!(
245+
run_replay_tx(&net, node0, tx12, 3, 0),
246+
"tx12 is in orphan pool"
247+
);
248+
assert!(run_replay_tx(&net, node0, tx11, 4, 0), "tx11 is in orphan");
249+
250+
assert!(run_replay_tx(&net, node0, tx1, 5, 0), "tx1 is in orphan");
251+
252+
assert!(
253+
run_replay_tx(&net, node0, parent, 0, 6),
254+
"all is in pending"
255+
);
256+
}
257+
}
258+
259+
pub struct TxPoolOrphanUnordered;
260+
impl Spec for TxPoolOrphanUnordered {
261+
fn run(&self, nodes: &mut Vec<Node>) {
262+
let node0 = &nodes[0];
263+
let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(node0);
264+
265+
assert!(
266+
run_replay_tx(&net, node0, final_tx, 1, 0),
267+
"expect final_tx is in orphan pool"
268+
);
269+
270+
assert!(
271+
run_replay_tx(&net, node0, tx11, 2, 0),
272+
"tx11 in orphan pool"
273+
);
274+
let tx12_clone = tx12.clone();
275+
assert!(
276+
run_replay_tx(&net, node0, tx12, 3, 0),
277+
"tx12 is in orphan pool"
278+
);
279+
280+
// set tx12_clone with rpc
281+
let ret = node0
282+
.rpc_client()
283+
.send_transaction_result(tx12_clone.data().into());
284+
assert!(ret
285+
.err()
286+
.unwrap()
287+
.to_string()
288+
.contains("already exist in transaction_pool"));
289+
290+
assert!(
291+
run_replay_tx(&net, node0, parent, 3, 1),
292+
"parent is sent, should be in pending without change orphan pool"
293+
);
294+
assert!(
295+
run_replay_tx(&net, node0, tx1, 1, 4),
296+
"tx1 is send, orphan pool only contains final_tx"
297+
);
298+
299+
assert!(
300+
run_replay_tx(&net, node0, tx13, 0, 6),
301+
"tx13 is send, orphan pool is empty"
302+
);
303+
}
304+
}

tx-pool/src/component/orphan.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,10 @@ impl OrphanPool {
135135
self.limit_size();
136136
}
137137

138-
pub fn find_by_previous(&self, tx: &TransactionView) -> Option<ProposalShortId> {
138+
pub fn find_by_previous(&self, tx: &TransactionView) -> Vec<ProposalShortId> {
139139
tx.output_pts()
140140
.iter()
141-
.find_map(|out_point| self.by_out_point.get(out_point).cloned())
141+
.filter_map(|out_point| self.by_out_point.get(out_point).cloned())
142+
.collect::<Vec<_>>()
142143
}
143144
}

tx-pool/src/process.rs

+22-18
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,12 @@ impl TxPoolService {
238238
// non contextual verify first
239239
self.non_contextual_verify(&tx, None)?;
240240

241-
if self.chunk_contains(&tx).await || self.orphan_contains(&tx).await {
241+
if self.chunk_contains(&tx).await {
242+
return Err(Reject::Duplicated(tx.hash()));
243+
}
244+
245+
if self.orphan_contains(&tx).await {
246+
debug!("reject tx {} already in orphan pool", tx.hash());
242247
return Err(Reject::Duplicated(tx.hash()));
243248
}
244249

@@ -419,15 +424,12 @@ impl TxPoolService {
419424
.add_orphan_tx(tx, peer, declared_cycle)
420425
}
421426

422-
pub(crate) async fn find_orphan_by_previous(
423-
&self,
424-
tx: &TransactionView,
425-
) -> Option<OrphanEntry> {
427+
pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
426428
let orphan = self.orphan.read().await;
427-
if let Some(id) = orphan.find_by_previous(tx) {
428-
return orphan.get(&id).cloned();
429-
}
430-
None
429+
let ids = orphan.find_by_previous(tx);
430+
ids.iter()
431+
.map(|id| orphan.get(id).cloned().unwrap())
432+
.collect::<Vec<_>>()
431433
}
432434

433435
pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
@@ -439,12 +441,13 @@ impl TxPoolService {
439441
orphan_queue.push_back(tx.clone());
440442

441443
while let Some(previous) = orphan_queue.pop_front() {
442-
if let Some(orphan) = self.find_orphan_by_previous(&previous).await {
444+
let orphans = self.find_orphan_by_previous(&previous).await;
445+
for orphan in orphans.into_iter() {
443446
if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
444447
debug!(
445-
"process_orphan {} add to chunk, find previous from {}",
448+
"process_orphan {} add to chunk, find previous from {}",
449+
orphan.tx.hash(),
446450
tx.hash(),
447-
orphan.tx.hash()
448451
);
449452
self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
450453
self.chunk
@@ -475,19 +478,20 @@ impl TxPoolService {
475478
});
476479
debug!(
477480
"process_orphan {} success, find previous from {}",
478-
tx.hash(),
479-
orphan.tx.hash()
481+
orphan.tx.hash(),
482+
tx.hash()
480483
);
481484
self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
482485
orphan_queue.push_back(orphan.tx);
483486
}
484487
Err(reject) => {
485488
debug!(
486489
"process_orphan {} reject {}, find previous from {}",
487-
tx.hash(),
490+
orphan.tx.hash(),
488491
reject,
489-
orphan.tx.hash()
492+
tx.hash(),
490493
);
494+
491495
if !is_missing_input(&reject) {
492496
self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
493497
if reject.is_malformed_tx() {
@@ -503,7 +507,6 @@ impl TxPoolService {
503507
self.put_recent_reject(&orphan.tx.hash(), &reject).await;
504508
}
505509
}
506-
break;
507510
}
508511
}
509512
}
@@ -877,7 +880,8 @@ impl TxPoolService {
877880
let mut ids = vec![];
878881
for tx in txs {
879882
ids.push(tx.proposal_short_id());
880-
if let Some(orphan) = self.find_orphan_by_previous(&tx).await {
883+
let orphans = self.find_orphan_by_previous(tx).await;
884+
for orphan in orphans.into_iter() {
881885
ids.push(orphan.tx.proposal_short_id());
882886
}
883887
}

0 commit comments

Comments
 (0)