From 574fb99bb750bbefde0857fd2711abbccf4e39ca Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 31 Oct 2025 13:28:54 +0100 Subject: [PATCH] perf: bias towards proof results --- .../src/tree/payload_processor/multiproof.rs | 116 +++++++++--------- 1 file changed, 58 insertions(+), 58 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index ca3bd380d4d..73dc6a90954 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -1029,7 +1029,64 @@ impl MultiProofTask { loop { trace!(target: "engine::tree::payload_processor::multiproof", "entering main channel receiving loop"); - crossbeam_channel::select! { + crossbeam_channel::select_biased! { + recv(self.proof_result_rx) -> proof_msg => { + match proof_msg { + Ok(proof_result) => { + proofs_processed += 1; + + self.metrics + .proof_calculation_duration_histogram + .record(proof_result.elapsed); + + self.multiproof_manager.on_calculation_complete(); + + // Convert ProofResultMessage to SparseTrieUpdate + match proof_result.result { + Ok(proof_result_data) => { + debug!( + target: "engine::tree::payload_processor::multiproof", + sequence = proof_result.sequence_number, + total_proofs = proofs_processed, + "Processing calculated proof from worker" + ); + + let update = SparseTrieUpdate { + state: proof_result.state, + multiproof: proof_result_data.into_multiproof(), + }; + + if let Some(combined_update) = + self.on_proof(proof_result.sequence_number, update) + { + let _ = self.to_sparse_trie.send(combined_update); + } + } + Err(error) => { + error!(target: "engine::tree::payload_processor::multiproof", ?error, "proof calculation error from worker"); + return + } + } + + if self.is_done( + proofs_processed, + state_update_proofs_requested, + prefetch_proofs_requested, + updates_finished, + ) { + debug!( + target: "engine::tree::payload_processor::multiproof", + "State updates finished and all proofs processed, ending calculation" + ); + break + } + } + Err(_) => { + error!(target: "engine::tree::payload_processor::multiproof", "Proof result channel closed unexpectedly"); + return + } + } + }, recv(self.rx) -> message => { match message { Ok(msg) => match msg { @@ -1129,63 +1186,6 @@ impl MultiProofTask { return } } - }, - recv(self.proof_result_rx) -> proof_msg => { - match proof_msg { - Ok(proof_result) => { - proofs_processed += 1; - - self.metrics - .proof_calculation_duration_histogram - .record(proof_result.elapsed); - - self.multiproof_manager.on_calculation_complete(); - - // Convert ProofResultMessage to SparseTrieUpdate - match proof_result.result { - Ok(proof_result_data) => { - debug!( - target: "engine::tree::payload_processor::multiproof", - sequence = proof_result.sequence_number, - total_proofs = proofs_processed, - "Processing calculated proof from worker" - ); - - let update = SparseTrieUpdate { - state: proof_result.state, - multiproof: proof_result_data.into_multiproof(), - }; - - if let Some(combined_update) = - self.on_proof(proof_result.sequence_number, update) - { - let _ = self.to_sparse_trie.send(combined_update); - } - } - Err(error) => { - error!(target: "engine::tree::payload_processor::multiproof", ?error, "proof calculation error from worker"); - return - } - } - - if self.is_done( - proofs_processed, - state_update_proofs_requested, - prefetch_proofs_requested, - updates_finished, - ) { - debug!( - target: "engine::tree::payload_processor::multiproof", - "State updates finished and all proofs processed, ending calculation" - ); - break - } - } - Err(_) => { - error!(target: "engine::tree::payload_processor::multiproof", "Proof result channel closed unexpectedly"); - return - } - } } } }