Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,19 @@ async fn post_block_import_logging_and_response<T: BeaconChainTypes>(
seen_timestamp: Duration,
chain: &Arc<BeaconChain<T>>,
) -> Result<Response, Rejection> {
// Notify lookup sync of completing a block import. Necessary if a lookup was created after
// an HTTP block start import but before it completes.
//
// TODO: sync_tx not available in the http_api crate...
sync_tx.send_sync_message(SyncMessage::BlockProcessResult {
block_root,
imported: matches!(
result,
Ok(AvailabilityProcessingStatus::Imported(_))
| Err(BlockError::DuplicateFullyImported(_))
),
});

match result {
// The `DuplicateFullyImported` case here captures the case where the block finishes
// being imported after gossip verification. It could be that it finished imported as a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// imported. A block can become imported both after processing a block or blob. If a
// importing a block results in `Imported`, notify. Do not notify of blob errors.
if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) {
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
self.send_sync_message(SyncMessage::BlockProcessResult {
block_root,
imported: true,
});
Expand Down Expand Up @@ -1119,7 +1119,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// imported. A block can become imported both after processing a block or data column. If a
// importing a block results in `Imported`, notify. Do not notify of data column errors.
if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) {
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
self.send_sync_message(SyncMessage::BlockProcessResult {
block_root,
imported: true,
});
Expand Down Expand Up @@ -1614,9 +1614,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.maybe_store_invalid_block(&invalid_block_storage, block_root, &block, e);
}

self.send_sync_message(SyncMessage::GossipBlockProcessResult {
self.send_sync_message(SyncMessage::BlockProcessResult {
block_root,
imported: matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))),
imported: matches!(
result,
Ok(AvailabilityProcessingStatus::Imported(_))
| Err(BlockError::DuplicateFullyImported(_))
),
});
}

Expand Down
6 changes: 3 additions & 3 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1712,21 +1712,21 @@ async fn test_data_column_import_notifies_sync() {
.await
.expect("should receive sync message");

// Verify we received the expected GossipBlockProcessResult message
// Verify we received the expected BlockProcessResult message
assert_eq!(
sync_messages.len(),
1,
"should receive exactly one sync message"
);
match &sync_messages[0] {
SyncMessage::GossipBlockProcessResult {
SyncMessage::BlockProcessResult {
block_root: msg_block_root,
imported,
} => {
assert_eq!(*msg_block_root, block_root, "block root should match");
assert!(*imported, "block should be marked as imported");
}
other => panic!("expected GossipBlockProcessResult, got {:?}", other),
other => panic!("expected BlockProcessResult, got {:?}", other),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub enum SyncMessage<E: EthSpec> {
},

/// A block from gossip has completed processing,
GossipBlockProcessResult { block_root: Hash256, imported: bool },
BlockProcessResult { block_root: Hash256, imported: bool },
}

/// The type of processing specified for a received block.
Expand Down Expand Up @@ -831,7 +831,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => self
.block_lookups
.on_processing_result(process_type, result, &mut self.network),
SyncMessage::GossipBlockProcessResult {
SyncMessage::BlockProcessResult {
block_root,
imported,
} => self.block_lookups.on_external_processing_result(
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// processing.
BlockProcessStatus::NotValidated { .. } => {
// Lookup sync event safety: If the block is currently in the processing cache, we
// are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will
// are guaranteed to receive a `SyncMessage::BlockProcessResult` that will
// make progress on this lookup
return Ok(LookupRequestResult::Pending("block in processing cache"));
}
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/sync/tests/lookups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ impl TestRig {
.data_availability_checker
.remove_block_on_execution_error(&block_root);

self.send_sync_message(SyncMessage::GossipBlockProcessResult {
self.send_sync_message(SyncMessage::BlockProcessResult {
block_root,
imported: false,
});
Expand All @@ -1137,7 +1137,7 @@ impl TestRig {

self.insert_block_to_da_checker(block);

self.send_sync_message(SyncMessage::GossipBlockProcessResult {
self.send_sync_message(SyncMessage::BlockProcessResult {
block_root,
imported: false,
});
Expand Down
Loading