Skip to content

Commit

Permalink
Merge pull request #222 from ethpandaops/pk910/speedup-epoch-restore
Browse files Browse the repository at this point in the history
speedup epoch stats restore from db for old unfinalized epochs
  • Loading branch information
pk910 authored Jan 22, 2025
2 parents ba1c437 + cab9c2e commit 7c0dc09
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
15 changes: 9 additions & 6 deletions indexer/beacon/epochstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ func (es *EpochStats) getRequestedBy() []*Client {
return clients
}

func (es *EpochStats) restoreFromDb(dbDuty *dbtypes.UnfinalizedDuty, dynSsz *dynssz.DynSsz, chainState *consensus.ChainState) error {
func (es *EpochStats) restoreFromDb(dbDuty *dbtypes.UnfinalizedDuty, dynSsz *dynssz.DynSsz, chainState *consensus.ChainState, withDuties bool) error {
if es.ready {
return nil
}

values, err := es.parsePackedSSZ(dynSsz, chainState, dbDuty.DutiesSSZ)
values, err := es.parsePackedSSZ(dynSsz, chainState, dbDuty.DutiesSSZ, withDuties)
if err != nil {
return err
}
Expand Down Expand Up @@ -187,7 +187,8 @@ func (es *EpochStats) buildPackedSSZ(dynSsz *dynssz.DynSsz) ([]byte, error) {
}

// unmarshalSSZ unmarshals the EpochStats values using the provided SSZ bytes.
func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensus.ChainState, ssz []byte) (*EpochStatsValues, error) {
// skips computing attester duties if withCommittees is false to speed up the process.
func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensus.ChainState, ssz []byte, withCommittees bool) (*EpochStatsValues, error) {
if dynSsz == nil {
dynSsz = dynssz.NewDynSsz(nil)
}
Expand Down Expand Up @@ -259,8 +260,10 @@ func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensu
}

// compute committees
attesterDuties, _ := duties.GetAttesterDuties(chainState.GetSpecs(), beaconState, es.epoch)
values.AttesterDuties = attesterDuties
if withCommittees {
attesterDuties, _ := duties.GetAttesterDuties(chainState.GetSpecs(), beaconState, es.epoch)
values.AttesterDuties = attesterDuties
}

return values, nil
}
Expand Down Expand Up @@ -298,7 +301,7 @@ func (es *EpochStats) loadValuesFromDb(dynSsz *dynssz.DynSsz, chainState *consen
return nil
}

values, err := es.parsePackedSSZ(dynSsz, chainState, dbDuty.DutiesSSZ)
values, err := es.parsePackedSSZ(dynSsz, chainState, dbDuty.DutiesSSZ, true)
if err != nil {
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions indexer/beacon/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,9 @@ func (indexer *Indexer) StartIndexer() {
}()

epochStats := indexer.epochCache.createOrGetEpochStats(phase0.Epoch(dbDuty.Epoch), phase0.Root(dbDuty.DependentRoot), false)
pruneStats := dbDuty.Epoch < uint64(indexer.lastPrunedEpoch)

err := epochStats.restoreFromDb(dbDuty, indexer.dynSsz, chainState)
err := epochStats.restoreFromDb(dbDuty, indexer.dynSsz, chainState, !pruneStats)
if err != nil {
indexer.logger.WithError(err).Errorf("failed restoring epoch stats for epoch %v (%x) from db", dbDuty.Epoch, dbDuty.DependentRoot)
return
Expand All @@ -266,7 +267,7 @@ func (indexer *Indexer) StartIndexer() {
epochStats.isInDb = true

restoredEpochStats++
if dbDuty.Epoch < uint64(indexer.lastPrunedEpoch) {
if pruneStats {
epochStats.pruneValues()
}
}()
Expand Down

0 comments on commit 7c0dc09

Please sign in to comment.