diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 565a4e51e689..4127eac10deb 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -119,8 +119,14 @@ pin_project! { } impl ChainStream { - pub fn with_seen(self, seen: CidHashSet) -> Self { - ChainStream { seen, ..self } + pub fn with_seen(mut self, seen: CidHashSet) -> Self { + self.seen = seen; + self + } + + pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self { + self.fail_on_dead_links = fail_on_dead_links; + self } #[allow(dead_code)] @@ -162,14 +168,7 @@ pub fn stream_graph, ITER: Iterator tipset_iter: ITER, stateroot_limit: ChainEpoch, ) -> ChainStream { - ChainStream { - tipset_iter, - db, - dfs: VecDeque::new(), - seen: CidHashSet::default(), - stateroot_limit, - fail_on_dead_links: false, - } + stream_chain(db, tipset_iter, stateroot_limit).fail_on_dead_links(false) } impl, ITER: Iterator + Unpin> Stream @@ -287,7 +286,7 @@ pin_project! { block_receiver: flume::Receiver>, extract_sender: flume::Sender, stateroot_limit: ChainEpoch, - queue: Vec<(Cid,Option>)>, + queue: Vec<(Cid, Option>)>, fail_on_dead_links: bool, } @@ -308,18 +307,7 @@ impl UnorderedChainStream { } } -/// Stream all blocks that are reachable before the `stateroot_limit` epoch in an unordered fashion. -/// After this limit, only block headers are streamed. Any dead links are reported as errors. -/// -/// # Arguments -/// -/// * `db` - A database that implements [`Blockstore`] interface. -/// * `tipset_iter` - An iterator of [`Tipset`], descending order `$child -> $parent`. -/// * `stateroot_limit` - An epoch that signifies how far back we need to inspect tipsets, in-depth. -/// This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth` is the -/// number of `[`Tipset`]` that needs inspection. -#[allow(dead_code)] -pub fn unordered_stream_chain< +fn unordered_stream_chain_inner< DB: Blockstore + Sync + Send + 'static, T: Borrow, ITER: Iterator + Unpin + Send + 'static, @@ -327,10 +315,10 @@ pub fn unordered_stream_chain< db: Arc, tipset_iter: ITER, stateroot_limit: ChainEpoch, + fail_on_dead_links: bool, ) -> UnorderedChainStream { let (sender, receiver) = flume::bounded(BLOCK_CHANNEL_LIMIT); let (extract_sender, extract_receiver) = flume::unbounded(); - let fail_on_dead_links = true; let seen = Arc::new(Mutex::new(CidHashSet::default())); let handle = UnorderedChainStream::::start_workers( db.clone(), @@ -353,6 +341,29 @@ pub fn unordered_stream_chain< } } +/// Stream all blocks that are reachable before the `stateroot_limit` epoch in an unordered fashion. +/// After this limit, only block headers are streamed. Any dead links are reported as errors. +/// +/// # Arguments +/// +/// * `db` - A database that implements [`Blockstore`] interface. +/// * `tipset_iter` - An iterator of [`Tipset`], descending order `$child -> $parent`. +/// * `stateroot_limit` - An epoch that signifies how far back we need to inspect tipsets, in-depth. +/// This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth` is the +/// number of `[`Tipset`]` that needs inspection. +#[allow(dead_code)] +pub fn unordered_stream_chain< + DB: Blockstore + Sync + Send + 'static, + T: Borrow, + ITER: Iterator + Unpin + Send + 'static, +>( + db: Arc, + tipset_iter: ITER, + stateroot_limit: ChainEpoch, +) -> UnorderedChainStream { + unordered_stream_chain_inner(db, tipset_iter, stateroot_limit, true) +} + // Stream available graph in unordered search. All reachable nodes are touched and dead-links // are ignored. pub fn unordered_stream_graph< @@ -364,29 +375,7 @@ pub fn unordered_stream_graph< tipset_iter: ITER, stateroot_limit: ChainEpoch, ) -> UnorderedChainStream { - let (sender, receiver) = flume::bounded(2048); - let (extract_sender, extract_receiver) = flume::unbounded(); - let fail_on_dead_links = false; - let seen = Arc::new(Mutex::new(CidHashSet::default())); - let handle = UnorderedChainStream::::start_workers( - db.clone(), - sender.clone(), - extract_receiver, - seen.clone(), - fail_on_dead_links, - ); - - UnorderedChainStream { - seen, - db, - worker_handle: handle, - block_receiver: receiver, - queue: Vec::new(), - tipset_iter, - extract_sender, - stateroot_limit, - fail_on_dead_links, - } + unordered_stream_chain_inner(db, tipset_iter, stateroot_limit, false) } impl, ITER: Iterator + Unpin>