diff --git a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts index 110e38113ce7..fdafcd77e68a 100644 --- a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts @@ -23,10 +23,10 @@ import { ImportBlockOpts, BlockInput, BlobsSource, - BlockInputDataBlobs, - BlockInputDataDataColumns, + BlockInputBlobs, + BlockInputDataColumns, DataColumnsSource, - BlockInputData, + BlockInputAvailableData, } from "../../../../chain/blocks/types.js"; import {promiseAllMaybeAsync} from "../../../../util/promises.js"; import {isOptimisticBlock} from "../../../../util/forkChoice.js"; @@ -77,7 +77,7 @@ export function getBeaconBlockApi({ if (isSignedBlockContents(signedBlockOrContents)) { ({signedBlock} = signedBlockOrContents); const fork = config.getForkName(signedBlock.message.slot); - let blockData: BlockInputData; + let blockData: BlockInputAvailableData; if (fork === ForkName.peerdas) { dataColumnSidecars = computeDataColumnSidecars(config, signedBlock, signedBlockOrContents); blockData = { @@ -88,7 +88,7 @@ export function getBeaconBlockApi({ dataColumns: dataColumnSidecars, dataColumnsBytes: dataColumnSidecars.map(() => null), dataColumnsSource: DataColumnsSource.api, - } as BlockInputDataDataColumns; + } as BlockInputDataColumns; blobSidecars = []; } else if (fork === ForkName.deneb) { blobSidecars = computeBlobSidecars(config, signedBlock, signedBlockOrContents); @@ -97,7 +97,7 @@ export function getBeaconBlockApi({ blobs: blobSidecars, blobsSource: BlobsSource.api, blobsBytes: blobSidecars.map(() => null), - } as BlockInputDataBlobs; + } as BlockInputBlobs; dataColumnSidecars = []; } else { throw Error(`Invalid data fork=${fork} for publish`); diff --git a/packages/beacon-node/src/chain/blocks/types.ts b/packages/beacon-node/src/chain/blocks/types.ts index 93ab80766b85..31a8c02ef950 100644 --- a/packages/beacon-node/src/chain/blocks/types.ts +++ b/packages/beacon-node/src/chain/blocks/types.ts @@ -1,8 +1,8 @@ -import {CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition"; -import {MaybeValidExecutionStatus, DataAvailabilityStatus} from "@lodestar/fork-choice"; -import {deneb, Slot, RootHex, SignedBeaconBlock, peerdas, ColumnIndex} from "@lodestar/types"; -import {ForkSeq, ForkName} from "@lodestar/params"; -import {ChainForkConfig} from "@lodestar/config"; +import type {ChainForkConfig} from "@lodestar/config"; +import type {DataAvailabilityStatus, MaybeValidExecutionStatus} from "@lodestar/fork-choice"; +import {type ForkName, ForkSeq} from "@lodestar/params"; +import {type CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition"; +import type {ColumnIndex, RootHex, SignedBeaconBlock, Slot, deneb, peerdas} from "@lodestar/types"; export enum BlockInputType { // preData is preDeneb @@ -10,7 +10,7 @@ export enum BlockInputType { // data is out of available window, can be used to sync forward and keep adding to forkchoice outOfRangeData = "outOfRangeData", availableData = "availableData", - dataPromise = "dataPromise", + dataPromise = "data_promise", } /** Enum to represent where blocks come from */ @@ -21,6 +21,25 @@ export enum BlockSource { byRoot = "req_resp_by_root", } +export enum GossipedInputType { + block = "block", + blob = "blob", + dataColumn = "data_column", +} + +interface CachedDataItem { + cacheId: number; +} +interface Availability { + availabilityPromise: Promise; + resolveAvailability: (data: T) => void; +} + +/** + * + * Deneb Blob Format Types + * + */ /** Enum to represent where blobs come from */ export enum BlobsSource { gossip = "gossip", @@ -28,56 +47,84 @@ export enum BlobsSource { byRange = "req_resp_by_range", byRoot = "req_resp_by_root", } +type ForkBlobsInfo = { + fork: ForkName.deneb; +}; +type BlobData = { + blobSidecar: deneb.BlobSidecar; + blobBytes: Uint8Array | null; +}; +export type BlockInputBlobs = ForkBlobsInfo & { + blobs: deneb.BlobSidecars; + blobsBytes: (Uint8Array | null)[]; + blobsSource: BlobsSource; +}; +export type BlobsCacheMap = Map; +type CachedBlobs = CachedDataItem & + ForkBlobsInfo & + Availability & { + blobsCache: BlobsCacheMap; + }; +/** + * + * PeerDAS Column Format Types + * + */ export enum DataColumnsSource { gossip = "gossip", api = "api", byRange = "req_resp_by_range", byRoot = "req_resp_by_root", } - -export enum GossipedInputType { - block = "block", - blob = "blob", - dataColumn = "dataColumn", -} - -export type BlobsCacheMap = Map; -export type DataColumnsCacheMap = Map< - number, - {dataColumnSidecar: peerdas.DataColumnSidecar; dataColumnBytes: Uint8Array | null} ->; - -type ForkBlobsInfo = {fork: ForkName.deneb}; -type BlobsData = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]; blobsSource: BlobsSource}; -export type BlockInputDataBlobs = ForkBlobsInfo & BlobsData; - -type ForkDataColumnsInfo = {fork: ForkName.peerdas}; -type DataColumnsData = { +type ForkDataColumnsInfo = { + fork: ForkName.peerdas; +}; +type DataColumnData = { + dataColumn: peerdas.DataColumnSidecar; + dataColumnBytes: Uint8Array | null; +}; +export type DataColumnsCacheMap = Map; +export type BlockInputDataColumns = ForkDataColumnsInfo & { // marker of that columns are to be custodied dataColumns: peerdas.DataColumnSidecars; dataColumnsBytes: (Uint8Array | null)[]; dataColumnsSource: DataColumnsSource; }; -export type BlockInputDataDataColumns = ForkDataColumnsInfo & DataColumnsData; -export type BlockInputData = BlockInputDataBlobs | BlockInputDataDataColumns; +type CachedDataColumns = CachedDataItem & + ForkDataColumnsInfo & + Availability & { + dataColumnsCache: DataColumnsCacheMap; + }; -type Availability = {availabilityPromise: Promise; resolveAvailability: (data: T) => void}; -type CachedBlobs = {blobsCache: BlobsCacheMap} & Availability; -type CachedDataColumns = {dataColumnsCache: DataColumnsCacheMap} & Availability; -export type CachedData = {cacheId: number} & ( - | (ForkBlobsInfo & CachedBlobs) - | (ForkDataColumnsInfo & CachedDataColumns) -); +/** + * + * Cross-Fork Data Types + * + */ +export type BlockInputAvailableData = BlockInputBlobs | BlockInputDataColumns; +export type BlockInputCachedData = CachedBlobs | CachedDataColumns; -export type BlockInput = {block: SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & ( +export type BlockInput = { + block: SignedBeaconBlock; + source: BlockSource; + blockBytes: Uint8Array | null; +} & ( | {type: BlockInputType.preData | BlockInputType.outOfRangeData} - | ({type: BlockInputType.availableData} & {blockData: BlockInputData}) + | ({type: BlockInputType.availableData} & { + blockData: BlockInputAvailableData; + }) // the blobsSource here is added to BlockInputBlobs when availability is resolved - | ({type: BlockInputType.dataPromise} & {cachedData: CachedData}) + | ({type: BlockInputType.dataPromise} & { + cachedData: BlockInputCachedData; + }) ); -export type NullBlockInput = {block: null; blockRootHex: RootHex; blockInputPromise: Promise} & { - cachedData: CachedData; +export type NullBlockInput = { + block: null; + blockRootHex: RootHex; + blockInputPromise: Promise; +} & { + cachedData: BlockInputCachedData; }; export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean { @@ -134,7 +181,7 @@ export const getBlockInput = { block: SignedBeaconBlock, source: BlockSource, blockBytes: Uint8Array | null, - blockData: BlockInputData + blockData: BlockInputAvailableData ): BlockInput { if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) { throw Error(`Pre Deneb block slot ${block.message.slot}`); @@ -153,7 +200,7 @@ export const getBlockInput = { block: SignedBeaconBlock, source: BlockSource, blockBytes: Uint8Array | null, - cachedData: CachedData + cachedData: BlockInputCachedData ): BlockInput { if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) { throw Error(`Pre Deneb block slot ${block.message.slot}`); @@ -168,7 +215,7 @@ export const getBlockInput = { }, }; -export function getBlockInputBlobs(blobsCache: BlobsCacheMap): Omit { +export function getBlockInputBlobs(blobsCache: BlobsCacheMap): Omit { const blobs = []; const blobsBytes = []; @@ -187,7 +234,7 @@ export function getBlockInputBlobs(blobsCache: BlobsCacheMap): Omit { +): Omit { const dataColumns = []; const dataColumnsBytes = []; @@ -197,7 +244,7 @@ export function getBlockInputDataColumns( // check if the index is correct as per the custody columns throw Error(`Missing dataColumnCache at index=${index}`); } - const {dataColumnSidecar, dataColumnBytes} = dataColumnCache; + const {dataColumn: dataColumnSidecar, dataColumnBytes} = dataColumnCache; dataColumns.push(dataColumnSidecar); dataColumnsBytes.push(dataColumnBytes); } diff --git a/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts b/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts index ac87529a30cd..0b5f771f0e86 100644 --- a/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts +++ b/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts @@ -14,7 +14,7 @@ import { ImportBlockOpts, BlobSidecarValidation, getBlockInput, - BlockInputData, + BlockInputAvailableData, } from "./types.js"; // we can now wait for full 12 seconds because unavailable block sync will try pulling @@ -105,7 +105,7 @@ async function maybeValidateBlobs( : await raceWithCutoff( chain, blockInput, - blockInput.cachedData.availabilityPromise as Promise + blockInput.cachedData.availabilityPromise as Promise ); if (blockData.fork === ForkName.deneb) { diff --git a/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts b/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts index b26017283dcd..d9b75524d3d1 100644 --- a/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts +++ b/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts @@ -9,14 +9,14 @@ import { NullBlockInput, getBlockInput, BlockSource, - BlockInputDataBlobs, - CachedData, + BlockInputBlobs, + BlockInputCachedData, GossipedInputType, getBlockInputBlobs, BlobsSource, DataColumnsSource, getBlockInputDataColumns, - BlockInputDataDataColumns, + BlockInputDataColumns, } from "../blocks/types.js"; import {Metrics} from "../../metrics/index.js"; import {CustodyConfig} from "../../util/dataColumns.js"; @@ -39,7 +39,7 @@ type BlockInputCacheType = { fork: ForkName; block?: SignedBeaconBlock; blockBytes?: Uint8Array | null; - cachedData?: CachedData; + cachedData?: BlockInputCachedData; // block promise and its callback cached for delayed resolution blockInputPromise: Promise; resolveBlockInput: (blockInput: BlockInput) => void; @@ -134,7 +134,7 @@ export class SeenGossipBlockInput { // TODO: freetheblobs check if its the same blob or a duplicate and throw/take actions blockCache.cachedData?.dataColumnsCache.set(dataColumnSidecar.index, { - dataColumnSidecar, + dataColumn: dataColumnSidecar, // easily splice out the unsigned message as blob is a fixed length type dataColumnBytes: dataColumnBytes?.slice(0, dataColumnBytes.length) ?? null, }); @@ -369,8 +369,8 @@ export function getEmptyBlockInputCacheEntry(fork: ForkName, globalCacheId: numb } if (fork === ForkName.deneb) { - let resolveAvailability: ((blobs: BlockInputDataBlobs) => void) | null = null; - const availabilityPromise = new Promise((resolveCB) => { + let resolveAvailability: ((blobs: BlockInputBlobs) => void) | null = null; + const availabilityPromise = new Promise((resolveCB) => { resolveAvailability = resolveCB; }); @@ -379,7 +379,7 @@ export function getEmptyBlockInputCacheEntry(fork: ForkName, globalCacheId: numb } const blobsCache = new Map(); - const cachedData: CachedData = { + const cachedData: BlockInputCachedData = { fork, blobsCache, availabilityPromise, @@ -388,8 +388,8 @@ export function getEmptyBlockInputCacheEntry(fork: ForkName, globalCacheId: numb }; return {fork, blockInputPromise, resolveBlockInput, cachedData}; } else if (fork === ForkName.peerdas) { - let resolveAvailability: ((blobs: BlockInputDataDataColumns) => void) | null = null; - const availabilityPromise = new Promise((resolveCB) => { + let resolveAvailability: ((blobs: BlockInputDataColumns) => void) | null = null; + const availabilityPromise = new Promise((resolveCB) => { resolveAvailability = resolveCB; }); @@ -398,7 +398,7 @@ export function getEmptyBlockInputCacheEntry(fork: ForkName, globalCacheId: numb } const dataColumnsCache = new Map(); - const cachedData: CachedData = { + const cachedData: BlockInputCachedData = { fork, dataColumnsCache, availabilityPromise, diff --git a/packages/beacon-node/src/network/processor/gossipHandlers.ts b/packages/beacon-node/src/network/processor/gossipHandlers.ts index cfa08b59a7c1..c6f63e44bdf3 100644 --- a/packages/beacon-node/src/network/processor/gossipHandlers.ts +++ b/packages/beacon-node/src/network/processor/gossipHandlers.ts @@ -55,7 +55,7 @@ import { BlobSidecarValidation, BlockInputType, NullBlockInput, - BlockInputData, + BlockInputAvailableData, } from "../../chain/blocks/types.js"; import {sszDeserialize} from "../gossip/topic.js"; import {INetworkCore} from "../core/index.js"; @@ -407,7 +407,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler raceWithCutoff( chain, blockSlot, - blockInput.cachedData.availabilityPromise as Promise, + blockInput.cachedData.availabilityPromise as Promise, BLOCK_AVAILABILITY_CUTOFF_MS ).catch((_e) => { chain.logger.debug("Block under processing not yet available, racing with cutoff to add to unknownBlockInput", { @@ -469,7 +469,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler await raceWithCutoff( chain, blobSlot, - blockInput.cachedData.availabilityPromise as Promise, + blockInput.cachedData.availabilityPromise as Promise, BLOCK_AVAILABILITY_CUTOFF_MS ).catch((_e) => { chain.logger.debug("Block under processing not yet fully available adding to unknownBlockInput", { @@ -506,7 +506,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler await raceWithCutoff( chain, blobSlot, - normalBlockInput.cachedData.availabilityPromise as Promise, + normalBlockInput.cachedData.availabilityPromise as Promise, BLOCK_AVAILABILITY_CUTOFF_MS ).catch((_e) => { chain.logger.debug("Block under processing not yet fully available adding to unknownBlockInput", { @@ -557,7 +557,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler await raceWithCutoff( chain, blobSlot, - blockInput.cachedData.availabilityPromise as Promise, + blockInput.cachedData.availabilityPromise as Promise, BLOCK_AVAILABILITY_CUTOFF_MS ).catch((_e) => { chain.logger.debug("Block under processing not yet fully available adding to unknownBlockInput", { @@ -587,7 +587,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler await raceWithCutoff( chain, blobSlot, - normalBlockInput.cachedData.availabilityPromise as Promise, + normalBlockInput.cachedData.availabilityPromise as Promise, BLOCK_AVAILABILITY_CUTOFF_MS ).catch((_e) => { chain.logger.debug("Block under processing not yet fully available adding to unknownBlockInput", { diff --git a/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRange.ts b/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRange.ts index 2d9287c2e505..8a5871c7895e 100644 --- a/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRange.ts +++ b/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRange.ts @@ -10,8 +10,8 @@ import { BlockInput, BlockSource, getBlockInput, - BlockInputDataBlobs, - BlockInputDataDataColumns, + BlockInputBlobs, + BlockInputDataColumns, DataColumnsSource, BlockInputType, getBlockInputDataColumns, @@ -198,7 +198,7 @@ export function matchBlockWithBlobs( blobs: blobSidecars, blobsSource, blobsBytes: Array.from({length: blobKzgCommitmentsLen}, () => null), - } as BlockInputDataBlobs; + } as BlockInputBlobs; // TODO DENEB: instead of null, pass payload in bytes blockInputs.push(getBlockInput.availableData(config, block.data, blockSource, block.bytes, blockData)); @@ -293,7 +293,7 @@ export function matchBlockWithDataColumns( dataColumns: [], dataColumnsBytes: [], dataColumnsSource, - } as BlockInputDataDataColumns; + } as BlockInputDataColumns; blockInputs.push(getBlockInput.availableData(config, block.data, blockSource, null, blockData)); } else { // Quick inspect how many blobSidecars was expected @@ -346,7 +346,7 @@ export function matchBlockWithDataColumns( } for (const dataColumnSidecar of dataColumnSidecars) { - cachedData.dataColumnsCache.set(dataColumnSidecar.index, {dataColumnSidecar, dataColumnBytes: null}); + cachedData.dataColumnsCache.set(dataColumnSidecar.index, {dataColumn: dataColumnSidecar, dataColumnBytes: null}); } if (shouldHaveAllData) { @@ -357,7 +357,7 @@ export function matchBlockWithDataColumns( dataColumns, dataColumnsBytes, dataColumnsSource, - } as BlockInputDataDataColumns; + } as BlockInputDataColumns; // TODO DENEB: instead of null, pass payload in bytes blockInputs.push(getBlockInput.availableData(config, block.data, blockSource, block.bytes, blockData)); diff --git a/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts b/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts index 81fd36adcd0c..6e17b9e42fb9 100644 --- a/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts +++ b/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts @@ -11,9 +11,9 @@ import { getBlockInput, NullBlockInput, BlobsSource, - BlockInputDataBlobs, + BlockInputBlobs, DataColumnsSource, - BlockInputDataDataColumns, + BlockInputDataColumns, } from "../../chain/blocks/types.js"; import {PeerIdStr} from "../../util/peerId.js"; import {INetwork} from "../interface.js"; @@ -228,7 +228,7 @@ export async function unavailableBeaconBlobsByRoot( if (blobs.length !== blobKzgCommitmentsLen) { throw Error(`Not all blobs fetched missingBlobs=${blobKzgCommitmentsLen - blobs.length}`); } - const blockData = {fork: cachedData.fork, ...allBlobs, blobsSource: BlobsSource.byRoot} as BlockInputDataBlobs; + const blockData = {fork: cachedData.fork, ...allBlobs, blobsSource: BlobsSource.byRoot} as BlockInputBlobs; resolveAvailability(blockData); metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.UNKNOWN_SYNC}); availableBlockInput = getBlockInput.availableData(config, block, BlockSource.byRoot, blockBytes, blockData); @@ -247,7 +247,7 @@ export async function unavailableBeaconBlobsByRoot( dataColumns: [], dataColumnsBytes: [], dataColumnsSource: DataColumnsSource.gossip, - } as BlockInputDataDataColumns; + } as BlockInputDataColumns; resolveAvailability(blockData); metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.UNKNOWN_SYNC}); diff --git a/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts b/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts index d7204a8560f9..4e377c235b0b 100644 --- a/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts +++ b/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts @@ -19,8 +19,8 @@ import { BlockInputType, BlockSource, BlockInput, - BlockInputDataBlobs, - CachedData, + BlockInputBlobs, + BlockInputCachedData, } from "../../../../src/chain/blocks/types.js"; import {ZERO_HASH, ZERO_HASH_HEX} from "../../../../src/constants/constants.js"; import {IteratorEventType} from "../../../../src/util/asyncIterableToEvents.js"; @@ -252,8 +252,8 @@ describe.skip("data serialization through worker boundary", function () { type Resolves> = T extends Promise ? (U extends void ? null : U) : never; function getEmptyBlockInput(): BlockInput { - let resolveAvailability: ((blobs: BlockInputDataBlobs) => void) | null = null; - const availabilityPromise = new Promise((resolveCB) => { + let resolveAvailability: ((blobs: BlockInputBlobs) => void) | null = null; + const availabilityPromise = new Promise((resolveCB) => { resolveAvailability = resolveCB; }); if (resolveAvailability === null) { @@ -261,7 +261,7 @@ function getEmptyBlockInput(): BlockInput { } const blobsCache = new Map(); - const cachedData = {fork: ForkName.deneb, blobsCache, availabilityPromise, resolveAvailability} as CachedData; + const cachedData = {fork: ForkName.deneb, blobsCache, availabilityPromise, resolveAvailability} as BlockInputCachedData; return { type: BlockInputType.dataPromise, block: ssz.deneb.SignedBeaconBlock.defaultValue(),