Skip to content

Commit 3df4ce8

Browse files
committed
Provide syncer session call back handlers that can be intercepted
1 parent a7ac803 commit 3df4ce8

File tree

9 files changed

+241
-41
lines changed

9 files changed

+241
-41
lines changed

execution_chain/sync/beacon.nim

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@ import
1515
pkg/stew/[interval_set, sorted_set],
1616
../core/chain,
1717
../networking/p2p,
18-
./beacon/worker/headers/headers_target,
1918
./beacon/[beacon_desc, worker],
19+
./beacon/worker/blocks/[blocks_fetch, blocks_import],
20+
./beacon/worker/headers/[headers_fetch, headers_target],
21+
./beacon/worker/update,
2022
./[sync_sched, wire_protocol]
2123

2224
export
@@ -25,33 +27,62 @@ export
2527
logScope:
2628
topics = "beacon sync"
2729

30+
# ------------------------------------------------------------------------------
31+
# Interceptable handlers
32+
# ------------------------------------------------------------------------------
33+
34+
proc schedDaemonCB(
35+
ctx: BeaconCtxRef;
36+
): Future[Duration]
37+
{.async: (raises: []).} =
38+
return worker.runDaemon(ctx, "RunDaemon") # async/template
39+
40+
proc schedStartCB(buddy: BeaconBuddyRef): bool =
41+
return worker.start(buddy, "RunStart")
42+
43+
proc schedStopCB(buddy: BeaconBuddyRef) =
44+
worker.stop(buddy, "RunStop")
45+
46+
proc schedPoolCB(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
47+
return worker.runPool(buddy, last, laps, "RunPool")
48+
49+
proc schedPeerCB(
50+
buddy: BeaconBuddyRef;
51+
): Future[Duration]
52+
{.async: (raises: []).} =
53+
return worker.runPeer(buddy, "RunPeer") # async/template
54+
55+
proc noOpFn(buddy: BeaconBuddyRef) = discard
56+
proc noOpEx(self: BeaconHandlersSyncRef) = discard
57+
2858
# ------------------------------------------------------------------------------
2959
# Virtual methods/interface, `mixin` functions
3060
# ------------------------------------------------------------------------------
3161

3262
proc runSetup(ctx: BeaconCtxRef): bool =
33-
worker.setup(ctx, "RunSetup")
63+
return worker.setup(ctx, "RunSetup")
3464

3565
proc runRelease(ctx: BeaconCtxRef) =
3666
worker.release(ctx, "RunRelease")
3767

38-
proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
39-
return worker.runDaemon(ctx, "RunDaemon")
40-
4168
proc runTicker(ctx: BeaconCtxRef) =
4269
worker.runTicker(ctx, "RunTicker")
4370

71+
72+
proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
73+
return await ctx.handler.schedDaemon(ctx)
74+
4475
proc runStart(buddy: BeaconBuddyRef): bool =
45-
worker.start(buddy, "RunStart")
76+
return buddy.ctx.handler.schedStart(buddy)
4677

4778
proc runStop(buddy: BeaconBuddyRef) =
48-
worker.stop(buddy, "RunStop")
79+
buddy.ctx.handler.schedStop(buddy)
4980

5081
proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
51-
worker.runPool(buddy, last, laps, "RunPool")
82+
return buddy.ctx.handler.schedPool(buddy, last, laps)
5283

5384
proc runPeer(buddy: BeaconBuddyRef): Future[Duration] {.async: (raises: []).} =
54-
return worker.runPeer(buddy, "RunPeer")
85+
return await buddy.ctx.handler.schedPeer(buddy)
5586

5687
# ------------------------------------------------------------------------------
5788
# Public functions
@@ -83,6 +114,25 @@ proc config*(
83114
desc.initSync(ethNode, maxPeers)
84115
desc.ctx.pool.chain = chain
85116

117+
# Set up handlers so they can be overlayed
118+
desc.ctx.pool.handlers = BeaconHandlersSyncRef(
119+
version: 0,
120+
activate: updateActivateCB,
121+
suspend: updateSuspendCB,
122+
schedDaemon: schedDaemonCB,
123+
schedStart: schedStartCB,
124+
schedStop: schedStopCB,
125+
schedPool: schedPoolCB,
126+
schedPeer: schedPeerCB,
127+
getBlockHeaders: getBlockHeadersCB,
128+
syncBlockHeaders: noOpFn,
129+
getBlockBodies: getBlockBodiesCB,
130+
syncBlockBodies: noOpFn,
131+
importBlock: importBlockCB,
132+
syncImportBlock: noOpFn,
133+
startSync: noOpEx,
134+
stopSync: noOpEx)
135+
86136
if not desc.lazyConfigHook.isNil:
87137
desc.lazyConfigHook(desc)
88138
desc.lazyConfigHook = nil
@@ -99,10 +149,16 @@ proc configTarget*(desc: BeaconSyncRef; hex: string; isFinal: bool): bool =
99149

100150
proc start*(desc: BeaconSyncRef): bool =
101151
doAssert not desc.ctx.isNil
102-
desc.startSync()
152+
if desc.startSync():
153+
let w = BeaconHandlersSyncRef(desc.ctx.pool.handlers)
154+
w.startSync(w)
155+
return true
156+
# false
103157

104158
proc stop*(desc: BeaconSyncRef) {.async.} =
105159
doAssert not desc.ctx.isNil
160+
let w = BeaconHandlersSyncRef(desc.ctx.pool.handlers)
161+
w.stopSync(w)
106162
await desc.stopSync()
107163

108164
# ------------------------------------------------------------------------------

execution_chain/sync/beacon/beacon_desc.nim

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,10 @@ type
2525
## Instance descriptor, extends scheduler object
2626
lazyConfigHook*: BeaconSyncConfigHook
2727

28+
BeaconHandlersSyncRef* = ref object of BeaconHandlersRef
29+
## Add start/stop helpers to function list. By default, this functiona
30+
## are no-ops.
31+
startSync*: proc(self: BeaconHandlersSyncRef) {.gcsafe, raises: [].}
32+
stopSync*: proc(self: BeaconHandlersSyncRef) {.gcsafe, raises: [].}
33+
2834
# End

execution_chain/sync/beacon/worker/blocks/blocks_blocks.nim

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,27 @@ import
1717
../../../../networking/p2p,
1818
../../../wire_protocol/types,
1919
../[update, worker_desc],
20-
./[blocks_fetch, blocks_helpers, blocks_import, blocks_unproc]
20+
./[blocks_fetch, blocks_helpers, blocks_unproc]
2121

2222
# ------------------------------------------------------------------------------
2323
# Private helpers
2424
# ------------------------------------------------------------------------------
2525

26+
template importBlock(
27+
buddy: BeaconBuddyRef;
28+
blk: EthBlock;
29+
effPeerID: Hash;
30+
): Result[Duration,BeaconError] =
31+
## Async/template
32+
##
33+
## Wrapper around `importBlock()` handler
34+
##
35+
let
36+
ctx = buddy.ctx
37+
rc = await ctx.handler.importBlock(buddy, blk, effPeerID)
38+
ctx.handler.syncImportBlock(buddy) # debugging, trace, replay
39+
rc
40+
2641
proc getNthHash(ctx: BeaconCtxRef; blocks: seq[EthBlock]; n: int): Hash32 =
2742
ctx.hdrCache.getHash(blocks[n].header.number).valueOr:
2843
return zeroHash32
@@ -201,7 +216,7 @@ template blocksImport*(
201216

202217
for n in 0 ..< blocks.len:
203218
let nBn = blocks[n].header.number
204-
discard (await buddy.importBlock(blocks[n], peerID)).valueOr:
219+
buddy.importBlock(blocks[n], peerID).isOkOr:
205220
if error.excp != ECancelledError:
206221
isError = true
207222

execution_chain/sync/beacon/worker/blocks/blocks_fetch.nim

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,30 @@ import
1818
../worker_desc,
1919
./blocks_helpers
2020

21+
logScope:
22+
topics = "beacon sync"
23+
24+
# ------------------------------------------------------------------------------
25+
# Private helper
26+
# -----------------------------------------------------------------------------
27+
28+
template getBlockBodies(
29+
buddy: BeaconBuddyRef;
30+
req: BlockBodiesRequest;
31+
): Result[FetchBodiesData,BeaconError] =
32+
## Async/template
33+
##
34+
## Wrapper around `getBlockBodies()` handler
35+
##
36+
let rc = await buddy.ctx.handler.getBlockBodies(buddy, req)
37+
buddy.ctx.handler.syncBlockBodies(buddy) # debugging, sync, replay
38+
rc
39+
2140
# ------------------------------------------------------------------------------
22-
# Private helpers
41+
# Public handler
2342
# -----------------------------------------------------------------------------
2443

25-
proc getBlockBodies(
44+
proc getBlockBodiesCB*(
2645
buddy: BeaconBuddyRef;
2746
req: BlockBodiesRequest;
2847
): Future[Result[FetchBodiesData,BeaconError]]
@@ -70,7 +89,7 @@ template fetchBodies*(
7089
trace trEthSendSendingGetBlockBodies,
7190
peer, nReq, bdyErrors=buddy.bdyErrors
7291

73-
let rc = await buddy.getBlockBodies(request)
92+
let rc = buddy.getBlockBodies(request)
7493
var elapsed: Duration
7594
if rc.isOk:
7695
elapsed = rc.value.elapsed

execution_chain/sync/beacon/worker/blocks/blocks_import.nim

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@ import
1616
../../../wire_protocol,
1717
../worker_desc
1818

19+
logScope:
20+
topics = "beacon sync"
21+
1922
# ------------------------------------------------------------------------------
20-
# Public function
23+
# Public handler
2124
# ------------------------------------------------------------------------------
2225

23-
proc importBlock*(
26+
proc importBlockCB*(
2427
buddy: BeaconBuddyRef;
2528
blk: EthBlock;
2629
effPeerID: Hash;

execution_chain/sync/beacon/worker/headers/headers_fetch.nim

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,30 @@ import
1818
../worker_desc,
1919
./headers_helpers
2020

21+
logScope:
22+
topics = "beacon sync"
23+
2124
# ------------------------------------------------------------------------------
2225
# Private helpers
26+
# -----------------------------------------------------------------------------
27+
28+
template getBlockHeaders(
29+
buddy: BeaconBuddyRef;
30+
req: BlockHeadersRequest;
31+
): Result[FetchHeadersData,BeaconError] =
32+
## Async/template
33+
##
34+
## Wrapper around `getBlockHeaders()` handler
35+
##
36+
let rc = await buddy.ctx.handler.getBlockHeaders(buddy, req)
37+
buddy.ctx.handler.syncBlockHeaders(buddy) # debugging, sync, replay
38+
rc
39+
40+
# ------------------------------------------------------------------------------
41+
# Public handler
2342
# ------------------------------------------------------------------------------
2443

25-
proc getBlockHeaders(
44+
proc getBlockHeadersCB*(
2645
buddy: BeaconBuddyRef;
2746
req: BlockHeadersRequest;
2847
): Future[Result[FetchHeadersData,BeaconError]]
@@ -88,7 +107,7 @@ template fetchHeadersReversed*(
88107
trace trEthSendSendingGetBlockHeaders & " reverse", peer, req=ivReq,
89108
nReq=req.maxResults, hash=topHash.toStr, hdrErrors=buddy.hdrErrors
90109

91-
let rc = await buddy.getBlockHeaders(req)
110+
let rc = buddy.getBlockHeaders(req)
92111
var elapsed: Duration
93112
if rc.isOk:
94113
elapsed = rc.value.elapsed

execution_chain/sync/beacon/worker/start_stop.nim

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import
1414
pkg/[chronicles, chronos, eth/common, metrics],
1515
../../../networking/p2p,
1616
../../wire_protocol,
17-
./[blocks, headers, update, worker_desc]
17+
./[blocks, headers, worker_desc]
1818

1919
type
2020
SyncStateData = tuple
@@ -59,8 +59,8 @@ proc setupServices*(ctx: BeaconCtxRef; info: static[string]) =
5959

6060
# Set up the notifier informing when a new syncer session has started.
6161
ctx.hdrCache.start proc() =
62-
# Activates the syncer. Work will be picked up by peers when available.
63-
ctx.updateActivateSyncer()
62+
# This directive captures `ctx` for calling the activation handler.
63+
ctx.handler.activate(ctx)
6464

6565
# Provide progress info call back handler
6666
ctx.pool.chain.com.beaconSyncerProgress = proc(): SyncStateData =

execution_chain/sync/beacon/worker/update.nim

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,6 @@ declareGauge nec_sync_head, "" &
3030
# Private functions, state handler helpers
3131
# ------------------------------------------------------------------------------
3232

33-
proc updateSuspendSyncer(ctx: BeaconCtxRef) =
34-
## Clean up sync target buckets, stop syncer activity, and and get ready
35-
## for awaiting a new request from the `CL`.
36-
##
37-
ctx.hdrCache.clear()
38-
39-
ctx.pool.failedPeers.clear()
40-
ctx.pool.seenData = false
41-
42-
ctx.hibernate = true
43-
44-
metrics.set(nec_sync_last_block_imported, 0)
45-
metrics.set(nec_sync_head, 0)
46-
47-
info "Suspending syncer", base=ctx.chain.baseNumber.bnStr,
48-
head=ctx.chain.latestNumber.bnStr, nSyncPeers=ctx.pool.nBuddies
49-
5033
proc commitCollectHeaders(ctx: BeaconCtxRef; info: static[string]): bool =
5134
## Link header chain into `FC` module. Gets ready for block import.
5235
##
@@ -227,7 +210,7 @@ proc updateSyncState*(ctx: BeaconCtxRef; info: static[string]) =
227210

228211
# Final sync scrum layout reached or inconsistent/impossible state
229212
if newState == idle:
230-
ctx.updateSuspendSyncer()
213+
ctx.handler.suspend(ctx)
231214

232215

233216
proc updateLastBlockImported*(ctx: BeaconCtxRef; bn: BlockNumber) =
@@ -238,7 +221,7 @@ proc updateLastBlockImported*(ctx: BeaconCtxRef; bn: BlockNumber) =
238221
# Public functions, call-back handler ready
239222
# ------------------------------------------------------------------------------
240223

241-
proc updateActivateSyncer*(ctx: BeaconCtxRef) =
224+
proc updateActivateCB*(ctx: BeaconCtxRef) =
242225
## If in hibernate mode, accept a cache session and activate syncer
243226
##
244227
if ctx.hibernate and # only in idle mode
@@ -277,6 +260,24 @@ proc updateActivateSyncer*(ctx: BeaconCtxRef) =
277260
head=ctx.chain.latestNumber.bnStr, state=ctx.hdrCache.state,
278261
initTarget=ctx.pool.initTarget.isSome(), nSyncPeers=ctx.pool.nBuddies
279262

263+
264+
proc updateSuspendCB*(ctx: BeaconCtxRef) =
265+
## Clean up sync target buckets, stop syncer activity, and and get ready
266+
## for a new sync request from the `CL`.
267+
##
268+
ctx.hdrCache.clear()
269+
270+
ctx.pool.failedPeers.clear()
271+
ctx.pool.seenData = false
272+
273+
ctx.hibernate = true
274+
275+
metrics.set(nec_sync_last_block_imported, 0)
276+
metrics.set(nec_sync_head, 0)
277+
278+
info "Suspending syncer", base=ctx.chain.baseNumber.bnStr,
279+
head=ctx.chain.latestNumber.bnStr, nSyncPeers=ctx.pool.nBuddies
280+
280281
# ------------------------------------------------------------------------------
281282
# End
282283
# ------------------------------------------------------------------------------

0 commit comments

Comments
 (0)