Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,11 @@ proc verifySidecars(

proc storeSidecars(self: BlockProcessor, sidecarsOpt: Opt[BlobSidecars]) =
if sidecarsOpt.isSome():
debug "Inserting blobs into database", blobs = sidecarsOpt[].len
for b in sidecarsOpt[]:
self.consensusManager.dag.db.putBlobSidecar(b[])

proc storeSidecars(self: BlockProcessor, sidecarsOpt: Opt[DataColumnSidecars]) =
if sidecarsOpt.isSome():
debug "Inserting columns into database", columns = sidecarsOpt[].len
for c in sidecarsOpt[]:
self.consensusManager.dag.db.putDataColumnSidecar(c[])

Expand Down
8 changes: 4 additions & 4 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import
system/ansi_c,
std/[os, random, terminal, times],
std/[os, random, strutils, terminal, times],
chronos, chronicles,
metrics, metrics/chronos_httpserver,
stew/[byteutils, io2],
Expand Down Expand Up @@ -1725,7 +1725,7 @@ proc reconstructDataColumns(node: BeaconNode, slot: Slot) =
if node.dag.db.getDataColumnSidecar(forkyBlck.root, i, colData):
columns.add(newClone(colData))
indices.incl(i)
debug "Stored data columns", columns = indices.len
debug "PeerDAS: Data columns before reconstruction", columns = indices.len

# Make sure the node has obtained 50%+ of all the columns
if columns.lenu64 < (maxColCount div 2):
Expand All @@ -1741,7 +1741,7 @@ proc reconstructDataColumns(node: BeaconNode, slot: Slot) =
# Reconstruct columns
let recovered = recover_cells_and_proofs_parallel(
node.batchVerifier[].taskpool, columns).valueOr:
error "Error in data column reconstruction"
error "Data column reconstruction incomplete"
return
let rowCount = recovered.len
var reconCounter = 0
Expand Down Expand Up @@ -1974,7 +1974,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =

if (not node.config.peerdasSupernode) and
(slot.epoch() + 1).start_slot() - slot == 1 and
node.quarantine.sidecarless.len == 0 and
node.dataColumnQuarantine[].len == 0 and
node.attachedValidatorBalanceTotal > 0.Gwei:
# Detect new validator custody at the last slot of every epoch
node.validatorCustody.detectNewValidatorCustody(slot,
Expand Down
39 changes: 30 additions & 9 deletions beacon_chain/spec/peerdas_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

# Uncategorized helper functions from the spec
import
chronicles, results, taskpools,
chronos, chronicles, results, taskpools,
eth/p2p/discoveryv5/node,
kzg4844/kzg,
ssz_serialization/[
Expand Down Expand Up @@ -163,30 +163,51 @@ proc recover_cells_and_proofs_parallel*(

for column in dataColumns:
if not (blobCount == column.column.len):
return err ("DataColumns do not have the same length")
return err("DataColumns do not have the same length")

# spawn threads for recovery
var
pendingFuts = newSeq[Flowvar[Result[CellsAndProofs, void]]](blobCount)
pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]]
res = newSeq[CellsAndProofs](blobCount)
for blobIdx in 0..<blobCount:

let startTime = Moment.now()
const reconstructionTimeout = 2.seconds

# ---- Spawn phase with time limit ----
for blobIdx in 0 ..< blobCount:
let now = Moment.now()
if (now - startTime) > reconstructionTimeout:
debug "PeerDAS reconstruction timed out while preparing columns",
spawned = pendingFuts.len, total = blobCount
break # Stop spawning new tasks

var
cellIndices = newSeq[CellIndex](columnCount)
cells = newSeq[Cell](columnCount)
for i in 0 ..< dataColumns.len:
cellIndices[i] = dataColumns[i][].index
cells[i] = dataColumns[i][].column[blobIdx]
pendingFuts[blobIdx] =
tp.spawn recoverCellsAndKzgProofsTask(cellIndices, cells)
pendingFuts.add(tp.spawn recoverCellsAndKzgProofsTask(cellIndices, cells))

# ---- Sync phase ----
for i in 0 ..< pendingFuts.len:
let now = Moment.now()
if (now - startTime) > reconstructionTimeout:
debug "PeerDAS reconstruction timed out",
completed = i, totalSpawned = pendingFuts.len
return err("Data column reconstruction timed out")

# sync threads
for i in 0..<blobCount:
let futRes = sync pendingFuts[i]
if futRes.isErr:
return err("KZG cells and proofs recovery failed")

res[i] = futRes.get

if pendingFuts.len < blobCount:
return err("Data column reconstruction timed out")

ok(res)


proc assemble_data_column_sidecars*(
signed_beacon_block: fulu.SignedBeaconBlock | gloas.SignedBeaconBlock,
blobs: seq[KzgBlob], cell_proofs: seq[KzgProof]): seq[fulu.DataColumnSidecar] =
Expand Down
Loading