diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index f99c8468c1..0e59273d12 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -780,6 +780,13 @@ proc push*[T]( ## Push successful result to queue ``sq``. mixin updateScore, updateStats, getStats + template findPosition(sq, sr: untyped): SyncPosition = + sq.find(sr).valueOr: + debug "Request is no more relevant", + request = sr, sync_ident = sq.ident, topics = "syncman" + # Request is not in queue anymore, probably reset happened. + return + # This is backpressure handling algorithm, this algorithm is blocking # all pending `push` requests if `request` is not in range. var @@ -787,13 +794,7 @@ proc push*[T]( block: var pos: SyncPosition while true: - pos = sq.find(sr).valueOr: - debug "Request is no more relevant", - request = sr, - sync_ident = sq.ident, - topics = "syncman" - # Request is not in queue anymore, probably reset happened. - return + pos = sq.findPosition(sr) if pos.qindex == 0: # Exiting loop when request is first in queue. @@ -816,20 +817,18 @@ proc push*[T]( await sq.lock.acquire() try: - block: - position = sq.find(sr).valueOr: - # Queue has advanced, the request is no longer relevant. - debug "Request is no more relevant", - request = sr, - sync_ident = sq.ident, - topics = "syncman" - return + position = sq.findPosition(sr) if not(isNil(processingCb)): processingCb() let pres = await sq.process(sr, data, blobs, maybeFinalized) + # We need to update position, because while we waiting for `process()` to + # complete - clearAndWakeup() could be invoked which could clean whole the + # queue (invalidating all the positions). + position = sq.findPosition(sr) + case pres.code of SyncProcessError.Empty: # Empty responses does not affect failures count