Skip to content

Commit

Permalink
Re-implement using streams
Browse files Browse the repository at this point in the history
  • Loading branch information
s0me0ne-unkn0wn committed Nov 4, 2023
1 parent 092d6a3 commit 7f1698c
Showing 1 changed file with 103 additions and 93 deletions.
196 changes: 103 additions & 93 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ use polkadot_primitives::{

use parity_scale_codec::Encode;

use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
use futures::{channel::oneshot, prelude::*, StreamExt};

use std::{
path::PathBuf,
sync::Arc,
task::Poll,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -152,100 +153,109 @@ async fn run<Context>(
)
.await;
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;

let mut tasks = FuturesUnordered::new();
loop {
futures::select! {
comm = ctx.recv().fuse() => {
match comm {
Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(FromOrchestra::Communication { msg }) => match msg {
CandidateValidationMessage::ValidateFromChainState(
candidate_receipt,
pov,
executor_params,
timeout,
response_sender,
) => {
let mut sender = ctx.sender().clone();
let metrics = metrics.clone();
let validation_host = validation_host.clone();

tasks.push(async move {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
timeout,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}.boxed());
},
CandidateValidationMessage::ValidateFromExhaustive(
persisted_validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
timeout,
response_sender,
) => {
let metrics = metrics.clone();
let validation_host = validation_host.clone();

tasks.push(async move {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
persisted_validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
timeout,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}.boxed());
},
CandidateValidationMessage::PreCheck(
relay_parent,
validation_code_hash,
response_sender,
) => {
let mut sender = ctx.sender().clone();
let validation_host = validation_host.clone();

tasks.push(async move {
let precheck_result = precheck_pvf(
&mut sender,
validation_host,
relay_parent,
validation_code_hash,
)
.await;

let _ = response_sender.send(precheck_result);
}.boxed());
}
},
Err(e) => return Err(SubsystemError::from(e))
}
let mut res = Ok(());
let sender = ctx.sender().to_owned();

let read_stream = stream::poll_fn(|c| loop {
match ctx.recv().poll_unpin(c) {
Poll::Ready(Ok(FromOrchestra::Signal(OverseerSignal::Conclude))) =>
return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(FromOrchestra::Signal(_))) => continue,
Poll::Ready(Ok(FromOrchestra::Communication { msg })) => return Poll::Ready(Some(msg)),
Poll::Ready(Err(e)) => {
res = Err(e);
return Poll::Ready(None)
},
_ = tasks.select_next_some() => ()
}
});

read_stream
// NB: Cloning `sender` inside `async` block of `for_each_concurrent` renders the whole
// thing not `Send`, so we `zip` the message stream with the stream of `sender` clones here.
.zip(stream::repeat(sender))
// FIXME: The backlog size here is the same as in PVF host queues. It should either use a
// common constant, or another appropriate value should be chosen
.for_each_concurrent(30, |message_and_sender| {
handle_candidate_validation_message(
message_and_sender.1,
validation_host.clone(),
metrics.clone(),
message_and_sender.0,
)
})
.await;

res
}

async fn handle_candidate_validation_message<Sender>(
mut sender: Sender,
validation_host: ValidationHost,
metrics: Metrics,
msg: CandidateValidationMessage,
) where
Sender: SubsystemSender<RuntimeApiMessage>,
{
match msg {
CandidateValidationMessage::ValidateFromChainState(
candidate_receipt,
pov,
executor_params,
timeout,
response_sender,
) => {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
timeout,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
},
CandidateValidationMessage::ValidateFromExhaustive(
persisted_validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
timeout,
response_sender,
) => {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
persisted_validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
timeout,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
},
CandidateValidationMessage::PreCheck(
relay_parent,
validation_code_hash,
response_sender,
) => {
let precheck_result =
precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash)
.await;

let _ = response_sender.send(precheck_result);
},
}
}

Expand Down

0 comments on commit 7f1698c

Please sign in to comment.