Skip to content
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ var (
utils.AllowUnprotectedTxs,
utils.BatchRequestLimit,
utils.BatchResponseMaxSize,
utils.RPCTxSyncDefaultTimeoutFlag,
utils.RPCTxSyncMaxTimeoutFlag,
}

metricsFlags = []cli.Flag{
Expand Down
27 changes: 27 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,24 @@ var (
Value: ethconfig.Defaults.RPCTxFeeCap,
Category: flags.APICategory,
}
RPCGlobalLogQueryLimit = &cli.IntFlag{
Name: "rpc.logquerylimit",
Usage: "Maximum number of alternative addresses or topics allowed per search position in eth_getLogs filter criteria (0 = no cap)",
Value: ethconfig.Defaults.LogQueryLimit,
Category: flags.APICategory,
}
RPCTxSyncDefaultTimeoutFlag = &cli.DurationFlag{
Name: "rpc.txsync.defaulttimeout",
Usage: "Default timeout for eth_sendRawTransactionSync (e.g. 2s, 500ms)",
Value: ethconfig.Defaults.TxSyncDefaultTimeout,
Category: flags.APICategory,
}
RPCTxSyncMaxTimeoutFlag = &cli.DurationFlag{
Name: "rpc.txsync.maxtimeout",
Usage: "Maximum allowed timeout for eth_sendRawTransactionSync (e.g. 5m)",
Value: ethconfig.Defaults.TxSyncMaxTimeout,
Category: flags.APICategory,
}
// Authenticated RPC HTTP settings
AuthListenFlag = &cli.StringFlag{
Name: "authrpc.addr",
Expand Down Expand Up @@ -1837,6 +1855,15 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.IsSet(CacheLogSizeFlag.Name) {
cfg.FilterLogCacheSize = ctx.Int(CacheLogSizeFlag.Name)
}
if ctx.IsSet(RPCGlobalLogQueryLimit.Name) {
cfg.LogQueryLimit = ctx.Int(RPCGlobalLogQueryLimit.Name)
}
if ctx.IsSet(RPCTxSyncDefaultTimeoutFlag.Name) {
cfg.TxSyncDefaultTimeout = ctx.Duration(RPCTxSyncDefaultTimeoutFlag.Name)
}
if ctx.IsSet(RPCTxSyncMaxTimeoutFlag.Name) {
cfg.TxSyncMaxTimeout = ctx.Duration(RPCTxSyncMaxTimeoutFlag.Name)
}
if !ctx.Bool(SnapshotFlag.Name) || cfg.SnapshotCache == 0 {
// If snap-sync is requested, this flag is also required
if cfg.SyncMode == downloader.SnapSync {
Expand Down
26 changes: 21 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2367,7 +2367,11 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
if status == CanonStatTy {
bc.writeHeadBlock(block)

bc.chainFeed.Send(ChainEvent{Header: block.Header()})
bc.chainFeed.Send(ChainEvent{
Header: block.Header(),
Receipts: receipts,
Transactions: block.Transactions(),
})

if len(logs) > 0 {
bc.logsFeed.Send(logs)
Expand Down Expand Up @@ -3683,6 +3687,13 @@ func (bc *BlockChain) recoverAncestors(block *types.Block, makeWitness bool) (co
// collectLogs collects the logs that were generated or removed during the
// processing of a block. These logs are later announced as deleted or reborn.
func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
_, logs := bc.collectReceiptsAndLogs(b, removed)
return logs
}

// collectReceiptsAndLogs retrieves receipts from the database and returns both receipts and logs.
// This avoids duplicate database reads when both are needed.
func (bc *BlockChain) collectReceiptsAndLogs(b *types.Block, removed bool) ([]*types.Receipt, []*types.Log) {
var blobGasPrice *big.Int
if b.ExcessBlobGas() != nil {
blobGasPrice = eip4844.CalcBlobFee(bc.chainConfig, b.Header())
Expand All @@ -3708,8 +3719,7 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
logs = append(logs, log)
}
}

return logs
return receipts, logs
}

// reorg takes two blocks, an old chain and a new chain and will reconstruct the
Expand Down Expand Up @@ -3959,8 +3969,14 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
bc.writeHeadBlock(head)

// Emit events
logs := bc.collectLogs(head, false)
bc.chainFeed.Send(ChainEvent{Header: head.Header()})
receipts, logs := bc.collectReceiptsAndLogs(head, false)

bc.chainFeed.Send(ChainEvent{
Header: head.Header(),
Receipts: receipts,
Transactions: head.Transactions(),
})

if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
Expand Down
4 changes: 3 additions & 1 deletion core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ type NewMinedBlockEvent struct {
type RemovedLogsEvent struct{ Logs []*types.Log }

type ChainEvent struct {
Header *types.Header
Header *types.Header
Receipts []*types.Receipt
Transactions []*types.Transaction
}

type ChainSideEvent struct {
Expand Down
8 changes: 8 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,3 +712,11 @@ func (b *EthAPIBackend) WitnessByNumberOrHash(ctx context.Context, blockNrOrHash
func (b *EthAPIBackend) IsParallelImportActive() bool {
return b.eth.blockchain.IsParallelStatelessImportEnabled()
}

func (b *EthAPIBackend) RPCTxSyncDefaultTimeout() time.Duration {
return b.eth.config.TxSyncDefaultTimeout
}

func (b *EthAPIBackend) RPCTxSyncMaxTimeout() time.Duration {
return b.eth.config.TxSyncMaxTimeout
}
11 changes: 11 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var Defaults = Config{
TrieTimeout: 60 * time.Minute,
SnapshotCache: 102,
FilterLogCacheSize: 32,
LogQueryLimit: 1000,
Miner: miner.DefaultConfig,
TxPool: legacypool.DefaultConfig,
BlobPool: blobpool.DefaultConfig,
Expand All @@ -79,6 +80,8 @@ var Defaults = Config{
RPCTxFeeCap: 1, // 1 ether
FastForwardThreshold: 6400,
WitnessAPIEnabled: false,
TxSyncDefaultTimeout: 20 * time.Second,
TxSyncMaxTimeout: 1 * time.Minute,
}

//go:generate go run github.com/fjl/gencodec -type Config -formats toml -out gen_config.go
Expand Down Expand Up @@ -148,6 +151,10 @@ type Config struct {
// This is the number of blocks for which logs will be cached in the filter system.
FilterLogCacheSize int

// This is the maximum number of addresses or topics allowed in filter criteria
// for eth_getLogs.
LogQueryLimit int

// Address-specific cache sizes for biased caching (pathdb only)
// Maps account address to cache size in bytes
AddressCacheSizes map[common.Address]int
Expand Down Expand Up @@ -260,6 +267,10 @@ type Config struct {

// MaxBlindForkValidationLimit denotes the maximum number of blocks to traverse back in the database when validating blind forks
MaxBlindForkValidationLimit uint64

// EIP-7966: eth_sendRawTransactionSync timeouts
TxSyncDefaultTimeout time.Duration `toml:",omitempty"`
TxSyncMaxTimeout time.Duration `toml:",omitempty"`
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
81 changes: 81 additions & 0 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
errPendingLogsUnsupported = errors.New("pending logs are not supported")
errExceedMaxTopics = errors.New("exceed max topics")
errExceedMaxAddresses = errors.New("exceed max addresses")
errExceedMaxTxHashes = errors.New("exceed max number of transaction hashes allowed per transactionReceipts subscription")
)

const (
Expand All @@ -51,6 +52,8 @@ const (
maxTopics = 4
// The maximum number of allowed topics within a topic criteria
maxSubTopics = 1000
// The maximum number of transaction hash criteria allowed in a single subscription
maxTxHashes = 200
)

// filter is a helper struct that holds meta information over the filter type
Expand Down Expand Up @@ -305,6 +308,84 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc
return rpcSub, nil
}

// TransactionReceiptsQuery defines criteria for transaction receipts subscription.
// Same as ethereum.TransactionReceiptsQuery but with UnmarshalJSON() method.
type TransactionReceiptsQuery ethereum.TransactionReceiptsQuery

// UnmarshalJSON sets *args fields with given data.
func (args *TransactionReceiptsQuery) UnmarshalJSON(data []byte) error {
type input struct {
TransactionHashes []common.Hash `json:"transactionHashes"`
}

var raw input
if err := json.Unmarshal(data, &raw); err != nil {
return err
}

args.TransactionHashes = raw.TransactionHashes
return nil
}

// TransactionReceipts creates a subscription that fires transaction receipts when transactions are included in blocks.
func (api *FilterAPI) TransactionReceipts(ctx context.Context, filter *TransactionReceiptsQuery) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

// Validate transaction hashes limit
if filter != nil && len(filter.TransactionHashes) > maxTxHashes {
return nil, errExceedMaxTxHashes
}

var (
rpcSub = notifier.CreateSubscription()
matchedReceipts = make(chan []*ReceiptWithTx)
txHashes []common.Hash
)

if filter != nil {
txHashes = filter.TransactionHashes
}

receiptsSub := api.events.SubscribeTransactionReceipts(txHashes, matchedReceipts)

go func() {
defer receiptsSub.Unsubscribe()

signer := types.LatestSigner(api.sys.backend.ChainConfig())

for {
select {
case receiptsWithTxs := <-matchedReceipts:
if len(receiptsWithTxs) > 0 {
// Convert to the same format as eth_getTransactionReceipt
marshaledReceipts := make([]map[string]interface{}, len(receiptsWithTxs))
for i, receiptWithTx := range receiptsWithTxs {
marshaledReceipts[i] = ethapi.MarshalReceipt(
receiptWithTx.Receipt,
receiptWithTx.Receipt.BlockHash,
receiptWithTx.Receipt.BlockNumber.Uint64(),
signer,
receiptWithTx.Transaction,
int(receiptWithTx.Receipt.TransactionIndex),
false,
)
}

// Send a batch of tx receipts in one notification
notifier.Notify(rpcSub.ID, marshaledReceipts)
}
case <-rpcSub.Err():
return
}
}
}()

return rpcSub, nil
}

// FilterCriteria represents a request to create a new filter.
// Same as ethereum.FilterQuery but with UnmarshalJSON() method.
type FilterCriteria ethereum.FilterQuery
Expand Down
68 changes: 68 additions & 0 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/history"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -565,3 +566,70 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo

return true
}

// ReceiptWithTx contains a receipt and its corresponding transaction
type ReceiptWithTx struct {
Receipt *types.Receipt
Transaction *types.Transaction
}

// filterReceipts returns the receipts matching the given criteria
// In addition to returning receipts, it also returns the corresponding transactions.
// This is because receipts only contain low-level data, while user-facing data
// may require additional information from the Transaction.
func filterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx {
var ret []*ReceiptWithTx

receipts := ev.Receipts
txs := ev.Transactions

if len(receipts) != len(txs) {
log.Warn("Receipts and transactions length mismatch", "receipts", len(receipts), "transactions", len(txs))
return ret
}

if len(txHashes) == 0 {
// No filter, send all receipts with their transactions.
ret = make([]*ReceiptWithTx, len(receipts))
for i, receipt := range receipts {
ret[i] = &ReceiptWithTx{
Receipt: receipt,
Transaction: txs[i],
}
}
} else if len(txHashes) == 1 {
// Filter by single transaction hash.
// This is a common case, so we distinguish it from filtering by multiple tx hashes and made a small optimization.
for i, receipt := range receipts {
if receipt.TxHash == txHashes[0] {
ret = append(ret, &ReceiptWithTx{
Receipt: receipt,
Transaction: txs[i],
})
break
}
}
} else {
// Filter by multiple transaction hashes.
txHashMap := make(map[common.Hash]bool, len(txHashes))
for _, hash := range txHashes {
txHashMap[hash] = true
}

for i, receipt := range receipts {
if txHashMap[receipt.TxHash] {
ret = append(ret, &ReceiptWithTx{
Receipt: receipt,
Transaction: txs[i],
})

// Early exit if all receipts are found
if len(ret) == len(txHashes) {
break
}
}
}
}

return ret
}
Loading
Loading