Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,104 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "Prometheus",
"fieldConfig": {
"defaults": {
"custom": {}
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 32
},
"hiddenSeries": false,
"id": 14,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"percentage": false,
"pluginVersion": "7.1.3",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "Millau_to_Rialto_MessageLane_00000000_unprofitable_delivery_transactions",
"interval": "",
"legendFormat": "Millau -> Rialto, lane 00000000",
"refId": "A"
},
{
"expr": "Rialto_to_Millau_MessageLane_00000000_unprofitable_delivery_transactions",
"interval": "",
"legendFormat": "Rialto -> Millau, lane 00000000",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Count of unprofitable message delivery transactions",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"refresh": "10s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ MESSAGE_LANE=${MSG_EXCHANGE_GEN_LANE:-00000000}

/home/user/substrate-relay relay-messages millau-to-rialto \
--lane $MESSAGE_LANE \
--relayer-mode=altruistic \
--source-host millau-node-bob \
--source-port 9944 \
--source-signer //Rialto.OutboundMessagesRelay.Lane00000001 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ MESSAGE_LANE=${MSG_EXCHANGE_GEN_LANE:-00000000}

/home/user/substrate-relay relay-messages rialto-to-millau \
--lane $MESSAGE_LANE \
--relayer-mode=altruistic \
--source-host rialto-node-bob \
--source-port 9944 \
--source-signer //Millau.OutboundMessagesRelay.Lane00000001 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ SEND_MESSAGE="$SHARED_CMD $SHARED_HOST $SOURCE_SIGNER"

SOURCE_CHAIN="Rialto"
TARGET_CHAIN="Millau"
EXTRA_ARGS=""
EXTRA_ARGS="--use-xcm-pallet"
REGULAR_PAYLOAD="020419ac"
BATCH_PAYLOAD="020419ac"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ sleep 6
--rialto-transactions-mortality=64 \
--lane=00000000 \
--lane=73776170 \
--relayer-mode=altruistic \
--prometheus-host=0.0.0.0
6 changes: 5 additions & 1 deletion relays/messages/src/message_lane_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ pub async fn run<P: MessageLane, Strategy: RelayStrategy>(
relay_utils::relay_loop(source_client, target_client)
.reconnect_delay(params.reconnect_delay)
.with_metrics(metrics_params)
.loop_metric(MessageLaneLoopMetrics::new(Some(&metrics_prefix::<P>(&params.lane)))?)?
.loop_metric(MessageLaneLoopMetrics::new(
Some(&metrics_prefix::<P>(&params.lane)),
P::SOURCE_NAME,
P::TARGET_NAME,
)?)?
.expose()
.await?
.run(metrics_prefix::<P>(&params.lane), move |source_client, target_client, metrics| {
Expand Down
7 changes: 6 additions & 1 deletion relays/messages/src/message_race_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub async fn run<P: MessageLane, Strategy: RelayStrategy>(
source_state_updates,
MessageDeliveryRaceTarget {
client: target_client.clone(),
metrics_msg,
metrics_msg: metrics_msg.clone(),
_phantom: Default::default(),
},
target_state_updates,
Expand All @@ -74,6 +74,7 @@ pub async fn run<P: MessageLane, Strategy: RelayStrategy>(
latest_confirmed_nonces_at_source: VecDeque::new(),
target_nonces: None,
strategy: BasicStrategy::new(),
metrics_msg,
},
)
.await
Expand Down Expand Up @@ -255,6 +256,8 @@ struct MessageDeliveryStrategy<P: MessageLane, Strategy: RelayStrategy, SC, TC>
target_nonces: Option<TargetClientNonces<DeliveryRaceTargetNoncesData>>,
/// Basic delivery strategy.
strategy: MessageDeliveryStrategyBase<P>,
/// Message lane metrics.
metrics_msg: Option<MessageLaneLoopMetrics>,
}

type MessageDeliveryStrategyBase<P> = BasicStrategy<
Expand Down Expand Up @@ -519,6 +522,7 @@ where
lane_target_client: lane_target_client.clone(),
nonces_queue: source_queue.clone(),
nonces_queue_range: 0..maximal_source_queue_index + 1,
metrics: self.metrics_msg.clone(),
};

let mut strategy = EnforcementStrategy::new(self.relay_strategy.clone());
Expand Down Expand Up @@ -631,6 +635,7 @@ mod tests {
latest_confirmed_nonces_at_source: vec![(header_id(1), 19)].into_iter().collect(),
lane_source_client: TestSourceClient::default(),
lane_target_client: TestTargetClient::default(),
metrics_msg: None,
target_nonces: Some(TargetClientNonces {
latest_nonce: 19,
nonces_data: DeliveryRaceTargetNoncesData {
Expand Down
23 changes: 21 additions & 2 deletions relays/messages/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
use bp_messages::MessageNonce;
use finality_relay::SyncLoopMetrics;
use relay_utils::metrics::{
metric_name, register, GaugeVec, Metric, Opts, PrometheusError, Registry, U64,
metric_name, register, Counter, GaugeVec, Metric, Opts, PrometheusError, Registry, U64,
};

/// Message lane relay metrics.
Expand All @@ -39,11 +39,17 @@ pub struct MessageLaneLoopMetrics {
/// Lane state nonces: "source_latest_generated", "source_latest_confirmed",
/// "target_latest_received", "target_latest_confirmed".
lane_state_nonces: GaugeVec<U64>,
/// Count of unprofitable message delivery transactions that we have submitted so far.
unprofitable_delivery_transactions: Counter<U64>,
}

impl MessageLaneLoopMetrics {
/// Create and register messages loop metrics.
pub fn new(prefix: Option<&str>) -> Result<Self, PrometheusError> {
pub fn new(
prefix: Option<&str>,
source_name: &str,
target_name: &str,
) -> Result<Self, PrometheusError> {
Ok(MessageLaneLoopMetrics {
source_to_target_finality_metrics: SyncLoopMetrics::new(
prefix,
Expand All @@ -59,6 +65,13 @@ impl MessageLaneLoopMetrics {
Opts::new(metric_name(prefix, "lane_state_nonces"), "Nonces of the lane state"),
&["type"],
)?,
unprofitable_delivery_transactions: Counter::new(
metric_name(prefix, "unprofitable_delivery_transactions"),
format!(
"Count of unprofitable message delivery transactions from {} to {}",
source_name, target_name
),
)?,
})
}

Expand Down Expand Up @@ -127,13 +140,19 @@ impl MessageLaneLoopMetrics {
.with_label_values(&["target_latest_confirmed"])
.set(target_latest_confirmed_nonce);
}

/// Note unprofitable delivery transaction.
pub fn note_unprofitable_delivery_transactions(&self) {
self.unprofitable_delivery_transactions.inc()
}
}

impl Metric for MessageLaneLoopMetrics {
fn register(&self, registry: &Registry) -> Result<(), PrometheusError> {
self.source_to_target_finality_metrics.register(registry)?;
self.target_to_source_finality_metrics.register(registry)?;
register(self.lane_state_nonces.clone(), registry)?;
register(self.unprofitable_delivery_transactions.clone(), registry)?;
Ok(())
}
}
32 changes: 30 additions & 2 deletions relays/messages/src/relay_strategy/altruistic_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
message_lane_loop::{
SourceClient as MessageLaneSourceClient, TargetClient as MessageLaneTargetClient,
},
relay_strategy::{RelayReference, RelayStrategy},
relay_strategy::{RationalStrategy, RelayReference, RelayStrategy},
};

/// The relayer doesn't care about rewards.
Expand All @@ -38,8 +38,36 @@ impl RelayStrategy for AltruisticStrategy {
TargetClient: MessageLaneTargetClient<P>,
>(
&mut self,
_reference: &mut RelayReference<P, SourceClient, TargetClient>,
reference: &mut RelayReference<P, SourceClient, TargetClient>,
) -> bool {
// we don't care about costs and rewards, but we want to report unprofitable transactions
// => let rational strategy fill required fields
let _ = RationalStrategy.decide(reference).await;
Comment on lines +43 to +45
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move the entire logic that computes the costs and rewards to a common method (could be defined on the RelayStrategy trait) ? It seems strange to use RationalStrategy.decide() inside AltruisticStrategy.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave it outside trait, but feel free to implement what you think is better

true
}

async fn final_decision<
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 questions:

  1. Do we need this method to be async ?
  2. This method doesn't seem to be used for taking a decision. Could we rename it to something like update_metrics ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. If it works without async, let's remove it - it has probably leftover of some early experiments
  2. I mean something like "notify strategy about final decision", but feel free to rename

P: MessageLane,
SourceClient: MessageLaneSourceClient<P>,
TargetClient: MessageLaneTargetClient<P>,
>(
&self,
reference: &RelayReference<P, SourceClient, TargetClient>,
) {
if let Some(ref metrics) = reference.metrics {
if reference.total_cost > reference.total_reward {
log::debug!(
target: "bridge",
"The relayer has submitted unprofitable {} -> {} message delivery trabsaction with {} messages: total cost = {:?}, total reward = {:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
reference.index + 1,
reference.total_cost,
reference.total_reward,
);

metrics.note_unprofitable_delivery_transactions();
}
}
}
}
6 changes: 5 additions & 1 deletion relays/messages/src/relay_strategy/enforcement_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl<Strategy: RelayStrategy> EnforcementStrategy<Strategy> {
let mut relay_reference = RelayReference {
lane_source_client: reference.lane_source_client.clone(),
lane_target_client: reference.lane_target_client.clone(),
metrics: reference.metrics.clone(),

selected_reward: P::SourceChainBalance::zero(),
selected_cost: P::SourceChainBalance::zero(),
Expand Down Expand Up @@ -211,7 +212,10 @@ impl<Strategy: RelayStrategy> EnforcementStrategy<Strategy> {
);
}

Some(hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1)
let selected_max_nonce =
hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1;
self.strategy.final_decision(&relay_reference).await;
Some(selected_max_nonce)
} else {
None
}
Expand Down
14 changes: 14 additions & 0 deletions relays/messages/src/relay_strategy/mix_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,18 @@ impl RelayStrategy for MixStrategy {
RelayerMode::Rational => RationalStrategy.decide(reference).await,
}
}

async fn final_decision<
P: MessageLane,
SourceClient: MessageLaneSourceClient<P>,
TargetClient: MessageLaneTargetClient<P>,
>(
&self,
reference: &RelayReference<P, SourceClient, TargetClient>,
) {
match self.relayer_mode {
RelayerMode::Altruistic => AltruisticStrategy.final_decision(reference).await,
RelayerMode::Rational => RationalStrategy.final_decision(reference).await,
}
}
}
19 changes: 16 additions & 3 deletions relays/messages/src/relay_strategy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

//! Relayer strategy

use std::ops::Range;

use async_trait::async_trait;

use bp_messages::{MessageNonce, Weight};
use std::ops::Range;

use crate::{
message_lane::MessageLane,
Expand All @@ -29,6 +27,7 @@ use crate::{
TargetClient as MessageLaneTargetClient,
},
message_race_strategy::SourceRangesQueue,
metrics::MessageLaneLoopMetrics,
};

pub(crate) use self::enforcement_strategy::*;
Expand All @@ -55,6 +54,16 @@ pub trait RelayStrategy: 'static + Clone + Send + Sync {
&mut self,
reference: &mut RelayReference<P, SourceClient, TargetClient>,
) -> bool;

/// Notification that the following maximal nonce has been selected for the delivery.
async fn final_decision<
P: MessageLane,
SourceClient: MessageLaneSourceClient<P>,
TargetClient: MessageLaneTargetClient<P>,
>(
&self,
reference: &RelayReference<P, SourceClient, TargetClient>,
);
}

/// Reference data for participating in relay
Expand All @@ -67,6 +76,8 @@ pub struct RelayReference<
pub lane_source_client: SourceClient,
/// The client that is connected to the message lane target node.
pub lane_target_client: TargetClient,
/// Metrics reference.
pub metrics: Option<MessageLaneLoopMetrics>,
/// Current block reward summary
pub selected_reward: P::SourceChainBalance,
/// Current block cost summary
Expand Down Expand Up @@ -112,6 +123,8 @@ pub struct RelayMessagesBatchReference<
pub lane_source_client: SourceClient,
/// The client that is connected to the message lane target node.
pub lane_target_client: TargetClient,
/// Metrics reference.
pub metrics: Option<MessageLaneLoopMetrics>,
/// Source queue.
pub nonces_queue: SourceRangesQueue<
P::SourceHeaderHash,
Expand Down
Loading