Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().DurationVar(&cfg.EvmCallTimeout, "rpc.evmtimeout", rpccfg.DefaultEvmCallTimeout, "Maximum amount of time to wait for the answer from EVM call.")
rootCmd.PersistentFlags().DurationVar(&cfg.OverlayGetLogsTimeout, "rpc.overlay.getlogstimeout", rpccfg.DefaultOverlayGetLogsTimeout, "Maximum amount of time to wait for the answer from the overlay_getLogs call.")
rootCmd.PersistentFlags().DurationVar(&cfg.OverlayReplayBlockTimeout, "rpc.overlay.replayblocktimeout", rpccfg.DefaultOverlayReplayBlockTimeout, "Maximum amount of time to wait for the answer to replay a single block when called from an overlay_getLogs call.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxLogs, "rpc.subscription.filters.maxlogs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxLogs, "Maximum number of logs to store per subscription.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxHeaders, "rpc.subscription.filters.maxheaders", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxHeaders, "Maximum number of block headers to store per subscription.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTxs, "rpc.subscription.filters.maxtxs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs, "Maximum number of transactions to store per subscription.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "rpc.subscription.filters.maxaddresses", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "Maximum number of addresses per subscription to filter logs by.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTopics, "rpc.subscription.filters.maxtopics", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics, "Maximum number of topics per subscription to filter logs by.")
rootCmd.PersistentFlags().IntVar(&cfg.BatchLimit, utils.RpcBatchLimit.Name, utils.RpcBatchLimit.Value, utils.RpcBatchLimit.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.ReturnDataLimit, utils.RpcReturnDataLimit.Name, utils.RpcReturnDataLimit.Value, utils.RpcReturnDataLimit.Usage)
rootCmd.PersistentFlags().Uint64Var(&cfg.LogsMaxRange, utils.RpcLogsMaxRange.Name, utils.RpcLogsMaxRange.Value, utils.RpcLogsMaxRange.Usage)
Expand Down Expand Up @@ -277,6 +282,7 @@ func checkDbCompatibility(ctx context.Context, db kv.RoDB) error {

func EmbeddedServices(ctx context.Context,
erigonDB kv.RoDB, stateCacheCfg kvcache.CoherentConfig,
rpcFiltersConfig rpchelper.FiltersConfig,
blockReader services.FullBlockReader, ethBackendServer remote.ETHBACKENDServer, txPoolServer txpool.TxpoolServer,
miningServer txpool.MiningServer, stateDiffClient StateChangesClient,
logger log.Logger,
Expand All @@ -299,7 +305,7 @@ func EmbeddedServices(ctx context.Context,

txPool = direct.NewTxPoolClient(txPoolServer)
mining = direct.NewMiningClient(miningServer)
ff = rpchelper.New(ctx, eth, txPool, mining, func() {}, logger)
ff = rpchelper.New(ctx, rpcFiltersConfig, eth, txPool, mining, func() {}, logger)

return
}
Expand Down Expand Up @@ -556,7 +562,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}
}()

ff = rpchelper.New(ctx, eth, txPool, mining, onNewSnapshot, logger)
ff = rpchelper.New(ctx, cfg.RpcFiltersConfig, eth, txPool, mining, onNewSnapshot, logger)
return db, dbsmt, eth, txPool, mining, stateCache, blockReader, engine, ff, agg, err
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/rpcdaemon/cli/httpcfg/http_cfg.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package httpcfg

import (
"github.com/ledgerwatch/erigon/turbo/rpchelper"
"time"

"github.com/ledgerwatch/erigon-lib/common/datadir"
Expand Down Expand Up @@ -54,6 +55,7 @@ type HttpCfg struct {
RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
RpcFiltersConfig rpchelper.FiltersConfig
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string
Expand Down
79 changes: 79 additions & 0 deletions erigon-lib/common/concurrent/concurrent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package concurrent

import "sync"

// NewSyncMap initializes and returns a new instance of SyncMap.
func NewSyncMap[K comparable, T any]() *SyncMap[K, T] {
return &SyncMap[K, T]{
m: make(map[K]T),
}
}

// SyncMap is a generic map that uses a read-write mutex to ensure thread-safe access.
type SyncMap[K comparable, T any] struct {
m map[K]T
mu sync.RWMutex
}

// Get retrieves the value associated with the given key.
func (m *SyncMap[K, T]) Get(k K) (res T, ok bool) {
m.mu.RLock()
defer m.mu.RUnlock()
res, ok = m.m[k]
return res, ok
}

// Put sets the value for the given key, returning the previous value if present.
func (m *SyncMap[K, T]) Put(k K, v T) (T, bool) {
m.mu.Lock()
defer m.mu.Unlock()
old, ok := m.m[k]
m.m[k] = v
return old, ok
}

// Do performs a custom operation on the value associated with the given key.
func (m *SyncMap[K, T]) Do(k K, fn func(T, bool) (T, bool)) (after T, ok bool) {
m.mu.Lock()
defer m.mu.Unlock()
val, ok := m.m[k]
nv, save := fn(val, ok)
if save {
m.m[k] = nv
} else {
delete(m.m, k)
}
return nv, ok
}

// DoAndStore performs a custom operation on the value associated with the given key and stores the result.
func (m *SyncMap[K, T]) DoAndStore(k K, fn func(t T, ok bool) T) (after T, ok bool) {
return m.Do(k, func(t T, b bool) (T, bool) {
res := fn(t, b)
return res, true
})
}

// Range calls a function for each key-value pair in the map.
func (m *SyncMap[K, T]) Range(fn func(k K, v T) error) error {
m.mu.RLock()
defer m.mu.RUnlock()
for k, v := range m.m {
if err := fn(k, v); err != nil {
return err
}
}
return nil
}

// Delete removes the value associated with the given key, if present.
func (m *SyncMap[K, T]) Delete(k K) (t T, deleted bool) {
m.mu.Lock()
defer m.mu.Unlock()
val, ok := m.m[k]
if !ok {
return t, false
}
delete(m.m, k)
return val, true
}
21 changes: 21 additions & 0 deletions erigon-lib/metrics/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
)

// NewCounter registers and returns new counter with the given name.
Expand Down Expand Up @@ -88,6 +89,26 @@ func GetOrCreateGauge(name string) Gauge {
return &gauge{g}
}

// GetOrCreateGaugeVec returns registered GaugeVec with the given name
// or creates a new GaugeVec if the registry doesn't contain a GaugeVec with
// the given name and labels.
//
// name must be a valid Prometheus-compatible metric with possible labels.
// labels are the names of the dimensions associated with the gauge vector.
// For instance,
//
// - foo, with labels []string{"bar", "baz"}
//
// The returned GaugeVec is safe to use from concurrent goroutines.
func GetOrCreateGaugeVec(name string, labels []string, help ...string) *prometheus.GaugeVec {
gv, err := defaultSet.GetOrCreateGaugeVec(name, labels, help...)
if err != nil {
panic(fmt.Errorf("could not get or create new gaugevec: %w", err))
}

return gv
}

// NewSummary creates and returns new summary with the given name.
//
// name must be valid Prometheus-compatible metric with possible labels.
Expand Down
119 changes: 115 additions & 4 deletions erigon-lib/metrics/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"fmt"
"reflect"
"sort"
"strings"
"sync"
Expand All @@ -16,15 +17,23 @@ type namedMetric struct {
isAux bool
}

type namedMetricVec struct {
name string
metric *prometheus.GaugeVec
isAux bool
}

// Set is a set of metrics.
//
// Metrics belonging to a set are exported separately from global metrics.
//
// Set.WritePrometheus must be called for exporting metrics from the set.
type Set struct {
mu sync.Mutex
a []*namedMetric
m map[string]*namedMetric
mu sync.Mutex
a []*namedMetric
av []*namedMetricVec
m map[string]*namedMetric
vecs map[string]*namedMetricVec
}

var defaultSet = NewSet()
Expand All @@ -34,7 +43,8 @@ var defaultSet = NewSet()
// Pass the set to RegisterSet() function in order to export its metrics via global WritePrometheus() call.
func NewSet() *Set {
return &Set{
m: make(map[string]*namedMetric),
m: make(map[string]*namedMetric),
vecs: make(map[string]*namedMetricVec),
}
}

Expand All @@ -46,11 +56,18 @@ func (s *Set) Describe(ch chan<- *prometheus.Desc) {
if !sort.SliceIsSorted(s.a, lessFunc) {
sort.Slice(s.a, lessFunc)
}
if !sort.SliceIsSorted(s.av, lessFunc) {
sort.Slice(s.av, lessFunc)
}
sa := append([]*namedMetric(nil), s.a...)
sav := append([]*namedMetricVec(nil), s.av...)
s.mu.Unlock()
for _, nm := range sa {
ch <- nm.metric.Desc()
}
for _, nmv := range sav {
nmv.metric.Describe(ch)
}
}

func (s *Set) Collect(ch chan<- prometheus.Metric) {
Expand All @@ -61,11 +78,18 @@ func (s *Set) Collect(ch chan<- prometheus.Metric) {
if !sort.SliceIsSorted(s.a, lessFunc) {
sort.Slice(s.a, lessFunc)
}
if !sort.SliceIsSorted(s.av, lessFunc) {
sort.Slice(s.av, lessFunc)
}
sa := append([]*namedMetric(nil), s.a...)
sav := append([]*namedMetricVec(nil), s.av...)
s.mu.Unlock()
for _, nm := range sa {
ch <- nm.metric
}
for _, nmv := range sav {
nmv.metric.Collect(ch)
}
}

// NewHistogram creates and returns new histogram in s with the given name.
Expand Down Expand Up @@ -307,6 +331,78 @@ func (s *Set) GetOrCreateGauge(name string, help ...string) (prometheus.Gauge, e
return g, nil
}

// GetOrCreateGaugeVec returns registered GaugeVec in s with the given name
// or creates new GaugeVec if s doesn't contain GaugeVec with the given name.
//
// name must be valid Prometheus-compatible metric with possible labels.
// For instance,
//
// - foo
// - foo{bar="baz"}
// - foo{bar="baz",aaa="b"}
//
// labels are the labels associated with the GaugeVec.
//
// The returned GaugeVec is safe to use from concurrent goroutines.
func (s *Set) GetOrCreateGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) {
s.mu.Lock()
nm := s.vecs[name]
s.mu.Unlock()
if nm == nil {
metric, err := newGaugeVec(name, labels, help...)
if err != nil {
return nil, fmt.Errorf("invalid metric name %q: %w", name, err)
}

nmNew := &namedMetricVec{
name: name,
metric: metric,
}

s.mu.Lock()
nm = s.vecs[name]
if nm == nil {
nm = nmNew
s.vecs[name] = nm
s.av = append(s.av, nm)
}
s.mu.Unlock()
s.registerMetricVec(name, metric, false)
}

if nm.metric == nil {
return nil, fmt.Errorf("metric %q is nil", name)
}

metricType := reflect.TypeOf(nm.metric)
if metricType != reflect.TypeOf(&prometheus.GaugeVec{}) {
return nil, fmt.Errorf("metric %q isn't a GaugeVec. It is %s", name, metricType)
}

return nm.metric, nil
}

// newGaugeVec creates a new Prometheus GaugeVec.
func newGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) {
name, constLabels, err := parseMetric(name)
if err != nil {
return nil, err
}

helpStr := "gauge metric"
if len(help) > 0 {
helpStr = strings.Join(help, ", ")
}

gv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: name,
Help: helpStr,
ConstLabels: constLabels,
}, labels)

return gv, nil
}

const defaultSummaryWindow = 5 * time.Minute

var defaultSummaryQuantiles = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.97: 0.003, 0.99: 0.001}
Expand Down Expand Up @@ -431,6 +527,21 @@ func (s *Set) registerMetric(name string, m prometheus.Metric) {
s.mustRegisterLocked(name, m)
}

func (s *Set) registerMetricVec(name string, mv *prometheus.GaugeVec, isAux bool) {
s.mu.Lock()
defer s.mu.Unlock()

if _, exists := s.vecs[name]; !exists {
nmv := &namedMetricVec{
name: name,
metric: mv,
isAux: isAux,
}
s.vecs[name] = nmv
s.av = append(s.av, nmv)
}
}

// mustRegisterLocked registers given metric with the given name.
//
// Panics if the given name was already registered before.
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig
}
// start HTTP API
httpRpcCfg := stack.Config().Http
ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, ethBackendRPC,
ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, httpRpcCfg.RpcFiltersConfig, blockReader, ethBackendRPC,
s.txPool2GrpcServer, miningRPC, stateDiffClient, s.logger)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ require (
github.com/pion/randutil v0.1.0
github.com/pion/stun v0.3.5
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.48.0
github.com/protolambda/ztyp v0.2.2
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c h1:alCfDKmPC0EC0KGlZWrNF0hilVWBkzMz+aAYTJ/2hY4=
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c/go.mod h1:WvSX4JsCRBuIXj0FRBFX9YLg+2SoL3w8Ww19uZO9yNE=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.8 h1:0Sc91seArqR3BQs49SGwGXWjf0MQYW98sEUYrao7duU=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.8/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.9 h1:9F73F/hDSwG7tF85TVdu9t47/qZ6KHXc4WZhsirCSxw=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.9/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE=
github.com/99designs/gqlgen v0.17.40 h1:/l8JcEVQ93wqIfmH9VS1jsAkwm6eAF1NwQn3N+SDqBY=
Expand Down Expand Up @@ -948,8 +946,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand Down
Loading