Skip to content

Commit

Permalink
Merge pull request #223 from ethpandaops/pk910/fix-filtered-slot-perf…
Browse files Browse the repository at this point in the history
…ormance

fix performance for filtered slot requests during long unfinality
  • Loading branch information
pk910 authored Jan 24, 2025
2 parents 7c0dc09 + 6830dde commit d170a55
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 63 deletions.
33 changes: 14 additions & 19 deletions indexer/beacon/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Block struct {
parentRoot *phase0.Root
dependentRoot *phase0.Root
forkId ForkKey
fokChecked bool
forkChecked bool
headerMutex sync.Mutex
headerChan chan bool
header *phase0.SignedBeaconBlockHeader
Expand Down Expand Up @@ -317,7 +317,7 @@ func (block *Block) unpruneBlockBody() {
}

// GetDbBlock returns the database representation of this block.
func (block *Block) GetDbBlock(indexer *Indexer) *dbtypes.Slot {
func (block *Block) GetDbBlock(indexer *Indexer, isCanonical bool) *dbtypes.Slot {
var epochStats *EpochStats
chainState := indexer.consensusPool.GetChainState()
if dependentBlock := indexer.blockCache.getDependentBlock(chainState, block, nil); dependentBlock != nil {
Expand All @@ -329,44 +329,39 @@ func (block *Block) GetDbBlock(indexer *Indexer) *dbtypes.Slot {
return nil
}

if !indexer.IsCanonicalBlock(block, nil) {
if !isCanonical {
dbBlock.Status = dbtypes.Orphaned
}

return dbBlock
}

// GetDbDeposits returns the database representation of the deposits in this block.
func (block *Block) GetDbDeposits(indexer *Indexer, depositIndex *uint64) []*dbtypes.Deposit {
orphaned := !indexer.IsCanonicalBlock(block, nil)
dbDeposits := indexer.dbWriter.buildDbDeposits(block, depositIndex, orphaned, nil)
dbDeposits = append(dbDeposits, indexer.dbWriter.buildDbDepositRequests(block, orphaned, nil)...)
func (block *Block) GetDbDeposits(indexer *Indexer, depositIndex *uint64, isCanonical bool) []*dbtypes.Deposit {
dbDeposits := indexer.dbWriter.buildDbDeposits(block, depositIndex, !isCanonical, nil)
dbDeposits = append(dbDeposits, indexer.dbWriter.buildDbDepositRequests(block, !isCanonical, nil)...)

return dbDeposits
}

// GetDbVoluntaryExits returns the database representation of the voluntary exits in this block.
func (block *Block) GetDbVoluntaryExits(indexer *Indexer) []*dbtypes.VoluntaryExit {
orphaned := !indexer.IsCanonicalBlock(block, nil)
return indexer.dbWriter.buildDbVoluntaryExits(block, orphaned, nil)
func (block *Block) GetDbVoluntaryExits(indexer *Indexer, isCanonical bool) []*dbtypes.VoluntaryExit {
return indexer.dbWriter.buildDbVoluntaryExits(block, !isCanonical, nil)
}

// GetDbSlashings returns the database representation of the slashings in this block.
func (block *Block) GetDbSlashings(indexer *Indexer) []*dbtypes.Slashing {
orphaned := !indexer.IsCanonicalBlock(block, nil)
return indexer.dbWriter.buildDbSlashings(block, orphaned, nil)
func (block *Block) GetDbSlashings(indexer *Indexer, isCanonical bool) []*dbtypes.Slashing {
return indexer.dbWriter.buildDbSlashings(block, !isCanonical, nil)
}

// GetDbWithdrawalRequests returns the database representation of the withdrawal requests in this block.
func (block *Block) GetDbWithdrawalRequests(indexer *Indexer) []*dbtypes.WithdrawalRequest {
orphaned := !indexer.IsCanonicalBlock(block, nil)
return indexer.dbWriter.buildDbWithdrawalRequests(block, orphaned, nil)
func (block *Block) GetDbWithdrawalRequests(indexer *Indexer, isCanonical bool) []*dbtypes.WithdrawalRequest {
return indexer.dbWriter.buildDbWithdrawalRequests(block, !isCanonical, nil)
}

// GetDbConsolidationRequests returns the database representation of the consolidation requests in this block.
func (block *Block) GetDbConsolidationRequests(indexer *Indexer) []*dbtypes.ConsolidationRequest {
orphaned := !indexer.IsCanonicalBlock(block, nil)
return indexer.dbWriter.buildDbConsolidationRequests(block, orphaned, nil)
func (block *Block) GetDbConsolidationRequests(indexer *Indexer, isCanonical bool) []*dbtypes.ConsolidationRequest {
return indexer.dbWriter.buildDbConsolidationRequests(block, !isCanonical, nil)
}

// GetForkId returns the fork ID of this block.
Expand Down
15 changes: 14 additions & 1 deletion indexer/beacon/canonical.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,20 @@ func (indexer *Indexer) GetChainHeads() []*ChainHead {

func (indexer *Indexer) IsCanonicalBlock(block *Block, overrideForkId *ForkKey) bool {
canonicalHead := indexer.GetCanonicalHead(overrideForkId)
return canonicalHead != nil && indexer.blockCache.isCanonicalBlock(block.Root, canonicalHead.Root)
return indexer.IsCanonicalBlockByHead(block, canonicalHead)
}

func (indexer *Indexer) IsCanonicalBlockByHead(block *Block, headBlock *Block) bool {
if headBlock == nil || block == nil {
return false
}

if block.forkChecked && headBlock.forkChecked {
parentForkIds := indexer.forkCache.getParentForkIds(headBlock.forkId)
return slices.Contains(parentForkIds, block.forkId)
}

return indexer.blockCache.isCanonicalBlock(block.Root, headBlock.Root)
}

// computeCanonicalChain computes the canonical chain and updates the indexer's state.
Expand Down
8 changes: 4 additions & 4 deletions indexer/beacon/forkdetection.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (cache *forkCache) processBlock(block *Block) error {
parentIsProcessed = true
parentIsFinalized = parentSlot < chainState.GetFinalizedSlot()
}
} else if parentBlock.fokChecked {
} else if parentBlock.forkChecked {
parentForkId = parentBlock.forkId
parentSlot = parentBlock.Slot
parentIsProcessed = true
Expand Down Expand Up @@ -163,7 +163,7 @@ func (cache *forkCache) processBlock(block *Block) error {
// check scenario 2
childBlocks := make([]*Block, 0)
for _, child := range cache.indexer.blockCache.getBlocksByParentRoot(block.Root) {
if !child.fokChecked {
if !child.forkChecked {
continue
}

Expand Down Expand Up @@ -211,7 +211,7 @@ func (cache *forkCache) processBlock(block *Block) error {

// set detected fork id to the block
block.forkId = currentForkId
block.fokChecked = true
block.forkChecked = true

// update fork head block if needed
fork := cache.getForkById(currentForkId)
Expand Down Expand Up @@ -336,7 +336,7 @@ func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skip
}

nextBlock := nextBlocks[0]
if !nextBlock.fokChecked {
if !nextBlock.forkChecked {
break
}

Expand Down
2 changes: 1 addition & 1 deletion indexer/beacon/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (indexer *Indexer) StartIndexer() {

block, _ := indexer.blockCache.createOrGetBlock(phase0.Root(dbBlock.Root), phase0.Slot(dbBlock.Slot))
block.forkId = ForkKey(dbBlock.ForkId)
block.fokChecked = true
block.forkChecked = true
block.processingStatus = dbBlock.Status
block.isInUnfinalizedDb = true

Expand Down
10 changes: 7 additions & 3 deletions services/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,17 @@ func (bs *ChainService) GetHeadForks(readyOnly bool) []*beacon.ForkHead {
return bs.beaconIndexer.GetForkHeads()
}

func (bs *ChainService) GetCanonicalForkIds() []uint64 {
func (bs *ChainService) GetCanonicalForkKeys() []beacon.ForkKey {
canonicalHead := bs.beaconIndexer.GetCanonicalHead(nil)
if canonicalHead == nil {
return []uint64{0}
return []beacon.ForkKey{0}
}

parentForkKeys := bs.beaconIndexer.GetParentForkIds(canonicalHead.GetForkId())
return bs.beaconIndexer.GetParentForkIds(canonicalHead.GetForkId())
}

func (bs *ChainService) GetCanonicalForkIds() []uint64 {
parentForkKeys := bs.GetCanonicalForkKeys()
forkIds := make([]uint64, len(parentForkKeys))
for idx, forkId := range parentForkKeys {
forkIds[idx] = uint64(forkId)
Expand Down
34 changes: 25 additions & 9 deletions services/chainservice_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,16 +281,21 @@ func (bs *ChainService) GetDbBlocksForSlots(firstSlot uint64, slotLimit uint32,
}

// get blocks from cache
lastCanonicalBlock := bs.beaconIndexer.GetCanonicalHead(nil)
slot := phase0.Slot(firstSlot)
if slot >= prunedSlot {
for slotIdx := int64(slot); slotIdx >= int64(prunedSlot) && slotIdx >= int64(lastSlot); slotIdx-- {
slot = phase0.Slot(slotIdx)
blocks := bs.beaconIndexer.GetBlocksBySlot(slot)
for _, block := range blocks {
if !withOrphaned && !bs.beaconIndexer.IsCanonicalBlock(block, nil) {
isCanonical := bs.beaconIndexer.IsCanonicalBlockByHead(block, lastCanonicalBlock)
if isCanonical {
lastCanonicalBlock = block
}
if !withOrphaned && !isCanonical {
continue
}
dbBlock := block.GetDbBlock(bs.beaconIndexer)
dbBlock := block.GetDbBlock(bs.beaconIndexer, isCanonical)
if dbBlock != nil {
resBlocks = append(resBlocks, dbBlock)
}
Expand Down Expand Up @@ -353,7 +358,11 @@ func (bs *ChainService) GetDbBlocksForSlots(firstSlot uint64, slotLimit uint32,
continue
}

isCanonical := bs.beaconIndexer.IsCanonicalBlock(block, nil)
isCanonical := bs.beaconIndexer.IsCanonicalBlockByHead(block, lastCanonicalBlock)
if isCanonical {
lastCanonicalBlock = block
}

if !withOrphaned && !isCanonical {
continue
}
Expand Down Expand Up @@ -519,6 +528,8 @@ func (bs *ChainService) GetDbBlocksByFilter(filter *dbtypes.BlockFilter, pageIdx

// get blocks from cache
// iterate from current slot to finalized slot
lastCanonicalBlock := bs.beaconIndexer.GetCanonicalHead(nil)

for slotIdx := int64(startSlot); slotIdx >= int64(finalizedSlot); slotIdx-- {
slot := phase0.Slot(slotIdx)
blocks := bs.beaconIndexer.GetBlocksBySlot(slot)
Expand All @@ -532,13 +543,17 @@ func (bs *ChainService) GetDbBlocksByFilter(filter *dbtypes.BlockFilter, pageIdx
continue
}

isOrphaned := !bs.beaconIndexer.IsCanonicalBlock(block, nil)
isCanonical := bs.beaconIndexer.IsCanonicalBlockByHead(block, lastCanonicalBlock)
if isCanonical {
lastCanonicalBlock = block
}

if filter.WithOrphaned != 1 {
if filter.WithOrphaned == 0 && isOrphaned {
if filter.WithOrphaned == 0 && !isCanonical {
// only canonical blocks, skip
continue
}
if filter.WithOrphaned == 2 && !isOrphaned {
if filter.WithOrphaned == 2 && isCanonical {
// only orphaned blocks, skip
continue
}
Expand Down Expand Up @@ -582,7 +597,7 @@ func (bs *ChainService) GetDbBlocksByFilter(filter *dbtypes.BlockFilter, pageIdx
cachedMatches = append(cachedMatches, cachedDbBlock{
slot: uint64(block.Slot),
proposer: uint64(blockHeader.Message.ProposerIndex),
orphaned: isOrphaned,
orphaned: !isCanonical,
block: block,
})
}
Expand Down Expand Up @@ -674,7 +689,7 @@ func (bs *ChainService) GetDbBlocksByFilter(filter *dbtypes.BlockFilter, pageIdx
}
if block.block != nil {
if block.slot >= uint64(prunedSlot) {
assignedBlock.Block = block.block.GetDbBlock(bs.beaconIndexer)
assignedBlock.Block = block.block.GetDbBlock(bs.beaconIndexer, !block.orphaned)
} else {
blockRoots = append(blockRoots, block.block.Root[:])
blockRootsIdx = append(blockRootsIdx, resIdx)
Expand Down Expand Up @@ -730,7 +745,8 @@ func (bs *ChainService) GetDbBlocksByParentRoot(parentRoot phase0.Root) []*dbtyp
cachedMatches := bs.beaconIndexer.GetBlockByParentRoot(parentRoot)
resBlocks := make([]*dbtypes.Slot, len(cachedMatches))
for idx, block := range cachedMatches {
resBlocks[idx] = block.GetDbBlock(bs.beaconIndexer)
isCanonical := bs.beaconIndexer.IsCanonicalBlock(block, nil)
resBlocks[idx] = block.GetDbBlock(bs.beaconIndexer, isCanonical)
}
if parentBlock == nil {
resBlocks = append(resBlocks, db.GetSlotsByParentRoot(parentRoot[:])...)
Expand Down
8 changes: 4 additions & 4 deletions services/chainservice_consolidations.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ func (bs *ChainService) GetConsolidationRequestOperationsByFilter(filter *dbtype
if blocks != nil {
for bidx := 0; bidx < len(blocks); bidx++ {
block := blocks[bidx]
isCanonical := bs.isCanonicalForkId(uint64(block.GetForkId()), canonicalForkIds)
if filter.WithOrphaned != 1 {
isOrphaned := !bs.isCanonicalForkId(uint64(block.GetForkId()), canonicalForkIds)
if filter.WithOrphaned == 0 && isOrphaned {
if filter.WithOrphaned == 0 && !isCanonical {
continue
}
if filter.WithOrphaned == 2 && !isOrphaned {
if filter.WithOrphaned == 2 && isCanonical {
continue
}
}
Expand All @@ -211,7 +211,7 @@ func (bs *ChainService) GetConsolidationRequestOperationsByFilter(filter *dbtype
continue
}

consolidationRequests := block.GetDbConsolidationRequests(bs.beaconIndexer)
consolidationRequests := block.GetDbConsolidationRequests(bs.beaconIndexer, isCanonical)
slice.Reverse(consolidationRequests) // reverse as other datasources are ordered by descending block index too
for idx, consolidationRequest := range consolidationRequests {
if filter.MinSrcIndex > 0 && (consolidationRequest.SourceIndex == nil || *consolidationRequest.SourceIndex < filter.MinSrcIndex) {
Expand Down
Loading

0 comments on commit d170a55

Please sign in to comment.