Skip to content

Commit d60f8be

Browse files
authored
Merge of #5136
2 parents b3774da + 5a55dac commit d60f8be

File tree

1 file changed

+34
-8
lines changed
  • beacon_node/http_api/src

1 file changed

+34
-8
lines changed

beacon_node/http_api/src/lib.rs

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ use warp::http::StatusCode;
9696
use warp::hyper::Body;
9797
use warp::sse::Event;
9898
use warp::Reply;
99-
use warp::{http::Response, Filter};
99+
use warp::{http::Response, Filter, Rejection};
100100
use warp_utils::{query::multi_key_query, uor::UnifyingOrFilter};
101101

102102
const API_PREFIX: &str = "eth";
@@ -453,7 +453,7 @@ pub fn serve<T: BeaconChainTypes>(
453453
warp::any()
454454
.and(network_globals.clone())
455455
.and(chain_filter.clone())
456-
.and_then(
456+
.then(
457457
move |network_globals: Arc<NetworkGlobals<T::EthSpec>>,
458458
chain: Arc<BeaconChain<T>>| async move {
459459
match *network_globals.sync_state.read() {
@@ -488,8 +488,7 @@ pub fn serve<T: BeaconChainTypes>(
488488
)),
489489
}
490490
},
491-
)
492-
.untuple_one();
491+
);
493492

494493
// Create a `warp` filter that provides access to the logger.
495494
let inner_ctx = ctx.clone();
@@ -3058,10 +3057,12 @@ pub fn serve<T: BeaconChainTypes>(
30583057
.and(log_filter.clone())
30593058
.then(
30603059
|epoch: Epoch,
3060+
not_synced_filter: Result<(), Rejection>,
30613061
task_spawner: TaskSpawner<T::EthSpec>,
30623062
chain: Arc<BeaconChain<T>>,
30633063
log: Logger| {
30643064
task_spawner.blocking_json_task(Priority::P0, move || {
3065+
not_synced_filter?;
30653066
proposer_duties::proposer_duties(epoch, &chain, &log)
30663067
})
30673068
},
@@ -3087,6 +3088,7 @@ pub fn serve<T: BeaconChainTypes>(
30873088
|endpoint_version: EndpointVersion,
30883089
slot: Slot,
30893090
accept_header: Option<api_types::Accept>,
3091+
not_synced_filter: Result<(), Rejection>,
30903092
query: api_types::ValidatorBlocksQuery,
30913093
task_spawner: TaskSpawner<T::EthSpec>,
30923094
chain: Arc<BeaconChain<T>>,
@@ -3098,6 +3100,8 @@ pub fn serve<T: BeaconChainTypes>(
30983100
"slot" => slot
30993101
);
31003102

3103+
not_synced_filter?;
3104+
31013105
if endpoint_version == V3 {
31023106
produce_block_v3(accept_header, chain, slot, query).await
31033107
} else {
@@ -3124,11 +3128,13 @@ pub fn serve<T: BeaconChainTypes>(
31243128
.and(chain_filter.clone())
31253129
.then(
31263130
|slot: Slot,
3131+
not_synced_filter: Result<(), Rejection>,
31273132
query: api_types::ValidatorBlocksQuery,
31283133
accept_header: Option<api_types::Accept>,
31293134
task_spawner: TaskSpawner<T::EthSpec>,
31303135
chain: Arc<BeaconChain<T>>| {
31313136
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
3137+
not_synced_filter?;
31323138
produce_blinded_block_v2(EndpointVersion(2), accept_header, chain, slot, query)
31333139
.await
31343140
})
@@ -3146,9 +3152,12 @@ pub fn serve<T: BeaconChainTypes>(
31463152
.and(chain_filter.clone())
31473153
.then(
31483154
|query: api_types::ValidatorAttestationDataQuery,
3155+
not_synced_filter: Result<(), Rejection>,
31493156
task_spawner: TaskSpawner<T::EthSpec>,
31503157
chain: Arc<BeaconChain<T>>| {
31513158
task_spawner.blocking_json_task(Priority::P0, move || {
3159+
not_synced_filter?;
3160+
31523161
let current_slot = chain
31533162
.slot()
31543163
.map_err(warp_utils::reject::beacon_chain_error)?;
@@ -3181,9 +3190,11 @@ pub fn serve<T: BeaconChainTypes>(
31813190
.and(chain_filter.clone())
31823191
.then(
31833192
|query: api_types::ValidatorAggregateAttestationQuery,
3193+
not_synced_filter: Result<(), Rejection>,
31843194
task_spawner: TaskSpawner<T::EthSpec>,
31853195
chain: Arc<BeaconChain<T>>| {
31863196
task_spawner.blocking_json_task(Priority::P0, move || {
3197+
not_synced_filter?;
31873198
chain
31883199
.get_aggregated_attestation_by_slot_and_root(
31893200
query.slot,
@@ -3222,10 +3233,12 @@ pub fn serve<T: BeaconChainTypes>(
32223233
.and(chain_filter.clone())
32233234
.then(
32243235
|epoch: Epoch,
3236+
not_synced_filter: Result<(), Rejection>,
32253237
indices: api_types::ValidatorIndexData,
32263238
task_spawner: TaskSpawner<T::EthSpec>,
32273239
chain: Arc<BeaconChain<T>>| {
32283240
task_spawner.blocking_json_task(Priority::P0, move || {
3241+
not_synced_filter?;
32293242
attester_duties::attester_duties(epoch, &indices.0, &chain)
32303243
})
32313244
},
@@ -3248,10 +3261,12 @@ pub fn serve<T: BeaconChainTypes>(
32483261
.and(chain_filter.clone())
32493262
.then(
32503263
|epoch: Epoch,
3264+
not_synced_filter: Result<(), Rejection>,
32513265
indices: api_types::ValidatorIndexData,
32523266
task_spawner: TaskSpawner<T::EthSpec>,
32533267
chain: Arc<BeaconChain<T>>| {
32543268
task_spawner.blocking_json_task(Priority::P0, move || {
3269+
not_synced_filter?;
32553270
sync_committees::sync_committee_duties(epoch, &indices.0, &chain)
32563271
})
32573272
},
@@ -3268,9 +3283,11 @@ pub fn serve<T: BeaconChainTypes>(
32683283
.and(chain_filter.clone())
32693284
.then(
32703285
|sync_committee_data: SyncContributionData,
3286+
not_synced_filter: Result<(), Rejection>,
32713287
task_spawner: TaskSpawner<T::EthSpec>,
32723288
chain: Arc<BeaconChain<T>>| {
32733289
task_spawner.blocking_json_task(Priority::P0, move || {
3290+
not_synced_filter?;
32743291
chain
32753292
.get_aggregated_sync_committee_contribution(&sync_committee_data)
32763293
.map_err(|e| {
@@ -3301,11 +3318,13 @@ pub fn serve<T: BeaconChainTypes>(
33013318
.and(network_tx_filter.clone())
33023319
.and(log_filter.clone())
33033320
.then(
3304-
|task_spawner: TaskSpawner<T::EthSpec>,
3321+
|not_synced_filter: Result<(), Rejection>,
3322+
task_spawner: TaskSpawner<T::EthSpec>,
33053323
chain: Arc<BeaconChain<T>>,
33063324
aggregates: Vec<SignedAggregateAndProof<T::EthSpec>>,
33073325
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>, log: Logger| {
33083326
task_spawner.blocking_json_task(Priority::P0, move || {
3327+
not_synced_filter?;
33093328
let seen_timestamp = timestamp_now();
33103329
let mut verified_aggregates = Vec::with_capacity(aggregates.len());
33113330
let mut messages = Vec::with_capacity(aggregates.len());
@@ -3414,12 +3433,14 @@ pub fn serve<T: BeaconChainTypes>(
34143433
.and(network_tx_filter)
34153434
.and(log_filter.clone())
34163435
.then(
3417-
|task_spawner: TaskSpawner<T::EthSpec>,
3436+
|not_synced_filter: Result<(), Rejection>,
3437+
task_spawner: TaskSpawner<T::EthSpec>,
34183438
chain: Arc<BeaconChain<T>>,
34193439
contributions: Vec<SignedContributionAndProof<T::EthSpec>>,
34203440
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
34213441
log: Logger| {
34223442
task_spawner.blocking_json_task(Priority::P0, move || {
3443+
not_synced_filter?;
34233444
sync_committees::process_signed_contribution_and_proofs(
34243445
contributions,
34253446
network_tx,
@@ -3494,11 +3515,13 @@ pub fn serve<T: BeaconChainTypes>(
34943515
.and(log_filter.clone())
34953516
.and(warp_utils::json::json())
34963517
.then(
3497-
|task_spawner: TaskSpawner<T::EthSpec>,
3518+
|not_synced_filter: Result<(), Rejection>,
3519+
task_spawner: TaskSpawner<T::EthSpec>,
34983520
chain: Arc<BeaconChain<T>>,
34993521
log: Logger,
35003522
preparation_data: Vec<ProposerPreparationData>| {
35013523
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
3524+
not_synced_filter?;
35023525
let execution_layer = chain
35033526
.execution_layer
35043527
.as_ref()
@@ -4197,8 +4220,11 @@ pub fn serve<T: BeaconChainTypes>(
41974220
.and(task_spawner_filter.clone())
41984221
.and(chain_filter.clone())
41994222
.then(
4200-
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
4223+
|not_synced_filter: Result<(), Rejection>,
4224+
task_spawner: TaskSpawner<T::EthSpec>,
4225+
chain: Arc<BeaconChain<T>>| {
42014226
task_spawner.blocking_json_task(Priority::P1, move || {
4227+
not_synced_filter?;
42024228
chain.store_migrator.process_reconstruction();
42034229
Ok("success")
42044230
})

0 commit comments

Comments
 (0)