Skip to content

Commit 95f678d

Browse files
agnxshtersec
andauthored
fusaka devnet 2 branch (#7269)
* some clarifications * push event topic update for data column sidecar * prevent inhibiting validator custody * increase vcus poll interval * not so cool devnet hack for now * some validator custody and status v fixes * few more fixes to event stream and vcus * fixes * few more changes * clarifications regarding blob parameters * revert reqman hack and remove assertion * add more logging and reduce getblobs timeout * cancel blob loop post fulu fork epoch * rework validator custody counting logic and remove another assertion * rman hack 2 * clarifications in validator custody logic * reduce validator custody polling duration * have validator custody detection and custody backfill on separate loops * oops * added extra logging for clarity * use total attached balance instead of active balance * some more rework on getBlobsV2 * other fixes * off vcus for supernodes * make the BN pass min DA requirements to catch missing blocks * some fixes to rman * reword peer filtering and scoring * revise score * bump up parallel requests as there are more number of cancellations * omit minDA criteria * bump down ll requests for supernodes * fix beacon block broadcast not using BPO forkdigests (#7285) * gate status vx * patch blob schedule * oops bug --------- Co-authored-by: tersec <[email protected]>
1 parent 6d364fb commit 95f678d

21 files changed

+294
-289
lines changed

beacon_chain/consensus_object_pools/block_pools_types.nim

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ type
137137

138138
erSlot*: Slot
139139
## Earliest refilled slot is the earliest slot at which excess
140-
## DataColumnSidecar downloading starts, if erSlot = GENESIS_SLOT
140+
## DataColumnSidecar downloading finishes, if erSlot = GENESIS_SLOT
141141
## we can deduce that validator custody is inactive.
142142

143143
validatorMonitor*: ref ValidatorMonitor
@@ -411,14 +411,13 @@ func earliestAvailableSlot*(dag: ChainDAGRef): Slot =
411411
dag.backfill.slot != GENESIS_SLOT:
412412
# When the BN is backfilling, backfill slot is the earliest
413413
# persisted block.
414-
dag.eaSlot = dag.backfill.slot
415414
dag.backfill.slot
416415
elif dag.erSlot != GENESIS_SLOT:
417-
dag.eaSlot = dag.erSlot
416+
# This indicates column filling due to validator custody
417+
# is in progress
418418
dag.erSlot
419419
else:
420420
# When the BN has backfilled, tail moves progressively.
421-
dag.eaSlot = dag.tail.slot
422421
dag.tail.slot
423422

424423
template epoch*(e: EpochRef): Epoch = e.key.epoch

beacon_chain/consensus_object_pools/block_quarantine.nim

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,14 @@ func popColumnless*(
397397
else:
398398
Opt.none(ForkedSignedBeaconBlock)
399399

400+
func getColumnless*(
401+
quarantine: var Quarantine,
402+
root: Eth2Digest): Opt[ForkedSignedBeaconBlock] =
403+
try:
404+
Opt.some(quarantine.columnless[root])
405+
except KeyError:
406+
Opt.none(ForkedSignedBeaconBlock)
407+
400408
iterator peekBlobless*(quarantine: var Quarantine): ForkedSignedBeaconBlock =
401409
for k, v in quarantine.blobless.mpairs():
402410
yield v

beacon_chain/el/el_manager.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ const
6666
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.4/src/engine/shanghai.md#request-2
6767
GETPAYLOAD_TIMEOUT = 1.seconds
6868

69-
GETBLOBS_TIMEOUT = 200.milliseconds
69+
GETBLOBS_TIMEOUT = 100.milliseconds
7070

7171
connectionStateChangeHysteresisThreshold = 15
7272
## How many unsuccesful/successful requests we must see

beacon_chain/gossip_processing/block_processor.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -979,7 +979,7 @@ proc storeBlock(
979979
MsgSource.gossip, quarantined, Opt.none(BlobSidecars),
980980
cres)
981981
else:
982-
discard self.consensusManager.quarantine[].addBlobless(
982+
discard self.consensusManager.quarantine[].addColumnless(
983983
dag.finalizedHead.slot, forkyBlck)
984984
elif typeof(forkyBlck).kind >= ConsensusFork.Deneb and
985985
typeof(forkyBlck).kind < ConsensusFork.Fulu:

beacon_chain/gossip_processing/eth2_processor.nim

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -354,11 +354,10 @@ proc processBlobSidecar*(
354354

355355
proc validateDataColumnSidecarFromEL*(
356356
self: ref Eth2Processor,
357-
block_root: Eth2Digest):
358-
Future[ValidationRes]
357+
block_root: Eth2Digest)
359358
{.async: (raises: [CancelledError]).} =
360359
let elManager = self.blockProcessor[].consensusManager.elManager
361-
if (let o = self.quarantine[].popColumnless(block_root); o.isSome):
360+
if (let o = self.quarantine[].getColumnless(block_root); o.isSome):
362361
let columnless = o.unsafeGet()
363362
withBlck(columnless):
364363
when consensusFork >= ConsensusFork.Fulu:
@@ -387,28 +386,12 @@ proc validateDataColumnSidecarFromEL*(
387386
blobsEl.mapIt(kzg.KzgBlob(bytes: it.blob.data)),
388387
flat_proof)
389388

390-
# Pop out the column sidecars as we have all columns from the EL
391-
discard self.dataColumnQuarantine[].popSidecars(block_root,
392-
forkyBlck)
393-
394-
let end_time = Moment.now()
395-
debug "Time taken to get 100% response from EL and bypass blob gossip validation",
396-
time_taken = end_time - start_time
397-
debug "Pulled blobs from EL, bypassing blob gossip validation",
398-
blobs_from_el = blobsEl.len
399-
self.blockProcessor[].enqueueBlock(
400-
MsgSource.gossip, columnless,
401-
Opt.none(BlobSidecars),
402-
Opt.some(recovered_columns.mapIt(newClone it)))
403-
return ok()
404-
405-
else:
406-
discard self.quarantine[].addColumnless(
407-
self.dag.finalizedHead.slot, forkyBlck)
408-
else:
409-
raiseAssert "Could not have been added as columnless"
410-
else:
411-
return errIgnore ("Could not pull blobs and proofs from EL")
389+
# Send notification to event stream
390+
# and add these columns to column quarantine
391+
for col in recovered_columns:
392+
393+
if col.index in self.dataColumnQuarantine[].custodyColumns:
394+
self.dataColumnQuarantine[].put(block_root, newClone(col))
412395

413396
proc processDataColumnSidecar*(
414397
self: ref Eth2Processor, src: MsgSource,
@@ -417,11 +400,7 @@ proc processDataColumnSidecar*(
417400
template block_header: untyped = dataColumnSidecar.signed_block_header.message
418401
let block_root = hash_tree_root(block_header)
419402

420-
let vEL =
421-
await self.validateDataColumnSidecarFromEL(block_root)
422-
423-
if vEL.isOk():
424-
return vEL
403+
await self.validateDataColumnSidecarFromEL(block_root)
425404

426405
let
427406
wallTime = self.getCurrentBeaconTime()
@@ -446,7 +425,6 @@ proc processDataColumnSidecar*(
446425

447426
debug "Data column validated, putting data column in quarantine"
448427
self.dataColumnQuarantine[].put(block_root, newClone(dataColumnSidecar))
449-
self.dag.db.putDataColumnSidecar(dataColumnSidecar)
450428

451429
if (let o = self.quarantine[].popColumnless(block_root); o.isSome):
452430
let columnless = o.unsafeGet()

beacon_chain/networking/eth2_network.nim

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2793,45 +2793,10 @@ proc broadcastAggregateAndProof*(
27932793
node.broadcast(topic, proof)
27942794

27952795
proc broadcastBeaconBlock*(
2796-
node: Eth2Node, blck: phase0.SignedBeaconBlock):
2796+
node: Eth2Node, blck: SomeForkySignedBeaconBlock):
27972797
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
2798-
let topic = getBeaconBlocksTopic(node.forkDigests.phase0)
2799-
node.broadcast(topic, blck)
2800-
2801-
proc broadcastBeaconBlock*(
2802-
node: Eth2Node, blck: altair.SignedBeaconBlock):
2803-
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
2804-
let topic = getBeaconBlocksTopic(node.forkDigests.altair)
2805-
node.broadcast(topic, blck)
2806-
2807-
proc broadcastBeaconBlock*(
2808-
node: Eth2Node, blck: bellatrix.SignedBeaconBlock):
2809-
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
2810-
let topic = getBeaconBlocksTopic(node.forkDigests.bellatrix)
2811-
node.broadcast(topic, blck)
2812-
2813-
proc broadcastBeaconBlock*(
2814-
node: Eth2Node, blck: capella.SignedBeaconBlock):
2815-
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
2816-
let topic = getBeaconBlocksTopic(node.forkDigests.capella)
2817-
node.broadcast(topic, blck)
2818-
2819-
proc broadcastBeaconBlock*(
2820-
node: Eth2Node, blck: deneb.SignedBeaconBlock):
2821-
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
2822-
let topic = getBeaconBlocksTopic(node.forkDigests.deneb)
2823-
node.broadcast(topic, blck)
2824-
2825-
proc broadcastBeaconBlock*(
2826-
node: Eth2Node, blck: electra.SignedBeaconBlock):
2827-
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
2828-
let topic = getBeaconBlocksTopic(node.forkDigests.electra)
2829-
node.broadcast(topic, blck)
2830-
2831-
proc broadcastBeaconBlock*(
2832-
node: Eth2Node, blck: fulu.SignedBeaconBlock):
2833-
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
2834-
let topic = getBeaconBlocksTopic(node.forkDigests.fulu)
2798+
let topic = getBeaconBlocksTopic(
2799+
node.forkDigestAtEpoch(blck.message.slot.epoch))
28352800
node.broadcast(topic, blck)
28362801

28372802
proc broadcastBlobSidecar*(

beacon_chain/networking/peer_protocol.nim

Lines changed: 116 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func forkDigestAtEpoch(state: PeerSyncNetworkState,
7676
state.forkDigests[].atEpoch(epoch, state.cfg)
7777

7878
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.0/specs/phase0/p2p-interface.md#status
79-
proc getCurrentStatus(state: PeerSyncNetworkState): StatusMsg =
79+
proc getCurrentStatusV1(state: PeerSyncNetworkState): StatusMsg =
8080
let
8181
dag = state.dag
8282
wallSlot = state.getBeaconTime().slotOrZero
@@ -166,13 +166,15 @@ proc checkStatusMsg(state: PeerSyncNetworkState, status: StatusMsg | StatusMsgV2
166166
# apparently don't use spec ZERO_HASH as of this writing
167167
if not (status.finalizedRoot in [state.genesisBlockRoot, ZERO_HASH]):
168168
return err("peer following different finality")
169-
170169
ok()
171170

172-
proc handleStatus(peer: Peer,
173-
state: PeerSyncNetworkState,
174-
theirStatus: StatusMsg): Future[bool] {.async: (raises: [CancelledError]).}
171+
proc handleStatusV1(peer: Peer,
172+
state: PeerSyncNetworkState,
173+
theirStatus: StatusMsg): Future[bool] {.async: (raises: [CancelledError]).}
175174

175+
proc handleStatusV2(peer: Peer,
176+
state: PeerSyncNetworkState,
177+
theirStatus: StatusMsgV2): Future[bool] {.async: (raises: [CancelledError]).}
176178

177179
{.pop.} # TODO fix p2p macro for raises
178180

@@ -195,27 +197,55 @@ p2pProtocol PeerSync(version = 1,
195197
# need a dedicated flow in libp2p that resolves the race conditions -
196198
# this needs more thinking around the ordering of events and the
197199
# given incoming flag
200+
198201
let
199-
ourStatus = peer.networkState.getCurrentStatus()
200-
theirStatus = await peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR)
201-
ourStatusV2 = peer.networkState.getCurrentStatusV2()
202+
remoteFork = peer.networkState.getBeaconTime().slotOrZero.epoch()
203+
204+
if remoteFork >= peer.networkState.cfg.FULU_FORK_EPOCH:
205+
let
206+
ourStatus = peer.networkState.getCurrentStatusV2()
207+
theirStatus =
208+
await peer.statusV2(ourStatus, timeout = RESP_TIMEOUT_DUR)
209+
210+
if theirStatus.isOk:
211+
discard await peer.handleStatusV2(peer.networkState, theirStatus.get())
212+
peer.updateAgent()
213+
else:
214+
debug "Status response not received in time",
215+
peer, errorKind = theirStatus.error.kind
216+
await peer.disconnect(FaultOrError)
202217

203-
if theirStatus.isOk:
204-
discard await peer.handleStatus(peer.networkState, theirStatus.get())
205-
peer.updateAgent()
206218
else:
207-
debug "Status response not received in time",
208-
peer, errorKind = theirStatus.error.kind
209-
await peer.disconnect(FaultOrError)
210-
211-
proc status(peer: Peer,
212-
theirStatus: StatusMsg,
213-
response: SingleChunkResponse[StatusMsg])
214-
{.async, libp2pProtocol("status", 1).} =
215-
let ourStatus = peer.networkState.getCurrentStatus()
216-
trace "Sending status message", peer = peer, status = ourStatus
219+
let
220+
ourStatus = peer.networkState.getCurrentStatusV1()
221+
theirStatus =
222+
await peer.statusV1(ourStatus, timeout = RESP_TIMEOUT_DUR)
223+
224+
if theirStatus.isOk:
225+
discard await peer.handleStatusV1(peer.networkState, theirStatus.get())
226+
peer.updateAgent()
227+
else:
228+
debug "Status response not received in time",
229+
peer, errorKind = theirStatus.error.kind
230+
await peer.disconnect(FaultOrError)
231+
232+
proc statusV1(peer: Peer,
233+
theirStatus: StatusMsg,
234+
response: SingleChunkResponse[StatusMsg])
235+
{.async, libp2pProtocol("status", 1).} =
236+
let ourStatus = peer.networkState.getCurrentStatusV1()
237+
trace "Sending status (v1)", peer = peer, status = ourStatus
217238
await response.send(ourStatus)
218-
discard await peer.handleStatus(peer.networkState, theirStatus)
239+
discard await peer.handleStatusV1(peer.networkState, theirStatus)
240+
241+
proc statusV2(peer: Peer,
242+
theirStatus: StatusMsgV2,
243+
response: SingleChunkResponse[StatusMsgV2])
244+
{.async, libp2pProtocol("status", 2).} =
245+
let ourStatus = peer.networkState.getCurrentStatusV2()
246+
trace "Sending status (v2)", peer = peer, status = ourStatus
247+
await response.send(ourStatus)
248+
discard await peer.handleStatusV2(peer.networkState, theirStatus)
219249

220250
proc ping(peer: Peer, value: uint64): uint64
221251
{.libp2pProtocol("ping", 1).} =
@@ -251,10 +281,10 @@ proc setStatusV2Msg(peer: Peer, statusMsg: StatusMsgV2) =
251281
peer.state(PeerSync).statusMsgV2 = statusMsg
252282
peer.state(PeerSync).statusLastTime = Moment.now()
253283

254-
proc handleStatus(peer: Peer,
255-
state: PeerSyncNetworkState,
256-
theirStatus: StatusMsg): Future[bool]
257-
{.async: (raises: [CancelledError]).} =
284+
proc handleStatusV1(peer: Peer,
285+
state: PeerSyncNetworkState,
286+
theirStatus: StatusMsg): Future[bool]
287+
{.async: (raises: [CancelledError]).} =
258288
let
259289
res = checkStatusMsg(state, theirStatus)
260290

@@ -271,28 +301,77 @@ proc handleStatus(peer: Peer,
271301
await peer.handlePeer()
272302
true
273303

304+
proc handleStatusV2(peer: Peer,
305+
state: PeerSyncNetworkState,
306+
theirStatus: StatusMsgV2): Future[bool]
307+
{.async: (raises: [CancelledError]).} =
308+
let
309+
res = checkStatusMsg(state, theirStatus)
310+
311+
return if res.isErr():
312+
debug "Irrelevant peer", peer, theirStatus, err = res.error()
313+
await peer.disconnect(IrrelevantNetwork)
314+
false
315+
else:
316+
peer.setStatusV2Msg(theirStatus)
317+
318+
if peer.connectionState == Connecting:
319+
# As soon as we get here it means that we passed handshake succesfully. So
320+
# we can add this peer to PeerPool.
321+
await peer.handlePeer()
322+
true
323+
274324
proc updateStatus*(peer: Peer): Future[bool] {.async: (raises: [CancelledError]).} =
275325
## Request `status` of remote peer ``peer``.
276326
let
277327
nstate = peer.networkState(PeerSync)
278-
ourStatus = getCurrentStatus(nstate)
279-
theirStatus =
280-
(await peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR)).valueOr:
281-
return false
282328

283-
await peer.handleStatus(nstate, theirStatus)
329+
if nstate.getBeaconTime().slotOrZero.epoch() >= nstate.cfg.FULU_FORK_EPOCH:
330+
let
331+
ourStatus = getCurrentStatusV2(nstate)
332+
theirStatus =
333+
(await peer.statusV2(ourStatus, timeout = RESP_TIMEOUT_DUR)).valueOr:
334+
return false
335+
336+
await peer.handleStatusV2(nstate, theirStatus)
337+
else:
338+
let
339+
ourStatus = getCurrentStatusV1(nstate)
340+
theirStatus =
341+
(await peer.statusV1(ourStatus, timeout = RESP_TIMEOUT_DUR)).valueOr:
342+
return false
343+
344+
await peer.handleStatusV1(nstate, theirStatus)
284345

285346
proc getHeadRoot*(peer: Peer): Eth2Digest =
286-
## Returns head root for specific peer ``peer``.
287-
peer.state(PeerSync).statusMsg.headRoot
347+
let
348+
state = peer.networkState(PeerSync)
349+
pstate = peer.state(PeerSync)
350+
remoteFork = state.getBeaconTime().slotOrZero.epoch()
351+
if remoteFork >= state.cfg.FULU_FORK_EPOCH:
352+
pstate.statusMsgV2.headRoot
353+
else:
354+
pstate.statusMsg.headRoot
288355

289356
proc getHeadSlot*(peer: Peer): Slot =
290-
## Returns head slot for specific peer ``peer``.
291-
peer.state(PeerSync).statusMsg.headSlot
357+
let
358+
state = peer.networkState(PeerSync)
359+
pstate = peer.state(PeerSync)
360+
remoteFork = state.getBeaconTime().slotOrZero.epoch()
361+
if remoteFork >= state.cfg.FULU_FORK_EPOCH:
362+
pstate.statusMsgV2.headSlot
363+
else:
364+
pstate.statusMsg.headSlot
292365

293366
proc getFinalizedEpoch*(peer: Peer): Epoch =
294-
## Returns head slot for specific peer ``peer``.
295-
peer.state(PeerSync).statusMsg.finalizedEpoch
367+
let
368+
state = peer.networkState(PeerSync)
369+
pstate = peer.state(PeerSync)
370+
remoteFork = state.getBeaconTime().slotOrZero.epoch()
371+
if remoteFork >= state.cfg.FULU_FORK_EPOCH:
372+
pstate.statusMsgV2.finalizedEpoch
373+
else:
374+
pstate.statusMsg.finalizedEpoch
296375

297376
proc getStatusLastTime*(peer: Peer): chronos.Moment =
298377
## Returns head slot for specific peer ``peer``.

beacon_chain/networking/peer_scores.nim

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ const
3535
## Peer's answer to our request is fine.
3636
PeerScoreBadValues* = -1000
3737
## Peer's response contains incorrect data.
38+
PeerScoreBadColumnIntersection* = -2
39+
## Peer custodies irrelevant custody columns
3840
PeerScoreBadResponse* = -1000
3941
## Peer's response is not in requested range.
4042
PeerScoreMissingValues* = -25

0 commit comments

Comments
 (0)