diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index a3c004fb90..58a2f305ca 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -222,13 +222,11 @@ proc verifySidecars( proc storeSidecars(self: BlockProcessor, sidecarsOpt: Opt[BlobSidecars]) = if sidecarsOpt.isSome(): - debug "Inserting blobs into database", blobs = sidecarsOpt[].len for b in sidecarsOpt[]: self.consensusManager.dag.db.putBlobSidecar(b[]) proc storeSidecars(self: BlockProcessor, sidecarsOpt: Opt[DataColumnSidecars]) = if sidecarsOpt.isSome(): - debug "Inserting columns into database", columns = sidecarsOpt[].len for c in sidecarsOpt[]: self.consensusManager.dag.db.putDataColumnSidecar(c[]) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 53b78b5a7e..540e56662e 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -9,7 +9,7 @@ import system/ansi_c, - std/[os, random, terminal, times], + std/[os, random, strutils, terminal, times], chronos, chronicles, metrics, metrics/chronos_httpserver, stew/[byteutils, io2], @@ -1725,7 +1725,7 @@ proc reconstructDataColumns(node: BeaconNode, slot: Slot) = if node.dag.db.getDataColumnSidecar(forkyBlck.root, i, colData): columns.add(newClone(colData)) indices.incl(i) - debug "Stored data columns", columns = indices.len + debug "PeerDAS: Data columns before reconstruction", columns = indices.len # Make sure the node has obtained 50%+ of all the columns if columns.lenu64 < (maxColCount div 2): @@ -1741,7 +1741,7 @@ proc reconstructDataColumns(node: BeaconNode, slot: Slot) = # Reconstruct columns let recovered = recover_cells_and_proofs_parallel( node.batchVerifier[].taskpool, columns).valueOr: - error "Error in data column reconstruction" + error "Data column reconstruction incomplete" return let rowCount = recovered.len var reconCounter = 0 @@ -1974,7 +1974,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = if (not node.config.peerdasSupernode) and (slot.epoch() + 1).start_slot() - slot == 1 and - node.quarantine.sidecarless.len == 0 and + node.dataColumnQuarantine[].len == 0 and node.attachedValidatorBalanceTotal > 0.Gwei: # Detect new validator custody at the last slot of every epoch node.validatorCustody.detectNewValidatorCustody(slot, diff --git a/beacon_chain/spec/peerdas_helpers.nim b/beacon_chain/spec/peerdas_helpers.nim index ba0738dd2f..cf43fd386d 100644 --- a/beacon_chain/spec/peerdas_helpers.nim +++ b/beacon_chain/spec/peerdas_helpers.nim @@ -9,7 +9,7 @@ # Uncategorized helper functions from the spec import - chronicles, results, taskpools, + chronos, chronicles, results, taskpools, eth/p2p/discoveryv5/node, kzg4844/kzg, ssz_serialization/[ @@ -163,30 +163,51 @@ proc recover_cells_and_proofs_parallel*( for column in dataColumns: if not (blobCount == column.column.len): - return err ("DataColumns do not have the same length") + return err("DataColumns do not have the same length") - # spawn threads for recovery var - pendingFuts = newSeq[Flowvar[Result[CellsAndProofs, void]]](blobCount) + pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] res = newSeq[CellsAndProofs](blobCount) - for blobIdx in 0.. reconstructionTimeout: + debug "PeerDAS reconstruction timed out while preparing columns", + spawned = pendingFuts.len, total = blobCount + break # Stop spawning new tasks + var cellIndices = newSeq[CellIndex](columnCount) cells = newSeq[Cell](columnCount) for i in 0 ..< dataColumns.len: cellIndices[i] = dataColumns[i][].index cells[i] = dataColumns[i][].column[blobIdx] - pendingFuts[blobIdx] = - tp.spawn recoverCellsAndKzgProofsTask(cellIndices, cells) + pendingFuts.add(tp.spawn recoverCellsAndKzgProofsTask(cellIndices, cells)) + + # ---- Sync phase ---- + for i in 0 ..< pendingFuts.len: + let now = Moment.now() + if (now - startTime) > reconstructionTimeout: + debug "PeerDAS reconstruction timed out", + completed = i, totalSpawned = pendingFuts.len + return err("Data column reconstruction timed out") - # sync threads - for i in 0..