Skip to content
238 changes: 125 additions & 113 deletions validator_client/validator_services/src/attestation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,23 +180,71 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
Ok(())
}

/// For each each required attestation, spawn a new task that downloads, signs and uploads the
/// attestation to the beacon node.
/// Spawn only one new task for attestation post-Electra
/// For each required aggregates, spawn a new task that downloads, signs and uploads the
/// aggregates to the beacon node.
fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let duration_to_next_slot = self
.slot_clock
.duration_to_next_slot()
.ok_or("Unable to determine duration to next slot")?;

// Create and publish an `Attestation` for all validators only once
// as the committee_index is not included in AttestationData post-Electra
let attestation_duties: Vec<_> = self.duties_service.attesters(slot).into_iter().collect();
let attestation_service = self.clone();

let attestation_data_handle = self
.inner
.executor
.spawn_handle(
async move {
let attestation_data = attestation_service
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, 0)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;

attestation_service
.sign_and_publish_attestations(
slot,
&attestation_duties,
attestation_data.clone(),
)
.await
.map_err(|e| {
crit!(
error = format!("{:?}", e),
slot = slot.as_u64(),
"Error during attestation routine"
);
e
})?;
Ok::<AttestationData, String>(attestation_data)
},
"unaggregated attestation production",
)
.ok_or("Failed to spawn attestation data task")?;

// If a validator needs to publish an aggregate attestation, they must do so at 2/3
// through the slot. This delay triggers at this time
let aggregate_production_instant = Instant::now()
+ duration_to_next_slot
.checked_sub(slot_duration / 3)
.unwrap_or_else(|| Duration::from_secs(0));

let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
let aggregate_duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
.duties_service
.attesters(slot)
.into_iter()
Expand All @@ -207,24 +255,45 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
map
});

// For each committee index for this slot:
//
// - Create and publish an `Attestation` for all required validators.
// - Create and publish `SignedAggregateAndProof` for all aggregating validators.
duties_by_committee_index
.into_iter()
.for_each(|(committee_index, validator_duties)| {
// Spawn a separate task for each attestation.
self.inner.executor.spawn_ignoring_error(
self.clone().publish_attestations_and_aggregates(
slot,
committee_index,
validator_duties,
aggregate_production_instant,
),
"attestation publish",
);
});
// Spawn a task that awaits the attestation data handle and then spawns aggregate tasks
let attestation_service_clone = self.clone();
let executor = self.inner.executor.clone();
self.inner.executor.spawn(
async move {
// Log an error if the handle fails and return, skipping aggregates
let attestation_data = match attestation_data_handle.await {
Ok(Some(Ok(data))) => data,
Ok(Some(Err(err))) => {
error!(?err, "Attestation production failed");
return;
}
Ok(None) | Err(_) => {
info!("Aborting attestation production due to shutdown");
return;
}
};

// For each committee index for this slot:
// Create and publish `SignedAggregateAndProof` for all aggregating validators.
aggregate_duties_by_committee_index.into_iter().for_each(
|(committee_index, validator_duties)| {
let attestation_service = attestation_service_clone.clone();
let attestation_data = attestation_data.clone();
executor.spawn_ignoring_error(
attestation_service.handle_aggregates(
slot,
committee_index,
validator_duties,
aggregate_production_instant,
attestation_data,
),
"aggregate publish",
);
},
)
},
"attestation and aggregate publish",
);

// Schedule pruning of the slashing protection database once all unaggregated
// attestations have (hopefully) been signed, i.e. at the same time as aggregate
Expand All @@ -234,114 +303,76 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
Ok(())
}

/// Performs the first step of the attesting process: downloading `Attestation` objects,
/// signing them and returning them to the validator.
///
/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting
///
/// ## Detail
///
/// The given `validator_duties` should already be filtered to only contain those that match
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
#[instrument(
name = "attestation_duty_cycle",
name = "handle_aggregates",
skip_all,
fields(%slot, %committee_index)
)]
async fn publish_attestations_and_aggregates(
async fn handle_aggregates(
self,
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: Vec<DutyAndProof>,
aggregate_production_instant: Instant,
attestation_data: AttestationData,
) -> Result<(), ()> {
let attestations_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS],
);

// There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have
// There's not need to produce `SignedAggregateAndProof` if we do not have
// any validators for the given `slot` and `committee_index`.
if validator_duties.is_empty() {
return Ok(());
}

// Step 1.
//
// Download, sign and publish an `Attestation` for each validator.
let attestation_opt = self
.produce_and_publish_attestations(slot, committee_index, &validator_duties)
.await
.map_err(move |e| {
crit!(
error = format!("{:?}", e),
committee_index,
slot = slot.as_u64(),
"Error during attestation routine"
)
})?;
// Wait until the `aggregation_production_instant` (2/3rds
// of the way though the slot). As verified in the
// `delay_triggers_when_in_the_past` test, this code will still run
// even if the instant has already elapsed.
sleep_until(aggregate_production_instant).await;

drop(attestations_timer);

// Step 2.
//
// If an attestation was produced, make an aggregate.
if let Some(attestation_data) = attestation_opt {
// First, wait until the `aggregation_production_instant` (2/3rds
// of the way though the slot). As verified in the
// `delay_triggers_when_in_the_past` test, this code will still run
// even if the instant has already elapsed.
sleep_until(aggregate_production_instant).await;

// Start the metrics timer *after* we've done the delay.
let _aggregates_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::AGGREGATES],
);

// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
self.produce_and_publish_aggregates(
&attestation_data,
committee_index,
&validator_duties,
)
// Start the metrics timer *after* we've done the delay.
let _aggregates_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::AGGREGATES],
);

// Download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
self.produce_and_publish_aggregates(&attestation_data, committee_index, &validator_duties)
.await
.map_err(move |e| {
crit!(
error = format!("{:?}", e),
committee_index,
slot = slot.as_u64(),
"Error during attestation routine"
"Error during aggregate attestation routine"
)
})?;
}

Ok(())
}

/// Performs the first step of the attesting process: downloading `Attestation` objects,
/// signing them and returning them to the validator.
/// Performs the main steps of the attesting process: signing and publishing to the BN.
///
/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting
/// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/validator.md#attesting
///
/// ## Detail
///
/// The given `validator_duties` should already be filtered to only contain those that match
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
///
/// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each
/// validator and the list of individually-signed `Attestation` objects is returned to the BN.
#[instrument(skip_all, fields(%slot, %committee_index))]
async fn produce_and_publish_attestations(
/// `slot`. Critical errors will be logged if this is not the case.
#[instrument(skip_all, fields(%slot, %attestation_data.beacon_block_root))]
async fn sign_and_publish_attestations(
&self,
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: &[DutyAndProof],
) -> Result<Option<AttestationData>, String> {
attestation_data: AttestationData,
) -> Result<(), String> {
let _attestations_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS],
);

if validator_duties.is_empty() {
return Ok(None);
return Ok(());
}

let current_epoch = self
Expand All @@ -350,23 +381,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.ok_or("Unable to determine current slot from clock")?
.epoch(S::E::slots_per_epoch());

let attestation_data = self
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.instrument(info_span!("fetch_attestation_data"))
.await
.map_err(|e| e.to_string())?;

// Create futures to produce signed `Attestation` objects.
let attestation_data_ref = &attestation_data;
let signing_futures = validator_duties.iter().map(|duty_and_proof| {
Expand Down Expand Up @@ -426,7 +440,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
info = "a validator may have recently been removed from this VC",
pubkey = ?pubkey,
validator = ?duty.pubkey,
committee_index = committee_index,
slot = slot.as_u64(),
"Missing pubkey for attestation"
);
Expand All @@ -436,7 +449,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
crit!(
error = ?e,
validator = ?duty.pubkey,
committee_index,
slot = slot.as_u64(),
"Failed to sign attestation"
);
Expand All @@ -460,7 +472,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,

if attestations.is_empty() {
warn!("No attestations were published");
return Ok(None);
return Ok(());
}
let fork_name = self
.chain_spec
Expand Down Expand Up @@ -525,7 +537,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
),
}

Ok(Some(attestation_data))
Ok(())
}

/// Performs the second step of the attesting process: downloading an aggregated `Attestation`,
Expand Down