Skip to content
60 changes: 50 additions & 10 deletions beacon-chain/cache/sync_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ func (s *SyncCommitteeCache) Clear() {
s.cache = cache.NewFIFO(keyFn)
}

// CurrentPeriodPositions returns current period positions of validator indices with respect with
// sync committee. If any input validator index has no assignment, an empty list will be returned
// for that validator. If the input root does not exist in cache, `ErrNonExistingSyncCommitteeKey` is returned.
// Manual checking of state for index position in state is recommended when `ErrNonExistingSyncCommitteeKey` is returned.
func (s *SyncCommitteeCache) CurrentPeriodPositions(root [32]byte, indices []primitives.ValidatorIndex) ([][]primitives.CommitteeIndex, error) {
s.lock.RLock()
defer s.lock.RUnlock()

pos, err := s.positionsInCommittee(root, indices)
if err != nil {
return nil, err
}
result := make([][]primitives.CommitteeIndex, len(pos))
for i, p := range pos {
if p == nil {
result[i] = []primitives.CommitteeIndex{}
} else {
result[i] = p.currentPeriod
}
}

return result, nil
}

// CurrentPeriodIndexPosition returns current period index position of a validator index with respect with
// sync committee. If the input validator index has no assignment, an empty list will be returned.
// If the input root does not exist in cache, `ErrNonExistingSyncCommitteeKey` is returned.
Expand Down Expand Up @@ -104,11 +128,7 @@ func (s *SyncCommitteeCache) NextPeriodIndexPosition(root [32]byte, valIdx primi
return pos.nextPeriod, nil
}

// Helper function for `CurrentPeriodIndexPosition` and `NextPeriodIndexPosition` to return a mapping
// of validator index to its index(s) position in the sync committee.
func (s *SyncCommitteeCache) idxPositionInCommittee(
root [32]byte, valIdx primitives.ValidatorIndex,
) (*positionInCommittee, error) {
func (s *SyncCommitteeCache) positionsInCommittee(root [32]byte, indices []primitives.ValidatorIndex) ([]*positionInCommittee, error) {
obj, exists, err := s.cache.GetByKey(key(root))
if err != nil {
return nil, err
Expand All @@ -121,13 +141,33 @@ func (s *SyncCommitteeCache) idxPositionInCommittee(
if !ok {
return nil, errNotSyncCommitteeIndexPosition
}
idxInCommittee, ok := item.vIndexToPositionMap[valIdx]
if !ok {
SyncCommitteeCacheMiss.Inc()
result := make([]*positionInCommittee, len(indices))
for i, idx := range indices {
idxInCommittee, ok := item.vIndexToPositionMap[idx]
if ok {
SyncCommitteeCacheHit.Inc()
result[i] = idxInCommittee
} else {
SyncCommitteeCacheMiss.Inc()
result[i] = nil
}
}
return result, nil
}

// Helper function for `CurrentPeriodIndexPosition` and `NextPeriodIndexPosition` to return a mapping
// of validator index to its index(s) position in the sync committee.
func (s *SyncCommitteeCache) idxPositionInCommittee(
root [32]byte, valIdx primitives.ValidatorIndex,
) (*positionInCommittee, error) {
positions, err := s.positionsInCommittee(root, []primitives.ValidatorIndex{valIdx})
if err != nil {
return nil, err
}
if len(positions) == 0 {
return nil, nil
}
SyncCommitteeCacheHit.Inc()
return idxInCommittee, nil
return positions[0], nil
}

// UpdatePositionsInCommittee updates caching of validators position in sync committee in respect to
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/cache/sync_committee_disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ func NewSyncCommittee() *FakeSyncCommitteeCache {
return &FakeSyncCommitteeCache{}
}

// CurrentPeriodPositions -- fake
func (s *FakeSyncCommitteeCache) CurrentPeriodPositions(root [32]byte, indices []primitives.ValidatorIndex) ([][]primitives.CommitteeIndex, error) {
return nil, nil
}

// CurrentEpochIndexPosition -- fake.
func (s *FakeSyncCommitteeCache) CurrentPeriodIndexPosition(root [32]byte, valIdx primitives.ValidatorIndex) ([]primitives.CommitteeIndex, error) {
return nil, nil
Expand Down
33 changes: 33 additions & 0 deletions beacon-chain/core/helpers/sync_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,39 @@ var (
syncCommitteeCache = cache.NewSyncCommittee()
)

// CurrentPeriodPositions returns committee indices of the current period sync committee for input validators.
func CurrentPeriodPositions(st state.BeaconState, indices []primitives.ValidatorIndex) ([][]primitives.CommitteeIndex, error) {
root, err := SyncPeriodBoundaryRoot(st)
if err != nil {
return nil, err
}
pos, err := syncCommitteeCache.CurrentPeriodPositions(root, indices)
if errors.Is(err, cache.ErrNonExistingSyncCommitteeKey) {
committee, err := st.CurrentSyncCommittee()
if err != nil {
return nil, err
}

// Fill in the cache on miss.
go func() {
if err := syncCommitteeCache.UpdatePositionsInCommittee(root, st); err != nil {
log.WithError(err).Error("Could not fill sync committee cache on miss")
}
}()

pos = make([][]primitives.CommitteeIndex, len(indices))
for i, idx := range indices {
pubkey := st.PubkeyAtIndex(idx)
pos[i] = findSubCommitteeIndices(pubkey[:], committee.Pubkeys)
}
return pos, nil
}
if err != nil {
return nil, err
}
return pos, nil
}

// IsCurrentPeriodSyncCommittee returns true if the input validator index belongs in the current period sync committee
// along with the sync committee root.
// 1. Checks if the public key exists in the sync committee cache
Expand Down
32 changes: 32 additions & 0 deletions beacon-chain/core/helpers/sync_committee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,38 @@ import (
"github.com/OffchainLabs/prysm/v6/testing/require"
)

func TestCurrentPeriodPositions(t *testing.T) {
helpers.ClearCache()

validators := make([]*ethpb.Validator, params.BeaconConfig().SyncCommitteeSize)
syncCommittee := &ethpb.SyncCommittee{
Pubkeys: make([][]byte, params.BeaconConfig().SyncCommitteeSize),
}
for i := 0; i < len(validators); i++ {
k := make([]byte, 48)
copy(k, strconv.Itoa(i))
validators[i] = &ethpb.Validator{
PublicKey: k,
}
syncCommittee.Pubkeys[i] = bytesutil.PadTo(k, 48)
}
state, err := state_native.InitializeFromProtoAltair(&ethpb.BeaconStateAltair{
Validators: validators,
})
require.NoError(t, err)
require.NoError(t, state.SetCurrentSyncCommittee(syncCommittee))
require.NoError(t, state.SetNextSyncCommittee(syncCommittee))
require.NoError(t, err, helpers.SyncCommitteeCache().UpdatePositionsInCommittee([32]byte{}, state))

positions, err := helpers.CurrentPeriodPositions(state, []primitives.ValidatorIndex{0, 1})
require.NoError(t, err)
require.Equal(t, 2, len(positions))
require.Equal(t, 1, len(positions[0]))
assert.Equal(t, primitives.CommitteeIndex(0), positions[0][0])
require.Equal(t, 1, len(positions[1]))
assert.Equal(t, primitives.CommitteeIndex(1), positions[1][0])
}

func TestIsCurrentEpochSyncCommittee_UsingCache(t *testing.T) {
helpers.ClearCache()

Expand Down
128 changes: 121 additions & 7 deletions beacon-chain/rpc/prysm/v1alpha1/validator/proposer_altair.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package validator

import (
"bytes"
"context"

"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
Expand All @@ -15,6 +17,7 @@ import (
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
)

func (vs *Server) setSyncAggregate(ctx context.Context, blk interfaces.SignedBeaconBlock) {
Expand Down Expand Up @@ -51,21 +54,28 @@ func (vs *Server) getSyncAggregate(ctx context.Context, slot primitives.Slot, ro
if vs.SyncCommitteePool == nil {
return nil, errors.New("sync committee pool is nil")
}
// Contributions have to match the input root
contributions, err := vs.SyncCommitteePool.SyncCommitteeContributions(slot)

poolContributions, err := vs.SyncCommitteePool.SyncCommitteeContributions(slot)
if err != nil {
return nil, err
}
proposerContributions := proposerSyncContributions(contributions).filterByBlockRoot(root)
// Contributions have to match the input root
proposerContributions := proposerSyncContributions(poolContributions).filterByBlockRoot(root)

aggregatedContributions, err := vs.aggregatedSyncCommitteeMessages(ctx, slot, root, poolContributions)
if err != nil {
return nil, errors.Wrap(err, "could not get aggregated sync committee messages")
}
proposerContributions = append(proposerContributions, aggregatedContributions...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this adding aggregated contributions to the current list of already aggregated contributions proposerContributions? shouldn't we aggregate them together otherwise we are likely to create non-trivial intersections among the signing indices rendering the aggregates unaggregatable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot that if we aggregate messages that are already part of some aggregate with messages that are not, it can be hard to get a better total aggregate. One mitigation I see is to discard messages that are already in proposerContributions by looking at appropriate bits. Did you have another solution in mind?


// Each sync subcommittee is 128 bits and the sync committee is 512 bits for mainnet.
subcommitteeCount := params.BeaconConfig().SyncCommitteeSubnetCount
var bitsHolder [][]byte
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
for i := uint64(0); i < subcommitteeCount; i++ {
bitsHolder = append(bitsHolder, ethpb.NewSyncCommitteeAggregationBits())
}
sigsHolder := make([]bls.Signature, 0, params.BeaconConfig().SyncCommitteeSize/params.BeaconConfig().SyncCommitteeSubnetCount)
sigsHolder := make([]bls.Signature, 0, params.BeaconConfig().SyncCommitteeSize/subcommitteeCount)

for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
for i := uint64(0); i < subcommitteeCount; i++ {
cs := proposerContributions.filterBySubIndex(i)
aggregates, err := synccontribution.Aggregate(cs)
if err != nil {
Expand Down Expand Up @@ -107,3 +117,107 @@ func (vs *Server) getSyncAggregate(ctx context.Context, slot primitives.Slot, ro
SyncCommitteeSignature: syncSigBytes[:],
}, nil
}

func (vs *Server) aggregatedSyncCommitteeMessages(
ctx context.Context,
slot primitives.Slot,
root [32]byte,
poolContributions []*ethpb.SyncCommitteeContribution,
) ([]*ethpb.SyncCommitteeContribution, error) {
subcommitteeCount := params.BeaconConfig().SyncCommitteeSubnetCount
subcommitteeSize := params.BeaconConfig().SyncCommitteeSize / subcommitteeCount
sigsPerSubcommittee := make([][][]byte, subcommitteeCount)
bitsPerSubcommittee := make([]bitfield.Bitfield, subcommitteeCount)
for i := uint64(0); i < subcommitteeCount; i++ {
sigsPerSubcommittee[i] = make([][]byte, 0, subcommitteeSize)
bitsPerSubcommittee[i] = ethpb.NewSyncCommitteeAggregationBits()
}

// Get committee position(s) for each message's validator index.
scMessages, err := vs.SyncCommitteePool.SyncCommitteeMessages(slot)
if err != nil {
return nil, errors.Wrap(err, "could not get sync committee messages")
}
messageIndices := make([]primitives.ValidatorIndex, 0, len(scMessages))
messageSigs := make([][]byte, 0, len(scMessages))
for _, msg := range scMessages {
if bytes.Equal(root[:], msg.BlockRoot) {
messageIndices = append(messageIndices, msg.ValidatorIndex)
messageSigs = append(messageSigs, msg.Signature)
}
}
st, err := vs.HeadFetcher.HeadState(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not get head state")
}
positions, err := helpers.CurrentPeriodPositions(st, messageIndices)
if err != nil {
return nil, errors.Wrap(err, "could not get sync committee positions")
}

// Based on committee position(s), set the appropriate subcommittee bit and signature.
for i, ci := range positions {
for _, index := range ci {
k := uint64(index)
subnetIndex := k / subcommitteeSize
indexMod := k % subcommitteeSize

// Existing aggregated contributions from the pool intersecting with aggregates
// created from single sync committee messages can result in bit intersections
// that fail to produce the best possible final aggregate. Ignoring bits that are
// already set in pool contributions makes intersections impossible.
intersects := false
for _, poolContrib := range poolContributions {
if poolContrib.SubcommitteeIndex == subnetIndex && poolContrib.AggregationBits.BitAt(indexMod) {
intersects = true
}
}
if !intersects && !bitsPerSubcommittee[subnetIndex].BitAt(indexMod) {
bitsPerSubcommittee[subnetIndex].SetBitAt(indexMod, true)
sigsPerSubcommittee[subnetIndex] = append(sigsPerSubcommittee[subnetIndex], messageSigs[i])
}
}
}

// Aggregate.
result := make([]*ethpb.SyncCommitteeContribution, 0, subcommitteeCount)
for i := uint64(0); i < subcommitteeCount; i++ {
aggregatedSig := make([]byte, 96)
aggregatedSig[0] = 0xC0
if len(sigsPerSubcommittee[i]) != 0 {
contrib, err := aggregateSyncSubcommitteeMessages(slot, root, i, bitsPerSubcommittee[i], sigsPerSubcommittee[i])
if err != nil {
// Skip aggregating this subcommittee
log.WithError(err).Errorf("Could not aggregate sync messages for subcommittee %d", i)
continue
}
result = append(result, contrib)
}
}

return result, nil
}

func aggregateSyncSubcommitteeMessages(
slot primitives.Slot,
root [32]byte,
subcommitteeIndex uint64,
bits bitfield.Bitfield,
sigs [][]byte,
) (*ethpb.SyncCommitteeContribution, error) {
var err error
uncompressedSigs := make([]bls.Signature, len(sigs))
for i, sig := range sigs {
uncompressedSigs[i], err = bls.SignatureFromBytesNoValidation(sig)
if err != nil {
return nil, errors.Wrap(err, "could not create signature from bytes")
}
}
return &ethpb.SyncCommitteeContribution{
Slot: slot,
BlockRoot: root[:],
SubcommitteeIndex: subcommitteeIndex,
AggregationBits: bits.Bytes(),
Signature: bls.AggregateSignatures(uncompressedSigs).Marshal(),
}, nil
}
Loading
Loading