diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 8211fb11f3e..b2b8bc81e22 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -180,8 +180,9 @@ impl AttestationService Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; let duration_to_next_slot = self @@ -189,6 +190,53 @@ impl AttestationService = self.duties_service.attesters(slot).into_iter().collect(); + let attestation_service = self.clone(); + + let attestation_data_handle = self + .inner + .executor + .spawn_handle( + async move { + let attestation_data = attestation_service + .beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_GET], + ); + beacon_node + .get_validator_attestation_data(slot, 0) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) + .map(|result| result.data) + }) + .await + .map_err(|e| e.to_string())?; + + attestation_service + .sign_and_publish_attestations( + slot, + &attestation_duties, + attestation_data.clone(), + ) + .await + .map_err(|e| { + crit!( + error = format!("{:?}", e), + slot = slot.as_u64(), + "Error during attestation routine" + ); + e + })?; + Ok::(attestation_data) + }, + "unaggregated attestation production", + ) + .ok_or("Failed to spawn attestation data task")?; + // If a validator needs to publish an aggregate attestation, they must do so at 2/3 // through the slot. This delay triggers at this time let aggregate_production_instant = Instant::now() @@ -196,7 +244,7 @@ impl AttestationService> = self + let aggregate_duties_by_committee_index: HashMap> = self .duties_service .attesters(slot) .into_iter() @@ -207,24 +255,45 @@ impl AttestationService data, + Ok(Some(Err(err))) => { + error!(?err, "Attestation production failed"); + return; + } + Ok(None) | Err(_) => { + info!("Aborting attestation production due to shutdown"); + return; + } + }; + + // For each committee index for this slot: + // Create and publish `SignedAggregateAndProof` for all aggregating validators. + aggregate_duties_by_committee_index.into_iter().for_each( + |(committee_index, validator_duties)| { + let attestation_service = attestation_service_clone.clone(); + let attestation_data = attestation_data.clone(); + executor.spawn_ignoring_error( + attestation_service.handle_aggregates( + slot, + committee_index, + validator_duties, + aggregate_production_instant, + attestation_data, + ), + "aggregate publish", + ); + }, + ) + }, + "attestation and aggregate publish", + ); // Schedule pruning of the slashing protection database once all unaggregated // attestations have (hopefully) been signed, i.e. at the same time as aggregate @@ -234,114 +303,76 @@ impl AttestationService, aggregate_production_instant: Instant, + attestation_data: AttestationData, ) -> Result<(), ()> { - let attestations_timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::ATTESTATIONS], - ); - - // There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have + // There's not need to produce `SignedAggregateAndProof` if we do not have // any validators for the given `slot` and `committee_index`. if validator_duties.is_empty() { return Ok(()); } - // Step 1. - // - // Download, sign and publish an `Attestation` for each validator. - let attestation_opt = self - .produce_and_publish_attestations(slot, committee_index, &validator_duties) - .await - .map_err(move |e| { - crit!( - error = format!("{:?}", e), - committee_index, - slot = slot.as_u64(), - "Error during attestation routine" - ) - })?; + // Wait until the `aggregation_production_instant` (2/3rds + // of the way though the slot). As verified in the + // `delay_triggers_when_in_the_past` test, this code will still run + // even if the instant has already elapsed. + sleep_until(aggregate_production_instant).await; - drop(attestations_timer); - - // Step 2. - // - // If an attestation was produced, make an aggregate. - if let Some(attestation_data) = attestation_opt { - // First, wait until the `aggregation_production_instant` (2/3rds - // of the way though the slot). As verified in the - // `delay_triggers_when_in_the_past` test, this code will still run - // even if the instant has already elapsed. - sleep_until(aggregate_production_instant).await; - - // Start the metrics timer *after* we've done the delay. - let _aggregates_timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::AGGREGATES], - ); - - // Then download, sign and publish a `SignedAggregateAndProof` for each - // validator that is elected to aggregate for this `slot` and - // `committee_index`. - self.produce_and_publish_aggregates( - &attestation_data, - committee_index, - &validator_duties, - ) + // Start the metrics timer *after* we've done the delay. + let _aggregates_timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::AGGREGATES], + ); + + // Download, sign and publish a `SignedAggregateAndProof` for each + // validator that is elected to aggregate for this `slot` and + // `committee_index`. + self.produce_and_publish_aggregates(&attestation_data, committee_index, &validator_duties) .await .map_err(move |e| { crit!( error = format!("{:?}", e), committee_index, slot = slot.as_u64(), - "Error during attestation routine" + "Error during aggregate attestation routine" ) })?; - } Ok(()) } - /// Performs the first step of the attesting process: downloading `Attestation` objects, - /// signing them and returning them to the validator. + /// Performs the main steps of the attesting process: signing and publishing to the BN. /// - /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting + /// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/validator.md#attesting /// /// ## Detail /// /// The given `validator_duties` should already be filtered to only contain those that match - /// `slot` and `committee_index`. Critical errors will be logged if this is not the case. - /// - /// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each - /// validator and the list of individually-signed `Attestation` objects is returned to the BN. - #[instrument(skip_all, fields(%slot, %committee_index))] - async fn produce_and_publish_attestations( + /// `slot`. Critical errors will be logged if this is not the case. + #[instrument(skip_all, fields(%slot, %attestation_data.beacon_block_root))] + async fn sign_and_publish_attestations( &self, slot: Slot, - committee_index: CommitteeIndex, validator_duties: &[DutyAndProof], - ) -> Result, String> { + attestation_data: AttestationData, + ) -> Result<(), String> { + let _attestations_timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS], + ); + if validator_duties.is_empty() { - return Ok(None); + return Ok(()); } let current_epoch = self @@ -350,23 +381,6 @@ impl AttestationService AttestationService AttestationService AttestationService AttestationService