Skip to content

Commit 84a902a

Browse files
authored
Reduce load on validator subscription channels (#5311)
* Fix tests * Merge branch 'unstable' into unclog-channels * Avoid reallocations * Reduce subscription load on beacon node
1 parent 8cd2b1c commit 84a902a

File tree

5 files changed

+39
-54
lines changed

5 files changed

+39
-54
lines changed

beacon_node/http_api/src/lib.rs

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3448,34 +3448,34 @@ pub fn serve<T: BeaconChainTypes>(
34483448
chain: Arc<BeaconChain<T>>,
34493449
log: Logger| {
34503450
task_spawner.blocking_json_task(Priority::P0, move || {
3451-
for subscription in &subscriptions {
3452-
chain
3453-
.validator_monitor
3454-
.write()
3455-
.auto_register_local_validator(subscription.validator_index);
3456-
3457-
let validator_subscription = api_types::ValidatorSubscription {
3458-
validator_index: subscription.validator_index,
3459-
attestation_committee_index: subscription.committee_index,
3460-
slot: subscription.slot,
3461-
committee_count_at_slot: subscription.committees_at_slot,
3462-
is_aggregator: subscription.is_aggregator,
3463-
};
3464-
3465-
let message = ValidatorSubscriptionMessage::AttestationSubscribe {
3466-
subscriptions: vec![validator_subscription],
3467-
};
3468-
if let Err(e) = validator_subscription_tx.try_send(message) {
3469-
warn!(
3470-
log,
3471-
"Unable to process committee subscriptions";
3472-
"info" => "the host may be overloaded or resource-constrained",
3473-
"error" => ?e,
3474-
);
3475-
return Err(warp_utils::reject::custom_server_error(
3476-
"unable to queue subscription, host may be overloaded or shutting down".to_string(),
3477-
));
3478-
}
3451+
let subscriptions: std::collections::BTreeSet<_> = subscriptions
3452+
.iter()
3453+
.map(|subscription| {
3454+
chain
3455+
.validator_monitor
3456+
.write()
3457+
.auto_register_local_validator(subscription.validator_index);
3458+
api_types::ValidatorSubscription {
3459+
attestation_committee_index: subscription.committee_index,
3460+
slot: subscription.slot,
3461+
committee_count_at_slot: subscription.committees_at_slot,
3462+
is_aggregator: subscription.is_aggregator,
3463+
}
3464+
})
3465+
.collect();
3466+
let message =
3467+
ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions };
3468+
if let Err(e) = validator_subscription_tx.try_send(message) {
3469+
warn!(
3470+
log,
3471+
"Unable to process committee subscriptions";
3472+
"info" => "the host may be overloaded or resource-constrained",
3473+
"error" => ?e,
3474+
);
3475+
return Err(warp_utils::reject::custom_server_error(
3476+
"unable to queue subscription, host may be overloaded or shutting down"
3477+
.to_string(),
3478+
));
34793479
}
34803480

34813481
Ok(())

beacon_node/network/src/service.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use lighthouse_network::{
2727
MessageId, NetworkEvent, NetworkGlobals, PeerId,
2828
};
2929
use slog::{crit, debug, error, info, o, trace, warn};
30+
use std::collections::BTreeSet;
3031
use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
3132
use store::HotColdDB;
3233
use strum::IntoStaticStr;
@@ -119,7 +120,7 @@ pub enum NetworkMessage<T: EthSpec> {
119120
pub enum ValidatorSubscriptionMessage {
120121
/// Subscribes a list of validators to specific slots for attestation duties.
121122
AttestationSubscribe {
122-
subscriptions: Vec<ValidatorSubscription>,
123+
subscriptions: BTreeSet<ValidatorSubscription>,
123124
},
124125
SyncCommitteeSubscribe {
125126
subscriptions: Vec<SyncCommitteeSubscription>,
@@ -783,7 +784,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
783784
ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions } => {
784785
if let Err(e) = self
785786
.attestation_service
786-
.validator_subscriptions(subscriptions)
787+
.validator_subscriptions(subscriptions.into_iter())
787788
{
788789
warn!(self.log, "Attestation validator subscription failed"; "error" => e);
789790
}

beacon_node/network/src/subnet_service/attestation_subnets.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
196196
/// safely dropped.
197197
pub fn validator_subscriptions(
198198
&mut self,
199-
subscriptions: Vec<ValidatorSubscription>,
199+
subscriptions: impl Iterator<Item = ValidatorSubscription>,
200200
) -> Result<(), String> {
201201
// If the node is in a proposer-only state, we ignore all subnet subscriptions.
202202
if self.proposer_only {
@@ -227,7 +227,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
227227
warn!(self.log,
228228
"Failed to compute subnet id for validator subscription";
229229
"error" => ?e,
230-
"validator_index" => subscription.validator_index
231230
);
232231
continue;
233232
}
@@ -257,13 +256,11 @@ impl<T: BeaconChainTypes> AttestationService<T> {
257256
warn!(self.log,
258257
"Subscription to subnet error";
259258
"error" => e,
260-
"validator_index" => subscription.validator_index,
261259
);
262260
} else {
263261
trace!(self.log,
264262
"Subscribed to subnet for aggregator duties";
265263
"exact_subnet" => ?exact_subnet,
266-
"validator_index" => subscription.validator_index
267264
);
268265
}
269266
}

beacon_node/network/src/subnet_service/tests/mod.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -180,14 +180,12 @@ mod attestation_service {
180180
use super::*;
181181

182182
fn get_subscription(
183-
validator_index: u64,
184183
attestation_committee_index: CommitteeIndex,
185184
slot: Slot,
186185
committee_count_at_slot: u64,
187186
is_aggregator: bool,
188187
) -> ValidatorSubscription {
189188
ValidatorSubscription {
190-
validator_index,
191189
attestation_committee_index,
192190
slot,
193191
committee_count_at_slot,
@@ -204,7 +202,6 @@ mod attestation_service {
204202
(0..validator_count)
205203
.map(|validator_index| {
206204
get_subscription(
207-
validator_index,
208205
validator_index,
209206
slot,
210207
committee_count_at_slot,
@@ -217,7 +214,6 @@ mod attestation_service {
217214
#[tokio::test]
218215
async fn subscribe_current_slot_wait_for_unsubscribe() {
219216
// subscription config
220-
let validator_index = 1;
221217
let committee_index = 1;
222218
// Keep a low subscription slot so that there are no additional subnet discovery events.
223219
let subscription_slot = 0;
@@ -233,7 +229,6 @@ mod attestation_service {
233229
.expect("Could not get current slot");
234230

235231
let subscriptions = vec![get_subscription(
236-
validator_index,
237232
committee_index,
238233
current_slot + Slot::new(subscription_slot),
239234
committee_count,
@@ -242,7 +237,7 @@ mod attestation_service {
242237

243238
// submit the subscriptions
244239
attestation_service
245-
.validator_subscriptions(subscriptions)
240+
.validator_subscriptions(subscriptions.into_iter())
246241
.unwrap();
247242

248243
// not enough time for peer discovery, just subscribe, unsubscribe
@@ -293,7 +288,6 @@ mod attestation_service {
293288
#[tokio::test]
294289
async fn test_same_subnet_unsubscription() {
295290
// subscription config
296-
let validator_index = 1;
297291
let committee_count = 1;
298292
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
299293

@@ -313,15 +307,13 @@ mod attestation_service {
313307
.expect("Could not get current slot");
314308

315309
let sub1 = get_subscription(
316-
validator_index,
317310
com1,
318311
current_slot + Slot::new(subscription_slot1),
319312
committee_count,
320313
true,
321314
);
322315

323316
let sub2 = get_subscription(
324-
validator_index,
325317
com2,
326318
current_slot + Slot::new(subscription_slot2),
327319
committee_count,
@@ -350,7 +342,7 @@ mod attestation_service {
350342

351343
// submit the subscriptions
352344
attestation_service
353-
.validator_subscriptions(vec![sub1, sub2])
345+
.validator_subscriptions(vec![sub1, sub2].into_iter())
354346
.unwrap();
355347

356348
// Unsubscription event should happen at slot 2 (since subnet id's are the same, unsubscription event should be at higher slot + 1)
@@ -431,7 +423,7 @@ mod attestation_service {
431423

432424
// submit the subscriptions
433425
attestation_service
434-
.validator_subscriptions(subscriptions)
426+
.validator_subscriptions(subscriptions.into_iter())
435427
.unwrap();
436428

437429
let events = get_events(&mut attestation_service, Some(131), 10).await;
@@ -501,7 +493,7 @@ mod attestation_service {
501493

502494
// submit the subscriptions
503495
attestation_service
504-
.validator_subscriptions(subscriptions)
496+
.validator_subscriptions(subscriptions.into_iter())
505497
.unwrap();
506498

507499
let events = get_events(&mut attestation_service, None, 3).await;
@@ -538,7 +530,6 @@ mod attestation_service {
538530
#[tokio::test]
539531
async fn test_subscribe_same_subnet_several_slots_apart() {
540532
// subscription config
541-
let validator_index = 1;
542533
let committee_count = 1;
543534
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
544535

@@ -558,15 +549,13 @@ mod attestation_service {
558549
.expect("Could not get current slot");
559550

560551
let sub1 = get_subscription(
561-
validator_index,
562552
com1,
563553
current_slot + Slot::new(subscription_slot1),
564554
committee_count,
565555
true,
566556
);
567557

568558
let sub2 = get_subscription(
569-
validator_index,
570559
com2,
571560
current_slot + Slot::new(subscription_slot2),
572561
committee_count,
@@ -595,7 +584,7 @@ mod attestation_service {
595584

596585
// submit the subscriptions
597586
attestation_service
598-
.validator_subscriptions(vec![sub1, sub2])
587+
.validator_subscriptions(vec![sub1, sub2].into_iter())
599588
.unwrap();
600589

601590
// Unsubscription event should happen at the end of the slot.
@@ -668,7 +657,7 @@ mod attestation_service {
668657

669658
// submit the subscriptions
670659
attestation_service
671-
.validator_subscriptions(subscriptions)
660+
.validator_subscriptions(subscriptions.into_iter())
672661
.unwrap();
673662

674663
// There should only be the same subscriptions as there are in the specification,

consensus/types/src/validator_subscription.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ use ssz_derive::{Decode, Encode};
44

55
/// A validator subscription, created when a validator subscribes to a slot to perform optional aggregation
66
/// duties.
7-
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, Encode, Decode)]
7+
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, Encode, Decode, Eq, PartialOrd, Ord)]
88
pub struct ValidatorSubscription {
9-
/// The validators index.
10-
pub validator_index: u64,
119
/// The index of the committee within `slot` of which the validator is a member. Used by the
1210
/// beacon node to quickly evaluate the associated `SubnetId`.
1311
pub attestation_committee_index: CommitteeIndex,

0 commit comments

Comments
 (0)