Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions node/jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,14 @@ impl Span {
}
}

/// Add an additional int tag to the span.
pub fn add_int_tag(&mut self, tag: &str, value: i64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

match self {
Self::Enabled(ref mut inner) => inner.add_int_tag(tag, value),
Self::Disabled => {},
}
}

/// Adds the `FollowsFrom` relationship to this span with respect to the given one.
pub fn add_follows_from(&mut self, other: &Self) {
match (self, other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use polkadot_primitives::v1::{
use polkadot_subsystem::messages::{
AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage,
};
use polkadot_subsystem::SubsystemContext;
use polkadot_subsystem::{SubsystemContext, jaeger};

use crate::{
error::{Error, Result},
Expand Down Expand Up @@ -119,6 +119,9 @@ struct RunningTask {

/// Prometheues metrics for reporting results.
metrics: Metrics,

/// Span tracking the fetching of this chunk.
span: jaeger::Span,
}

impl FetchTaskConfig {
Expand All @@ -142,6 +145,9 @@ impl FetchTaskConfig {
};
}

let mut span = jaeger::candidate_hash_span(&core.candidate_hash, "availability-distribution");
span.add_stage(jaeger::Stage::AvailabilityDistribution);

let prepared_running = RunningTask {
session_index: session_info.session_index,
group_index: core.group_responsible,
Expand All @@ -156,6 +162,7 @@ impl FetchTaskConfig {
relay_parent: core.candidate_descriptor.relay_parent,
metrics,
sender,
span,
};
FetchTaskConfig {
live_in,
Expand All @@ -168,6 +175,7 @@ impl FetchTask {
/// Start fetching a chunk.
///
/// A task handling the fetching of the configured chunk will be spawned.
#[tracing::instrument(level = "trace", skip(config, ctx), fields(subsystem = LOG_TARGET))]
pub async fn start<Context>(config: FetchTaskConfig, ctx: &mut Context) -> Result<Self>
where
Context: SubsystemContext,
Expand Down Expand Up @@ -240,6 +248,7 @@ enum TaskError {
}

impl RunningTask {
#[tracing::instrument(level = "trace", skip(self, kill), fields(subsystem = LOG_TARGET))]
async fn run(self, kill: oneshot::Receiver<()>) {
// Wait for completion/or cancel.
let run_it = self.run_inner();
Expand All @@ -254,8 +263,13 @@ impl RunningTask {
let mut bad_validators = Vec::new();
let mut label = FAILED;
let mut count: u32 = 0;
let mut _span = self.span.child_builder("fetch-task")
.with_chunk_index(self.request.index.0)
.with_relay_parent(&self.relay_parent)
.build();
// Try validators in reverse order:
while let Some(validator) = self.group.pop() {
let _try_span = _span.child("try");
// Report retries:
if count > 0 {
self.metrics.on_retry();
Expand Down Expand Up @@ -302,8 +316,10 @@ impl RunningTask {
// Ok, let's store it and be happy:
self.store_chunk(chunk).await;
label = SUCCEEDED;
_span.add_string_tag("success", "true");
break;
}
_span.add_int_tag("tries", count as _);
self.metrics.on_fetch(label);
self.conclude(bad_validators).await;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ fn get_test_running_task() -> (RunningTask, mpsc::Receiver<FromFetchTask>) {
relay_parent: Hash::repeat_byte(71),
sender: tx,
metrics: Metrics::new_dummy(),
span: jaeger::Span::Disabled,
},
rx
)
Expand Down
2 changes: 2 additions & 0 deletions node/network/availability-distribution/src/requester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl Requester {
///
/// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress
/// by advancing the stream.
#[tracing::instrument(level = "trace", skip(keystore, metrics), fields(subsystem = LOG_TARGET))]
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
// All we do is forwarding messages, no need to make this big.
// Each sender will get one slot, see
Expand All @@ -90,6 +91,7 @@ impl Requester {
/// Update heads that need availability distribution.
///
/// For all active heads we will be fetching our chunks for availabilty distribution.
#[tracing::instrument(level = "trace", skip(self, ctx, update), fields(subsystem = LOG_TARGET))]
pub async fn update_fetching_heads<Context>(
&mut self,
ctx: &mut Context,
Expand Down
8 changes: 7 additions & 1 deletion node/network/availability-distribution/src/responder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use polkadot_node_network_protocol::request_response::{request::IncomingRequest,
use polkadot_primitives::v1::{CandidateHash, ErasureChunk, ValidatorIndex};
use polkadot_subsystem::{
messages::{AllMessages, AvailabilityStoreMessage},
SubsystemContext,
SubsystemContext, jaeger,
};

use crate::error::{Error, Result};
Expand Down Expand Up @@ -64,6 +64,12 @@ pub async fn answer_request<Context>(
where
Context: SubsystemContext,
{
let mut span = jaeger::candidate_hash_span(&req.payload.candidate_hash, "answer-request");
span.add_stage(jaeger::Stage::AvailabilityDistribution);
let _child_span = span.child_builder("answer-chunk-request")
.with_chunk_index(req.payload.index.0)
.build();

let chunk = query_chunk(ctx, req.payload.candidate_hash, req.payload.index).await?;

let result = chunk.is_some();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl SessionCache {
///
/// Use this function over any `fetch_session_info` if all you need is a reference to
/// `SessionInfo`, as it avoids an expensive clone.
#[tracing::instrument(level = "trace", skip(self, ctx, with_info), fields(subsystem = LOG_TARGET))]
pub async fn with_session_info<Context, F, R>(
&mut self,
ctx: &mut Context,
Expand Down