Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions beacon_chain/sync/sync_queue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -780,20 +780,21 @@ proc push*[T](
## Push successful result to queue ``sq``.
mixin updateScore, updateStats, getStats

template findPosition(sq, sr: untyped): SyncPosition =
sq.find(sr).valueOr:
debug "Request is no more relevant",
request = sr, sync_ident = sq.ident, topics = "syncman"
# Request is not in queue anymore, probably reset happened.
return

# This is backpressure handling algorithm, this algorithm is blocking
# all pending `push` requests if `request` is not in range.
var
position =
block:
var pos: SyncPosition
while true:
pos = sq.find(sr).valueOr:
debug "Request is no more relevant",
request = sr,
sync_ident = sq.ident,
topics = "syncman"
# Request is not in queue anymore, probably reset happened.
return
pos = sq.findPosition(sr)

if pos.qindex == 0:
# Exiting loop when request is first in queue.
Expand All @@ -816,20 +817,18 @@ proc push*[T](

await sq.lock.acquire()
try:
block:
position = sq.find(sr).valueOr:
# Queue has advanced, the request is no longer relevant.
debug "Request is no more relevant",
request = sr,
sync_ident = sq.ident,
topics = "syncman"
return
position = sq.findPosition(sr)

if not(isNil(processingCb)):
processingCb()

let pres = await sq.process(sr, data, blobs, maybeFinalized)

# We need to update position, because while we waiting for `process()` to
# complete - clearAndWakeup() could be invoked which could clean whole the
# queue (invalidating all the positions).
position = sq.findPosition(sr)

case pres.code
of SyncProcessError.Empty:
# Empty responses does not affect failures count
Expand Down