diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index ccd0698161e..38019338554 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -37,13 +37,14 @@ mod validators; mod version; use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; -use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3}; +use crate::utils::{AnyVersionFilter, EthV1Filter}; +use crate::validator::post_validator_liveness_epoch; +use crate::validator::*; use crate::version::beacon_response; use beacon::states; use beacon_chain::{ - AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, - WhenSlotSkipped, attestation_verification::VerifiedAttestation, - observed_operations::ObservationOutcome, validator_monitor::timestamp_now, + BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped, + observed_operations::ObservationOutcome, }; use beacon_processor::BeaconProcessorSend; pub use block_id::BlockId; @@ -51,18 +52,20 @@ use builder_states::get_next_withdrawals; use bytes::Bytes; use directory::DEFAULT_ROOT_DIR; use eth2::StatusCode; +use eth2::lighthouse::sync_state::SyncState; use eth2::types::{ self as api_types, BroadcastValidation, ContextDeserialize, EndpointVersion, ForkChoice, - ForkChoiceExtraData, ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, - StateId as CoreStateId, ValidatorId, ValidatorStatus, + ForkChoiceExtraData, ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, ValidatorId, }; use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; use health_metrics::observe::Observe; -use lighthouse_network::rpc::methods::MetaData; -use lighthouse_network::{Enr, NetworkGlobals, PeerId, PubsubMessage, types::SyncState}; +use lighthouse_network::Enr; +use lighthouse_network::NetworkGlobals; +use lighthouse_network::PeerId; +use lighthouse_network::PubsubMessage; use lighthouse_version::version_with_platform; use logging::{SSELoggingComponents, crit}; -use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage}; +use network::{NetworkMessage, NetworkSenders}; use network_utils::enr_ext::EnrExt; use operation_pool::ReceivedPreCapella; use parking_lot::RwLock; @@ -83,24 +86,19 @@ use std::sync::Arc; use sysinfo::{System, SystemExt}; use system_health::{observe_nat, observe_system_health_bn}; use task_spawner::{Priority, TaskSpawner}; -use tokio::sync::{ - mpsc::{Sender, UnboundedSender}, - oneshot, -}; +use tokio::sync::mpsc::UnboundedSender; use tokio_stream::{ StreamExt, wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}, }; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; use types::{ - Attestation, AttestationData, AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint, - ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256, ProposerPreparationData, ProposerSlashing, - SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, - SingleAttestation, Slot, SyncCommitteeMessage, SyncContributionData, + Attestation, AttestationData, AttesterSlashing, BeaconStateError, Checkpoint, ConfigAndPreset, + Epoch, EthSpec, ForkName, Hash256, ProposerSlashing, SignedBlindedBeaconBlock, + SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation, Slot, SyncCommitteeMessage, }; use version::{ - ResponseIncludesVersion, V1, V2, V3, add_consensus_version_header, add_ssz_content_type_header, + ResponseIncludesVersion, V1, V2, add_consensus_version_header, add_ssz_content_type_header, execution_optimistic_finalized_beacon_response, inconsistent_fork_rejection, unsupported_version_rejection, }; @@ -360,16 +358,18 @@ pub fn serve( } // Create a filter that extracts the endpoint version. - let any_version = warp::path(API_PREFIX).and(warp::path::param::().or_else( - |_| async move { - Err(warp_utils::reject::custom_bad_request( - "Invalid version identifier".to_string(), - )) - }, - )); + let any_version = warp::path(API_PREFIX) + .and( + warp::path::param::().or_else(|_| async move { + Err(warp_utils::reject::custom_bad_request( + "Invalid version identifier".to_string(), + )) + }), + ) + .boxed(); // Filter that enforces a single endpoint version and then discards the `EndpointVersion`. - let single_version = |reqd: EndpointVersion| { + fn single_version(any_version: AnyVersionFilter, reqd: EndpointVersion) -> EthV1Filter { any_version .and_then(move |version| async move { if version == reqd { @@ -379,10 +379,11 @@ pub fn serve( } }) .untuple_one() - }; + .boxed() + } - let eth_v1 = single_version(V1); - let eth_v2 = single_version(V2); + let eth_v1 = single_version(any_version.clone(), V1); + let eth_v2 = single_version(any_version.clone(), V2); // Create a `warp` filter that provides access to the network globals. let inner_network_globals = ctx.network_globals.clone(); @@ -403,34 +404,34 @@ pub fn serve( // Create a `warp` filter that provides access to the beacon chain. let inner_ctx = ctx.clone(); - let chain_filter = - warp::any() - .map(move || inner_ctx.chain.clone()) - .and_then(|chain| async move { - match chain { - Some(chain) => Ok(chain), - None => Err(warp_utils::reject::custom_not_found( - "Beacon chain genesis has not yet been observed.".to_string(), - )), - } - }); + let chain_filter = warp::any() + .map(move || inner_ctx.chain.clone()) + .and_then(|chain| async move { + match chain { + Some(chain) => Ok(chain), + None => Err(warp_utils::reject::custom_not_found( + "Beacon chain genesis has not yet been observed.".to_string(), + )), + } + }) + .boxed(); // Create a `warp` filter that provides access to the network sender channel. let network_tx = ctx .network_senders .as_ref() .map(|senders| senders.network_send()); - let network_tx_filter = - warp::any() - .map(move || network_tx.clone()) - .and_then(|network_tx| async move { - match network_tx { - Some(network_tx) => Ok(network_tx), - None => Err(warp_utils::reject::custom_not_found( - "The networking stack has not yet started (network_tx).".to_string(), - )), - } - }); + let network_tx_filter = warp::any() + .map(move || network_tx.clone()) + .and_then(|network_tx| async move { + match network_tx { + Some(network_tx) => Ok(network_tx), + None => Err(warp_utils::reject::custom_not_found( + "The networking stack has not yet started (network_tx).".to_string(), + )), + } + }) + .boxed(); // Create a `warp` filter that provides access to the network attestation subscription channel. let validator_subscriptions_tx = ctx @@ -447,7 +448,8 @@ pub fn serve( .to_string(), )), } - }); + }) + .boxed(); // Create a `warp` filter that rejects requests whilst the node is syncing. let not_while_syncing_filter = @@ -487,7 +489,8 @@ pub fn serve( SyncState::Stalled => Ok(()), } }, - ); + ) + .boxed(); // Create a `warp` filter that returns 404s if the light client server is disabled. let light_client_server_filter = @@ -540,8 +543,9 @@ pub fn serve( .beacon_processor_send .clone() .filter(|_| config.enable_beacon_processor); - let task_spawner_filter = - warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone())); + let task_spawner_filter = warp::any() + .map(move || TaskSpawner::new(beacon_processor_send.clone())) + .boxed(); let duplicate_block_status_code = ctx.config.duplicate_block_status_code; @@ -553,6 +557,7 @@ pub fn serve( // GET beacon/genesis let get_beacon_genesis = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("genesis")) .and(warp::path::end()) @@ -576,6 +581,7 @@ pub fn serve( */ let beacon_states_path = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("states")) .and(warp::path::param::().or_else(|_| async { @@ -652,6 +658,7 @@ pub fn serve( // mechanism for arbitrary forwards block iteration, we only support iterating forwards along // the canonical chain. let get_beacon_headers = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("headers")) .and(warp::query::()) @@ -748,6 +755,7 @@ pub fn serve( // GET beacon/headers/{block_id} let get_beacon_headers_block_id = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("headers")) .and(warp::path::param::().or_else(|_| async { @@ -803,6 +811,7 @@ pub fn serve( // POST beacon/blocks let post_beacon_blocks = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("blocks")) .and(warp::path::end()) @@ -839,6 +848,7 @@ pub fn serve( ); let post_beacon_blocks_ssz = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("blocks")) .and(warp::path::end()) @@ -875,6 +885,7 @@ pub fn serve( ); let post_beacon_blocks_v2 = eth_v2 + .clone() .and(warp::path("beacon")) .and(warp::path("blocks")) .and(warp::query::()) @@ -914,6 +925,7 @@ pub fn serve( ); let post_beacon_blocks_v2_ssz = eth_v2 + .clone() .and(warp::path("beacon")) .and(warp::path("blocks")) .and(warp::query::()) @@ -957,6 +969,7 @@ pub fn serve( // POST beacon/blinded_blocks let post_beacon_blinded_blocks = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("blinded_blocks")) .and(warp::path::end()) @@ -984,6 +997,7 @@ pub fn serve( // POST beacon/blocks let post_beacon_blinded_blocks_ssz = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("blinded_blocks")) .and(warp::path::end()) @@ -1018,6 +1032,7 @@ pub fn serve( ); let post_beacon_blinded_blocks_v2 = eth_v2 + .clone() .and(warp::path("beacon")) .and(warp::path("blinded_blocks")) .and(warp::query::()) @@ -1057,6 +1072,7 @@ pub fn serve( ); let post_beacon_blinded_blocks_v2_ssz = eth_v2 + .clone() .and(warp::path("beacon")) .and(warp::path("blinded_blocks")) .and(warp::query::()) @@ -1099,6 +1115,7 @@ pub fn serve( }); let beacon_blocks_path_v1 = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("blocks")) .and(block_id_or_err) @@ -1106,6 +1123,7 @@ pub fn serve( .and(chain_filter.clone()); let beacon_blocks_path_any = any_version + .clone() .and(warp::path("beacon")) .and(warp::path("blocks")) .and(block_id_or_err) @@ -1231,6 +1249,7 @@ pub fn serve( // GET beacon/blinded_blocks/{block_id} let get_beacon_blinded_block = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("blinded_blocks")) .and(block_id_or_err) @@ -1283,6 +1302,7 @@ pub fn serve( // GET beacon/blob_sidecars/{block_id} let get_blob_sidecars = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("blob_sidecars")) .and(block_id_or_err) @@ -1334,6 +1354,7 @@ pub fn serve( // GET beacon/blobs/{block_id} let get_blobs = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("blobs")) .and(block_id_or_err) @@ -1383,18 +1404,21 @@ pub fn serve( */ let beacon_pool_path = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("pool")) .and(task_spawner_filter.clone()) .and(chain_filter.clone()); let beacon_pool_path_v2 = eth_v2 + .clone() .and(warp::path("beacon")) .and(warp::path("pool")) .and(task_spawner_filter.clone()) .and(chain_filter.clone()); let beacon_pool_path_any = any_version + .clone() .and(warp::path("beacon")) .and(warp::path("pool")) .and(task_spawner_filter.clone()) @@ -1524,7 +1548,7 @@ pub fn serve( .register_api_attester_slashing(slashing.to_ref()); if let ObservationOutcome::New(slashing) = outcome { - publish_pubsub_message( + utils::publish_pubsub_message( &network_tx, PubsubMessage::AttesterSlashing(Box::new( slashing.clone().into_inner(), @@ -1615,7 +1639,7 @@ pub fn serve( .register_api_proposer_slashing(&slashing); if let ObservationOutcome::New(slashing) = outcome { - publish_pubsub_message( + utils::publish_pubsub_message( &network_tx, PubsubMessage::ProposerSlashing(Box::new( slashing.clone().into_inner(), @@ -1673,7 +1697,7 @@ pub fn serve( .register_api_voluntary_exit(&exit.message); if let ObservationOutcome::New(exit) = outcome { - publish_pubsub_message( + utils::publish_pubsub_message( &network_tx, PubsubMessage::VoluntaryExit(Box::new(exit.clone().into_inner())), )?; @@ -1770,7 +1794,7 @@ pub fn serve( ReceivedPreCapella::Yes }; if matches!(received_pre_capella, ReceivedPreCapella::No) { - publish_pubsub_message( + utils::publish_pubsub_message( &network_tx, PubsubMessage::BlsToExecutionChange(Box::new( verified_address_change.as_inner().clone(), @@ -1824,6 +1848,7 @@ pub fn serve( ); let beacon_rewards_path = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("rewards")) .and(task_spawner_filter.clone()) @@ -1854,6 +1879,7 @@ pub fn serve( */ let builder_states_path = eth_v1 + .clone() .and(warp::path("builder")) .and(warp::path("states")) .and(chain_filter.clone()); @@ -1908,6 +1934,7 @@ pub fn serve( */ let beacon_light_client_path = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("light_client")) .and(light_client_server_filter) @@ -2060,6 +2087,7 @@ pub fn serve( */ let beacon_rewards_path = eth_v1 + .clone() .and(warp::path("beacon")) .and(warp::path("rewards")) .and(task_spawner_filter.clone()) @@ -2144,10 +2172,11 @@ pub fn serve( * config */ - let config_path = eth_v1.and(warp::path("config")); + let config_path = eth_v1.clone().and(warp::path("config")); // GET config/fork_schedule let get_config_fork_schedule = config_path + .clone() .and(warp::path("fork_schedule")) .and(warp::path::end()) .and(task_spawner_filter.clone()) @@ -2166,6 +2195,7 @@ pub fn serve( // GET config/spec let get_config_spec = config_path + .clone() .and(warp::path("spec")) .and(warp::path::end()) .and(task_spawner_filter.clone()) @@ -2205,6 +2235,7 @@ pub fn serve( // GET debug/beacon/data_column_sidecars/{block_id} let get_debug_data_column_sidecars = eth_v1 + .clone() .and(warp::path("debug")) .and(warp::path("beacon")) .and(warp::path("data_column_sidecars")) @@ -2254,6 +2285,7 @@ pub fn serve( // GET debug/beacon/states/{state_id} let get_debug_beacon_states = any_version + .clone() .and(warp::path("debug")) .and(warp::path("beacon")) .and(warp::path("states")) @@ -2329,6 +2361,7 @@ pub fn serve( // GET debug/beacon/heads let get_debug_beacon_heads = any_version + .clone() .and(warp::path("debug")) .and(warp::path("beacon")) .and(warp::path("heads")) @@ -2369,6 +2402,7 @@ pub fn serve( // GET debug/fork_choice let get_debug_fork_choice = eth_v1 + .clone() .and(warp::path("debug")) .and(warp::path("fork_choice")) .and(warp::path::end()) @@ -2450,6 +2484,7 @@ pub fn serve( // GET node/identity let get_node_identity = eth_v1 + .clone() .and(warp::path("node")) .and(warp::path("identity")) .and(warp::path::end()) @@ -2469,7 +2504,7 @@ pub fn serve( enr, p2p_addresses, discovery_addresses, - metadata: from_meta_data::( + metadata: utils::from_meta_data::( &network_globals.local_metadata, &chain.spec, ), @@ -2480,6 +2515,7 @@ pub fn serve( // GET node/version let get_node_version = eth_v1 + .clone() .and(warp::path("node")) .and(warp::path("version")) .and(warp::path::end()) @@ -2493,6 +2529,7 @@ pub fn serve( // GET node/syncing let get_node_syncing = eth_v1 + .clone() .and(warp::path("node")) .and(warp::path("syncing")) .and(warp::path::end()) @@ -2554,6 +2591,7 @@ pub fn serve( // GET node/health let get_node_health = eth_v1 + .clone() .and(warp::path("node")) .and(warp::path("health")) .and(warp::path::end()) @@ -2602,6 +2640,7 @@ pub fn serve( // GET node/peers/{peer_id} let get_node_peers_by_id = eth_v1 + .clone() .and(warp::path("node")) .and(warp::path("peers")) .and(warp::path::param::()) @@ -2656,6 +2695,7 @@ pub fn serve( // GET node/peers let get_node_peers = eth_v1 + .clone() .and(warp::path("node")) .and(warp::path("peers")) .and(warp::path::end()) @@ -2720,6 +2760,7 @@ pub fn serve( // GET node/peer_count let get_node_peer_count = eth_v1 + .clone() .and(warp::path("node")) .and(warp::path("peer_count")) .and(warp::path::end()) @@ -2763,804 +2804,124 @@ pub fn serve( */ // GET validator/duties/proposer/{epoch} - let get_validator_duties_proposer = eth_v1 - .and(warp::path("validator")) - .and(warp::path("duties")) - .and(warp::path("proposer")) - .and(warp::path::param::().or_else(|_| async { - Err(warp_utils::reject::custom_bad_request( - "Invalid epoch".to_string(), - )) - })) - .and(warp::path::end()) - .and(not_while_syncing_filter.clone()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |epoch: Epoch, - not_synced_filter: Result<(), Rejection>, - task_spawner: TaskSpawner, - chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - proposer_duties::proposer_duties(epoch, &chain) - }) - }, - ); + let get_validator_duties_proposer = get_validator_duties_proposer( + eth_v1.clone().clone(), + chain_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); // GET validator/blocks/{slot} - let get_validator_blocks = any_version - .and(warp::path("validator")) - .and(warp::path("blocks")) - .and(warp::path::param::().or_else(|_| async { - Err(warp_utils::reject::custom_bad_request( - "Invalid slot".to_string(), - )) - })) - .and(warp::path::end()) - .and(warp::header::optional::("accept")) - .and(not_while_syncing_filter.clone()) - .and(warp::query::()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |endpoint_version: EndpointVersion, - slot: Slot, - accept_header: Option, - not_synced_filter: Result<(), Rejection>, - query: api_types::ValidatorBlocksQuery, - task_spawner: TaskSpawner, - chain: Arc>| { - task_spawner.spawn_async_with_rejection(Priority::P0, async move { - debug!(?slot, "Block production request from HTTP API"); - - not_synced_filter?; - - if endpoint_version == V3 { - produce_block_v3(accept_header, chain, slot, query).await - } else { - produce_block_v2(accept_header, chain, slot, query).await - } - }) - }, - ); + let get_validator_blocks = get_validator_blocks( + any_version.clone().clone(), + chain_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); // GET validator/blinded_blocks/{slot} - let get_validator_blinded_blocks = eth_v1 - .and(warp::path("validator")) - .and(warp::path("blinded_blocks")) - .and(warp::path::param::().or_else(|_| async { - Err(warp_utils::reject::custom_bad_request( - "Invalid slot".to_string(), - )) - })) - .and(warp::path::end()) - .and(not_while_syncing_filter.clone()) - .and(warp::query::()) - .and(warp::header::optional::("accept")) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |slot: Slot, - not_synced_filter: Result<(), Rejection>, - query: api_types::ValidatorBlocksQuery, - accept_header: Option, - task_spawner: TaskSpawner, - chain: Arc>| { - task_spawner.spawn_async_with_rejection(Priority::P0, async move { - not_synced_filter?; - produce_blinded_block_v2(accept_header, chain, slot, query).await - }) - }, - ); + let get_validator_blinded_blocks = get_validator_blinded_blocks( + eth_v1.clone().clone(), + chain_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); // GET validator/attestation_data?slot,committee_index - let get_validator_attestation_data = eth_v1 - .and(warp::path("validator")) - .and(warp::path("attestation_data")) - .and(warp::path::end()) - .and(warp::query::()) - .and(not_while_syncing_filter.clone()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |query: api_types::ValidatorAttestationDataQuery, - not_synced_filter: Result<(), Rejection>, - task_spawner: TaskSpawner, - chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - - let current_slot = chain.slot().map_err(warp_utils::reject::unhandled_error)?; - - // allow a tolerance of one slot to account for clock skew - if query.slot > current_slot + 1 { - return Err(warp_utils::reject::custom_bad_request(format!( - "request slot {} is more than one slot past the current slot {}", - query.slot, current_slot - ))); - } - - chain - .produce_unaggregated_attestation(query.slot, query.committee_index) - .map(|attestation| attestation.data().clone()) - .map(api_types::GenericResponse::from) - .map_err(warp_utils::reject::unhandled_error) - }) - }, - ); + let get_validator_attestation_data = get_validator_attestation_data( + eth_v1.clone().clone(), + chain_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); // GET validator/aggregate_attestation?attestation_data_root,slot - let get_validator_aggregate_attestation = any_version - .and(warp::path("validator")) - .and(warp::path("aggregate_attestation")) - .and(warp::path::end()) - .and(warp::query::()) - .and(not_while_syncing_filter.clone()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |endpoint_version: EndpointVersion, - query: api_types::ValidatorAggregateAttestationQuery, - not_synced_filter: Result<(), Rejection>, - task_spawner: TaskSpawner, - chain: Arc>| { - task_spawner.blocking_response_task(Priority::P0, move || { - not_synced_filter?; - crate::aggregate_attestation::get_aggregate_attestation( - query.slot, - &query.attestation_data_root, - query.committee_index, - endpoint_version, - chain, - ) - }) - }, - ); + let get_validator_aggregate_attestation = get_validator_aggregate_attestation( + any_version.clone().clone(), + chain_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); // POST validator/duties/attester/{epoch} - let post_validator_duties_attester = eth_v1 - .and(warp::path("validator")) - .and(warp::path("duties")) - .and(warp::path("attester")) - .and(warp::path::param::().or_else(|_| async { - Err(warp_utils::reject::custom_bad_request( - "Invalid epoch".to_string(), - )) - })) - .and(warp::path::end()) - .and(not_while_syncing_filter.clone()) - .and(warp_utils::json::json()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |epoch: Epoch, - not_synced_filter: Result<(), Rejection>, - indices: api_types::ValidatorIndexData, - task_spawner: TaskSpawner, - chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - attester_duties::attester_duties(epoch, &indices.0, &chain) - }) - }, - ); + let post_validator_duties_attester = post_validator_duties_attester( + eth_v1.clone().clone(), + chain_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); // POST validator/duties/sync/{epoch} - let post_validator_duties_sync = eth_v1 - .and(warp::path("validator")) - .and(warp::path("duties")) - .and(warp::path("sync")) - .and(warp::path::param::().or_else(|_| async { - Err(warp_utils::reject::custom_bad_request( - "Invalid epoch".to_string(), - )) - })) - .and(warp::path::end()) - .and(not_while_syncing_filter.clone()) - .and(warp_utils::json::json()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |epoch: Epoch, - not_synced_filter: Result<(), Rejection>, - indices: api_types::ValidatorIndexData, - task_spawner: TaskSpawner, - chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - sync_committees::sync_committee_duties(epoch, &indices.0, &chain) - }) - }, - ); + let post_validator_duties_sync = post_validator_duties_sync( + eth_v1.clone().clone(), + chain_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); // GET validator/sync_committee_contribution - let get_validator_sync_committee_contribution = eth_v1 - .and(warp::path("validator")) - .and(warp::path("sync_committee_contribution")) - .and(warp::path::end()) - .and(warp::query::()) - .and(not_while_syncing_filter.clone()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |sync_committee_data: SyncContributionData, - not_synced_filter: Result<(), Rejection>, - task_spawner: TaskSpawner, - chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - chain - .get_aggregated_sync_committee_contribution(&sync_committee_data) - .map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "unable to fetch sync contribution: {:?}", - e - )) - })? - .map(api_types::GenericResponse::from) - .ok_or_else(|| { - warp_utils::reject::custom_not_found( - "no matching sync contribution found".to_string(), - ) - }) - }) - }, - ); + let get_validator_sync_committee_contribution = get_validator_sync_committee_contribution( + eth_v1.clone().clone(), + chain_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); // POST validator/aggregate_and_proofs - let post_validator_aggregate_and_proofs = any_version - .and(warp::path("validator")) - .and(warp::path("aggregate_and_proofs")) - .and(warp::path::end()) - .and(not_while_syncing_filter.clone()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .and(warp_utils::json::json()) - .and(network_tx_filter.clone()) - .then( - // V1 and V2 are identical except V2 has a consensus version header in the request. - // We only require this header for SSZ deserialization, which isn't supported for - // this endpoint presently. - |_endpoint_version: EndpointVersion, - not_synced_filter: Result<(), Rejection>, - task_spawner: TaskSpawner, - chain: Arc>, - aggregates: Vec>, - network_tx: UnboundedSender>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - let seen_timestamp = timestamp_now(); - let mut verified_aggregates = Vec::with_capacity(aggregates.len()); - let mut messages = Vec::with_capacity(aggregates.len()); - let mut failures = Vec::new(); - - // Verify that all messages in the post are valid before processing further - for (index, aggregate) in aggregates.iter().enumerate() { - match chain.verify_aggregated_attestation_for_gossip(aggregate) { - Ok(verified_aggregate) => { - messages.push(PubsubMessage::AggregateAndProofAttestation(Box::new( - verified_aggregate.aggregate().clone(), - ))); - - // Notify the validator monitor. - chain - .validator_monitor - .read() - .register_api_aggregated_attestation( - seen_timestamp, - verified_aggregate.aggregate(), - verified_aggregate.indexed_attestation(), - &chain.slot_clock, - ); - - verified_aggregates.push((index, verified_aggregate)); - } - // If we already know the attestation, don't broadcast it or attempt to - // further verify it. Return success. - // - // It's reasonably likely that two different validators produce - // identical aggregates, especially if they're using the same beacon - // node. - Err(AttnError::AttestationSupersetKnown(_)) => continue, - // If we've already seen this aggregator produce an aggregate, just - // skip this one. - // - // We're likely to see this with VCs that use fallback BNs. The first - // BN might time-out *after* publishing the aggregate and then the - // second BN will indicate it's already seen the aggregate. - // - // There's no actual error for the user or the network since the - // aggregate has been successfully published by some other node. - Err(AttnError::AggregatorAlreadyKnown(_)) => continue, - Err(e) => { - error!( - error = ?e, - request_index = index, - aggregator_index = aggregate.message().aggregator_index(), - attestation_index = aggregate.message().aggregate().committee_index(), - attestation_slot = %aggregate.message().aggregate().data().slot, - "Failure verifying aggregate and proofs" - ); - failures.push(api_types::Failure::new(index, format!("Verification: {:?}", e))); - } - } - } - - // Publish aggregate attestations to the libp2p network - if !messages.is_empty() { - publish_network_message(&network_tx, NetworkMessage::Publish { messages })?; - } - - // Import aggregate attestations - for (index, verified_aggregate) in verified_aggregates { - if let Err(e) = chain.apply_attestation_to_fork_choice(&verified_aggregate) { - error!( - error = ?e, - request_index = index, - aggregator_index = verified_aggregate.aggregate().message().aggregator_index(), - attestation_index = verified_aggregate.attestation().committee_index(), - attestation_slot = %verified_aggregate.attestation().data().slot, - "Failure applying verified aggregate attestation to fork choice" - ); - failures.push(api_types::Failure::new(index, format!("Fork choice: {:?}", e))); - } - if let Err(e) = chain.add_to_block_inclusion_pool(verified_aggregate) { - warn!( - error = ?e, - request_index = index, - "Could not add verified aggregate attestation to the inclusion pool" - ); - failures.push(api_types::Failure::new(index, format!("Op pool: {:?}", e))); - } - } - - if !failures.is_empty() { - Err(warp_utils::reject::indexed_bad_request("error processing aggregate and proofs".to_string(), - failures, - )) - } else { - Ok(()) - } - }) - }, - ); + let post_validator_aggregate_and_proofs = post_validator_aggregate_and_proofs( + any_version.clone().clone(), + chain_filter.clone(), + network_tx_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); - let post_validator_contribution_and_proofs = eth_v1 - .and(warp::path("validator")) - .and(warp::path("contribution_and_proofs")) - .and(warp::path::end()) - .and(not_while_syncing_filter.clone()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .and(warp_utils::json::json()) - .and(network_tx_filter.clone()) - .then( - |not_synced_filter: Result<(), Rejection>, - task_spawner: TaskSpawner, - chain: Arc>, - contributions: Vec>, - network_tx: UnboundedSender>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - sync_committees::process_signed_contribution_and_proofs( - contributions, - network_tx, - &chain, - )?; - Ok(api_types::GenericResponse::from(())) - }) - }, - ); + let post_validator_contribution_and_proofs = post_validator_contribution_and_proofs( + eth_v1.clone().clone(), + chain_filter.clone(), + network_tx_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); // POST validator/beacon_committee_subscriptions - let post_validator_beacon_committee_subscriptions = eth_v1 - .and(warp::path("validator")) - .and(warp::path("beacon_committee_subscriptions")) - .and(warp::path::end()) - .and(warp_utils::json::json()) - .and(validator_subscription_tx_filter.clone()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |committee_subscriptions: Vec, - validator_subscription_tx: Sender, - task_spawner: TaskSpawner, - chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - let subscriptions: std::collections::BTreeSet<_> = committee_subscriptions - .iter() - .map(|subscription| { - chain - .validator_monitor - .write() - .auto_register_local_validator(subscription.validator_index); - api_types::ValidatorSubscription { - attestation_committee_index: subscription.committee_index, - slot: subscription.slot, - committee_count_at_slot: subscription.committees_at_slot, - is_aggregator: subscription.is_aggregator, - } - }) - .collect(); - - let message = - ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions }; - if let Err(e) = validator_subscription_tx.try_send(message) { - warn!( - info = "the host may be overloaded or resource-constrained", - error = ?e, - "Unable to process committee subscriptions" - ); - return Err(warp_utils::reject::custom_server_error( - "unable to queue subscription, host may be overloaded or shutting down" - .to_string(), - )); - } - Ok(()) - }) - }, + let post_validator_beacon_committee_subscriptions = + post_validator_beacon_committee_subscriptions( + eth_v1.clone().clone(), + chain_filter.clone(), + validator_subscription_tx_filter.clone(), + task_spawner_filter.clone(), ); // POST validator/prepare_beacon_proposer - let post_validator_prepare_beacon_proposer = eth_v1 - .and(warp::path("validator")) - .and(warp::path("prepare_beacon_proposer")) - .and(warp::path::end()) - .and(not_while_syncing_filter.clone()) - .and(network_tx_filter.clone()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .and(warp_utils::json::json()) - .then( - |not_synced_filter: Result<(), Rejection>, - network_tx: UnboundedSender>, - task_spawner: TaskSpawner, - chain: Arc>, - preparation_data: Vec| { - task_spawner.spawn_async_with_rejection(Priority::P0, async move { - not_synced_filter?; - let execution_layer = chain - .execution_layer - .as_ref() - .ok_or(BeaconChainError::ExecutionLayerMissing) - .map_err(warp_utils::reject::unhandled_error)?; - - let current_slot = chain - .slot_clock - .now_or_genesis() - .ok_or(BeaconChainError::UnableToReadSlot) - .map_err(warp_utils::reject::unhandled_error)?; - let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); - - debug!( - count = preparation_data.len(), - "Received proposer preparation data" - ); - - execution_layer - .update_proposer_preparation( - current_epoch, - preparation_data.iter().map(|data| (data, &None)), - ) - .await; - - chain - .prepare_beacon_proposer(current_slot) - .await - .map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "error updating proposer preparations: {:?}", - e - )) - })?; - - if chain.spec.is_peer_das_scheduled() { - let (finalized_beacon_state, _, _) = - StateId(CoreStateId::Finalized).state(&chain)?; - let validators_and_balances = preparation_data - .iter() - .filter_map(|preparation| { - if let Ok(effective_balance) = finalized_beacon_state - .get_effective_balance(preparation.validator_index as usize) - { - Some((preparation.validator_index as usize, effective_balance)) - } else { - None - } - }) - .collect::>(); - - let current_slot = - chain.slot().map_err(warp_utils::reject::unhandled_error)?; - if let Some(cgc_change) = chain - .data_availability_checker - .custody_context() - .register_validators(validators_and_balances, current_slot, &chain.spec) - { - chain.update_data_column_custody_info(Some( - cgc_change - .effective_epoch - .start_slot(T::EthSpec::slots_per_epoch()), - )); - - network_tx.send(NetworkMessage::CustodyCountChanged { - new_custody_group_count: cgc_change.new_custody_group_count, - sampling_count: cgc_change.sampling_count, - }).unwrap_or_else(|e| { - debug!(error = %e, "Could not send message to the network service. \ - Likely shutdown") - }); - } - } - - Ok::<_, warp::reject::Rejection>(warp::reply::json(&()).into_response()) - }) - }, - ); + let post_validator_prepare_beacon_proposer = post_validator_prepare_beacon_proposer( + eth_v1.clone().clone(), + chain_filter.clone(), + network_tx_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); // POST validator/register_validator - let post_validator_register_validator = eth_v1 - .and(warp::path("validator")) - .and(warp::path("register_validator")) - .and(warp::path::end()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .and(warp_utils::json::json()) - .then( - |task_spawner: TaskSpawner, - chain: Arc>, - register_val_data: Vec| async { - let (tx, rx) = oneshot::channel(); - - let initial_result = task_spawner - .spawn_async_with_rejection_no_conversion(Priority::P0, async move { - let execution_layer = chain - .execution_layer - .as_ref() - .ok_or(BeaconChainError::ExecutionLayerMissing) - .map_err(warp_utils::reject::unhandled_error)?; - let current_slot = chain - .slot_clock - .now_or_genesis() - .ok_or(BeaconChainError::UnableToReadSlot) - .map_err(warp_utils::reject::unhandled_error)?; - let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); - - debug!( - count = register_val_data.len(), - "Received register validator request" - ); - - let head_snapshot = chain.head_snapshot(); - let spec = &chain.spec; - - let (preparation_data, filtered_registration_data): ( - Vec<(ProposerPreparationData, Option)>, - Vec, - ) = register_val_data - .into_iter() - .filter_map(|register_data| { - chain - .validator_index(®ister_data.message.pubkey) - .ok() - .flatten() - .and_then(|validator_index| { - let validator = head_snapshot - .beacon_state - .get_validator(validator_index) - .ok()?; - let validator_status = ValidatorStatus::from_validator( - validator, - current_epoch, - spec.far_future_epoch, - ) - .superstatus(); - let is_active_or_pending = - matches!(validator_status, ValidatorStatus::Pending) - || matches!( - validator_status, - ValidatorStatus::Active - ); - - // Filter out validators who are not 'active' or 'pending'. - is_active_or_pending.then_some({ - ( - ( - ProposerPreparationData { - validator_index: validator_index as u64, - fee_recipient: register_data - .message - .fee_recipient, - }, - Some(register_data.message.gas_limit), - ), - register_data, - ) - }) - }) - }) - .unzip(); - - // Update the prepare beacon proposer cache based on this request. - execution_layer - .update_proposer_preparation( - current_epoch, - preparation_data.iter().map(|(data, limit)| (data, limit)), - ) - .await; - - // Call prepare beacon proposer blocking with the latest update in order to make - // sure we have a local payload to fall back to in the event of the blinded block - // flow failing. - chain - .prepare_beacon_proposer(current_slot) - .await - .map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "error updating proposer preparations: {:?}", - e - )) - })?; - - info!( - count = filtered_registration_data.len(), - "Forwarding register validator request to connected builder" - ); - - // It's a waste of a `BeaconProcessor` worker to just - // wait on a response from the builder (especially since - // they have frequent timeouts). Spawn a new task and - // send the response back to our original HTTP request - // task via a channel. - let builder_future = async move { - let arc_builder = chain - .execution_layer - .as_ref() - .ok_or(BeaconChainError::ExecutionLayerMissing) - .map_err(warp_utils::reject::unhandled_error)? - .builder(); - let builder = arc_builder - .as_ref() - .ok_or(BeaconChainError::BuilderMissing) - .map_err(warp_utils::reject::unhandled_error)?; - builder - .post_builder_validators(&filtered_registration_data) - .await - .map(|resp| warp::reply::json(&resp).into_response()) - .map_err(|e| { - warn!( - num_registrations = filtered_registration_data.len(), - error = ?e, - "Relay error when registering validator(s)" - ); - // Forward the HTTP status code if we are able to, otherwise fall back - // to a server error. - if let eth2::Error::ServerMessage(message) = e { - if message.code == StatusCode::BAD_REQUEST.as_u16() { - return warp_utils::reject::custom_bad_request( - message.message, - ); - } else { - // According to the spec this response should only be a 400 or 500, - // so we fall back to a 500 here. - return warp_utils::reject::custom_server_error( - message.message, - ); - } - } - warp_utils::reject::custom_server_error(format!("{e:?}")) - }) - }; - tokio::task::spawn(async move { tx.send(builder_future.await) }); - - // Just send a generic 200 OK from this closure. We'll - // ignore the `Ok` variant and form a proper response - // from what is sent back down the channel. - Ok(warp::reply::reply().into_response()) - }) - .await; - - if initial_result.is_err() { - return convert_rejection(initial_result).await; - } - - // Await a response from the builder without blocking a - // `BeaconProcessor` worker. - convert_rejection(rx.await.unwrap_or_else(|_| { - Ok(warp::reply::with_status( - warp::reply::json(&"No response from channel"), - warp::http::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response()) - })) - .await - }, - ); + let post_validator_register_validator = post_validator_register_validator( + eth_v1.clone().clone(), + chain_filter.clone(), + task_spawner_filter.clone(), + ); // POST validator/sync_committee_subscriptions - let post_validator_sync_committee_subscriptions = eth_v1 - .and(warp::path("validator")) - .and(warp::path("sync_committee_subscriptions")) - .and(warp::path::end()) - .and(warp_utils::json::json()) - .and(validator_subscription_tx_filter) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |subscriptions: Vec, - validator_subscription_tx: Sender, - task_spawner: TaskSpawner, - chain: Arc>, - | { - task_spawner.blocking_json_task(Priority::P0, move || { - for subscription in subscriptions { - chain - .validator_monitor - .write() - .auto_register_local_validator(subscription.validator_index); - - let message = ValidatorSubscriptionMessage::SyncCommitteeSubscribe { - subscriptions: vec![subscription], - }; - if let Err(e) = validator_subscription_tx.try_send(message) { - warn!( - info = "the host may be overloaded or resource-constrained", - error = ?e, - "Unable to process sync subscriptions" - ); - return Err(warp_utils::reject::custom_server_error( - "unable to queue subscription, host may be overloaded or shutting down".to_string(), - )); - } - } - - Ok(()) - }) - }, - ); + let post_validator_sync_committee_subscriptions = post_validator_sync_committee_subscriptions( + eth_v1.clone().clone(), + chain_filter.clone(), + validator_subscription_tx_filter.clone(), + task_spawner_filter.clone(), + ); // POST validator/liveness/{epoch} - let post_validator_liveness_epoch = eth_v1 - .and(warp::path("validator")) - .and(warp::path("liveness")) - .and(warp::path::param::()) - .and(warp::path::end()) - .and(warp_utils::json::json()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |epoch: Epoch, - indices: api_types::ValidatorIndexData, - task_spawner: TaskSpawner, - chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - // Ensure the request is for either the current, previous or next epoch. - let current_epoch = - chain.epoch().map_err(warp_utils::reject::unhandled_error)?; - let prev_epoch = current_epoch.saturating_sub(Epoch::new(1)); - let next_epoch = current_epoch.saturating_add(Epoch::new(1)); - - if epoch < prev_epoch || epoch > next_epoch { - return Err(warp_utils::reject::custom_bad_request(format!( - "request epoch {} is more than one epoch from the current epoch {}", - epoch, current_epoch - ))); - } - - let liveness: Vec = indices - .0 - .iter() - .cloned() - .map(|index| { - let is_live = chain.validator_seen_at_epoch(index as usize, epoch); - api_types::StandardLivenessResponseData { index, is_live } - }) - .collect(); - - Ok(api_types::GenericResponse::from(liveness)) - }) - }, - ); + let post_validator_liveness_epoch = post_validator_liveness_epoch( + eth_v1.clone().clone(), + chain_filter.clone(), + task_spawner_filter.clone(), + ); // POST lighthouse/finalize let post_lighthouse_finalize = warp::path("lighthouse") @@ -3632,7 +2993,10 @@ pub fn serve( ); network_globals.add_trusted_peer(enr.clone()); - publish_network_message(&network_tx, NetworkMessage::ConnectTrustedPeer(enr))?; + utils::publish_network_message( + &network_tx, + NetworkMessage::ConnectTrustedPeer(enr), + )?; Ok(()) }) @@ -3663,7 +3027,7 @@ pub fn serve( ); network_globals.remove_trusted_peer(enr.clone()); - publish_network_message( + utils::publish_network_message( &network_tx, NetworkMessage::DisconnectTrustedPeer(enr), )?; @@ -4115,6 +3479,7 @@ pub fn serve( ); let get_events = eth_v1 + .clone() .and(warp::path("events")) .and(warp::path::end()) .and(multi_key_query::()) @@ -4436,70 +3801,3 @@ pub fn serve( Ok(http_server) } - -fn from_meta_data( - meta_data: &RwLock>, - spec: &ChainSpec, -) -> api_types::MetaData { - let meta_data = meta_data.read(); - let format_hex = |bytes: &[u8]| format!("0x{}", hex::encode(bytes)); - - let seq_number = *meta_data.seq_number(); - let attnets = format_hex(&meta_data.attnets().clone().into_bytes()); - let syncnets = format_hex( - &meta_data - .syncnets() - .cloned() - .unwrap_or_default() - .into_bytes(), - ); - - if spec.is_peer_das_scheduled() { - api_types::MetaData::V3(api_types::MetaDataV3 { - seq_number, - attnets, - syncnets, - custody_group_count: meta_data.custody_group_count().cloned().unwrap_or_default(), - }) - } else { - api_types::MetaData::V2(api_types::MetaDataV2 { - seq_number, - attnets, - syncnets, - }) - } -} - -/// Publish a message to the libp2p pubsub network. -fn publish_pubsub_message( - network_tx: &UnboundedSender>, - message: PubsubMessage, -) -> Result<(), warp::Rejection> { - publish_network_message( - network_tx, - NetworkMessage::Publish { - messages: vec![message], - }, - ) -} - -/// Publish a message to the libp2p pubsub network. -fn publish_pubsub_messages( - network_tx: &UnboundedSender>, - messages: Vec>, -) -> Result<(), warp::Rejection> { - publish_network_message(network_tx, NetworkMessage::Publish { messages }) -} - -/// Publish a message to the libp2p network. -fn publish_network_message( - network_tx: &UnboundedSender>, - message: NetworkMessage, -) -> Result<(), warp::Rejection> { - network_tx.send(message).map_err(|e| { - warp_utils::reject::custom_server_error(format!( - "unable to publish to network channel: {}", - e - )) - }) -} diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 9671a72da26..b54c071eb80 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -138,9 +138,10 @@ pub async fn publish_block>( "Signed block published to network via HTTP API" ); - crate::publish_pubsub_message(&sender, PubsubMessage::BeaconBlock(block.clone())).map_err( - |_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish)), - )?; + crate::utils::publish_pubsub_message(&sender, PubsubMessage::BeaconBlock(block.clone())) + .map_err(|_| { + BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish)) + })?; Ok(()) }; @@ -492,7 +493,7 @@ fn publish_blob_sidecars( blob: &GossipVerifiedBlob, ) -> Result<(), BlockError> { let pubsub_message = PubsubMessage::BlobSidecar(Box::new((blob.index(), blob.clone_blob()))); - crate::publish_pubsub_message(sender_clone, pubsub_message) + crate::utils::publish_pubsub_message(sender_clone, pubsub_message) .map_err(|_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish))) } @@ -525,7 +526,7 @@ fn publish_column_sidecars( PubsubMessage::DataColumnSidecar(Box::new((subnet, data_col))) }) .collect::>(); - crate::publish_pubsub_messages(sender_clone, pubsub_messages) + crate::utils::publish_pubsub_messages(sender_clone, pubsub_messages) .map_err(|_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish))) } diff --git a/beacon_node/http_api/src/sync_committees.rs b/beacon_node/http_api/src/sync_committees.rs index edda0e60a61..b9fa24ad6a4 100644 --- a/beacon_node/http_api/src/sync_committees.rs +++ b/beacon_node/http_api/src/sync_committees.rs @@ -1,6 +1,6 @@ //! Handlers for sync committee endpoints. -use crate::publish_pubsub_message; +use crate::utils::publish_pubsub_message; use beacon_chain::sync_committee_verification::{ Error as SyncVerificationError, VerifiedSyncCommitteeMessage, }; diff --git a/beacon_node/http_api/src/utils.rs b/beacon_node/http_api/src/utils.rs index cf61fa481cb..a89780ba245 100644 --- a/beacon_node/http_api/src/utils.rs +++ b/beacon_node/http_api/src/utils.rs @@ -1,3 +1,89 @@ +use crate::task_spawner::TaskSpawner; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use eth2::types::EndpointVersion; +use lighthouse_network::PubsubMessage; +use lighthouse_network::rpc::methods::MetaData; +use network::{NetworkMessage, ValidatorSubscriptionMessage}; +use parking_lot::RwLock; +use std::sync::Arc; +use tokio::sync::mpsc::{Sender, UnboundedSender}; +use types::{ChainSpec, EthSpec}; +use warp::Rejection; use warp::filters::BoxedFilter; pub type ResponseFilter = BoxedFilter<(warp::reply::Response,)>; +pub type AnyVersionFilter = BoxedFilter<(EndpointVersion,)>; +pub type EthV1Filter = BoxedFilter<()>; +pub type ChainFilter = BoxedFilter<(Arc>,)>; +pub type NotWhileSyncingFilter = BoxedFilter<(Result<(), Rejection>,)>; +pub type TaskSpawnerFilter = BoxedFilter<(TaskSpawner<::EthSpec>,)>; +pub type ValidatorSubscriptionTxFilter = BoxedFilter<(Sender,)>; +pub type NetworkTxFilter = + BoxedFilter<(UnboundedSender::EthSpec>>,)>; + +pub fn from_meta_data( + meta_data: &RwLock>, + spec: &ChainSpec, +) -> eth2::types::MetaData { + let meta_data = meta_data.read(); + let format_hex = |bytes: &[u8]| format!("0x{}", hex::encode(bytes)); + + let seq_number = *meta_data.seq_number(); + let attnets = format_hex(&meta_data.attnets().clone().into_bytes()); + let syncnets = format_hex( + &meta_data + .syncnets() + .cloned() + .unwrap_or_default() + .into_bytes(), + ); + + if spec.is_peer_das_scheduled() { + eth2::types::MetaData::V3(eth2::types::MetaDataV3 { + seq_number, + attnets, + syncnets, + custody_group_count: meta_data.custody_group_count().cloned().unwrap_or_default(), + }) + } else { + eth2::types::MetaData::V2(eth2::types::MetaDataV2 { + seq_number, + attnets, + syncnets, + }) + } +} + +/// Publish a message to the libp2p pubsub network. +pub fn publish_pubsub_message( + network_tx: &UnboundedSender>, + message: PubsubMessage, +) -> Result<(), warp::Rejection> { + publish_network_message( + network_tx, + NetworkMessage::Publish { + messages: vec![message], + }, + ) +} + +/// Publish a message to the libp2p pubsub network. +pub fn publish_pubsub_messages( + network_tx: &UnboundedSender>, + messages: Vec>, +) -> Result<(), warp::Rejection> { + publish_network_message(network_tx, NetworkMessage::Publish { messages }) +} + +/// Publish a message to the libp2p network. +pub fn publish_network_message( + network_tx: &UnboundedSender>, + message: NetworkMessage, +) -> Result<(), warp::Rejection> { + network_tx.send(message).map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "unable to publish to network channel: {}", + e + )) + }) +} diff --git a/beacon_node/http_api/src/validator.rs b/beacon_node/http_api/src/validator.rs deleted file mode 100644 index 25b0feb99e8..00000000000 --- a/beacon_node/http_api/src/validator.rs +++ /dev/null @@ -1,22 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; -use types::{BeaconState, PublicKeyBytes}; - -/// Uses the `chain.validator_pubkey_cache` to resolve a pubkey to a validator -/// index and then ensures that the validator exists in the given `state`. -pub fn pubkey_to_validator_index( - chain: &BeaconChain, - state: &BeaconState, - pubkey: &PublicKeyBytes, -) -> Result, Box> { - chain - .validator_index(pubkey) - .map_err(Box::new)? - .filter(|&index| { - state - .validators() - .get(index) - .is_some_and(|v| v.pubkey == *pubkey) - }) - .map(Result::Ok) - .transpose() -} diff --git a/beacon_node/http_api/src/validator/mod.rs b/beacon_node/http_api/src/validator/mod.rs new file mode 100644 index 00000000000..9cf1f1a33d0 --- /dev/null +++ b/beacon_node/http_api/src/validator/mod.rs @@ -0,0 +1,971 @@ +use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3}; +use crate::task_spawner::{Priority, TaskSpawner}; +use crate::utils::{ + AnyVersionFilter, ChainFilter, EthV1Filter, NetworkTxFilter, NotWhileSyncingFilter, + ResponseFilter, TaskSpawnerFilter, ValidatorSubscriptionTxFilter, publish_network_message, +}; +use crate::version::V3; +use crate::{StateId, attester_duties, proposer_duties, sync_committees}; +use beacon_chain::attestation_verification::VerifiedAttestation; +use beacon_chain::validator_monitor::timestamp_now; +use beacon_chain::{AttestationError, BeaconChain, BeaconChainError, BeaconChainTypes}; +use eth2::StatusCode; +use eth2::types::{ + Accept, BeaconCommitteeSubscription, EndpointVersion, Failure, GenericResponse, + StandardLivenessResponseData, StateId as CoreStateId, ValidatorAggregateAttestationQuery, + ValidatorAttestationDataQuery, ValidatorBlocksQuery, ValidatorIndexData, ValidatorStatus, +}; +use lighthouse_network::PubsubMessage; +use network::{NetworkMessage, ValidatorSubscriptionMessage}; +use slot_clock::SlotClock; +use std::sync::Arc; +use tokio::sync::mpsc::{Sender, UnboundedSender}; +use tokio::sync::oneshot; +use tracing::{debug, error, info, warn}; +use types::{ + BeaconState, Epoch, EthSpec, ProposerPreparationData, PublicKeyBytes, SignedAggregateAndProof, + SignedContributionAndProof, SignedValidatorRegistrationData, Slot, SyncContributionData, + ValidatorSubscription, +}; +use warp::{Filter, Rejection, Reply}; +use warp_utils::reject::convert_rejection; + +/// Uses the `chain.validator_pubkey_cache` to resolve a pubkey to a validator +/// index and then ensures that the validator exists in the given `state`. +pub fn pubkey_to_validator_index( + chain: &BeaconChain, + state: &BeaconState, + pubkey: &PublicKeyBytes, +) -> Result, Box> { + chain + .validator_index(pubkey) + .map_err(Box::new)? + .filter(|&index| { + state + .validators() + .get(index) + .is_some_and(|v| v.pubkey == *pubkey) + }) + .map(Result::Ok) + .transpose() +} + +// GET validator/sync_committee_contribution +pub fn get_validator_sync_committee_contribution( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("sync_committee_contribution")) + .and(warp::path::end()) + .and(warp::query::()) + .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |sync_committee_data: SyncContributionData, + not_synced_filter: Result<(), Rejection>, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { + not_synced_filter?; + chain + .get_aggregated_sync_committee_contribution(&sync_committee_data) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "unable to fetch sync contribution: {:?}", + e + )) + })? + .map(GenericResponse::from) + .ok_or_else(|| { + warp_utils::reject::custom_not_found( + "no matching sync contribution found".to_string(), + ) + }) + }) + }, + ) + .boxed() +} + +// POST validator/duties/sync/{epoch} +pub fn post_validator_duties_sync( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("duties")) + .and(warp::path("sync")) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid epoch".to_string(), + )) + })) + .and(warp::path::end()) + .and(not_while_syncing_filter.clone()) + .and(warp_utils::json::json()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |epoch: Epoch, + not_synced_filter: Result<(), Rejection>, + indices: ValidatorIndexData, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { + not_synced_filter?; + sync_committees::sync_committee_duties(epoch, &indices.0, &chain) + }) + }, + ) + .boxed() +} + +// POST validator/duties/attester/{epoch} +pub fn post_validator_duties_attester( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("duties")) + .and(warp::path("attester")) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid epoch".to_string(), + )) + })) + .and(warp::path::end()) + .and(not_while_syncing_filter.clone()) + .and(warp_utils::json::json()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |epoch: Epoch, + not_synced_filter: Result<(), Rejection>, + indices: ValidatorIndexData, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { + not_synced_filter?; + attester_duties::attester_duties(epoch, &indices.0, &chain) + }) + }, + ) + .boxed() +} + +// GET validator/aggregate_attestation?attestation_data_root,slot +pub fn get_validator_aggregate_attestation( + any_version: AnyVersionFilter, + chain_filter: ChainFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + any_version + .and(warp::path("validator")) + .and(warp::path("aggregate_attestation")) + .and(warp::path::end()) + .and(warp::query::()) + .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |endpoint_version: EndpointVersion, + query: ValidatorAggregateAttestationQuery, + not_synced_filter: Result<(), Rejection>, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_response_task(Priority::P0, move || { + not_synced_filter?; + crate::aggregate_attestation::get_aggregate_attestation( + query.slot, + &query.attestation_data_root, + query.committee_index, + endpoint_version, + chain, + ) + }) + }, + ) + .boxed() +} + +// GET validator/attestation_data?slot,committee_index +pub fn get_validator_attestation_data( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("attestation_data")) + .and(warp::path::end()) + .and(warp::query::()) + .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |query: ValidatorAttestationDataQuery, + not_synced_filter: Result<(), Rejection>, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { + not_synced_filter?; + + let current_slot = chain.slot().map_err(warp_utils::reject::unhandled_error)?; + + // allow a tolerance of one slot to account for clock skew + if query.slot > current_slot + 1 { + return Err(warp_utils::reject::custom_bad_request(format!( + "request slot {} is more than one slot past the current slot {}", + query.slot, current_slot + ))); + } + + chain + .produce_unaggregated_attestation(query.slot, query.committee_index) + .map(|attestation| attestation.data().clone()) + .map(GenericResponse::from) + .map_err(warp_utils::reject::unhandled_error) + }) + }, + ) + .boxed() +} + +// GET validator/blinded_blocks/{slot} +pub fn get_validator_blinded_blocks( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("blinded_blocks")) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid slot".to_string(), + )) + })) + .and(warp::path::end()) + .and(not_while_syncing_filter.clone()) + .and(warp::query::()) + .and(warp::header::optional::("accept")) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |slot: Slot, + not_synced_filter: Result<(), Rejection>, + query: ValidatorBlocksQuery, + accept_header: Option, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + not_synced_filter?; + produce_blinded_block_v2(accept_header, chain, slot, query).await + }) + }, + ) + .boxed() +} + +// GET validator/blocks/{slot} +pub fn get_validator_blocks( + any_version: AnyVersionFilter, + chain_filter: ChainFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + any_version + .and(warp::path("validator")) + .and(warp::path("blocks")) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid slot".to_string(), + )) + })) + .and(warp::path::end()) + .and(warp::header::optional::("accept")) + .and(not_while_syncing_filter) + .and(warp::query::()) + .and(task_spawner_filter) + .and(chain_filter) + .then( + |endpoint_version: EndpointVersion, + slot: Slot, + accept_header: Option, + not_synced_filter: Result<(), Rejection>, + query: ValidatorBlocksQuery, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + debug!(?slot, "Block production request from HTTP API"); + + not_synced_filter?; + + if endpoint_version == V3 { + produce_block_v3(accept_header, chain, slot, query).await + } else { + produce_block_v2(accept_header, chain, slot, query).await + } + }) + }, + ) + .boxed() +} + +// POST validator/liveness/{epoch} +pub fn post_validator_liveness_epoch( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("liveness")) + .and(warp::path::param::()) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |epoch: Epoch, + indices: ValidatorIndexData, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { + // Ensure the request is for either the current, previous or next epoch. + let current_epoch = + chain.epoch().map_err(warp_utils::reject::unhandled_error)?; + let prev_epoch = current_epoch.saturating_sub(Epoch::new(1)); + let next_epoch = current_epoch.saturating_add(Epoch::new(1)); + + if epoch < prev_epoch || epoch > next_epoch { + return Err(warp_utils::reject::custom_bad_request(format!( + "request epoch {} is more than one epoch from the current epoch {}", + epoch, current_epoch + ))); + } + + let liveness: Vec = indices + .0 + .iter() + .cloned() + .map(|index| { + let is_live = chain.validator_seen_at_epoch(index as usize, epoch); + StandardLivenessResponseData { index, is_live } + }) + .collect(); + + Ok(GenericResponse::from(liveness)) + }) + }, + ) + .boxed() +} + +// POST validator/sync_committee_subscriptions +pub fn post_validator_sync_committee_subscriptions( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + validator_subscription_tx_filter: ValidatorSubscriptionTxFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("sync_committee_subscriptions")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(validator_subscription_tx_filter) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |subscriptions: Vec, + validator_subscription_tx: Sender, + task_spawner: TaskSpawner, + chain: Arc>, + | { + task_spawner.blocking_json_task(Priority::P0, move || { + for subscription in subscriptions { + chain + .validator_monitor + .write() + .auto_register_local_validator(subscription.validator_index); + + let message = ValidatorSubscriptionMessage::SyncCommitteeSubscribe { + subscriptions: vec![subscription], + }; + if let Err(e) = validator_subscription_tx.try_send(message) { + warn!( + info = "the host may be overloaded or resource-constrained", + error = ?e, + "Unable to process sync subscriptions" + ); + return Err(warp_utils::reject::custom_server_error( + "unable to queue subscription, host may be overloaded or shutting down".to_string(), + )); + } + } + + Ok(()) + }) + }, + ).boxed() +} + +// POST validator/register_validator +pub fn post_validator_register_validator( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("register_validator")) + .and(warp::path::end()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .and(warp_utils::json::json()) + .then( + |task_spawner: TaskSpawner, + chain: Arc>, + register_val_data: Vec| async { + let (tx, rx) = oneshot::channel(); + + let initial_result = task_spawner + .spawn_async_with_rejection_no_conversion(Priority::P0, async move { + let execution_layer = chain + .execution_layer + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing) + .map_err(warp_utils::reject::unhandled_error)?; + let current_slot = chain + .slot_clock + .now_or_genesis() + .ok_or(BeaconChainError::UnableToReadSlot) + .map_err(warp_utils::reject::unhandled_error)?; + let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); + + debug!( + count = register_val_data.len(), + "Received register validator request" + ); + + let head_snapshot = chain.head_snapshot(); + let spec = &chain.spec; + + let (preparation_data, filtered_registration_data): ( + Vec<(ProposerPreparationData, Option)>, + Vec, + ) = register_val_data + .into_iter() + .filter_map(|register_data| { + chain + .validator_index(®ister_data.message.pubkey) + .ok() + .flatten() + .and_then(|validator_index| { + let validator = head_snapshot + .beacon_state + .get_validator(validator_index) + .ok()?; + let validator_status = ValidatorStatus::from_validator( + validator, + current_epoch, + spec.far_future_epoch, + ) + .superstatus(); + let is_active_or_pending = + matches!(validator_status, ValidatorStatus::Pending) + || matches!( + validator_status, + ValidatorStatus::Active + ); + + // Filter out validators who are not 'active' or 'pending'. + is_active_or_pending.then_some({ + ( + ( + ProposerPreparationData { + validator_index: validator_index as u64, + fee_recipient: register_data + .message + .fee_recipient, + }, + Some(register_data.message.gas_limit), + ), + register_data, + ) + }) + }) + }) + .unzip(); + + // Update the prepare beacon proposer cache based on this request. + execution_layer + .update_proposer_preparation( + current_epoch, + preparation_data.iter().map(|(data, limit)| (data, limit)), + ) + .await; + + // Call prepare beacon proposer blocking with the latest update in order to make + // sure we have a local payload to fall back to in the event of the blinded block + // flow failing. + chain + .prepare_beacon_proposer(current_slot) + .await + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "error updating proposer preparations: {:?}", + e + )) + })?; + + info!( + count = filtered_registration_data.len(), + "Forwarding register validator request to connected builder" + ); + + // It's a waste of a `BeaconProcessor` worker to just + // wait on a response from the builder (especially since + // they have frequent timeouts). Spawn a new task and + // send the response back to our original HTTP request + // task via a channel. + let builder_future = async move { + let arc_builder = chain + .execution_layer + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing) + .map_err(warp_utils::reject::unhandled_error)? + .builder(); + let builder = arc_builder + .as_ref() + .ok_or(BeaconChainError::BuilderMissing) + .map_err(warp_utils::reject::unhandled_error)?; + builder + .post_builder_validators(&filtered_registration_data) + .await + .map(|resp| warp::reply::json(&resp).into_response()) + .map_err(|e| { + warn!( + num_registrations = filtered_registration_data.len(), + error = ?e, + "Relay error when registering validator(s)" + ); + // Forward the HTTP status code if we are able to, otherwise fall back + // to a server error. + if let eth2::Error::ServerMessage(message) = e { + if message.code == StatusCode::BAD_REQUEST.as_u16() { + return warp_utils::reject::custom_bad_request( + message.message, + ); + } else { + // According to the spec this response should only be a 400 or 500, + // so we fall back to a 500 here. + return warp_utils::reject::custom_server_error( + message.message, + ); + } + } + warp_utils::reject::custom_server_error(format!("{e:?}")) + }) + }; + tokio::task::spawn(async move { tx.send(builder_future.await) }); + + // Just send a generic 200 OK from this closure. We'll + // ignore the `Ok` variant and form a proper response + // from what is sent back down the channel. + Ok(warp::reply::reply().into_response()) + }) + .await; + + if initial_result.is_err() { + return convert_rejection(initial_result).await; + } + + // Await a response from the builder without blocking a + // `BeaconProcessor` worker. + convert_rejection(rx.await.unwrap_or_else(|_| { + Ok(warp::reply::with_status( + warp::reply::json(&"No response from channel"), + warp::http::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response()) + })) + .await + }, + ) + .boxed() +} + +// POST validator/prepare_beacon_proposer +pub fn post_validator_prepare_beacon_proposer( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + network_tx_filter: NetworkTxFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("prepare_beacon_proposer")) + .and(warp::path::end()) + .and(not_while_syncing_filter.clone()) + .and(network_tx_filter.clone()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .and(warp_utils::json::json()) + .then( + |not_synced_filter: Result<(), Rejection>, + network_tx: UnboundedSender>, + task_spawner: TaskSpawner, + chain: Arc>, + preparation_data: Vec| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + not_synced_filter?; + let execution_layer = chain + .execution_layer + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing) + .map_err(warp_utils::reject::unhandled_error)?; + + let current_slot = chain + .slot_clock + .now_or_genesis() + .ok_or(BeaconChainError::UnableToReadSlot) + .map_err(warp_utils::reject::unhandled_error)?; + let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); + + debug!( + count = preparation_data.len(), + "Received proposer preparation data" + ); + + execution_layer + .update_proposer_preparation( + current_epoch, + preparation_data.iter().map(|data| (data, &None)), + ) + .await; + + chain + .prepare_beacon_proposer(current_slot) + .await + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "error updating proposer preparations: {:?}", + e + )) + })?; + + if chain.spec.is_peer_das_scheduled() { + let (finalized_beacon_state, _, _) = + StateId(CoreStateId::Finalized).state(&chain)?; + let validators_and_balances = preparation_data + .iter() + .filter_map(|preparation| { + if let Ok(effective_balance) = finalized_beacon_state + .get_effective_balance(preparation.validator_index as usize) + { + Some((preparation.validator_index as usize, effective_balance)) + } else { + None + } + }) + .collect::>(); + + let current_slot = + chain.slot().map_err(warp_utils::reject::unhandled_error)?; + if let Some(cgc_change) = chain + .data_availability_checker + .custody_context() + .register_validators(validators_and_balances, current_slot, &chain.spec) + { + chain.update_data_column_custody_info(Some( + cgc_change + .effective_epoch + .start_slot(T::EthSpec::slots_per_epoch()), + )); + + network_tx.send(NetworkMessage::CustodyCountChanged { + new_custody_group_count: cgc_change.new_custody_group_count, + sampling_count: cgc_change.sampling_count, + }).unwrap_or_else(|e| { + debug!(error = %e, "Could not send message to the network service. \ + Likely shutdown") + }); + } + } + + Ok::<_, warp::reject::Rejection>(warp::reply::json(&()).into_response()) + }) + }, + ) + .boxed() +} + +// POST validator/beacon_committee_subscriptions +pub fn post_validator_beacon_committee_subscriptions( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + validator_subscription_tx_filter: ValidatorSubscriptionTxFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("beacon_committee_subscriptions")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(validator_subscription_tx_filter.clone()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |committee_subscriptions: Vec, + validator_subscription_tx: Sender, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { + let subscriptions: std::collections::BTreeSet<_> = committee_subscriptions + .iter() + .map(|subscription| { + chain + .validator_monitor + .write() + .auto_register_local_validator(subscription.validator_index); + ValidatorSubscription { + attestation_committee_index: subscription.committee_index, + slot: subscription.slot, + committee_count_at_slot: subscription.committees_at_slot, + is_aggregator: subscription.is_aggregator, + } + }) + .collect(); + + let message = + ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions }; + if let Err(e) = validator_subscription_tx.try_send(message) { + warn!( + info = "the host may be overloaded or resource-constrained", + error = ?e, + "Unable to process committee subscriptions" + ); + return Err(warp_utils::reject::custom_server_error( + "unable to queue subscription, host may be overloaded or shutting down" + .to_string(), + )); + } + Ok(()) + }) + }, + ) + .boxed() +} + +pub fn post_validator_contribution_and_proofs( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + network_tx_filter: NetworkTxFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("contribution_and_proofs")) + .and(warp::path::end()) + .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .and(warp_utils::json::json()) + .and(network_tx_filter.clone()) + .then( + |not_synced_filter: Result<(), Rejection>, + task_spawner: TaskSpawner, + chain: Arc>, + contributions: Vec>, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + not_synced_filter?; + sync_committees::process_signed_contribution_and_proofs( + contributions, + network_tx, + &chain, + )?; + Ok(GenericResponse::from(())) + }) + }, + ) + .boxed() +} + +// POST validator/aggregate_and_proofs +pub fn post_validator_aggregate_and_proofs( + any_version: AnyVersionFilter, + chain_filter: ChainFilter, + network_tx_filter: NetworkTxFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + any_version + .and(warp::path("validator")) + .and(warp::path("aggregate_and_proofs")) + .and(warp::path::end()) + .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .and(warp_utils::json::json()) + .and(network_tx_filter.clone()) + .then( + // V1 and V2 are identical except V2 has a consensus version header in the request. + // We only require this header for SSZ deserialization, which isn't supported for + // this endpoint presently. + |_endpoint_version: EndpointVersion, + not_synced_filter: Result<(), Rejection>, + task_spawner: TaskSpawner, + chain: Arc>, + aggregates: Vec>, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + not_synced_filter?; + let seen_timestamp = timestamp_now(); + let mut verified_aggregates = Vec::with_capacity(aggregates.len()); + let mut messages = Vec::with_capacity(aggregates.len()); + let mut failures = Vec::new(); + + // Verify that all messages in the post are valid before processing further + for (index, aggregate) in aggregates.iter().enumerate() { + match chain.verify_aggregated_attestation_for_gossip(aggregate) { + Ok(verified_aggregate) => { + messages.push(PubsubMessage::AggregateAndProofAttestation(Box::new( + verified_aggregate.aggregate().clone(), + ))); + + // Notify the validator monitor. + chain + .validator_monitor + .read() + .register_api_aggregated_attestation( + seen_timestamp, + verified_aggregate.aggregate(), + verified_aggregate.indexed_attestation(), + &chain.slot_clock, + ); + + verified_aggregates.push((index, verified_aggregate)); + } + // If we already know the attestation, don't broadcast it or attempt to + // further verify it. Return success. + // + // It's reasonably likely that two different validators produce + // identical aggregates, especially if they're using the same beacon + // node. + Err(AttestationError::AttestationSupersetKnown(_)) => continue, + // If we've already seen this aggregator produce an aggregate, just + // skip this one. + // + // We're likely to see this with VCs that use fallback BNs. The first + // BN might time-out *after* publishing the aggregate and then the + // second BN will indicate it's already seen the aggregate. + // + // There's no actual error for the user or the network since the + // aggregate has been successfully published by some other node. + Err(AttestationError::AggregatorAlreadyKnown(_)) => continue, + Err(e) => { + error!( + error = ?e, + request_index = index, + aggregator_index = aggregate.message().aggregator_index(), + attestation_index = aggregate.message().aggregate().committee_index(), + attestation_slot = %aggregate.message().aggregate().data().slot, + "Failure verifying aggregate and proofs" + ); + failures.push(Failure::new(index, format!("Verification: {:?}", e))); + } + } + } + + // Publish aggregate attestations to the libp2p network + if !messages.is_empty() { + publish_network_message(&network_tx, NetworkMessage::Publish { messages })?; + } + + // Import aggregate attestations + for (index, verified_aggregate) in verified_aggregates { + if let Err(e) = chain.apply_attestation_to_fork_choice(&verified_aggregate) { + error!( + error = ?e, + request_index = index, + aggregator_index = verified_aggregate.aggregate().message().aggregator_index(), + attestation_index = verified_aggregate.attestation().committee_index(), + attestation_slot = %verified_aggregate.attestation().data().slot, + "Failure applying verified aggregate attestation to fork choice" + ); + failures.push(Failure::new(index, format!("Fork choice: {:?}", e))); + } + if let Err(e) = chain.add_to_block_inclusion_pool(verified_aggregate) { + warn!( + error = ?e, + request_index = index, + "Could not add verified aggregate attestation to the inclusion pool" + ); + failures.push(Failure::new(index, format!("Op pool: {:?}", e))); + } + } + + if !failures.is_empty() { + Err(warp_utils::reject::indexed_bad_request("error processing aggregate and proofs".to_string(), + failures, + )) + } else { + Ok(()) + } + }) + }, + ).boxed() +} + +// GET validator/duties/proposer/{epoch} +pub fn get_validator_duties_proposer( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("duties")) + .and(warp::path("proposer")) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid epoch".to_string(), + )) + })) + .and(warp::path::end()) + .and(not_while_syncing_filter) + .and(task_spawner_filter) + .and(chain_filter) + .then( + |epoch: Epoch, + not_synced_filter: Result<(), Rejection>, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { + not_synced_filter?; + proposer_duties::proposer_duties(epoch, &chain) + }) + }, + ) + .boxed() +}