Skip to content

Commit

Permalink
Speed up altair block processing 2x (#3115)
Browse files Browse the repository at this point in the history
* Speed up altair block processing >2x

Like #3089, this PR drastially speeds up historical REST queries and
other long state replays.

* cache sync committee validator indices
* use ~80mb less memory for validator pubkey mappings
* batch-verify sync aggregate signature (fixes #2985)
* document sync committee hack with head block vs sync message block
* add batch signature verification failure tests

Before:

```
../env.sh nim c -d:release -r ncli_db --db:mainnet_0/db bench --start-slot:-1000
All time are ms
     Average,       StdDev,          Min,          Max,      Samples,         Test
Validation is turned off meaning that no BLS operations are performed
    5830.675,        0.000,     5830.675,     5830.675,            1, Initialize DB
       0.481,        1.878,        0.215,       59.167,          981, Load block from database
    8422.566,        0.000,     8422.566,     8422.566,            1, Load state from database
       6.996,        1.678,        0.042,       14.385,          969, Advance slot, non-epoch
      93.217,        8.318,       84.192,      122.209,           32, Advance slot, epoch
      20.513,       23.665,       11.510,      201.561,          981, Apply block, no slot processing
       0.000,        0.000,        0.000,        0.000,            0, Database load
       0.000,        0.000,        0.000,        0.000,            0, Database store
```

After:

```
    7081.422,        0.000,     7081.422,     7081.422,            1, Initialize DB
       0.553,        2.122,        0.175,       66.692,          981, Load block from database
    5439.446,        0.000,     5439.446,     5439.446,            1, Load state from database
       6.829,        1.575,        0.043,       12.156,          969, Advance slot, non-epoch
      94.716,        2.749,       88.395,      100.026,           32, Advance slot, epoch
      11.636,       23.766,        4.889,      205.250,          981, Apply block, no slot processing
       0.000,        0.000,        0.000,        0.000,            0, Database load
       0.000,        0.000,        0.000,        0.000,            0, Database store
```

* add comment
  • Loading branch information
arnetheduck authored Nov 24, 2021
1 parent 88c623e commit 9c2f43e
Show file tree
Hide file tree
Showing 14 changed files with 252 additions and 88 deletions.
9 changes: 7 additions & 2 deletions AllTests-mainnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ OK: 16/16 Fail: 0/16 Skip: 0/16
+ latest_block_root OK
```
OK: 3/3 Fail: 0/3 Skip: 0/3
## Block pool altair processing [Preset: mainnet]
```diff
+ Invalid signatures [Preset: mainnet] OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## Block pool processing [Preset: mainnet]
```diff
+ Adding the same block twice returns a Duplicate error [Preset: mainnet] OK
Expand Down Expand Up @@ -165,7 +170,7 @@ OK: 7/7 Fail: 0/7 Skip: 0/7
## Gossip validation [Preset: mainnet]
```diff
+ Any committee index is valid OK
+ Validation sanity OK
+ validateAttestation OK
```
OK: 2/2 Fail: 0/2 Skip: 0/2
## Gossip validation - Extra
Expand Down Expand Up @@ -366,4 +371,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1

---TOTAL---
OK: 206/208 Fail: 0/208 Skip: 2/208
OK: 207/209 Fail: 0/209 Skip: 2/209
6 changes: 6 additions & 0 deletions beacon_chain/consensus_object_pools/block_pools_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ type
onFinHappened*: OnFinalizedCallback
## On finalization callback

headSyncCommittees*: SyncCommitteeCache ##\
## A cache of the sync committees, as they appear in the head state -
## using the head state is slightly wrong - if a reorg deeper than
## EPOCHS_PER_SYNC_COMMITTEE_PERIOD is happening, some valid sync
## committee messages will be rejected

EpochKey* = object
## The epoch key fully determines the shuffling for proposers and
## committees in a beacon state - the epoch level information in the state
Expand Down
39 changes: 22 additions & 17 deletions beacon_chain/consensus_object_pools/blockchain_dag.nim
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func init*(
finalized_checkpoint: getStateField(state.data, finalized_checkpoint),
shuffled_active_validator_indices:
cache.get_shuffled_active_validator_indices(state.data, epoch)
)
)

for i in 0'u64..<SLOTS_PER_EPOCH:
epochRef.beacon_proposers[i] = get_beacon_proposer_index(
Expand Down Expand Up @@ -548,6 +548,10 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
# have a cache
dag.updateValidatorKeys(getStateField(dag.headState.data, validators).asSeq())

withState(dag.headState.data):
when stateFork >= BeaconStateFork.Altair:
dag.headSyncCommittees = state.data.get_sync_committee_cache(cache)

info "Block dag initialized",
head = shortLog(headRef),
finalizedHead = shortLog(dag.finalizedHead),
Expand Down Expand Up @@ -1037,8 +1041,8 @@ proc pruneBlocksDAG(dag: ChainDAGRef) =
dagPruneDur = Moment.now() - startTick

iterator syncSubcommittee*(
syncCommittee: openArray[ValidatorPubKey],
subcommitteeIdx: SyncSubcommitteeIndex): ValidatorPubKey =
syncCommittee: openArray[ValidatorIndex],
subcommitteeIdx: SyncSubcommitteeIndex): ValidatorIndex =
var
i = subcommitteeIdx.asInt * SYNC_SUBCOMMITTEE_SIZE
onePastEndIdx = min(syncCommittee.len, i + SYNC_SUBCOMMITTEE_SIZE)
Expand All @@ -1060,36 +1064,33 @@ iterator syncSubcommitteePairs*(
inc i

func syncCommitteeParticipants*(dag: ChainDAGRef,
slot: Slot): seq[ValidatorPubKey] =
slot: Slot): seq[ValidatorIndex] =
withState(dag.headState.data):
when stateFork >= BeaconStateFork.Altair:
let
period = sync_committee_period(slot)
curPeriod = sync_committee_period(state.data.slot)

if period == curPeriod:
@(state.data.current_sync_committee.pubkeys.data)
@(dag.headSyncCommittees.current_sync_committee)
elif period == curPeriod + 1:
@(state.data.current_sync_committee.pubkeys.data)
@(dag.headSyncCommittees.next_sync_committee)
else: @[]
else:
@[]

func getSubcommitteePositionsAux(
dag: ChainDAGRef,
syncCommittee: openarray[ValidatorPubKey],
syncCommittee: openArray[ValidatorIndex],
subcommitteeIdx: SyncSubcommitteeIndex,
validatorIdx: uint64): seq[uint64] =
# TODO Can we avoid the key conversions by getting a compressed key
# out of ImmutableValidatorData2? If we had this, we can define
# the function `dag.validatorKeyBytes` and use it here.
let validatorKey = dag.validatorKey(validatorIdx)
if validatorKey.isNone():
return @[]
let validatorPubKey = validatorKey.get().toPubKey

for pos, key in toSeq(syncCommittee.syncSubcommittee(subcommitteeIdx)):
if validatorPubKey == key:
if validatorIdx == uint64(key):
result.add uint64(pos)

func getSubcommitteePositions*(dag: ChainDAGRef,
Expand All @@ -1102,31 +1103,31 @@ func getSubcommitteePositions*(dag: ChainDAGRef,
period = sync_committee_period(slot)
curPeriod = sync_committee_period(state.data.slot)

template search(syncCommittee: openarray[ValidatorPubKey]): seq[uint64] =
template search(syncCommittee: openArray[ValidatorIndex]): seq[uint64] =
dag.getSubcommitteePositionsAux(
syncCommittee, subcommitteeIdx, validatorIdx)

if period == curPeriod:
search(state.data.current_sync_committee.pubkeys.data)
search(dag.headSyncCommittees.current_sync_committee)
elif period == curPeriod + 1:
search(state.data.current_sync_committee.pubkeys.data)
search(dag.headSyncCommittees.next_sync_committee)
else: @[]
else:
@[]

template syncCommitteeParticipants*(
dag: ChainDAGRef,
slot: Slot,
subcommitteeIdx: SyncSubcommitteeIndex): seq[ValidatorPubKey] =
subcommitteeIdx: SyncSubcommitteeIndex): seq[ValidatorIndex] =
toSeq(syncSubcommittee(dag.syncCommitteeParticipants(slot), subcommitteeIdx))

iterator syncCommitteeParticipants*(
dag: ChainDAGRef,
slot: Slot,
subcommitteeIdx: SyncSubcommitteeIndex,
aggregationBits: SyncCommitteeAggregationBits): ValidatorPubKey =
aggregationBits: SyncCommitteeAggregationBits): ValidatorIndex =
for pos, valIdx in pairs(dag.syncCommitteeParticipants(slot, subcommitteeIdx)):
if aggregationBits[pos]:
if pos < aggregationBits.bits and aggregationBits[pos]:
yield valIdx

func needStateCachesAndForkChoicePruning*(dag: ChainDAGRef): bool =
Expand Down Expand Up @@ -1207,6 +1208,10 @@ proc updateHead*(

dag.db.putHeadBlock(newHead.root)

withState(dag.headState.data):
when stateFork >= BeaconStateFork.Altair:
dag.headSyncCommittees = state.data.get_sync_committee_cache(cache)

let
finalizedHead = newHead.atEpochStart(
getStateField(dag.headState.data, finalized_checkpoint).epoch)
Expand Down
16 changes: 11 additions & 5 deletions beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ proc validateSyncCommitteeMessage*(

ok((positionsInSubcommittee, cookedSignature.get()))

# https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.8/specs/altair/p2p-interface.md#sync_committee_contribution_and_proof
# https://github.com/ethereum/eth2.0-specs/blob/v1.1.5/specs/altair/p2p-interface.md#sync_committee_contribution_and_proof
proc validateSignedContributionAndProof*(
dag: ChainDAGRef,
syncCommitteeMsgPool: var SyncCommitteeMsgPool,
Expand Down Expand Up @@ -855,16 +855,22 @@ proc validateSignedContributionAndProof*(
initialized = false
syncCommitteeSlot = msg.message.contribution.slot + 1

for validatorPubKey in dag.syncCommitteeParticipants(
for validatorIndex in dag.syncCommitteeParticipants(
syncCommitteeSlot,
committeeIdx,
msg.message.contribution.aggregation_bits):
let validatorPubKey = validatorPubKey.loadWithCache.get
let validatorPubKey = dag.validatorKey(validatorIndex)
if not validatorPubKey.isSome():
# This should never happen (!)
warn "Invalid validator index in committee cache",
validatorIndex
return errIgnore("SignedContributionAndProof: Invalid committee cache")

if not initialized:
initialized = true
committeeAggKey.init(validatorPubKey)
committeeAggKey.init(validatorPubKey.get())
else:
committeeAggKey.aggregate(validatorPubKey)
committeeAggKey.aggregate(validatorPubKey.get())

if not initialized:
# [REJECT] The contribution has participants
Expand Down
44 changes: 40 additions & 4 deletions beacon_chain/spec/beaconstate.nim
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ proc initialize_beacon_state_from_eth1*(
deposits.len)
state.eth1_deposit_index = deposits.lenu64

var pubkeyToIndex = initTable[ValidatorPubKey, int]()
var pubkeyToIndex = initTable[ValidatorPubKey, ValidatorIndex]()
for idx, deposit in deposits:
let
pubkey = deposit.pubkey
Expand All @@ -249,7 +249,7 @@ proc initialize_beacon_state_from_eth1*(
do:
if skipBlsValidation in flags or
verify_deposit_signature(cfg, deposit):
pubkeyToIndex[pubkey] = state.validators.len
pubkeyToIndex[pubkey] = ValidatorIndex(state.validators.len)
if not state.validators.add(get_validator_from_deposit(deposit)):
raiseAssert "too many validators"
if not state.balances.add(amount):
Expand Down Expand Up @@ -707,6 +707,9 @@ func get_next_sync_committee_keys(state: altair.BeaconState | merge.BeaconState)
## Return the sequence of sync committee indices (which may include
## duplicate indices) for the next sync committee, given a ``state`` at a
## sync committee period boundary.
# The sync committe depends on seed and effective balance - it can
# thus only be computed for the current epoch of the state, after balance
# updates have been performed

let epoch = get_current_epoch(state) + 1

Expand Down Expand Up @@ -744,9 +747,9 @@ proc get_next_sync_committee*(state: altair.BeaconState | merge.BeaconState):
# see signatures_batch, TODO shouldn't be here
# Deposit processing ensures all keys are valid
var attestersAgg: AggregatePublicKey
attestersAgg.init(res.pubkeys.data[0].loadWithCache().get)
attestersAgg.init(res.pubkeys.data[0].load().get)
for i in 1 ..< res.pubkeys.data.len:
attestersAgg.aggregate(res.pubkeys.data[i].loadWithCache().get)
attestersAgg.aggregate(res.pubkeys.data[i].load().get)

res.aggregate_pubkey = finish(attestersAgg).toPubKey()
res
Expand Down Expand Up @@ -922,3 +925,36 @@ func latest_block_root*(state: ForkyBeaconState, state_root: Eth2Digest): Eth2Di

func latest_block_root*(state: ForkyHashedBeaconState): Eth2Digest =
latest_block_root(state.data, state.root)

func get_sync_committee_cache*(
state: altair.BeaconState | merge.BeaconState, cache: var StateCache):
SyncCommitteeCache =
let period = state.slot.sync_committee_period()

cache.sync_committees.withValue(period, v) do:
return v[]

var
s = toHashSet(state.current_sync_committee.pubkeys.data)

for pk in state.next_sync_committee.pubkeys.data:
s.incl(pk)

var pubkeyIndices: Table[ValidatorPubKey, ValidatorIndex]
for i, v in state.validators:
if v.pubkey in s:
pubkeyIndices[v.pubkey] = i.ValidatorIndex

var res: SyncCommitteeCache
try:
for i in 0..<res.current_sync_committee.len():
res.current_sync_committee[i] =
pubkeyIndices[state.current_sync_committee.pubkeys[i]]
res.next_sync_committee[i] =
pubkeyIndices[state.next_sync_committee.pubkeys[i]]
except KeyError:
raiseAssert "table constructed just above"

cache.sync_committees[period] = res

res
4 changes: 3 additions & 1 deletion beacon_chain/spec/datatypes/altair.nim
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,9 @@ type
## - ProposerSlashing (SignedBeaconBlockHeader)
## - AttesterSlashing (IndexedAttestation)
## - SignedVoluntaryExits
## - SyncAggregate
##
## However:
## - ETH1Data (Deposits) can contain invalid BLS signatures
##
## The block state transition has NOT been verified
Expand All @@ -373,7 +375,7 @@ type
voluntary_exits*: List[TrustedSignedVoluntaryExit, Limit MAX_VOLUNTARY_EXITS]

# [New in Altair]
sync_aggregate*: SyncAggregate # TODO TrustedSyncAggregate after batching
sync_aggregate*: TrustedSyncAggregate

SyncnetBits* = BitArray[SYNC_COMMITTEE_SUBNET_COUNT]

Expand Down
5 changes: 5 additions & 0 deletions beacon_chain/spec/datatypes/base.nim
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,17 @@ type
message*: AggregateAndProof
signature*: ValidatorSig

SyncCommitteeCache* = object
current_sync_committee*: array[SYNC_COMMITTEE_SIZE, ValidatorIndex]
next_sync_committee*: array[SYNC_COMMITTEE_SIZE, ValidatorIndex]

# This doesn't know about forks or branches in the DAG. It's for straight,
# linear chunks of the chain.
StateCache* = object
shuffled_active_validator_indices*:
Table[Epoch, seq[ValidatorIndex]]
beacon_proposer_indices*: Table[Slot, Option[ValidatorIndex]]
sync_committees*: Table[SyncCommitteePeriod, SyncCommitteeCache]

# This matches the mutable state of the Solidity deposit contract
# https://github.com/ethereum/consensus-specs/blob/v1.1.2/solidity_deposit_contract/deposit_contract.sol
Expand Down
5 changes: 1 addition & 4 deletions beacon_chain/spec/eth2_apis/eth2_json_rpc_serialization.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ proc toJsonHex(data: openArray[byte]): string =

proc fromJson*(n: JsonNode, argName: string, result: var ValidatorPubKey) {.raises: [Defect, ValueError].} =
n.kind.expect(JString, argName)
var tmp = ValidatorPubKey.fromHex(n.getStr()).tryGet()
if not tmp.loadWithCache().isSome():
raise (ref ValueError)(msg: "Invalid public BLS key")
result = tmp
result = ValidatorPubKey.fromHex(n.getStr()).tryGet()

proc `%`*(pubkey: ValidatorPubKey): JsonNode =
newJString(toJsonHex(toRaw(pubkey)))
Expand Down
44 changes: 44 additions & 0 deletions beacon_chain/spec/signatures_batch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -414,4 +414,48 @@ proc collectSignatureSets*(
volex.message.epoch,
DOMAIN_VOLUNTARY_EXIT)

block:
# 7. SyncAggregate
# ----------------------------------------------------
withState(state):
when stateFork >= BeaconStateFork.Altair and
(signed_block is altair.SignedBeaconBlock or
signed_block is merge.SignedBeaconBlock):
let
current_sync_committee =
state.data.get_sync_committee_cache(cache).current_sync_committee

var inited = false
var attestersAgg{.noInit.}: AggregatePublicKey
for i in 0 ..< current_sync_committee.len:
if signed_block.message.body.sync_aggregate.sync_committee_bits[i]:
let key = validatorKeys.load(current_sync_committee[i])
if not key.isSome():
return err("Invalid key cache")

if not inited: # first iteration
attestersAgg.init(key.get())
inited = true
else:
attestersAgg.aggregate(key.get())

if not inited:
if signed_block.message.body.sync_aggregate.sync_committee_signature !=
default(CookedSig).toValidatorSig():
return err("process_sync_aggregate: empty sync aggregates need signature of point at infinity")
else:
let
attesters = finish(attestersAgg)
previous_slot = max(state.data.slot, Slot(1)) - 1

sigs.addSignatureSet(
attesters,
get_block_root_at_slot(state.data, previous_slot),
signed_block.message.body.sync_aggregate.sync_committee_signature.loadOrExit(
"process_sync_aggregate: cannot load signature"),
state.data.fork,
state.data.genesis_validators_root,
previous_slot.epoch,
DOMAIN_SYNC_COMMITTEE)

ok()
Loading

0 comments on commit 9c2f43e

Please sign in to comment.