diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index b0e29f729f9..71dcb9e7c35 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -143,6 +143,11 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) { rootCmd.PersistentFlags().DurationVar(&cfg.EvmCallTimeout, "rpc.evmtimeout", rpccfg.DefaultEvmCallTimeout, "Maximum amount of time to wait for the answer from EVM call.") rootCmd.PersistentFlags().DurationVar(&cfg.OverlayGetLogsTimeout, "rpc.overlay.getlogstimeout", rpccfg.DefaultOverlayGetLogsTimeout, "Maximum amount of time to wait for the answer from the overlay_getLogs call.") rootCmd.PersistentFlags().DurationVar(&cfg.OverlayReplayBlockTimeout, "rpc.overlay.replayblocktimeout", rpccfg.DefaultOverlayReplayBlockTimeout, "Maximum amount of time to wait for the answer to replay a single block when called from an overlay_getLogs call.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxLogs, "rpc.subscription.filters.maxlogs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxLogs, "Maximum number of logs to store per subscription.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxHeaders, "rpc.subscription.filters.maxheaders", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxHeaders, "Maximum number of block headers to store per subscription.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTxs, "rpc.subscription.filters.maxtxs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs, "Maximum number of transactions to store per subscription.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "rpc.subscription.filters.maxaddresses", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "Maximum number of addresses per subscription to filter logs by.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTopics, "rpc.subscription.filters.maxtopics", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics, "Maximum number of topics per subscription to filter logs by.") rootCmd.PersistentFlags().IntVar(&cfg.BatchLimit, utils.RpcBatchLimit.Name, utils.RpcBatchLimit.Value, utils.RpcBatchLimit.Usage) rootCmd.PersistentFlags().IntVar(&cfg.ReturnDataLimit, utils.RpcReturnDataLimit.Name, utils.RpcReturnDataLimit.Value, utils.RpcReturnDataLimit.Usage) rootCmd.PersistentFlags().Uint64Var(&cfg.LogsMaxRange, utils.RpcLogsMaxRange.Name, utils.RpcLogsMaxRange.Value, utils.RpcLogsMaxRange.Usage) @@ -277,6 +282,7 @@ func checkDbCompatibility(ctx context.Context, db kv.RoDB) error { func EmbeddedServices(ctx context.Context, erigonDB kv.RoDB, stateCacheCfg kvcache.CoherentConfig, + rpcFiltersConfig rpchelper.FiltersConfig, blockReader services.FullBlockReader, ethBackendServer remote.ETHBACKENDServer, txPoolServer txpool.TxpoolServer, miningServer txpool.MiningServer, stateDiffClient StateChangesClient, logger log.Logger, @@ -299,7 +305,7 @@ func EmbeddedServices(ctx context.Context, txPool = direct.NewTxPoolClient(txPoolServer) mining = direct.NewMiningClient(miningServer) - ff = rpchelper.New(ctx, eth, txPool, mining, func() {}, logger) + ff = rpchelper.New(ctx, rpcFiltersConfig, eth, txPool, mining, func() {}, logger) return } @@ -556,7 +562,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger } }() - ff = rpchelper.New(ctx, eth, txPool, mining, onNewSnapshot, logger) + ff = rpchelper.New(ctx, cfg.RpcFiltersConfig, eth, txPool, mining, onNewSnapshot, logger) return db, dbsmt, eth, txPool, mining, stateCache, blockReader, engine, ff, agg, err } diff --git a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go index 8f3fa4dc772..88ee22b4a2d 100644 --- a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go +++ b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go @@ -1,6 +1,7 @@ package httpcfg import ( + "github.com/ledgerwatch/erigon/turbo/rpchelper" "time" "github.com/ledgerwatch/erigon-lib/common/datadir" @@ -54,6 +55,7 @@ type HttpCfg struct { RpcAllowListFilePath string RpcBatchConcurrency uint RpcStreamingDisable bool + RpcFiltersConfig rpchelper.FiltersConfig DBReadConcurrency int TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum TxPoolApiAddr string diff --git a/erigon-lib/common/concurrent/concurrent.go b/erigon-lib/common/concurrent/concurrent.go new file mode 100644 index 00000000000..a29301ea79b --- /dev/null +++ b/erigon-lib/common/concurrent/concurrent.go @@ -0,0 +1,79 @@ +package concurrent + +import "sync" + +// NewSyncMap initializes and returns a new instance of SyncMap. +func NewSyncMap[K comparable, T any]() *SyncMap[K, T] { + return &SyncMap[K, T]{ + m: make(map[K]T), + } +} + +// SyncMap is a generic map that uses a read-write mutex to ensure thread-safe access. +type SyncMap[K comparable, T any] struct { + m map[K]T + mu sync.RWMutex +} + +// Get retrieves the value associated with the given key. +func (m *SyncMap[K, T]) Get(k K) (res T, ok bool) { + m.mu.RLock() + defer m.mu.RUnlock() + res, ok = m.m[k] + return res, ok +} + +// Put sets the value for the given key, returning the previous value if present. +func (m *SyncMap[K, T]) Put(k K, v T) (T, bool) { + m.mu.Lock() + defer m.mu.Unlock() + old, ok := m.m[k] + m.m[k] = v + return old, ok +} + +// Do performs a custom operation on the value associated with the given key. +func (m *SyncMap[K, T]) Do(k K, fn func(T, bool) (T, bool)) (after T, ok bool) { + m.mu.Lock() + defer m.mu.Unlock() + val, ok := m.m[k] + nv, save := fn(val, ok) + if save { + m.m[k] = nv + } else { + delete(m.m, k) + } + return nv, ok +} + +// DoAndStore performs a custom operation on the value associated with the given key and stores the result. +func (m *SyncMap[K, T]) DoAndStore(k K, fn func(t T, ok bool) T) (after T, ok bool) { + return m.Do(k, func(t T, b bool) (T, bool) { + res := fn(t, b) + return res, true + }) +} + +// Range calls a function for each key-value pair in the map. +func (m *SyncMap[K, T]) Range(fn func(k K, v T) error) error { + m.mu.RLock() + defer m.mu.RUnlock() + for k, v := range m.m { + if err := fn(k, v); err != nil { + return err + } + } + return nil +} + +// Delete removes the value associated with the given key, if present. +func (m *SyncMap[K, T]) Delete(k K) (t T, deleted bool) { + m.mu.Lock() + defer m.mu.Unlock() + val, ok := m.m[k] + if !ok { + return t, false + } + delete(m.m, k) + return val, true +} diff --git a/erigon-lib/metrics/register.go b/erigon-lib/metrics/register.go index 2ac13a6b4ca..46624d2216a 100644 --- a/erigon-lib/metrics/register.go +++ b/erigon-lib/metrics/register.go @@ -2,6 +2,7 @@ package metrics import ( "fmt" + "github.com/prometheus/client_golang/prometheus" ) // NewCounter registers and returns new counter with the given name. @@ -88,6 +89,26 @@ func GetOrCreateGauge(name string) Gauge { return &gauge{g} } +// GetOrCreateGaugeVec returns registered GaugeVec with the given name +// or creates a new GaugeVec if the registry doesn't contain a GaugeVec with +// the given name and labels. +// +// name must be a valid Prometheus-compatible metric with possible labels. +// labels are the names of the dimensions associated with the gauge vector. +// For instance, +// +// - foo, with labels []string{"bar", "baz"} +// +// The returned GaugeVec is safe to use from concurrent goroutines. +func GetOrCreateGaugeVec(name string, labels []string, help ...string) *prometheus.GaugeVec { + gv, err := defaultSet.GetOrCreateGaugeVec(name, labels, help...) + if err != nil { + panic(fmt.Errorf("could not get or create new gaugevec: %w", err)) + } + + return gv +} + // NewSummary creates and returns new summary with the given name. // // name must be valid Prometheus-compatible metric with possible labels. diff --git a/erigon-lib/metrics/set.go b/erigon-lib/metrics/set.go index 2b0418fd2bd..289ac6970d0 100644 --- a/erigon-lib/metrics/set.go +++ b/erigon-lib/metrics/set.go @@ -2,6 +2,7 @@ package metrics import ( "fmt" + "reflect" "sort" "strings" "sync" @@ -16,15 +17,23 @@ type namedMetric struct { isAux bool } +type namedMetricVec struct { + name string + metric *prometheus.GaugeVec + isAux bool +} + // Set is a set of metrics. // // Metrics belonging to a set are exported separately from global metrics. // // Set.WritePrometheus must be called for exporting metrics from the set. type Set struct { - mu sync.Mutex - a []*namedMetric - m map[string]*namedMetric + mu sync.Mutex + a []*namedMetric + av []*namedMetricVec + m map[string]*namedMetric + vecs map[string]*namedMetricVec } var defaultSet = NewSet() @@ -34,7 +43,8 @@ var defaultSet = NewSet() // Pass the set to RegisterSet() function in order to export its metrics via global WritePrometheus() call. func NewSet() *Set { return &Set{ - m: make(map[string]*namedMetric), + m: make(map[string]*namedMetric), + vecs: make(map[string]*namedMetricVec), } } @@ -46,11 +56,18 @@ func (s *Set) Describe(ch chan<- *prometheus.Desc) { if !sort.SliceIsSorted(s.a, lessFunc) { sort.Slice(s.a, lessFunc) } + if !sort.SliceIsSorted(s.av, lessFunc) { + sort.Slice(s.av, lessFunc) + } sa := append([]*namedMetric(nil), s.a...) + sav := append([]*namedMetricVec(nil), s.av...) s.mu.Unlock() for _, nm := range sa { ch <- nm.metric.Desc() } + for _, nmv := range sav { + nmv.metric.Describe(ch) + } } func (s *Set) Collect(ch chan<- prometheus.Metric) { @@ -61,11 +78,18 @@ func (s *Set) Collect(ch chan<- prometheus.Metric) { if !sort.SliceIsSorted(s.a, lessFunc) { sort.Slice(s.a, lessFunc) } + if !sort.SliceIsSorted(s.av, lessFunc) { + sort.Slice(s.av, lessFunc) + } sa := append([]*namedMetric(nil), s.a...) + sav := append([]*namedMetricVec(nil), s.av...) s.mu.Unlock() for _, nm := range sa { ch <- nm.metric } + for _, nmv := range sav { + nmv.metric.Collect(ch) + } } // NewHistogram creates and returns new histogram in s with the given name. @@ -307,6 +331,78 @@ func (s *Set) GetOrCreateGauge(name string, help ...string) (prometheus.Gauge, e return g, nil } +// GetOrCreateGaugeVec returns registered GaugeVec in s with the given name +// or creates new GaugeVec if s doesn't contain GaugeVec with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// labels are the labels associated with the GaugeVec. +// +// The returned GaugeVec is safe to use from concurrent goroutines. +func (s *Set) GetOrCreateGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) { + s.mu.Lock() + nm := s.vecs[name] + s.mu.Unlock() + if nm == nil { + metric, err := newGaugeVec(name, labels, help...) + if err != nil { + return nil, fmt.Errorf("invalid metric name %q: %w", name, err) + } + + nmNew := &namedMetricVec{ + name: name, + metric: metric, + } + + s.mu.Lock() + nm = s.vecs[name] + if nm == nil { + nm = nmNew + s.vecs[name] = nm + s.av = append(s.av, nm) + } + s.mu.Unlock() + s.registerMetricVec(name, metric, false) + } + + if nm.metric == nil { + return nil, fmt.Errorf("metric %q is nil", name) + } + + metricType := reflect.TypeOf(nm.metric) + if metricType != reflect.TypeOf(&prometheus.GaugeVec{}) { + return nil, fmt.Errorf("metric %q isn't a GaugeVec. It is %s", name, metricType) + } + + return nm.metric, nil +} + +// newGaugeVec creates a new Prometheus GaugeVec. +func newGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) { + name, constLabels, err := parseMetric(name) + if err != nil { + return nil, err + } + + helpStr := "gauge metric" + if len(help) > 0 { + helpStr = strings.Join(help, ", ") + } + + gv := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: name, + Help: helpStr, + ConstLabels: constLabels, + }, labels) + + return gv, nil +} + const defaultSummaryWindow = 5 * time.Minute var defaultSummaryQuantiles = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.97: 0.003, 0.99: 0.001} @@ -431,6 +527,21 @@ func (s *Set) registerMetric(name string, m prometheus.Metric) { s.mustRegisterLocked(name, m) } +func (s *Set) registerMetricVec(name string, mv *prometheus.GaugeVec, isAux bool) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, exists := s.vecs[name]; !exists { + nmv := &namedMetricVec{ + name: name, + metric: mv, + isAux: isAux, + } + s.vecs[name] = nmv + s.av = append(s.av, nmv) + } +} + // mustRegisterLocked registers given metric with the given name. // // Panics if the given name was already registered before. diff --git a/eth/backend.go b/eth/backend.go index 1d1428fc67b..42320576480 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1489,7 +1489,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig } // start HTTP API httpRpcCfg := stack.Config().Http - ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, ethBackendRPC, + ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, httpRpcCfg.RpcFiltersConfig, blockReader, ethBackendRPC, s.txPool2GrpcServer, miningRPC, stateDiffClient, s.logger) if err != nil { return err diff --git a/go.mod b/go.mod index a489bd5df60..7dddc7dae34 100644 --- a/go.mod +++ b/go.mod @@ -88,7 +88,7 @@ require ( github.com/pion/randutil v0.1.0 github.com/pion/stun v0.3.5 github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.19.0 + github.com/prometheus/client_golang v1.19.1 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.48.0 github.com/protolambda/ztyp v0.2.2 diff --git a/go.sum b/go.sum index ef06ff873fa..f3801cb86b1 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,6 @@ filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7 gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c h1:alCfDKmPC0EC0KGlZWrNF0hilVWBkzMz+aAYTJ/2hY4= gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c/go.mod h1:WvSX4JsCRBuIXj0FRBFX9YLg+2SoL3w8Ww19uZO9yNE= git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= -github.com/0xPolygonHermez/zkevm-data-streamer v0.2.8 h1:0Sc91seArqR3BQs49SGwGXWjf0MQYW98sEUYrao7duU= -github.com/0xPolygonHermez/zkevm-data-streamer v0.2.8/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE= github.com/0xPolygonHermez/zkevm-data-streamer v0.2.9 h1:9F73F/hDSwG7tF85TVdu9t47/qZ6KHXc4WZhsirCSxw= github.com/0xPolygonHermez/zkevm-data-streamer v0.2.9/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE= github.com/99designs/gqlgen v0.17.40 h1:/l8JcEVQ93wqIfmH9VS1jsAkwm6eAF1NwQn3N+SDqBY= @@ -948,8 +946,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= -github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= -github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index b6058a955b3..cb69d3d1096 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -100,6 +100,12 @@ var DefaultFlags = []cli.Flag{ &OverlayGetLogsFlag, &OverlayReplayBlockFlag, + &RpcSubscriptionFiltersMaxLogsFlag, + &RpcSubscriptionFiltersMaxHeadersFlag, + &RpcSubscriptionFiltersMaxTxsFlag, + &RpcSubscriptionFiltersMaxAddressesFlag, + &RpcSubscriptionFiltersMaxTopicsFlag, + &utils.SnapKeepBlocksFlag, &utils.SnapStopFlag, &utils.DbPageSizeFlag, diff --git a/turbo/cli/flags.go b/turbo/cli/flags.go index 0aa4056bffc..781902b0c3f 100644 --- a/turbo/cli/flags.go +++ b/turbo/cli/flags.go @@ -27,6 +27,7 @@ import ( "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/node/nodecfg" + "github.com/ledgerwatch/erigon/turbo/rpchelper" ) var ( @@ -247,6 +248,32 @@ var ( Value: rpccfg.DefaultOverlayReplayBlockTimeout, } + RpcSubscriptionFiltersMaxLogsFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxlogs", + Usage: "Maximum number of logs to store per subscription.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxLogs, + } + RpcSubscriptionFiltersMaxHeadersFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxheaders", + Usage: "Maximum number of block headers to store per subscription.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxHeaders, + } + RpcSubscriptionFiltersMaxTxsFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxtxs", + Usage: "Maximum number of transactions to store per subscription.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs, + } + RpcSubscriptionFiltersMaxAddressesFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxaddresses", + Usage: "Maximum number of addresses per subscription to filter logs by.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses, + } + RpcSubscriptionFiltersMaxTopicsFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxtopics", + Usage: "Maximum number of topics per subscription to filter logs by.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics, + } + TxPoolCommitEvery = cli.DurationFlag{ Name: "txpool.commit.every", Usage: "How often transactions should be committed to the storage", @@ -500,15 +527,22 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name), DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name), RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name), - Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name), - Feecap: ctx.Float64(utils.RPCGlobalTxFeeCapFlag.Name), - MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name), - TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name), - BatchLimit: ctx.Int(utils.RpcBatchLimit.Name), - ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name), - LogsMaxRange: ctx.Uint64(utils.RpcLogsMaxRange.Name), - AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name), - MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name), + RpcFiltersConfig: rpchelper.FiltersConfig{ + RpcSubscriptionFiltersMaxLogs: ctx.Int(RpcSubscriptionFiltersMaxLogsFlag.Name), + RpcSubscriptionFiltersMaxHeaders: ctx.Int(RpcSubscriptionFiltersMaxHeadersFlag.Name), + RpcSubscriptionFiltersMaxTxs: ctx.Int(RpcSubscriptionFiltersMaxTxsFlag.Name), + RpcSubscriptionFiltersMaxAddresses: ctx.Int(RpcSubscriptionFiltersMaxAddressesFlag.Name), + RpcSubscriptionFiltersMaxTopics: ctx.Int(RpcSubscriptionFiltersMaxTopicsFlag.Name), + }, + Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name), + Feecap: ctx.Float64(utils.RPCGlobalTxFeeCapFlag.Name), + MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name), + TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name), + BatchLimit: ctx.Int(utils.RpcBatchLimit.Name), + ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name), + LogsMaxRange: ctx.Uint64(utils.RpcLogsMaxRange.Name), + AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name), + MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name), TxPoolApiAddr: ctx.String(utils.TxpoolApiAddrFlag.Name), diff --git a/turbo/jsonrpc/eth_block_test.go b/turbo/jsonrpc/eth_block_test.go index 699475d6ae6..808a03c0f4f 100644 --- a/turbo/jsonrpc/eth_block_test.go +++ b/turbo/jsonrpc/eth_block_test.go @@ -77,7 +77,7 @@ func TestGetBlockByNumberWithPendingTag(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) expected := 1 header := &types.Header{ diff --git a/turbo/jsonrpc/eth_call_test.go b/turbo/jsonrpc/eth_call_test.go index 565e86373c4..ffe1a8c6330 100644 --- a/turbo/jsonrpc/eth_call_test.go +++ b/turbo/jsonrpc/eth_call_test.go @@ -46,7 +46,7 @@ func TestEstimateGas(t *testing.T) { stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mock.Mock(t)) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) var from = libcommon.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") var to = libcommon.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e") diff --git a/turbo/jsonrpc/eth_filters_test.go b/turbo/jsonrpc/eth_filters_test.go index 523d0893049..d2d7a68c6e1 100644 --- a/turbo/jsonrpc/eth_filters_test.go +++ b/turbo/jsonrpc/eth_filters_test.go @@ -37,7 +37,7 @@ func TestNewFilters(t *testing.T) { stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) ptf, err := api.NewPendingTransactionFilter(ctx) @@ -66,7 +66,7 @@ func TestLogsSubscribeAndUnsubscribe_WithoutConcurrentMapIssue(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, log.New()) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, log.New()) // generate some random topics topics := make([][]libcommon.Hash, 0) diff --git a/turbo/jsonrpc/eth_mining_test.go b/turbo/jsonrpc/eth_mining_test.go index ef1190e3e6b..7d12ecd3429 100644 --- a/turbo/jsonrpc/eth_mining_test.go +++ b/turbo/jsonrpc/eth_mining_test.go @@ -26,7 +26,7 @@ func TestPendingBlock(t *testing.T) { m := mock.Mock(t) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mock.Mock(t)) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) engine := ethash.NewFaker() api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, nil, false, rpccfg.DefaultEvmCallTimeout, engine, @@ -53,7 +53,7 @@ func TestPendingLogs(t *testing.T) { m := mock.Mock(t) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) expect := []byte{211} ch, id := ff.SubscribePendingLogs(1) diff --git a/turbo/jsonrpc/eth_subscribe_test.go b/turbo/jsonrpc/eth_subscribe_test.go index e78e2f464d6..c02c46e8454 100644 --- a/turbo/jsonrpc/eth_subscribe_test.go +++ b/turbo/jsonrpc/eth_subscribe_test.go @@ -52,7 +52,7 @@ func TestEthSubscribe(t *testing.T) { backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, m.BlockReader, logger, builder.NewLatestBlockBuiltStore()) backendClient := direct.NewEthBackendClientDirect(backendServer) backend := rpcservices.NewRemoteBackend(backendClient, m.DB, m.BlockReader) - ff := rpchelper.New(ctx, backend, nil, nil, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, backend, nil, nil, func() {}, m.Log) newHeads, id := ff.SubscribeNewHeads(16) defer ff.UnsubscribeHeads(id) diff --git a/turbo/jsonrpc/send_transaction_test.go b/turbo/jsonrpc/send_transaction_test.go index f26b73cc1fb..fedb383f490 100644 --- a/turbo/jsonrpc/send_transaction_test.go +++ b/turbo/jsonrpc/send_transaction_test.go @@ -88,7 +88,7 @@ func TestSendRawTransaction(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mockSentry) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, mockSentry.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, mockSentry.Log) api := jsonrpc.NewEthAPI(newBaseApiForTest(mockSentry), mockSentry.DB, mockSentry.DBSMT, nil, txPool, nil, 5000000, 1e18, 100_000, ðconfig.Defaults, false, 100_000, 128, logger, nil, 1000) api.BadTxAllowance = 1 @@ -143,7 +143,7 @@ func TestSendRawTransactionUnprotected(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mockSentry) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, mockSentry.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, mockSentry.Log) api := jsonrpc.NewEthAPI(newBaseApiForTest(mockSentry), mockSentry.DB, mockSentry.DBSMT, nil, txPool, nil, 5000000, 1e18, 100_000, ðconfig.Defaults, false, 100_000, 128, logger, nil, 1000) api.BadTxAllowance = 1 diff --git a/turbo/jsonrpc/txpool_api_test.go b/turbo/jsonrpc/txpool_api_test.go index e8238d93566..57f264a0898 100644 --- a/turbo/jsonrpc/txpool_api_test.go +++ b/turbo/jsonrpc/txpool_api_test.go @@ -37,7 +37,7 @@ func TestTxPoolContent(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) agg := m.HistoryV3Components() api := NewTxPoolAPI(NewBaseApi(ff, kvcache.New(kvcache.DefaultCoherentConfig), m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, txPool, "") diff --git a/turbo/rpchelper/config.go b/turbo/rpchelper/config.go new file mode 100644 index 00000000000..21610b6a508 --- /dev/null +++ b/turbo/rpchelper/config.go @@ -0,0 +1,22 @@ +package rpchelper + +// FiltersConfig defines the configuration settings for RPC subscription filters. +// Each field represents a limit on the number of respective items that can be stored per subscription. +type FiltersConfig struct { + RpcSubscriptionFiltersMaxLogs int // Maximum number of logs to store per subscription. Default: 0 (no limit) + RpcSubscriptionFiltersMaxHeaders int // Maximum number of block headers to store per subscription. Default: 0 (no limit) + RpcSubscriptionFiltersMaxTxs int // Maximum number of transactions to store per subscription. Default: 0 (no limit) + RpcSubscriptionFiltersMaxAddresses int // Maximum number of addresses per subscription to filter logs by. Default: 0 (no limit) + RpcSubscriptionFiltersMaxTopics int // Maximum number of topics per subscription to filter logs by. Default: 0 (no limit) +} + +// DefaultFiltersConfig defines the default settings for filter configurations. +// These default values set no limits on the number of logs, block headers, transactions, +// addresses, or topics that can be stored per subscription. +var DefaultFiltersConfig = FiltersConfig{ + RpcSubscriptionFiltersMaxLogs: 0, // No limit on the number of logs per subscription + RpcSubscriptionFiltersMaxHeaders: 0, // No limit on the number of block headers per subscription + RpcSubscriptionFiltersMaxTxs: 0, // No limit on the number of transactions per subscription + RpcSubscriptionFiltersMaxAddresses: 0, // No limit on the number of addresses per subscription to filter logs by + RpcSubscriptionFiltersMaxTopics: 0, // No limit on the number of topics per subscription to filter logs by +} diff --git a/turbo/rpchelper/filters.go b/turbo/rpchelper/filters.go index 4265716fd5a..c5ead362835 100644 --- a/turbo/rpchelper/filters.go +++ b/turbo/rpchelper/filters.go @@ -12,12 +12,14 @@ import ( "sync/atomic" "time" + "github.com/prometheus/client_golang/prometheus" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/concurrent" "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool" - txpool2 "github.com/ledgerwatch/erigon/zk/txpool" "github.com/ledgerwatch/log/v3" "google.golang.org/grpc" @@ -26,49 +28,59 @@ import ( "github.com/ledgerwatch/erigon/rlp" ) +// Filters holds the state for managing subscriptions to various Ethereum events. +// It allows for the subscription and management of events such as new blocks, pending transactions, +// logs, and other Ethereum-related activities. type Filters struct { mu sync.RWMutex pendingBlock *types.Block - headsSubs *SyncMap[HeadsSubID, Sub[*types.Header]] - pendingLogsSubs *SyncMap[PendingLogsSubID, Sub[types.Logs]] - pendingBlockSubs *SyncMap[PendingBlockSubID, Sub[*types.Block]] - pendingTxsSubs *SyncMap[PendingTxsSubID, Sub[[]types.Transaction]] + headsSubs *concurrent.SyncMap[HeadsSubID, Sub[*types.Header]] + pendingLogsSubs *concurrent.SyncMap[PendingLogsSubID, Sub[types.Logs]] + pendingBlockSubs *concurrent.SyncMap[PendingBlockSubID, Sub[*types.Block]] + pendingTxsSubs *concurrent.SyncMap[PendingTxsSubID, Sub[[]types.Transaction]] logsSubs *LogsFilterAggregator logsRequestor atomic.Value onNewSnapshot func() - storeMu sync.Mutex - logsStores *SyncMap[LogsSubID, []*types.Log] - pendingHeadsStores *SyncMap[HeadsSubID, []*types.Header] - pendingTxsStores *SyncMap[PendingTxsSubID, [][]types.Transaction] + logsStores *concurrent.SyncMap[LogsSubID, []*types.Log] + pendingHeadsStores *concurrent.SyncMap[HeadsSubID, []*types.Header] + pendingTxsStores *concurrent.SyncMap[PendingTxsSubID, [][]types.Transaction] logger log.Logger + + config FiltersConfig } -func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, onNewSnapshot func(), logger log.Logger) *Filters { +// New creates a new Filters instance, initializes it, and starts subscription goroutines for Ethereum events. +// It requires a context, Ethereum backend, transaction pool client, mining client, snapshot callback function, +// and a logger for logging events. +func New(ctx context.Context, config FiltersConfig, ethBackend ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, onNewSnapshot func(), logger log.Logger) *Filters { logger.Info("rpc filters: subscribing to Erigon events") ff := &Filters{ - headsSubs: NewSyncMap[HeadsSubID, Sub[*types.Header]](), - pendingTxsSubs: NewSyncMap[PendingTxsSubID, Sub[[]types.Transaction]](), - pendingLogsSubs: NewSyncMap[PendingLogsSubID, Sub[types.Logs]](), - pendingBlockSubs: NewSyncMap[PendingBlockSubID, Sub[*types.Block]](), + headsSubs: concurrent.NewSyncMap[HeadsSubID, Sub[*types.Header]](), + pendingTxsSubs: concurrent.NewSyncMap[PendingTxsSubID, Sub[[]types.Transaction]](), + pendingLogsSubs: concurrent.NewSyncMap[PendingLogsSubID, Sub[types.Logs]](), + pendingBlockSubs: concurrent.NewSyncMap[PendingBlockSubID, Sub[*types.Block]](), logsSubs: NewLogsFilterAggregator(), onNewSnapshot: onNewSnapshot, - logsStores: NewSyncMap[LogsSubID, []*types.Log](), - pendingHeadsStores: NewSyncMap[HeadsSubID, []*types.Header](), - pendingTxsStores: NewSyncMap[PendingTxsSubID, [][]types.Transaction](), + logsStores: concurrent.NewSyncMap[LogsSubID, []*types.Log](), + pendingHeadsStores: concurrent.NewSyncMap[HeadsSubID, []*types.Header](), + pendingTxsStores: concurrent.NewSyncMap[PendingTxsSubID, [][]types.Transaction](), logger: logger, + config: config, } go func() { if ethBackend == nil { return } + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Events"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Events"}).Dec() return default: } @@ -76,6 +88,7 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, if err := ethBackend.Subscribe(ctx, ff.OnNewEvent); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Events"}).Dec() return default: } @@ -92,15 +105,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, if ethBackend == nil { return } + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Logs"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Logs"}).Dec() return default: } if err := ethBackend.SubscribeLogs(ctx, ff.OnNewLogs, &ff.logsRequestor); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Logs"}).Dec() return default: } @@ -115,19 +131,22 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, if txPool != nil { go func() { + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingTxs"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingTxs"}).Dec() return default: } if err := ff.subscribeToPendingTransactions(ctx, txPool); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingTxs"}).Dec() return default: } - if grpcutil.IsEndOfStream(err) || grpcutil.IsRetryLater(err) || grpcutil.ErrIs(err, txpool2.ErrPoolDisabled) { + if grpcutil.IsEndOfStream(err) || grpcutil.IsRetryLater(err) || isTxPoolDisabledErr(err) { time.Sleep(3 * time.Second) continue } @@ -138,15 +157,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, if !reflect.ValueOf(mining).IsNil() { //https://groups.google.com/g/golang-nuts/c/wnH302gBa4I go func() { + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingBlock"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingBlock"}).Dec() return default: } if err := ff.subscribeToPendingBlocks(ctx, mining); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingBlock"}).Dec() return default: } @@ -159,15 +181,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, } }() go func() { + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingLogs"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingLogs"}).Dec() return default: } if err := ff.subscribeToPendingLogs(ctx, mining); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingLogs"}).Dec() return default: } @@ -185,12 +210,15 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, return ff } +// LastPendingBlock returns the last pending block that was received. func (ff *Filters) LastPendingBlock() *types.Block { ff.mu.RLock() defer ff.mu.RUnlock() return ff.pendingBlock } +// subscribeToPendingTransactions subscribes to pending transactions using the given transaction pool client. +// It listens for new transactions and processes them as they arrive. func (ff *Filters) subscribeToPendingTransactions(ctx context.Context, txPool txpool.TxpoolClient) error { subscription, err := txPool.OnAdd(ctx, &txpool.OnAddRequest{}, grpc.WaitForReady(true)) if err != nil { @@ -211,6 +239,8 @@ func (ff *Filters) subscribeToPendingTransactions(ctx context.Context, txPool tx return nil } +// subscribeToPendingBlocks subscribes to pending blocks using the given mining client. +// It listens for new pending blocks and processes them as they arrive. func (ff *Filters) subscribeToPendingBlocks(ctx context.Context, mining txpool.MiningClient) error { subscription, err := mining.OnPendingBlock(ctx, &txpool.OnPendingBlockRequest{}, grpc.WaitForReady(true)) if err != nil { @@ -237,6 +267,8 @@ func (ff *Filters) subscribeToPendingBlocks(ctx context.Context, mining txpool.M return nil } +// HandlePendingBlock handles a new pending block received from the mining client. +// It updates the internal state and notifies subscribers about the new block. func (ff *Filters) HandlePendingBlock(reply *txpool.OnPendingBlockReply) { b := &types.Block{} if reply == nil || len(reply.RplBlock) == 0 { @@ -256,6 +288,8 @@ func (ff *Filters) HandlePendingBlock(reply *txpool.OnPendingBlockReply) { }) } +// subscribeToPendingLogs subscribes to pending logs using the given mining client. +// It listens for new pending logs and processes them as they arrive. func (ff *Filters) subscribeToPendingLogs(ctx context.Context, mining txpool.MiningClient) error { subscription, err := mining.OnPendingLogs(ctx, &txpool.OnPendingLogsRequest{}, grpc.WaitForReady(true)) if err != nil { @@ -281,6 +315,8 @@ func (ff *Filters) subscribeToPendingLogs(ctx context.Context, mining txpool.Min return nil } +// HandlePendingLogs handles new pending logs received from the mining client. +// It updates the internal state and notifies subscribers about the new logs. func (ff *Filters) HandlePendingLogs(reply *txpool.OnPendingLogsReply) { if len(reply.RplLogs) == 0 { return @@ -295,6 +331,8 @@ func (ff *Filters) HandlePendingLogs(reply *txpool.OnPendingLogsReply) { }) } +// SubscribeNewHeads subscribes to new block headers and returns a channel to receive the headers +// and a subscription ID to manage the subscription. func (ff *Filters) SubscribeNewHeads(size int) (<-chan *types.Header, HeadsSubID) { id := HeadsSubID(generateSubscriptionID()) sub := newChanSub[*types.Header](size) @@ -302,6 +340,8 @@ func (ff *Filters) SubscribeNewHeads(size int) (<-chan *types.Header, HeadsSubID return sub.ch, id } +// UnsubscribeHeads unsubscribes from new block headers using the given subscription ID. +// It returns true if the unsubscription was successful, otherwise false. func (ff *Filters) UnsubscribeHeads(id HeadsSubID) bool { ch, ok := ff.headsSubs.Get(id) if !ok { @@ -315,6 +355,8 @@ func (ff *Filters) UnsubscribeHeads(id HeadsSubID) bool { return true } +// SubscribePendingLogs subscribes to pending logs and returns a channel to receive the logs +// and a subscription ID to manage the subscription. It uses the specified filter criteria. func (ff *Filters) SubscribePendingLogs(size int) (<-chan types.Logs, PendingLogsSubID) { id := PendingLogsSubID(generateSubscriptionID()) sub := newChanSub[types.Logs](size) @@ -322,6 +364,7 @@ func (ff *Filters) SubscribePendingLogs(size int) (<-chan types.Logs, PendingLog return sub.ch, id } +// UnsubscribePendingLogs unsubscribes from pending logs using the given subscription ID. func (ff *Filters) UnsubscribePendingLogs(id PendingLogsSubID) { ch, ok := ff.pendingLogsSubs.Get(id) if !ok { @@ -331,6 +374,8 @@ func (ff *Filters) UnsubscribePendingLogs(id PendingLogsSubID) { ff.pendingLogsSubs.Delete(id) } +// SubscribePendingBlock subscribes to pending blocks and returns a channel to receive the blocks +// and a subscription ID to manage the subscription. func (ff *Filters) SubscribePendingBlock(size int) (<-chan *types.Block, PendingBlockSubID) { id := PendingBlockSubID(generateSubscriptionID()) sub := newChanSub[*types.Block](size) @@ -338,6 +383,7 @@ func (ff *Filters) SubscribePendingBlock(size int) (<-chan *types.Block, Pending return sub.ch, id } +// UnsubscribePendingBlock unsubscribes from pending blocks using the given subscription ID. func (ff *Filters) UnsubscribePendingBlock(id PendingBlockSubID) { ch, ok := ff.pendingBlockSubs.Get(id) if !ok { @@ -347,6 +393,8 @@ func (ff *Filters) UnsubscribePendingBlock(id PendingBlockSubID) { ff.pendingBlockSubs.Delete(id) } +// SubscribePendingTxs subscribes to pending transactions and returns a channel to receive the transactions +// and a subscription ID to manage the subscription. func (ff *Filters) SubscribePendingTxs(size int) (<-chan []types.Transaction, PendingTxsSubID) { id := PendingTxsSubID(generateSubscriptionID()) sub := newChanSub[[]types.Transaction](size) @@ -354,6 +402,8 @@ func (ff *Filters) SubscribePendingTxs(size int) (<-chan []types.Transaction, Pe return sub.ch, id } +// UnsubscribePendingTxs unsubscribes from pending transactions using the given subscription ID. +// It returns true if the unsubscription was successful, otherwise false. func (ff *Filters) UnsubscribePendingTxs(id PendingTxsSubID) bool { ch, ok := ff.pendingTxsSubs.Get(id) if !ok { @@ -367,31 +417,63 @@ func (ff *Filters) UnsubscribePendingTxs(id PendingTxsSubID) bool { return true } -func (ff *Filters) SubscribeLogs(size int, crit filters.FilterCriteria) (<-chan *types.Log, LogsSubID) { +// SubscribeLogs subscribes to logs using the specified filter criteria and returns a channel to receive the logs +// and a subscription ID to manage the subscription. +func (ff *Filters) SubscribeLogs(size int, criteria filters.FilterCriteria) (<-chan *types.Log, LogsSubID) { sub := newChanSub[*types.Log](size) id, f := ff.logsSubs.insertLogsFilter(sub) - f.addrs = map[libcommon.Address]int{} - if len(crit.Addresses) == 0 { + + // Initialize address and topic maps + f.addrs = concurrent.NewSyncMap[libcommon.Address, int]() + f.topics = concurrent.NewSyncMap[libcommon.Hash, int]() + + // Handle addresses + if len(criteria.Addresses) == 0 { + // If no addresses are specified, it means all addresses should be included f.allAddrs = 1 } else { - for _, addr := range crit.Addresses { - f.addrs[addr] = 1 + // Limit the number of addresses + addressCount := 0 + for _, addr := range criteria.Addresses { + if ff.config.RpcSubscriptionFiltersMaxAddresses == 0 || addressCount < ff.config.RpcSubscriptionFiltersMaxAddresses { + f.addrs.Put(addr, 1) + addressCount++ + } else { + break + } } } - f.topics = map[libcommon.Hash]int{} - if len(crit.Topics) == 0 { + + // Handle topics and track the allowed topics + if len(criteria.Topics) == 0 { + // If no topics are specified, it means all topics should be included f.allTopics = 1 } else { - for _, topics := range crit.Topics { + // Limit the number of topics + topicCount := 0 + allowedTopics := [][]libcommon.Hash{} + for _, topics := range criteria.Topics { + allowedTopicsRow := []libcommon.Hash{} for _, topic := range topics { - f.topics[topic] = 1 + if ff.config.RpcSubscriptionFiltersMaxTopics == 0 || topicCount < ff.config.RpcSubscriptionFiltersMaxTopics { + f.topics.Put(topic, 1) + allowedTopicsRow = append(allowedTopicsRow, topic) + topicCount++ + } else { + break + } + } + if len(allowedTopicsRow) > 0 { + allowedTopics = append(allowedTopics, allowedTopicsRow) } } + f.topicsOriginal = allowedTopics } - f.topicsOriginal = crit.Topics + + // Add the filter to the list of log filters ff.logsSubs.addLogsFilters(f) - // if any filter in the aggregate needs all addresses or all topics then the global log subscription needs to - // allow all addresses or topics through + + // Create a filter request based on the aggregated filters lfr := ff.logsSubs.createFilterRequest() addresses, topics := ff.logsSubs.getAggMaps() for addr := range addresses { @@ -412,12 +494,15 @@ func (ff *Filters) SubscribeLogs(size int, crit filters.FilterCriteria) (<-chan return sub.ch, id } +// loadLogsRequester loads the current logs requester and returns it. func (ff *Filters) loadLogsRequester() any { ff.mu.Lock() defer ff.mu.Unlock() return ff.logsRequestor.Load() } +// UnsubscribeLogs unsubscribes from logs using the given subscription ID. +// It returns true if the unsubscription was successful, otherwise false. func (ff *Filters) UnsubscribeLogs(id LogsSubID) bool { isDeleted := ff.logsSubs.removeLogsFilter(id) // if any filters in the aggregate need all addresses or all topics then the request to the central @@ -445,11 +530,12 @@ func (ff *Filters) UnsubscribeLogs(id LogsSubID) bool { return isDeleted } +// deleteLogStore deletes the log store associated with the given subscription ID. func (ff *Filters) deleteLogStore(id LogsSubID) { ff.logsStores.Delete(id) } -// OnNewEvent is called when there is a new Event from the remote +// OnNewEvent is called when there is a new event from the remote and processes it. func (ff *Filters) OnNewEvent(event *remote.SubscribeReply) { err := ff.onNewEvent(event) if err != nil { @@ -457,6 +543,7 @@ func (ff *Filters) OnNewEvent(event *remote.SubscribeReply) { } } +// onNewEvent processes the given event from the remote and updates the internal state. func (ff *Filters) onNewEvent(event *remote.SubscribeReply) error { switch event.Type { case remote.Event_HEADER: @@ -474,6 +561,7 @@ func (ff *Filters) onNewEvent(event *remote.SubscribeReply) error { } // TODO: implement? +// onPendingLog handles a new pending log event from the remote. func (ff *Filters) onPendingLog(event *remote.SubscribeReply) error { // payload := event.Data // var logs types.Logs @@ -490,6 +578,7 @@ func (ff *Filters) onPendingLog(event *remote.SubscribeReply) error { } // TODO: implement? +// onPendingBlock handles a new pending block event from the remote. func (ff *Filters) onPendingBlock(event *remote.SubscribeReply) error { // payload := event.Data // var block types.Block @@ -505,6 +594,7 @@ func (ff *Filters) onPendingBlock(event *remote.SubscribeReply) error { return nil } +// onNewHeader handles a new block header event from the remote and updates the internal state. func (ff *Filters) onNewHeader(event *remote.SubscribeReply) error { payload := event.Data var header types.Header @@ -521,6 +611,7 @@ func (ff *Filters) onNewHeader(event *remote.SubscribeReply) error { }) } +// OnNewTx handles a new transaction event from the transaction pool and processes it. func (ff *Filters) OnNewTx(reply *txpool.OnAddReply) { txs := make([]types.Transaction, len(reply.RplTxs)) for i, rlpTx := range reply.RplTxs { @@ -541,21 +632,41 @@ func (ff *Filters) OnNewTx(reply *txpool.OnAddReply) { }) } -// OnNewLogs is called when there is a new log +// OnNewLogs handles a new log event from the remote and processes it. func (ff *Filters) OnNewLogs(reply *remote.SubscribeLogsReply) { ff.logsSubs.distributeLog(reply) } -func (ff *Filters) AddLogs(id LogsSubID, logs *types.Log) { +// AddLogs adds logs to the store associated with the given subscription ID. +func (ff *Filters) AddLogs(id LogsSubID, log *types.Log) { ff.logsStores.DoAndStore(id, func(st []*types.Log, ok bool) []*types.Log { if !ok { st = make([]*types.Log, 0) } - st = append(st, logs) + + maxLogs := ff.config.RpcSubscriptionFiltersMaxLogs + if maxLogs > 0 && len(st)+1 > maxLogs { + // Calculate the number of logs to remove + excessLogs := len(st) + 1 - maxLogs + if excessLogs > 0 { + if excessLogs >= len(st) { + // If excessLogs is greater than or equal to the length of st, remove all + st = []*types.Log{} + } else { + // Otherwise, remove the oldest logs + st = st[excessLogs:] + } + } + } + + // Append the new log + st = append(st, log) return st }) } +// ReadLogs reads logs from the store associated with the given subscription ID. +// It returns the logs and a boolean indicating whether the logs were found. func (ff *Filters) ReadLogs(id LogsSubID) ([]*types.Log, bool) { res, ok := ff.logsStores.Delete(id) if !ok { @@ -564,16 +675,36 @@ func (ff *Filters) ReadLogs(id LogsSubID) ([]*types.Log, bool) { return res, true } +// AddPendingBlock adds a pending block header to the store associated with the given subscription ID. func (ff *Filters) AddPendingBlock(id HeadsSubID, block *types.Header) { ff.pendingHeadsStores.DoAndStore(id, func(st []*types.Header, ok bool) []*types.Header { if !ok { st = make([]*types.Header, 0) } + + maxHeaders := ff.config.RpcSubscriptionFiltersMaxHeaders + if maxHeaders > 0 && len(st)+1 > maxHeaders { + // Calculate the number of headers to remove + excessHeaders := len(st) + 1 - maxHeaders + if excessHeaders > 0 { + if excessHeaders >= len(st) { + // If excessHeaders is greater than or equal to the length of st, remove all + st = []*types.Header{} + } else { + // Otherwise, remove the oldest headers + st = st[excessHeaders:] + } + } + } + + // Append the new header st = append(st, block) return st }) } +// ReadPendingBlocks reads pending block headers from the store associated with the given subscription ID. +// It returns the block headers and a boolean indicating whether the headers were found. func (ff *Filters) ReadPendingBlocks(id HeadsSubID) ([]*types.Header, bool) { res, ok := ff.pendingHeadsStores.Delete(id) if !ok { @@ -582,16 +713,52 @@ func (ff *Filters) ReadPendingBlocks(id HeadsSubID) ([]*types.Header, bool) { return res, true } +// AddPendingTxs adds pending transactions to the store associated with the given subscription ID. func (ff *Filters) AddPendingTxs(id PendingTxsSubID, txs []types.Transaction) { ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction { if !ok { st = make([][]types.Transaction, 0) } + + // Calculate the total number of transactions in st + totalTxs := 0 + for _, txBatch := range st { + totalTxs += len(txBatch) + } + + maxTxs := ff.config.RpcSubscriptionFiltersMaxTxs + // If adding the new transactions would exceed maxTxs, remove oldest transactions + if maxTxs > 0 && totalTxs+len(txs) > maxTxs { + // Flatten st to a single slice + flatSt := make([]types.Transaction, 0, totalTxs) + for _, txBatch := range st { + flatSt = append(flatSt, txBatch...) + } + + // Calculate how many transactions need to be removed + excessTxs := len(flatSt) + len(txs) - maxTxs + if excessTxs > 0 { + if excessTxs >= len(flatSt) { + // If excessTxs is greater than or equal to the length of flatSt, remove all + flatSt = []types.Transaction{} + } else { + // Otherwise, remove the oldest transactions + flatSt = flatSt[excessTxs:] + } + } + + // Convert flatSt back to [][]types.Transaction with a single batch + st = [][]types.Transaction{flatSt} + } + + // Append the new transactions as a new batch st = append(st, txs) return st }) } +// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID. +// It returns the transactions and a boolean indicating whether the transactions were found. func (ff *Filters) ReadPendingTxs(id PendingTxsSubID) ([][]types.Transaction, bool) { res, ok := ff.pendingTxsStores.Delete(id) if !ok { diff --git a/turbo/rpchelper/filters_deadlock_test.go b/turbo/rpchelper/filters_deadlock_test.go index 1646ec19701..dc31fb9178d 100644 --- a/turbo/rpchelper/filters_deadlock_test.go +++ b/turbo/rpchelper/filters_deadlock_test.go @@ -18,7 +18,8 @@ import ( func TestFiltersDeadlock_Test(t *testing.T) { t.Parallel() logger := log.New() - f := rpchelper.New(context.TODO(), nil, nil, nil, func() {}, logger) + config := rpchelper.FiltersConfig{} + f := rpchelper.New(context.TODO(), config, nil, nil, nil, func() {}, logger) crit := filters.FilterCriteria{ Addresses: nil, Topics: [][]libcommon.Hash{}, diff --git a/turbo/rpchelper/filters_test.go b/turbo/rpchelper/filters_test.go index 087a027348d..3be33ff7f9b 100644 --- a/turbo/rpchelper/filters_test.go +++ b/turbo/rpchelper/filters_test.go @@ -2,6 +2,8 @@ package rpchelper import ( "context" + "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon/core/types" "testing" libcommon "github.com/ledgerwatch/erigon-lib/common" @@ -49,7 +51,7 @@ func TestFilters_GenerateSubscriptionID(t *testing.T) { v := <-subs _, ok := set[v] if ok { - t.Errorf("SubscriptionID Confict: %s", v) + t.Errorf("SubscriptionID Conflict: %s", v) return } set[v] = struct{}{} @@ -58,7 +60,8 @@ func TestFilters_GenerateSubscriptionID(t *testing.T) { func TestFilters_SingleSubscription_OnlyTopicsSubscribedAreBroadcast(t *testing.T) { t.Parallel() - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) subbedTopic := libcommon.BytesToHash([]byte{10, 20}) @@ -90,7 +93,8 @@ func TestFilters_SingleSubscription_OnlyTopicsSubscribedAreBroadcast(t *testing. func TestFilters_SingleSubscription_EmptyTopicsInCriteria_OnlyTopicsSubscribedAreBroadcast(t *testing.T) { t.Parallel() - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) var nilTopic libcommon.Hash subbedTopic := libcommon.BytesToHash([]byte{10, 20}) @@ -123,7 +127,8 @@ func TestFilters_SingleSubscription_EmptyTopicsInCriteria_OnlyTopicsSubscribedAr func TestFilters_TwoSubscriptionsWithDifferentCriteria(t *testing.T) { t.Parallel() - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) criteria1 := filters.FilterCriteria{ Addresses: nil, @@ -163,7 +168,8 @@ func TestFilters_TwoSubscriptionsWithDifferentCriteria(t *testing.T) { func TestFilters_ThreeSubscriptionsWithDifferentCriteria(t *testing.T) { t.Parallel() - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) criteria1 := filters.FilterCriteria{ Addresses: nil, @@ -238,7 +244,8 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) { return nil } - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) f.logsRequestor.Store(loadRequester) // first request has no filters @@ -270,7 +277,7 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) { if lastFilterRequest.AllTopics == false { t.Error("2: expected all topics to be true") } - if len(lastFilterRequest.Addresses) != 1 && lastFilterRequest.Addresses[0] != address1H160 { + if len(lastFilterRequest.Addresses) != 1 && gointerfaces.ConvertH160toAddress(lastFilterRequest.Addresses[0]) != gointerfaces.ConvertH160toAddress(address1H160) { t.Error("2: expected the address to match the last request") } @@ -288,10 +295,10 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) { if lastFilterRequest.AllTopics == false { t.Error("3: expected all topics to be true") } - if len(lastFilterRequest.Addresses) != 1 && lastFilterRequest.Addresses[0] != address1H160 { + if len(lastFilterRequest.Addresses) != 1 && gointerfaces.ConvertH160toAddress(lastFilterRequest.Addresses[0]) != gointerfaces.ConvertH160toAddress(address1H160) { t.Error("3: expected the address to match the previous request") } - if len(lastFilterRequest.Topics) != 1 && lastFilterRequest.Topics[0] != topic1H256 { + if len(lastFilterRequest.Topics) != 1 && gointerfaces.ConvertH256ToHash(lastFilterRequest.Topics[0]) != gointerfaces.ConvertH256ToHash(topic1H256) { t.Error("3: expected the topics to match the last request") } @@ -307,10 +314,10 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) { if lastFilterRequest.AllTopics == false { t.Error("4: expected all topics to be true") } - if len(lastFilterRequest.Addresses) != 1 && lastFilterRequest.Addresses[0] != address1H160 { + if len(lastFilterRequest.Addresses) != 1 && gointerfaces.ConvertH160toAddress(lastFilterRequest.Addresses[0]) != gointerfaces.ConvertH160toAddress(address1H160) { t.Error("4: expected an address to be present") } - if len(lastFilterRequest.Topics) != 1 && lastFilterRequest.Topics[0] != topic1H256 { + if len(lastFilterRequest.Topics) != 1 && gointerfaces.ConvertH256ToHash(lastFilterRequest.Topics[0]) != gointerfaces.ConvertH256ToHash(topic1H256) { t.Error("4: expected a topic to be present") } @@ -327,7 +334,7 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) { if len(lastFilterRequest.Addresses) != 0 { t.Error("5: expected addresses to be empty") } - if len(lastFilterRequest.Topics) != 1 && lastFilterRequest.Topics[0] != topic1H256 { + if len(lastFilterRequest.Topics) != 1 && gointerfaces.ConvertH256ToHash(lastFilterRequest.Topics[0]) != gointerfaces.ConvertH256ToHash(topic1H256) { t.Error("5: expected a topic to be present") } @@ -335,15 +342,150 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) { // and nothing in the address or topics lists f.UnsubscribeLogs(id3) if lastFilterRequest.AllAddresses == true { - t.Error("5: expected all addresses to be false") + t.Error("6: expected all addresses to be false") } if lastFilterRequest.AllTopics == true { - t.Error("5: expected all topics to be false") + t.Error("6: expected all topics to be false") } if len(lastFilterRequest.Addresses) != 0 { - t.Error("5: expected addresses to be empty") + t.Error("6: expected addresses to be empty") } if len(lastFilterRequest.Topics) != 0 { - t.Error("5: expected topics to be empty") + t.Error("6: expected topics to be empty") + } +} + +func TestFilters_AddLogs(t *testing.T) { + tests := []struct { + name string + maxLogs int + numToAdd int + expectedLen int + }{ + {"WithinLimit", 5, 5, 5}, + {"ExceedingLimit", 2, 3, 2}, + {"UnlimitedLogs", 0, 10, 10}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxLogs: tt.maxLogs} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + logID := LogsSubID("test-log") + logEntry := &types.Log{Address: libcommon.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87")} + + for i := 0; i < tt.numToAdd; i++ { + f.AddLogs(logID, logEntry) + } + + logs, found := f.logsStores.Get(logID) + if !found { + t.Fatal("Expected to find logs in the store") + } + if len(logs) != tt.expectedLen { + t.Fatalf("Expected %d logs, but got %d", tt.expectedLen, len(logs)) + } + }) + } +} + +func TestFilters_AddPendingBlocks(t *testing.T) { + tests := []struct { + name string + maxHeaders int + numToAdd int + expectedLen int + }{ + {"WithinLimit", 3, 3, 3}, + {"ExceedingLimit", 2, 5, 2}, + {"UnlimitedHeaders", 0, 10, 10}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxHeaders: tt.maxHeaders} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + blockID := HeadsSubID("test-block") + header := &types.Header{} + + for i := 0; i < tt.numToAdd; i++ { + f.AddPendingBlock(blockID, header) + } + + blocks, found := f.pendingHeadsStores.Get(blockID) + if !found { + t.Fatal("Expected to find blocks in the store") + } + if len(blocks) != tt.expectedLen { + t.Fatalf("Expected %d blocks, but got %d", tt.expectedLen, len(blocks)) + } + }) + } +} + +func TestFilters_AddPendingTxs(t *testing.T) { + tests := []struct { + name string + maxTxs int + numToAdd int + expectedLen int + }{ + {"WithinLimit", 5, 5, 5}, + {"ExceedingLimit", 2, 6, 2}, + {"UnlimitedTxs", 0, 10, 10}, + {"TriggerPanic", 5, 10, 0}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxTxs: tt.maxTxs} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + txID := PendingTxsSubID("test-tx") + var tx types.Transaction = types.NewTransaction(0, libcommon.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"), uint256.NewInt(10), 50000, uint256.NewInt(10), nil) + tx, _ = tx.WithSignature(*types.LatestSignerForChainID(nil), libcommon.Hex2Bytes("9bea4c4daac7c7c52e093e6a4c35dbbcf8856f1af7b059ba20253e70848d094f8a8fae537ce25ed8cb5af9adac3f141af69bd515bd2ba031522df09b97dd72b100")) + + // Testing for panic + if tt.name == "TriggerPanic" { + defer func() { + if r := recover(); r != nil { + t.Errorf("AddPendingTxs caused a panic: %v", r) + } + }() + + // Add transactions to trigger panic + // Initial batch to set the stage + for i := 0; i < 4; i++ { + f.AddPendingTxs(txID, []types.Transaction{tx}) + } + + // Adding more transactions in smaller increments to ensure the panic + for i := 0; i < 2; i++ { + f.AddPendingTxs(txID, []types.Transaction{tx}) + } + + // Adding another large batch to ensure it exceeds the limit and triggers the panic + largeBatch := make([]types.Transaction, 10) + for i := range largeBatch { + largeBatch[i] = tx + } + f.AddPendingTxs(txID, largeBatch) + } else { + for i := 0; i < tt.numToAdd; i++ { + f.AddPendingTxs(txID, []types.Transaction{tx}) + } + + txs, found := f.ReadPendingTxs(txID) + if !found { + t.Fatal("Expected to find transactions in the store") + } + totalTxs := 0 + for _, batch := range txs { + totalTxs += len(batch) + } + if totalTxs != tt.expectedLen { + t.Fatalf("Expected %d transactions, but got %d", tt.expectedLen, totalTxs) + } + } + }) } } diff --git a/turbo/rpchelper/filters_xlayer.go b/turbo/rpchelper/filters_xlayer.go new file mode 100644 index 00000000000..8965df9b9a8 --- /dev/null +++ b/turbo/rpchelper/filters_xlayer.go @@ -0,0 +1,16 @@ +package rpchelper + +import "strings" + +// isTxPoolDisabledErr returns true if err represents a disabled txpool without importing zk/txpool +// It checks error text to avoid a package dependency which would create an import cycle. +func isTxPoolDisabledErr(err error) bool { + if err == nil { + return false + } + // Known messages used by txpool when disabled + // - fmt.Errorf("TxPool Disabled") in zk/txpool/txpool_grpc_server.go + // - wrapped errors may contain this substring + const disabledMsg = "TxPool Disabled" + return strings.Contains(err.Error(), disabledMsg) +} diff --git a/turbo/rpchelper/logsfilter.go b/turbo/rpchelper/logsfilter.go index 9b14eb2ea7e..a588d370f59 100644 --- a/turbo/rpchelper/logsfilter.go +++ b/turbo/rpchelper/logsfilter.go @@ -4,6 +4,7 @@ import ( "sync" libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/concurrent" "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" @@ -11,56 +12,76 @@ import ( ) type LogsFilterAggregator struct { - aggLogsFilter LogsFilter // Aggregation of all current log filters - logsFilters *SyncMap[LogsSubID, *LogsFilter] // Filter for each subscriber, keyed by filterID + aggLogsFilter LogsFilter // Aggregation of all current log filters + logsFilters *concurrent.SyncMap[LogsSubID, *LogsFilter] // Filter for each subscriber, keyed by filterID logsFilterLock sync.RWMutex } // LogsFilter is used for both representing log filter for a specific subscriber (RPC daemon usually) // and "aggregated" log filter representing a union of all subscribers. Therefore, the values in -// the mappings are counters (of type int) and they get deleted when counter goes back to 0 -// Also, addAddr and allTopic are int instead of bool because they are also counter, counting -// how many subscribers have this set on +// the mappings are counters (of type int) and they get deleted when counter goes back to 0. +// Also, addAddr and allTopic are int instead of bool because they are also counters, counting +// how many subscribers have this set on. type LogsFilter struct { allAddrs int - addrs map[libcommon.Address]int + addrs *concurrent.SyncMap[libcommon.Address, int] allTopics int - topics map[libcommon.Hash]int + topics *concurrent.SyncMap[libcommon.Hash, int] topicsOriginal [][]libcommon.Hash // Original topic filters to be applied before distributing to individual subscribers sender Sub[*types2.Log] // nil for aggregate subscriber, for appropriate stream server otherwise } +// Send sends a log to the subscriber represented by the LogsFilter. +// It forwards the log to the subscriber's sender. func (l *LogsFilter) Send(lg *types2.Log) { l.sender.Send(lg) } + +// Close closes the sender associated with the LogsFilter. +// It is used to properly clean up and release resources associated with the sender. func (l *LogsFilter) Close() { l.sender.Close() } +// NewLogsFilterAggregator creates and returns a new instance of LogsFilterAggregator. +// It initializes the aggregated log filter and the map of individual log filters. func NewLogsFilterAggregator() *LogsFilterAggregator { return &LogsFilterAggregator{ aggLogsFilter: LogsFilter{ - addrs: make(map[libcommon.Address]int), - topics: make(map[libcommon.Hash]int), + addrs: concurrent.NewSyncMap[libcommon.Address, int](), + topics: concurrent.NewSyncMap[libcommon.Hash, int](), }, - logsFilters: NewSyncMap[LogsSubID, *LogsFilter](), + logsFilters: concurrent.NewSyncMap[LogsSubID, *LogsFilter](), } } +// insertLogsFilter inserts a new log filter into the LogsFilterAggregator with the specified sender. +// It generates a new filter ID, creates a new LogsFilter, and adds it to the logsFilters map. func (a *LogsFilterAggregator) insertLogsFilter(sender Sub[*types2.Log]) (LogsSubID, *LogsFilter) { + a.logsFilterLock.Lock() + defer a.logsFilterLock.Unlock() filterId := LogsSubID(generateSubscriptionID()) - filter := &LogsFilter{addrs: map[libcommon.Address]int{}, topics: map[libcommon.Hash]int{}, sender: sender} + filter := &LogsFilter{ + addrs: concurrent.NewSyncMap[libcommon.Address, int](), + topics: concurrent.NewSyncMap[libcommon.Hash, int](), + sender: sender, + } a.logsFilters.Put(filterId, filter) return filterId, filter } +// removeLogsFilter removes a log filter identified by filterId from the LogsFilterAggregator. +// It closes the filter and subtracts its addresses and topics from the aggregated filter. func (a *LogsFilterAggregator) removeLogsFilter(filterId LogsSubID) bool { + a.logsFilterLock.Lock() + defer a.logsFilterLock.Unlock() + filter, ok := a.logsFilters.Get(filterId) if !ok { return false } filter.Close() - filter, ok = a.logsFilters.Delete(filterId) + _, ok = a.logsFilters.Delete(filterId) if !ok { return false } @@ -68,6 +89,8 @@ func (a *LogsFilterAggregator) removeLogsFilter(filterId LogsSubID) bool { return true } +// createFilterRequest creates a LogsFilterRequest from the current state of the LogsFilterAggregator. +// It generates a request that represents the union of all current log filters. func (a *LogsFilterAggregator) createFilterRequest() *remote.LogsFilterRequest { a.logsFilterLock.RLock() defer a.logsFilterLock.RUnlock() @@ -77,90 +100,159 @@ func (a *LogsFilterAggregator) createFilterRequest() *remote.LogsFilterRequest { } } +// subtractLogFilters subtracts the counts of addresses and topics in the given LogsFilter from the aggregated filter. +// It decrements the counters for each address and topic in the aggregated filter by the corresponding counts in the +// provided LogsFilter. If the count for any address or topic reaches zero, it is removed from the aggregated filter. func (a *LogsFilterAggregator) subtractLogFilters(f *LogsFilter) { - a.logsFilterLock.Lock() - defer a.logsFilterLock.Unlock() a.aggLogsFilter.allAddrs -= f.allAddrs - for addr, count := range f.addrs { - a.aggLogsFilter.addrs[addr] -= count - if a.aggLogsFilter.addrs[addr] == 0 { - delete(a.aggLogsFilter.addrs, addr) - } + if f.allAddrs > 0 { + // Decrement the count for AllAddresses + activeSubscriptionsLogsAllAddressesGauge.Dec() } + f.addrs.Range(func(addr libcommon.Address, count int) error { + a.aggLogsFilter.addrs.Do(addr, func(value int, exists bool) (int, bool) { + if exists { + // Decrement the count for subscribed address + activeSubscriptionsLogsAddressesGauge.Dec() + newValue := value - count + if newValue <= 0 { + return 0, false + } + return newValue, true + } + return 0, false + }) + return nil + }) a.aggLogsFilter.allTopics -= f.allTopics - for topic, count := range f.topics { - a.aggLogsFilter.topics[topic] -= count - if a.aggLogsFilter.topics[topic] == 0 { - delete(a.aggLogsFilter.topics, topic) - } + if f.allTopics > 0 { + // Decrement the count for AllTopics + activeSubscriptionsLogsAllTopicsGauge.Dec() } + f.topics.Range(func(topic libcommon.Hash, count int) error { + a.aggLogsFilter.topics.Do(topic, func(value int, exists bool) (int, bool) { + if exists { + // Decrement the count for subscribed topic + activeSubscriptionsLogsTopicsGauge.Dec() + newValue := value - count + if newValue <= 0 { + return 0, false + } + return newValue, true + } + return 0, false + }) + return nil + }) } +// addLogsFilters adds the counts of addresses and topics in the given LogsFilter to the aggregated filter. +// It increments the counters for each address and topic in the aggregated filter by the corresponding counts in the +// provided LogsFilter. func (a *LogsFilterAggregator) addLogsFilters(f *LogsFilter) { a.logsFilterLock.Lock() defer a.logsFilterLock.Unlock() a.aggLogsFilter.allAddrs += f.allAddrs - for addr, count := range f.addrs { - a.aggLogsFilter.addrs[addr] += count + if f.allAddrs > 0 { + // Increment the count for AllAddresses + activeSubscriptionsLogsAllAddressesGauge.Inc() } + f.addrs.Range(func(addr libcommon.Address, count int) error { + // Increment the count for subscribed address + activeSubscriptionsLogsAddressesGauge.Inc() + a.aggLogsFilter.addrs.DoAndStore(addr, func(value int, exists bool) int { + return value + count + }) + return nil + }) a.aggLogsFilter.allTopics += f.allTopics - for topic, count := range f.topics { - a.aggLogsFilter.topics[topic] += count + if f.allTopics > 0 { + // Increment the count for AllTopics + activeSubscriptionsLogsAllTopicsGauge.Inc() } + f.topics.Range(func(topic libcommon.Hash, count int) error { + // Increment the count for subscribed topic + activeSubscriptionsLogsTopicsGauge.Inc() + a.aggLogsFilter.topics.DoAndStore(topic, func(value int, exists bool) int { + return value + count + }) + return nil + }) } +// getAggMaps returns the aggregated maps of addresses and topics from the LogsFilterAggregator. +// It creates copies of the current state of the aggregated addresses and topics filters. func (a *LogsFilterAggregator) getAggMaps() (map[libcommon.Address]int, map[libcommon.Hash]int) { a.logsFilterLock.RLock() defer a.logsFilterLock.RUnlock() addresses := make(map[libcommon.Address]int) - for k, v := range a.aggLogsFilter.addrs { + a.aggLogsFilter.addrs.Range(func(k libcommon.Address, v int) error { addresses[k] = v - } + return nil + }) topics := make(map[libcommon.Hash]int) - for k, v := range a.aggLogsFilter.topics { + a.aggLogsFilter.topics.Range(func(k libcommon.Hash, v int) error { topics[k] = v - } + return nil + }) return addresses, topics } +// distributeLog processes an event log and distributes it to all subscribed log filters. +// It checks each filter to determine if the log should be sent based on the filter's address and topic settings. func (a *LogsFilterAggregator) distributeLog(eventLog *remote.SubscribeLogsReply) error { + a.logsFilterLock.RLock() + defer a.logsFilterLock.RUnlock() + + var lg types2.Log + var topics []libcommon.Hash + a.logsFilters.Range(func(k LogsSubID, filter *LogsFilter) error { if filter.allAddrs == 0 { - _, addrOk := filter.addrs[gointerfaces.ConvertH160toAddress(eventLog.Address)] + _, addrOk := filter.addrs.Get(gointerfaces.ConvertH160toAddress(eventLog.Address)) if !addrOk { return nil } } - var topics []libcommon.Hash + + // Pre-allocate topics slice to the required size to avoid multiple allocations + topics = topics[:0] + if cap(topics) < len(eventLog.Topics) { + topics = make([]libcommon.Hash, 0, len(eventLog.Topics)) + } for _, topic := range eventLog.Topics { topics = append(topics, gointerfaces.ConvertH256ToHash(topic)) } + if filter.allTopics == 0 { if !a.chooseTopics(filter, topics) { return nil } } - lg := &types2.Log{ - Address: gointerfaces.ConvertH160toAddress(eventLog.Address), - Topics: topics, - Data: eventLog.Data, - BlockNumber: eventLog.BlockNumber, - TxHash: gointerfaces.ConvertH256ToHash(eventLog.TransactionHash), - TxIndex: uint(eventLog.TransactionIndex), - BlockHash: gointerfaces.ConvertH256ToHash(eventLog.BlockHash), - Index: uint(eventLog.LogIndex), - Removed: eventLog.Removed, - } - filter.sender.Send(lg) + + // Reuse lg object to avoid creating new instances + lg.Address = gointerfaces.ConvertH160toAddress(eventLog.Address) + lg.Topics = topics + lg.Data = eventLog.Data + lg.BlockNumber = eventLog.BlockNumber + lg.TxHash = gointerfaces.ConvertH256ToHash(eventLog.TransactionHash) + lg.TxIndex = uint(eventLog.TransactionIndex) + lg.BlockHash = gointerfaces.ConvertH256ToHash(eventLog.BlockHash) + lg.Index = uint(eventLog.LogIndex) + lg.Removed = eventLog.Removed + + filter.sender.Send(&lg) return nil }) return nil } +// chooseTopics checks if the log topics match the filter's topics. +// It returns true if the log topics match the filter's topics, otherwise false. func (a *LogsFilterAggregator) chooseTopics(filter *LogsFilter, logTopics []libcommon.Hash) bool { var found bool for _, logTopic := range logTopics { - if _, ok := filter.topics[logTopic]; ok { + if _, ok := filter.topics.Get(logTopic); ok { found = true break } diff --git a/turbo/rpchelper/metrics.go b/turbo/rpchelper/metrics.go new file mode 100644 index 00000000000..41e963d3285 --- /dev/null +++ b/turbo/rpchelper/metrics.go @@ -0,0 +1,19 @@ +package rpchelper + +import ( + "github.com/ledgerwatch/erigon-lib/metrics" +) + +const ( + filterLabelName = "filter" + clientLabelName = "client" +) + +var ( + activeSubscriptionsGauge = metrics.GetOrCreateGaugeVec("subscriptions", []string{filterLabelName}, "Current number of subscriptions") + activeSubscriptionsLogsAllAddressesGauge = metrics.GetOrCreateGauge("subscriptions_logs_all_addresses") + activeSubscriptionsLogsAllTopicsGauge = metrics.GetOrCreateGauge("subscriptions_logs_all_topics") + activeSubscriptionsLogsAddressesGauge = metrics.GetOrCreateGauge("subscriptions_logs_addresses") + activeSubscriptionsLogsTopicsGauge = metrics.GetOrCreateGauge("subscriptions_logs_topics") + activeSubscriptionsLogsClientGauge = metrics.GetOrCreateGaugeVec("subscriptions_logs_client", []string{clientLabelName}, "Current number of subscriptions by client") +) diff --git a/turbo/rpchelper/subscription.go b/turbo/rpchelper/subscription.go index 6fb57b151d0..e86e46f52de 100644 --- a/turbo/rpchelper/subscription.go +++ b/turbo/rpchelper/subscription.go @@ -45,69 +45,3 @@ func (s *chan_sub[T]) Close() { s.closed = true close(s.ch) } - -func NewSyncMap[K comparable, T any]() *SyncMap[K, T] { - return &SyncMap[K, T]{ - m: make(map[K]T), - } -} - -type SyncMap[K comparable, T any] struct { - m map[K]T - mu sync.RWMutex -} - -func (m *SyncMap[K, T]) Get(k K) (res T, ok bool) { - m.mu.RLock() - defer m.mu.RUnlock() - res, ok = m.m[k] - return res, ok -} - -func (m *SyncMap[K, T]) Put(k K, v T) (T, bool) { - m.mu.Lock() - defer m.mu.Unlock() - old, ok := m.m[k] - m.m[k] = v - return old, ok -} - -func (m *SyncMap[K, T]) Do(k K, fn func(T, bool) (T, bool)) (after T, ok bool) { - m.mu.Lock() - defer m.mu.Unlock() - val, ok := m.m[k] - nv, save := fn(val, ok) - if save { - m.m[k] = nv - } - return nv, ok -} - -func (m *SyncMap[K, T]) DoAndStore(k K, fn func(t T, ok bool) T) (after T, ok bool) { - return m.Do(k, func(t T, b bool) (T, bool) { - res := fn(t, b) - return res, true - }) -} - -func (m *SyncMap[K, T]) Range(fn func(k K, v T) error) error { - m.mu.RLock() - defer m.mu.RUnlock() - for k, v := range m.m { - if err := fn(k, v); err != nil { - return err - } - } - return nil -} - -func (m *SyncMap[K, T]) Delete(k K) (t T, deleted bool) { - m.mu.Lock() - defer m.mu.Unlock() - val, ok := m.m[k] - if !ok { - return t, false - } - delete(m.m, k) - return val, true -}