Skip to content

Commit

Permalink
feat: optimistically verify blocks even before all blobs available
Browse files Browse the repository at this point in the history
cleanup pr and add metrics to track

simplify

improvements and type fixes

increase bucket precision

time fixes

improve metrics collection

improve metrics collection

some comments improv

fix the missing writing blobs for blobspromise
  • Loading branch information
g11tech committed Nov 20, 2023
1 parent fa30bcf commit 1c1f667
Show file tree
Hide file tree
Showing 13 changed files with 377 additions and 140 deletions.
15 changes: 3 additions & 12 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ export async function processBlocks(
}

try {
const {relevantBlocks, dataAvailabilityStatuses, parentSlots, parentBlock} = verifyBlocksSanityChecks(
this,
blocks,
opts
);
const {relevantBlocks, parentSlots, parentBlock} = verifyBlocksSanityChecks(this, blocks, opts);

// No relevant blocks, skip verifyBlocksInEpoch()
if (relevantBlocks.length === 0 || parentBlock === null) {
Expand All @@ -72,13 +68,8 @@ export async function processBlocks(

// Fully verify a block to be imported immediately after. Does not produce any side-effects besides adding intermediate
// states in the state cache through regen.
const {postStates, proposerBalanceDeltas, segmentExecStatus} = await verifyBlocksInEpoch.call(
this,
parentBlock,
relevantBlocks,
dataAvailabilityStatuses,
opts
);
const {postStates, dataAvailabilityStatuses, proposerBalanceDeltas, segmentExecStatus} =
await verifyBlocksInEpoch.call(this, parentBlock, relevantBlocks, opts);

// If segmentExecStatus has lvhForkchoice then, the entire segment should be invalid
// and we need to further propagate
Expand Down
137 changes: 96 additions & 41 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {pruneSetToMax} from "@lodestar/utils";
export enum BlockInputType {
preDeneb = "preDeneb",
postDeneb = "postDeneb",
blobsPromise = "blobsPromise",
}

/** Enum to represent where blocks come from */
Expand All @@ -19,9 +20,13 @@ export enum BlockSource {
byRoot = "req_resp_by_root",
}

type BlobsCache = Map<number, {blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null}>;
type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]};

export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & (
| {type: BlockInputType.preDeneb}
| {type: BlockInputType.postDeneb; blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]}
| ({type: BlockInputType.postDeneb} & BlockInputBlobs)
| {type: BlockInputType.blobsPromise; blobsCache: BlobsCache; availabilityPromise: Promise<BlockInputBlobs>}
);

export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean {
Expand All @@ -42,8 +47,10 @@ type GossipedBlockInput =
type BlockInputCacheType = {
block?: allForks.SignedBeaconBlock;
blockBytes?: Uint8Array | null;
blobs: Map<number, deneb.BlobSidecar>;
blobsBytes: Map<number, Uint8Array | null>;
blobsCache: BlobsCache;
// promise and its callback cached for delayed resolution
availabilityPromise: Promise<BlockInputBlobs>;
resolveAvailability: (blobs: BlockInputBlobs) => void;
};

const MAX_GOSSIPINPUT_CACHE = 5;
Expand All @@ -53,13 +60,19 @@ const BLOBSIDECAR_FIXED_SIZE = 131256;
export const getBlockInput = {
blockInputCache: new Map<RootHex, BlockInputCacheType>(),

prune(): void {
pruneSetToMax(this.blockInputCache, MAX_GOSSIPINPUT_CACHE);
},

getGossipBlockInput(
config: ChainForkConfig,
gossipedInput: GossipedBlockInput
):
| {blockInput: BlockInput; blockInputMeta: {pending: null; haveBlobs: number; expectedBlobs: number}}
| {blockInput: null; blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null}}
| {blockInput: null; blockInputMeta: {pending: GossipedInputType.blob; haveBlobs: number; expectedBlobs: number}} {
| {
blockInput: BlockInput;
blockInputMeta: {pending: GossipedInputType.blob | null; haveBlobs: number; expectedBlobs: number};
}
| {blockInput: null; blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null}} {
let blockHex;
let blockCache;

Expand All @@ -69,75 +82,66 @@ export const getBlockInput = {
blockHex = toHexString(
config.getForkTypes(signedBlock.message.slot).BeaconBlock.hashTreeRoot(signedBlock.message)
);
blockCache = this.blockInputCache.get(blockHex) ?? {
blobs: new Map<number, deneb.BlobSidecar>(),
blobsBytes: new Map<number, Uint8Array | null>(),
};
blockCache = this.blockInputCache.get(blockHex) ?? getEmptyBlockInputCacheEntry();

blockCache.block = signedBlock;
blockCache.blockBytes = blockBytes;
} else {
const {signedBlob, blobBytes} = gossipedInput;
blockHex = toHexString(signedBlob.message.blockRoot);
blockCache = this.blockInputCache.get(blockHex);

// If a new entry is going to be inserted, prune out old ones
if (blockCache === undefined) {
pruneSetToMax(this.blockInputCache, MAX_GOSSIPINPUT_CACHE);
blockCache = {blobs: new Map<number, deneb.BlobSidecar>(), blobsBytes: new Map<number, Uint8Array | null>()};
}
blockCache = this.blockInputCache.get(blockHex) ?? getEmptyBlockInputCacheEntry();

// TODO: freetheblobs check if its the same blob or a duplicate and throw/take actions
blockCache.blobs.set(signedBlob.message.index, signedBlob.message);
// easily splice out the unsigned message as blob is a fixed length type
blockCache.blobsBytes.set(signedBlob.message.index, blobBytes?.slice(0, BLOBSIDECAR_FIXED_SIZE) ?? null);
blockCache.blobsCache.set(signedBlob.message.index, {
blobSidecar: signedBlob.message,
// easily splice out the unsigned message as blob is a fixed length type
blobBytes: blobBytes?.slice(0, BLOBSIDECAR_FIXED_SIZE) ?? null,
});
}

this.blockInputCache.set(blockHex, blockCache);
const {block: signedBlock, blockBytes} = blockCache;
const {block: signedBlock, blockBytes, blobsCache, availabilityPromise, resolveAvailability} = blockCache;

if (signedBlock !== undefined) {
// block is available, check if all blobs have shown up
const {slot, body} = signedBlock.message;
const {blobKzgCommitments} = body as deneb.BeaconBlockBody;
const blockInfo = `blockHex=${blockHex}, slot=${slot}`;

if (blobKzgCommitments.length < blockCache.blobs.size) {
if (blobKzgCommitments.length < blobsCache.size) {
throw Error(
`Received more blobs=${blockCache.blobs.size} than commitments=${blobKzgCommitments.length} for ${blockInfo}`
`Received more blobs=${blobsCache.size} than commitments=${blobKzgCommitments.length} for ${blockInfo}`
);
}
if (blobKzgCommitments.length === blockCache.blobs.size) {
const blobSidecars = [];
const blobsBytes = [];

for (let index = 0; index < blobKzgCommitments.length; index++) {
const blobSidecar = blockCache.blobs.get(index);
if (blobSidecar === undefined) {
throw Error(`Missing blobSidecar at index=${index} for ${blockInfo}`);
}
blobSidecars.push(blobSidecar);
blobsBytes.push(blockCache.blobsBytes.get(index) ?? null);
}

if (blobKzgCommitments.length === blobsCache.size) {
const allBlobs = getBlockInputBlobs(blobsCache);
resolveAvailability(allBlobs);
const {blobs, blobsBytes} = allBlobs;
return {
// TODO freetheblobs: collate and add serialized data for the postDeneb blockinput
blockInput: getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.gossip,
blobSidecars,
blobs,
blockBytes ?? null,
blobsBytes
),
blockInputMeta: {pending: null, haveBlobs: blockCache.blobs.size, expectedBlobs: blobKzgCommitments.length},
blockInputMeta: {pending: null, haveBlobs: blobs.length, expectedBlobs: blobKzgCommitments.length},
};
} else {
return {
blockInput: null,
blockInput: getBlockInput.blobsPromise(
config,
signedBlock,
BlockSource.gossip,
blobsCache,
blockBytes ?? null,
availabilityPromise
),
blockInputMeta: {
pending: GossipedInputType.blob,
haveBlobs: blockCache.blobs.size,
haveBlobs: blobsCache.size,
expectedBlobs: blobKzgCommitments.length,
},
};
Expand All @@ -146,7 +150,7 @@ export const getBlockInput = {
// will need to wait for the block to showup
return {
blockInput: null,
blockInputMeta: {pending: GossipedInputType.block, haveBlobs: blockCache.blobs.size, expectedBlobs: null},
blockInputMeta: {pending: GossipedInputType.block, haveBlobs: blobsCache.size, expectedBlobs: null},
};
}
},
Expand Down Expand Up @@ -188,8 +192,59 @@ export const getBlockInput = {
blobsBytes,
};
},

blobsPromise(
config: ChainForkConfig,
block: allForks.SignedBeaconBlock,
source: BlockSource,
blobsCache: BlobsCache,
blockBytes: Uint8Array | null,
availabilityPromise: Promise<BlockInputBlobs>
): BlockInput {
if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) {
throw Error(`Pre Deneb block slot ${block.message.slot}`);
}
return {
type: BlockInputType.blobsPromise,
block,
source,
blobsCache,
blockBytes,
availabilityPromise,
};
},
};

function getEmptyBlockInputCacheEntry(): BlockInputCacheType {
// Capture both the promise and its callbacks.
// It is not spec'ed but in tests in Firefox and NodeJS the promise constructor is run immediately
let resolveAvailability: ((blobs: BlockInputBlobs) => void) | null = null;
const availabilityPromise = new Promise<BlockInputBlobs>((resolveCB) => {
resolveAvailability = resolveCB;
});
if (resolveAvailability === null) {
throw Error("Promise Constructor was not executed immediately");
}
const blobsCache = new Map();
return {availabilityPromise, resolveAvailability, blobsCache};
}

function getBlockInputBlobs(blobsCache: BlobsCache): BlockInputBlobs {
const blobs = [];
const blobsBytes = [];

for (let index = 0; index < blobsCache.size; index++) {
const blobCache = blobsCache.get(index);
if (blobCache === undefined) {
throw Error(`Missing blobSidecar at index=${index}`);
}
const {blobSidecar, blobBytes} = blobCache;
blobs.push(blobSidecar);
blobsBytes.push(blobBytes);
}
return {blobs, blobsBytes};
}

export enum AttestationImportOpt {
Skip,
Force,
Expand Down
57 changes: 50 additions & 7 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@ import {
isStateValidatorsNodesPopulated,
DataAvailableStatus,
} from "@lodestar/state-transition";
import {bellatrix} from "@lodestar/types";
import {bellatrix, deneb} from "@lodestar/types";
import {ForkName} from "@lodestar/params";
import {ProtoBlock} from "@lodestar/fork-choice";
import {ProtoBlock, ExecutionStatus} from "@lodestar/fork-choice";
import {ChainForkConfig} from "@lodestar/config";
import {Logger} from "@lodestar/utils";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockProcessOpts} from "../options.js";
import {RegenCaller} from "../regen/index.js";
import type {BeaconChain} from "../chain.js";
import {BlockInput, ImportBlockOpts} from "./types.js";
import {BlockInput, ImportBlockOpts, BlockInputType} from "./types.js";
import {POS_PANDA_MERGE_TRANSITION_BANNER} from "./utils/pandaMergeTransitionBanner.js";
import {CAPELLA_OWL_BANNER} from "./utils/ownBanner.js";
import {DENEB_BLOWFISH_BANNER} from "./utils/blowfishBanner.js";
import {verifyBlocksStateTransitionOnly} from "./verifyBlocksStateTransitionOnly.js";
import {verifyBlocksSignatures} from "./verifyBlocksSignatures.js";
import {verifyBlocksExecutionPayload, SegmentExecStatus} from "./verifyBlocksExecutionPayloads.js";
import {verifyBlocksDataAvailability} from "./verifyBlocksDataAvailability.js";
import {writeBlockInputToDb} from "./writeBlockInputToDb.js";

/**
Expand All @@ -38,12 +39,12 @@ export async function verifyBlocksInEpoch(
this: BeaconChain,
parentBlock: ProtoBlock,
blocksInput: BlockInput[],
dataAvailabilityStatuses: DataAvailableStatus[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<{
postStates: CachedBeaconStateAllForks[];
proposerBalanceDeltas: number[];
segmentExecStatus: SegmentExecStatus;
dataAvailabilityStatuses: DataAvailableStatus[];
}> {
const blocks = blocksInput.map(({block}) => block);
if (blocks.length === 0) {
Expand Down Expand Up @@ -88,15 +89,23 @@ export async function verifyBlocksInEpoch(

try {
// batch all I/O operations to reduce overhead
const [segmentExecStatus, {postStates, proposerBalanceDeltas}] = await Promise.all([
const [
segmentExecStatus,
{dataAvailabilityStatuses, availableTime},
{postStates, proposerBalanceDeltas, verifyStateTime},
{verifySignaturesTime},
] = await Promise.all([
// Execution payloads
verifyBlocksExecutionPayload(this, parentBlock, blocks, preState0, abortController.signal, opts),
// data availability for the blobs
verifyBlocksDataAvailability(this, blocksInput, opts),
// Run state transition only
// TODO: Ensure it yields to allow flushing to workers and engine API
verifyBlocksStateTransitionOnly(
preState0,
blocksInput,
dataAvailabilityStatuses,
// hack availability for state transition eval as availability is separately determined
blocks.map(() => DataAvailableStatus.available),
this.logger,
this.metrics,
abortController.signal,
Expand Down Expand Up @@ -138,7 +147,41 @@ export async function verifyBlocksInEpoch(
}
}

return {postStates, proposerBalanceDeltas, segmentExecStatus};
if (segmentExecStatus.execAborted === null) {
const {executionStatuses, executionTime} = segmentExecStatus;
if (
blocksInput.length === 1 &&
// gossip blocks have seenTimestampSec
opts.seenTimestampSec !== undefined &&
blocksInput[0].type !== BlockInputType.preDeneb &&
executionStatuses[0] === ExecutionStatus.Valid
) {
// Find the max time when the block was actually verified
const fullyVerifiedTime = Math.max(executionTime, verifyStateTime, verifySignaturesTime);
const recvTofullyVerifedTime = fullyVerifiedTime / 1000 - opts.seenTimestampSec;
this.metrics?.gossipBlock.receivedTofullyVerifedTime.observe(recvTofullyVerifedTime);

const verifiedToBlobsAvailabiltyTime = Math.max(availableTime - fullyVerifiedTime, 0) / 1000;
const numBlobs = (blocksInput[0].block as deneb.SignedBeaconBlock).message.body.blobKzgCommitments.length;

this.metrics?.gossipBlock.verifiedToBlobsAvailabiltyTime.observe({numBlobs}, verifiedToBlobsAvailabiltyTime);
this.logger.verbose("Verified blockInput fully with blobs availability", {
slot: blocksInput[0].block.message.slot,
recvTofullyVerifedTime,
verifiedToBlobsAvailabiltyTime,
type: blocksInput[0].type,
numBlobs,
});
}
}

if (blocksInput[blocksInput.length - 1].type === BlockInputType.blobsPromise) {
// if the last block was unavailable (only last block should be unavailable) then
// record the availability delay
this.metrics?.gossipBlock.elapsedTimeTillProcessed;
}

return {postStates, dataAvailabilityStatuses, proposerBalanceDeltas, segmentExecStatus};
} finally {
abortController.abort();
}
Expand Down
Loading

0 comments on commit 1c1f667

Please sign in to comment.