Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimistically verify blocks even before all blobs available #6087

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
computeTimeAtSlot,
parseSignedBlindedBlockOrContents,
reconstructFullBlockOrContents,
DataAvailableStatus,
} from "@lodestar/state-transition";
import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
import {sleep, toHex} from "@lodestar/utils";
Expand Down Expand Up @@ -121,19 +120,13 @@ export function getBeaconBlockApi({
}

try {
await verifyBlocksInEpoch.call(
chain as BeaconChain,
parentBlock,
[blockForImport],
[DataAvailableStatus.available],
{
...opts,
verifyOnly: true,
skipVerifyBlockSignatures: true,
skipVerifyExecutionPayload: true,
seenTimestampSec,
}
);
await verifyBlocksInEpoch.call(chain as BeaconChain, parentBlock, [blockForImport], {
...opts,
verifyOnly: true,
skipVerifyBlockSignatures: true,
skipVerifyExecutionPayload: true,
seenTimestampSec,
});
} catch (error) {
chain.logger.error("Consensus checks failed while publishing the block", valLogMeta, error as Error);
chain.persistInvalidSszValue(
Expand Down
15 changes: 3 additions & 12 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ export async function processBlocks(
}

try {
const {relevantBlocks, dataAvailabilityStatuses, parentSlots, parentBlock} = verifyBlocksSanityChecks(
this,
blocks,
opts
);
const {relevantBlocks, parentSlots, parentBlock} = verifyBlocksSanityChecks(this, blocks, opts);

// No relevant blocks, skip verifyBlocksInEpoch()
if (relevantBlocks.length === 0 || parentBlock === null) {
Expand All @@ -72,13 +68,8 @@ export async function processBlocks(

// Fully verify a block to be imported immediately after. Does not produce any side-effects besides adding intermediate
// states in the state cache through regen.
const {postStates, proposerBalanceDeltas, segmentExecStatus} = await verifyBlocksInEpoch.call(
this,
parentBlock,
relevantBlocks,
dataAvailabilityStatuses,
opts
);
const {postStates, dataAvailabilityStatuses, proposerBalanceDeltas, segmentExecStatus} =
await verifyBlocksInEpoch.call(this, parentBlock, relevantBlocks, opts);

// If segmentExecStatus has lvhForkchoice then, the entire segment should be invalid
// and we need to further propagate
Expand Down
155 changes: 33 additions & 122 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import {toHexString} from "@chainsafe/ssz";
import {CachedBeaconStateAllForks, computeEpochAtSlot, DataAvailableStatus} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus} from "@lodestar/fork-choice";
import {allForks, deneb, Slot, RootHex} from "@lodestar/types";
import {allForks, deneb, Slot} from "@lodestar/types";
import {ForkSeq, MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";
import {pruneSetToMax} from "@lodestar/utils";

export enum BlockInputType {
preDeneb = "preDeneb",
postDeneb = "postDeneb",
blobsPromise = "blobsPromise",
}

/** Enum to represent where blocks come from */
Expand All @@ -19,9 +18,18 @@ export enum BlockSource {
byRoot = "req_resp_by_root",
}

export enum GossipedInputType {
block = "block",
blob = "blob",
}

export type BlobsCache = Map<number, {blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null}>;
export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]};

export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & (
| {type: BlockInputType.preDeneb}
| {type: BlockInputType.postDeneb; blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]}
| ({type: BlockInputType.postDeneb} & BlockInputBlobs)
| {type: BlockInputType.blobsPromise; blobsCache: BlobsCache; availabilityPromise: Promise<BlockInputBlobs>}
);

export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean {
Expand All @@ -32,125 +40,7 @@ export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clo
);
}

export enum GossipedInputType {
block = "block",
blob = "blob",
}
type GossipedBlockInput =
| {type: GossipedInputType.block; signedBlock: allForks.SignedBeaconBlock; blockBytes: Uint8Array | null}
| {type: GossipedInputType.blob; signedBlob: deneb.SignedBlobSidecar; blobBytes: Uint8Array | null};
type BlockInputCacheType = {
block?: allForks.SignedBeaconBlock;
blockBytes?: Uint8Array | null;
blobs: Map<number, deneb.BlobSidecar>;
blobsBytes: Map<number, Uint8Array | null>;
};

const MAX_GOSSIPINPUT_CACHE = 5;
// ssz.deneb.BlobSidecars.elementType.fixedSize;
const BLOBSIDECAR_FIXED_SIZE = 131256;

export const getBlockInput = {
blockInputCache: new Map<RootHex, BlockInputCacheType>(),

getGossipBlockInput(
config: ChainForkConfig,
gossipedInput: GossipedBlockInput
):
| {blockInput: BlockInput; blockInputMeta: {pending: null; haveBlobs: number; expectedBlobs: number}}
| {blockInput: null; blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null}}
| {blockInput: null; blockInputMeta: {pending: GossipedInputType.blob; haveBlobs: number; expectedBlobs: number}} {
let blockHex;
let blockCache;

if (gossipedInput.type === GossipedInputType.block) {
const {signedBlock, blockBytes} = gossipedInput;

blockHex = toHexString(
config.getForkTypes(signedBlock.message.slot).BeaconBlock.hashTreeRoot(signedBlock.message)
);
blockCache = this.blockInputCache.get(blockHex) ?? {
blobs: new Map<number, deneb.BlobSidecar>(),
blobsBytes: new Map<number, Uint8Array | null>(),
};

blockCache.block = signedBlock;
blockCache.blockBytes = blockBytes;
} else {
const {signedBlob, blobBytes} = gossipedInput;
blockHex = toHexString(signedBlob.message.blockRoot);
blockCache = this.blockInputCache.get(blockHex);

// If a new entry is going to be inserted, prune out old ones
if (blockCache === undefined) {
pruneSetToMax(this.blockInputCache, MAX_GOSSIPINPUT_CACHE);
blockCache = {blobs: new Map<number, deneb.BlobSidecar>(), blobsBytes: new Map<number, Uint8Array | null>()};
}

// TODO: freetheblobs check if its the same blob or a duplicate and throw/take actions
blockCache.blobs.set(signedBlob.message.index, signedBlob.message);
// easily splice out the unsigned message as blob is a fixed length type
blockCache.blobsBytes.set(signedBlob.message.index, blobBytes?.slice(0, BLOBSIDECAR_FIXED_SIZE) ?? null);
}

this.blockInputCache.set(blockHex, blockCache);
const {block: signedBlock, blockBytes} = blockCache;

if (signedBlock !== undefined) {
// block is available, check if all blobs have shown up
const {slot, body} = signedBlock.message;
const {blobKzgCommitments} = body as deneb.BeaconBlockBody;
const blockInfo = `blockHex=${blockHex}, slot=${slot}`;

if (blobKzgCommitments.length < blockCache.blobs.size) {
throw Error(
`Received more blobs=${blockCache.blobs.size} than commitments=${blobKzgCommitments.length} for ${blockInfo}`
);
}
if (blobKzgCommitments.length === blockCache.blobs.size) {
const blobSidecars = [];
const blobsBytes = [];

for (let index = 0; index < blobKzgCommitments.length; index++) {
const blobSidecar = blockCache.blobs.get(index);
if (blobSidecar === undefined) {
throw Error(`Missing blobSidecar at index=${index} for ${blockInfo}`);
}
blobSidecars.push(blobSidecar);
blobsBytes.push(blockCache.blobsBytes.get(index) ?? null);
}

return {
// TODO freetheblobs: collate and add serialized data for the postDeneb blockinput
blockInput: getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.gossip,
blobSidecars,
blockBytes ?? null,
blobsBytes
),
blockInputMeta: {pending: null, haveBlobs: blockCache.blobs.size, expectedBlobs: blobKzgCommitments.length},
};
} else {
return {
blockInput: null,
blockInputMeta: {
pending: GossipedInputType.blob,
haveBlobs: blockCache.blobs.size,
expectedBlobs: blobKzgCommitments.length,
},
};
}
} else {
// will need to wait for the block to showup
return {
blockInput: null,
blockInputMeta: {pending: GossipedInputType.block, haveBlobs: blockCache.blobs.size, expectedBlobs: null},
};
}
},

preDeneb(
config: ChainForkConfig,
block: allForks.SignedBeaconBlock,
Expand Down Expand Up @@ -188,6 +78,27 @@ export const getBlockInput = {
blobsBytes,
};
},

blobsPromise(
config: ChainForkConfig,
block: allForks.SignedBeaconBlock,
source: BlockSource,
blobsCache: BlobsCache,
blockBytes: Uint8Array | null,
availabilityPromise: Promise<BlockInputBlobs>
): BlockInput {
if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) {
throw Error(`Pre Deneb block slot ${block.message.slot}`);
}
return {
type: BlockInputType.blobsPromise,
block,
source,
blobsCache,
blockBytes,
availabilityPromise,
};
},
};

export enum AttestationImportOpt {
Expand Down
52 changes: 45 additions & 7 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
isStateValidatorsNodesPopulated,
DataAvailableStatus,
} from "@lodestar/state-transition";
import {bellatrix} from "@lodestar/types";
import {bellatrix, deneb} from "@lodestar/types";
import {ForkName} from "@lodestar/params";
import {ProtoBlock, ExecutionStatus} from "@lodestar/fork-choice";
import {ChainForkConfig} from "@lodestar/config";
Expand All @@ -14,13 +14,14 @@ import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockProcessOpts} from "../options.js";
import {RegenCaller} from "../regen/index.js";
import type {BeaconChain} from "../chain.js";
import {BlockInput, ImportBlockOpts} from "./types.js";
import {BlockInput, ImportBlockOpts, BlockInputType} from "./types.js";
import {POS_PANDA_MERGE_TRANSITION_BANNER} from "./utils/pandaMergeTransitionBanner.js";
import {CAPELLA_OWL_BANNER} from "./utils/ownBanner.js";
import {DENEB_BLOWFISH_BANNER} from "./utils/blowfishBanner.js";
import {verifyBlocksStateTransitionOnly} from "./verifyBlocksStateTransitionOnly.js";
import {verifyBlocksSignatures} from "./verifyBlocksSignatures.js";
import {verifyBlocksExecutionPayload, SegmentExecStatus} from "./verifyBlocksExecutionPayloads.js";
import {verifyBlocksDataAvailability} from "./verifyBlocksDataAvailability.js";
import {writeBlockInputToDb} from "./writeBlockInputToDb.js";

/**
Expand All @@ -38,12 +39,12 @@ export async function verifyBlocksInEpoch(
this: BeaconChain,
parentBlock: ProtoBlock,
blocksInput: BlockInput[],
dataAvailabilityStatuses: DataAvailableStatus[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<{
postStates: CachedBeaconStateAllForks[];
proposerBalanceDeltas: number[];
segmentExecStatus: SegmentExecStatus;
dataAvailabilityStatuses: DataAvailableStatus[];
}> {
const blocks = blocksInput.map(({block}) => block);
if (blocks.length === 0) {
Expand Down Expand Up @@ -88,7 +89,12 @@ export async function verifyBlocksInEpoch(

try {
// batch all I/O operations to reduce overhead
const [segmentExecStatus, {postStates, proposerBalanceDeltas}] = await Promise.all([
const [
segmentExecStatus,
{dataAvailabilityStatuses, availableTime},
{postStates, proposerBalanceDeltas, verifyStateTime},
{verifySignaturesTime},
] = await Promise.all([
// Execution payloads
opts.skipVerifyExecutionPayload !== true
? verifyBlocksExecutionPayload(this, parentBlock, blocks, preState0, abortController.signal, opts)
Expand All @@ -98,12 +104,16 @@ export async function verifyBlocksInEpoch(
mergeBlockFound: null,
} as SegmentExecStatus),

// data availability for the blobs
verifyBlocksDataAvailability(this, blocksInput, opts),

// Run state transition only
// TODO: Ensure it yields to allow flushing to workers and engine API
verifyBlocksStateTransitionOnly(
preState0,
blocksInput,
dataAvailabilityStatuses,
// hack availability for state transition eval as availability is separately determined
blocks.map(() => DataAvailableStatus.available),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should base on block.slot to determine DataAvailableStatus preDeneb vs available?

Copy link
Contributor Author

@g11tech g11tech Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internally in state transition it doesn't matter, but for correctness sake may be we can do it, although its inconsequential here as availability is actually verified as part of full block verification

this.logger,
this.metrics,
abortController.signal,
Expand All @@ -113,7 +123,7 @@ export async function verifyBlocksInEpoch(
// All signatures at once
opts.skipVerifyBlockSignatures !== true
? verifyBlocksSignatures(this.bls, this.logger, this.metrics, preState0, blocks, opts)
: Promise.resolve(),
: Promise.resolve({verifySignaturesTime: Date.now()}),

// ideally we want to only persist blocks after verifying them however the reality is there are
// rarely invalid blocks we'll batch all I/O operation here to reduce the overhead if there's
Expand Down Expand Up @@ -151,7 +161,35 @@ export async function verifyBlocksInEpoch(
}
}

return {postStates, proposerBalanceDeltas, segmentExecStatus};
if (segmentExecStatus.execAborted === null) {
const {executionStatuses, executionTime} = segmentExecStatus;
if (
blocksInput.length === 1 &&
// gossip blocks have seenTimestampSec
opts.seenTimestampSec !== undefined &&
blocksInput[0].type !== BlockInputType.preDeneb &&
executionStatuses[0] === ExecutionStatus.Valid
) {
// Find the max time when the block was actually verified
const fullyVerifiedTime = Math.max(executionTime, verifyStateTime, verifySignaturesTime);
const recvTofullyVerifedTime = fullyVerifiedTime / 1000 - opts.seenTimestampSec;
this.metrics?.gossipBlock.receivedToFullyVerifiedTime.observe(recvTofullyVerifedTime);

const verifiedToBlobsAvailabiltyTime = Math.max(availableTime - fullyVerifiedTime, 0) / 1000;
const numBlobs = (blocksInput[0].block as deneb.SignedBeaconBlock).message.body.blobKzgCommitments.length;

this.metrics?.gossipBlock.verifiedToBlobsAvailabiltyTime.observe({numBlobs}, verifiedToBlobsAvailabiltyTime);
this.logger.verbose("Verified blockInput fully with blobs availability", {
slot: blocksInput[0].block.message.slot,
recvTofullyVerifedTime,
verifiedToBlobsAvailabiltyTime,
type: blocksInput[0].type,
numBlobs,
});
}
}

return {postStates, dataAvailabilityStatuses, proposerBalanceDeltas, segmentExecStatus};
} finally {
abortController.abort();
}
Expand Down
Loading
Loading