Skip to content
23 changes: 20 additions & 3 deletions consensus/beacon/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package beacon

import (
"context"
"errors"
"fmt"
"math/big"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/core/tracing"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/internal/telemetry"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/holiman/uint256"
Expand Down Expand Up @@ -351,9 +353,17 @@ func (beacon *Beacon) Finalize(chain consensus.ChainHeaderReader, header *types.

// FinalizeAndAssemble implements consensus.Engine, setting the final state and
// assembling the block.
func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) {
func (beacon *Beacon) FinalizeAndAssemble(ctx context.Context, chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (result *types.Block, err error) {
ctx, _, spanEnd := telemetry.StartSpan(ctx, "consensus.beacon.FinalizeAndAssemble",
telemetry.Int64Attribute("block.number", int64(header.Number.Uint64())),
telemetry.Int64Attribute("txs.count", int64(len(body.Transactions))),
telemetry.Int64Attribute("withdrawals.count", int64(len(body.Withdrawals))),
)
defer spanEnd(&err)

if !beacon.IsPoSHeader(header) {
return beacon.ethone.FinalizeAndAssemble(chain, header, state, body, receipts)
block, delegateErr := beacon.ethone.FinalizeAndAssemble(ctx, chain, header, state, body, receipts)
return block, delegateErr
}
shanghai := chain.Config().IsShanghai(header.Number, header.Time)
if shanghai {
Expand All @@ -367,13 +377,20 @@ func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea
}
}
// Finalize and assemble the block.
_, _, finalizeSpanEnd := telemetry.StartSpan(ctx, "consensus.beacon.Finalize")
beacon.Finalize(chain, header, state, body)
finalizeSpanEnd(nil)

// Assign the final state root to header.
_, _, rootSpanEnd := telemetry.StartSpan(ctx, "consensus.beacon.IntermediateRoot")
header.Root = state.IntermediateRoot(true)
rootSpanEnd(nil)

// Assemble the final block.
return types.NewBlock(header, body, receipts, trie.NewStackTrie(nil)), nil
_, _, blockSpanEnd := telemetry.StartSpan(ctx, "consensus.beacon.NewBlock")
block := types.NewBlock(header, body, receipts, trie.NewStackTrie(nil))
blockSpanEnd(nil)
return block, nil
}

// Seal generates a new sealing request for the given input block and pushes
Expand Down
3 changes: 2 additions & 1 deletion consensus/clique/clique.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package clique

import (
"bytes"
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -581,7 +582,7 @@ func (c *Clique) Finalize(chain consensus.ChainHeaderReader, header *types.Heade

// FinalizeAndAssemble implements consensus.Engine, ensuring no uncles are set,
// nor block rewards given, and returns the final block.
func (c *Clique) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) {
func (c *Clique) FinalizeAndAssemble(ctx context.Context, chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) {
if len(body.Withdrawals) > 0 {
return nil, errors.New("clique does not support withdrawals")
}
Expand Down
3 changes: 2 additions & 1 deletion consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package consensus

import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -92,7 +93,7 @@ type Engine interface {
//
// Note: The block header and state database might be updated to reflect any
// consensus rules that happen at finalization (e.g. block rewards).
FinalizeAndAssemble(chain ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error)
FinalizeAndAssemble(ctx context.Context, chain ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error)

// Seal generates a new sealing request for the given input block and pushes
// the result into the given channel.
Expand Down
3 changes: 2 additions & 1 deletion consensus/ethash/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package ethash

import (
"context"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -513,7 +514,7 @@ func (ethash *Ethash) Finalize(chain consensus.ChainHeaderReader, header *types.

// FinalizeAndAssemble implements consensus.Engine, accumulating the block and
// uncle rewards, setting the final state and assembling the block.
func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) {
func (ethash *Ethash) FinalizeAndAssemble(ctx context.Context, chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) {
if len(body.Withdrawals) > 0 {
return nil, errors.New("ethash does not support withdrawals")
}
Expand Down
3 changes: 2 additions & 1 deletion core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"context"
"fmt"
"math/big"

Expand Down Expand Up @@ -411,7 +412,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
}

body := types.Body{Transactions: b.txs, Uncles: b.uncles, Withdrawals: b.withdrawals}
block, err := b.engine.FinalizeAndAssemble(cm, b.header, statedb, &body, b.receipts)
block, err := b.engine.FinalizeAndAssemble(context.Background(), cm, b.header, statedb, &body, b.receipts)
if err != nil {
panic(err)
}
Expand Down
8 changes: 5 additions & 3 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1865,11 +1865,11 @@ func (p *BlobPool) drop() {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
func (p *BlobPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) {
// If only plain transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if !filter.BlobTxs {
return nil
return nil, 0
}
// Track the amount of time waiting to retrieve the list of pending blob txs
// from the pool and the amount of time actually spent on assembling the data.
Expand All @@ -1885,6 +1885,7 @@ func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*tx
pendtimeHist.Update(time.Since(execStart).Nanoseconds())
}()

var count int
pending := make(map[common.Address][]*txpool.LazyTransaction, len(p.index))
for addr, txs := range p.index {
lazies := make([]*txpool.LazyTransaction, 0, len(txs))
Expand Down Expand Up @@ -1930,9 +1931,10 @@ func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*tx
}
if len(lazies) > 0 {
pending[addr] = lazies
count += len(lazies)
}
}
return pending
return pending, count
}

// updateStorageMetrics retrieves a bunch of stats from the data store and pushes
Expand Down
2 changes: 1 addition & 1 deletion core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2122,7 +2122,7 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
p := pool.Pending(txpool.PendingFilter{
p, _ := pool.Pending(txpool.PendingFilter{
MinTip: uint256.NewInt(1),
BaseFee: chain.basefee,
BlobFee: chain.blobfee,
Expand Down
8 changes: 5 additions & 3 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,15 +494,16 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) {
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.BlobTxs {
return nil
return nil, 0
}
pool.mu.Lock()
defer pool.mu.Unlock()

var count int
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs := list.Flatten()
Expand Down Expand Up @@ -539,9 +540,10 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
}
}
pending[addr] = lazies
count += len(lazies)
}
}
return pending
return pending, count
}

// ValidateTxBasics checks whether a transaction is valid according to the consensus
Expand Down
2 changes: 1 addition & 1 deletion core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type SubPool interface {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
Pending(filter PendingFilter) map[common.Address][]*LazyTransaction
Pending(filter PendingFilter) (map[common.Address][]*LazyTransaction, int)

// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
Expand Down
11 changes: 7 additions & 4 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,14 +359,17 @@ func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransaction {
func (p *TxPool) Pending(filter PendingFilter) (map[common.Address][]*LazyTransaction, int) {
var count int
txs := make(map[common.Address][]*LazyTransaction)
for _, subpool := range p.subpools {
for addr, set := range subpool.Pending(filter) {
txs[addr] = set
set, n := subpool.Pending(filter)
for addr, list := range set {
txs[addr] = list
}
count += n
}
return txs
return txs, count
}

// SubscribeTransactions registers a subscription for new transaction events,
Expand Down
2 changes: 1 addition & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
}

func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(txpool.PendingFilter{})
pending, _ := b.eth.txPool.Pending(txpool.PendingFilter{})
var txs types.Transactions
for _, batch := range pending {
for _, lazy := range batch {
Expand Down
22 changes: 12 additions & 10 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func newConsensusAPIWithoutHeartbeat(eth *eth.Ethereum) *ConsensusAPI {
//
// If there are payloadAttributes: we try to assemble a block with the payloadAttributes
// and return its payloadID.
func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedV1(ctx context.Context, update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if payloadAttributes != nil {
switch {
case payloadAttributes.Withdrawals != nil || payloadAttributes.BeaconRoot != nil:
Expand All @@ -171,12 +171,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, paramsErr("fcuV1 called post-shanghai")
}
}
return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1, false)
return api.forkchoiceUpdated(ctx, update, payloadAttributes, engine.PayloadV1, false)
}

// ForkchoiceUpdatedV2 is equivalent to V1 with the addition of withdrawals in the payload
// attributes. It supports both PayloadAttributesV1 and PayloadAttributesV2.
func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedV2(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if params != nil {
switch {
case params.BeaconRoot != nil:
Expand All @@ -189,12 +189,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, unsupportedForkErr("fcuV2 must only be called with paris or shanghai payloads")
}
}
return api.forkchoiceUpdated(update, params, engine.PayloadV2, false)
return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV2, false)
}

// ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root
// in the payload attributes. It supports only PayloadAttributesV3.
func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedV3(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if params != nil {
switch {
case params.Withdrawals == nil:
Expand All @@ -209,12 +209,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, pa
// hash, even if params are wrong. To do this we need to split up
// forkchoiceUpdate into a function that only updates the head and then a
// function that kicks off block construction.
return api.forkchoiceUpdated(update, params, engine.PayloadV3, false)
return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV3, false)
}

// ForkchoiceUpdatedV4 is equivalent to V3 with the addition of slot number
// in the payload attributes. It supports only PayloadAttributesV4.
func (api *ConsensusAPI) ForkchoiceUpdatedV4(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedV4(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if params != nil {
switch {
case params.Withdrawals == nil:
Expand All @@ -231,10 +231,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV4(update engine.ForkchoiceStateV1, pa
// hash, even if params are wrong. To do this we need to split up
// forkchoiceUpdate into a function that only updates the head and then a
// function that kicks off block construction.
return api.forkchoiceUpdated(update, params, engine.PayloadV4, false)
return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV4, false)
}

func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion, payloadWitness bool) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) forkchoiceUpdated(ctx context.Context, update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion, payloadWitness bool) (result engine.ForkChoiceResponse, err error) {
ctx, _, spanEnd := telemetry.StartSpan(ctx, "engine.forkchoiceUpdated")
defer spanEnd(&err)
api.forkchoiceLock.Lock()
defer api.forkchoiceLock.Unlock()

Expand Down Expand Up @@ -375,7 +377,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
if api.localBlocks.has(id) {
return valid(&id), nil
}
payload, err := api.eth.Miner().BuildPayload(args, payloadWitness)
payload, err := api.eth.Miner().BuildPayload(ctx, args, payloadWitness)
if err != nil {
log.Error("Failed to build payload", "err", err)
return valid(nil), engine.InvalidPayloadAttributes.With(err)
Expand Down
Loading