Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTxs, "rpc.subscription.filters.maxtxs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs, "Maximum number of transactions to store per subscription.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "rpc.subscription.filters.maxaddresses", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "Maximum number of addresses per subscription to filter logs by.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTopics, "rpc.subscription.filters.maxtopics", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics, "Maximum number of topics per subscription to filter logs by.")
rootCmd.PersistentFlags().DurationVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersTimeout, "rpc.subscription.filters.timeout", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersTimeout, "Timeout before idle filters are evicted. Defaults to 0 to disable eviction.")
rootCmd.PersistentFlags().IntVar(&cfg.BatchLimit, utils.RpcBatchLimit.Name, utils.RpcBatchLimit.Value, utils.RpcBatchLimit.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.ReturnDataLimit, utils.RpcReturnDataLimit.Name, utils.RpcReturnDataLimit.Value, utils.RpcReturnDataLimit.Usage)
rootCmd.PersistentFlags().BoolVar(&cfg.AllowUnprotectedTxs, utils.AllowUnprotectedTxs.Name, utils.AllowUnprotectedTxs.Value, utils.AllowUnprotectedTxs.Usage)
Expand Down
2 changes: 1 addition & 1 deletion execution/tests/execution-spec-tests
1 change: 1 addition & 0 deletions node/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ var DefaultFlags = []cli.Flag{
&RpcSubscriptionFiltersMaxTxsFlag,
&RpcSubscriptionFiltersMaxAddressesFlag,
&RpcSubscriptionFiltersMaxTopicsFlag,
&RpcSubscriptionFiltersTimeoutFlag,

&utils.SnapKeepBlocksFlag,
&utils.SnapStopFlag,
Expand Down
6 changes: 6 additions & 0 deletions node/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ var (
Usage: "Maximum number of topics per subscription to filter logs by.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics,
}
RpcSubscriptionFiltersTimeoutFlag = cli.DurationFlag{
Name: "rpc.subscription.filters.timeout",
Usage: "Timeout before idle filters are evicted. Set to 0 to disable eviction.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersTimeout,
}
)

func ApplyFlagsForEthConfig(ctx *cli.Context, cfg *ethconfig.Config, logger log.Logger) {
Expand Down Expand Up @@ -443,6 +448,7 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg
RpcSubscriptionFiltersMaxTxs: ctx.Int(RpcSubscriptionFiltersMaxTxsFlag.Name),
RpcSubscriptionFiltersMaxAddresses: ctx.Int(RpcSubscriptionFiltersMaxAddressesFlag.Name),
RpcSubscriptionFiltersMaxTopics: ctx.Int(RpcSubscriptionFiltersMaxTopicsFlag.Name),
RpcSubscriptionFiltersTimeout: ctx.Duration(RpcSubscriptionFiltersTimeoutFlag.Name),
},
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
Feecap: ctx.Float64(utils.RPCGlobalTxFeeCapFlag.Name),
Expand Down
12 changes: 12 additions & 0 deletions rpc/jsonrpc/eth_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (api *APIImpl) NewPendingTransactionFilter(_ context.Context) (string, erro
return "", rpc.ErrNotificationsUnsupported
}
txsCh, id := api.filters.SubscribePendingTxs(32)
api.filters.TrackSubscription(rpchelper.SubscriptionID(id), rpchelper.FilterTypePendingTxs, rpchelper.ProtocolHTTP)
go func() {
for txs := range txsCh {
api.filters.AddPendingTxs(id, txs)
Expand All @@ -49,6 +50,7 @@ func (api *APIImpl) NewBlockFilter(_ context.Context) (string, error) {
return "", rpc.ErrNotificationsUnsupported
}
ch, id := api.filters.SubscribeNewHeads(32)
api.filters.TrackSubscription(rpchelper.SubscriptionID(id), rpchelper.FilterTypeHeads, rpchelper.ProtocolHTTP)
go func() {
for block := range ch {
api.filters.AddPendingBlock(id, block)
Expand All @@ -63,6 +65,7 @@ func (api *APIImpl) NewFilter(_ context.Context, crit filters.FilterCriteria) (s
return "", rpc.ErrNotificationsUnsupported
}
logs, id := api.filters.SubscribeLogs(256, crit)
api.filters.TrackSubscription(rpchelper.SubscriptionID(id), rpchelper.FilterTypeLogs, rpchelper.ProtocolHTTP)
go func() {
for lg := range logs {
api.filters.AddLogs(id, lg)
Expand Down Expand Up @@ -110,12 +113,14 @@ func (api *APIImpl) GetFilterChanges(_ context.Context, index string) ([]any, er

// Identify the subscription type by probing each store; if none have data yet, return empty slice
if blocks, ok := api.filters.ReadPendingBlocks(rpchelper.HeadsSubID(cutIndex)); ok {
api.filters.TouchSubscription(rpchelper.SubscriptionID(cutIndex), rpchelper.FilterTypeHeads)
for _, v := range blocks {
stub = append(stub, v.Hash())
}
return stub, nil
}
if txs, ok := api.filters.ReadPendingTxs(rpchelper.PendingTxsSubID(cutIndex)); ok {
api.filters.TouchSubscription(rpchelper.SubscriptionID(cutIndex), rpchelper.FilterTypePendingTxs)
if len(txs) > 0 {
for _, txn := range txs[0] {
stub = append(stub, txn.Hash())
Expand All @@ -125,6 +130,7 @@ func (api *APIImpl) GetFilterChanges(_ context.Context, index string) ([]any, er
return stub, nil
}
if logs, ok := api.filters.ReadLogs(rpchelper.LogsSubID(cutIndex)); ok {
api.filters.TouchSubscription(rpchelper.SubscriptionID(cutIndex), rpchelper.FilterTypeLogs)
for _, v := range logs {
stub = append(stub, v)
}
Expand All @@ -144,6 +150,8 @@ func (api *APIImpl) GetFilterLogs(_ context.Context, index string) ([]*types.Log
if found := api.filters.HasSubscription(rpchelper.LogsSubID(cutIndex)); !found {
return nil, rpc.ErrFilterNotFound
}
// Reset the filter deadline since it was just accessed
api.filters.TouchSubscription(rpchelper.SubscriptionID(cutIndex), rpchelper.FilterTypeLogs)
if logs, ok := api.filters.ReadLogs(rpchelper.LogsSubID(cutIndex)); ok {
return logs, nil
}
Expand All @@ -165,6 +173,7 @@ func (api *APIImpl) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
go func() {
defer dbg.LogPanic()
headers, id := api.filters.SubscribeNewHeads(32)
api.filters.SetSubscriptionProtocol(rpchelper.SubscriptionID(id), rpchelper.FilterTypeHeads, rpchelper.ProtocolWS)
defer api.filters.UnsubscribeHeads(id)
for {
select {
Expand Down Expand Up @@ -203,6 +212,7 @@ func (api *APIImpl) NewPendingTransactions(ctx context.Context, fullTx *bool) (*
go func() {
defer dbg.LogPanic()
txsCh, id := api.filters.SubscribePendingTxs(256)
api.filters.SetSubscriptionProtocol(rpchelper.SubscriptionID(id), rpchelper.FilterTypePendingTxs, rpchelper.ProtocolWS)
defer api.filters.UnsubscribePendingTxs(id)

for {
Expand Down Expand Up @@ -250,6 +260,7 @@ func (api *APIImpl) NewPendingTransactionsWithBody(ctx context.Context) (*rpc.Su
go func() {
defer dbg.LogPanic()
txsCh, id := api.filters.SubscribePendingTxs(512)
api.filters.SetSubscriptionProtocol(rpchelper.SubscriptionID(id), rpchelper.FilterTypePendingTxs, rpchelper.ProtocolWS)
defer api.filters.UnsubscribePendingTxs(id)

for {
Expand Down Expand Up @@ -291,6 +302,7 @@ func (api *APIImpl) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc
go func() {
defer dbg.LogPanic()
logs, id := api.filters.SubscribeLogs(api.SubscribeLogsChannelSize, crit)
api.filters.SetSubscriptionProtocol(rpchelper.SubscriptionID(id), rpchelper.FilterTypeLogs, rpchelper.ProtocolWS)
defer api.filters.UnsubscribeLogs(id)

for {
Expand Down
26 changes: 16 additions & 10 deletions rpc/rpchelper/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@

package rpchelper

import "time"

const DefaultFilterTimeout = 0 * time.Minute

// FiltersConfig defines the configuration settings for RPC subscription filters.
// Each field represents a limit on the number of respective items that can be stored per subscription.
type FiltersConfig struct {
RpcSubscriptionFiltersMaxLogs int // Maximum number of logs to store per subscription. Default: 0 (no limit)
RpcSubscriptionFiltersMaxHeaders int // Maximum number of block headers to store per subscription. Default: 0 (no limit)
RpcSubscriptionFiltersMaxTxs int // Maximum number of transactions to store per subscription. Default: 0 (no limit)
RpcSubscriptionFiltersMaxAddresses int // Maximum number of addresses per subscription to filter logs by. Default: 0 (no limit)
RpcSubscriptionFiltersMaxTopics int // Maximum number of topics per subscription to filter logs by. Default: 0 (no limit)
RpcSubscriptionFiltersMaxLogs int // Maximum number of logs to store per subscription. Default: 0 (no limit)
RpcSubscriptionFiltersMaxHeaders int // Maximum number of block headers to store per subscription. Default: 0 (no limit)
RpcSubscriptionFiltersMaxTxs int // Maximum number of transactions to store per subscription. Default: 0 (no limit)
RpcSubscriptionFiltersMaxAddresses int // Maximum number of addresses per subscription to filter logs by. Default: 0 (no limit)
RpcSubscriptionFiltersMaxTopics int // Maximum number of topics per subscription to filter logs by. Default: 0 (no limit)
RpcSubscriptionFiltersTimeout time.Duration // Timeout before idle filters are evicted. Default: 0 (no eviction)
}

// DefaultFiltersConfig defines the default settings for filter configurations.
// These default values set no limits on the number of logs, block headers, transactions,
// addresses, or topics that can be stored per subscription.
var DefaultFiltersConfig = FiltersConfig{
RpcSubscriptionFiltersMaxLogs: 0, // No limit on the number of logs per subscription
RpcSubscriptionFiltersMaxHeaders: 0, // No limit on the number of block headers per subscription
RpcSubscriptionFiltersMaxTxs: 0, // No limit on the number of transactions per subscription
RpcSubscriptionFiltersMaxAddresses: 0, // No limit on the number of addresses per subscription to filter logs by
RpcSubscriptionFiltersMaxTopics: 0, // No limit on the number of topics per subscription to filter logs by
RpcSubscriptionFiltersMaxLogs: 0, // No limit on the number of logs per subscription
RpcSubscriptionFiltersMaxHeaders: 0, // No limit on the number of block headers per subscription
RpcSubscriptionFiltersMaxTxs: 0, // No limit on the number of transactions per subscription
RpcSubscriptionFiltersMaxAddresses: 0, // No limit on the number of addresses per subscription to filter logs by
RpcSubscriptionFiltersMaxTopics: 0, // No limit on the number of topics per subscription to filter logs by
RpcSubscriptionFiltersTimeout: DefaultFilterTimeout, // Evict filters not polled within this duration
}
Loading
Loading