From d8764ed419a19e4f6ea9fd50fb1900ac4ec408ec Mon Sep 17 00:00:00 2001 From: protolambda Date: Wed, 22 Mar 2023 19:29:01 +0100 Subject: [PATCH 1/2] op-node: implement RPC poll interval and rate-limit options and provide L1 RPC CLI options --- go.mod | 2 +- op-e2e/migration_test.go | 9 ++-- op-e2e/op_geth.go | 2 +- op-e2e/setup.go | 9 ++-- op-node/client/polling.go | 2 +- op-node/client/rate_limited.go | 52 ++++++++++++++++++++ op-node/client/rpc.go | 73 ++++++++++++++++++++++++--- op-node/flags/flags.go | 21 ++++++++ op-node/node/client.go | 87 ++++++++++++++++++++++++--------- op-node/node/node.go | 17 +++---- op-node/node/server_test.go | 6 +-- op-node/service.go | 9 ++-- op-service/backoff/operation.go | 3 ++ 13 files changed, 237 insertions(+), 55 deletions(-) create mode 100644 op-node/client/rate_limited.go diff --git a/go.mod b/go.mod index 1ee69c817630b..02a428b0adad3 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( golang.org/x/crypto v0.6.0 golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb golang.org/x/term v0.5.0 + golang.org/x/time v0.0.0-20220922220347-f3bd1da661af ) require ( @@ -178,7 +179,6 @@ require ( golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect - golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect golang.org/x/tools v0.6.0 // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect diff --git a/op-e2e/migration_test.go b/op-e2e/migration_test.go index d3f3e9a4dfbb8..f541e036337d6 100644 --- a/op-e2e/migration_test.go +++ b/op-e2e/migration_test.go @@ -270,9 +270,12 @@ func TestMigration(t *testing.T) { snapLog.SetHandler(log.DiscardHandler()) rollupNodeConfig := &node.Config{ L1: &node.L1EndpointConfig{ - L1NodeAddr: forkedL1URL, - L1TrustRPC: false, - L1RPCKind: sources.RPCKindBasic, + L1NodeAddr: forkedL1URL, + L1TrustRPC: false, + L1RPCKind: sources.RPCKindBasic, + RateLimit: 0, + BatchSize: 20, + HttpPollInterval: 12 * time.Second, }, L2: &node.L2EndpointConfig{ L2EngineAddr: gethNode.HTTPAuthEndpoint(), diff --git a/op-e2e/op_geth.go b/op-e2e/op_geth.go index 6477cbd4f7094..688a5a9a9d9bf 100644 --- a/op-e2e/op_geth.go +++ b/op-e2e/op_geth.go @@ -75,7 +75,7 @@ func NewOpGeth(t *testing.T, ctx context.Context, cfg *SystemConfig) (*OpGeth, e require.Nil(t, node.Start()) auth := rpc.WithHTTPAuth(gn.NewJWTAuth(cfg.JWTSecret)) - l2Node, err := client.NewRPC(ctx, logger, node.WSAuthEndpoint(), auth) + l2Node, err := client.NewRPC(ctx, logger, node.WSAuthEndpoint(), client.WithGethRPCOptions(auth)) require.Nil(t, err) // Finally create the engine client diff --git a/op-e2e/setup.go b/op-e2e/setup.go index e6082a9bc5194..898710df1ad5e 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -413,9 +413,12 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { l2EndpointConfig = sys.Nodes[name].HTTPAuthEndpoint() } rollupCfg.L1 = &rollupNode.L1EndpointConfig{ - L1NodeAddr: l1EndpointConfig, - L1TrustRPC: false, - L1RPCKind: sources.RPCKindBasic, + L1NodeAddr: l1EndpointConfig, + L1TrustRPC: false, + L1RPCKind: sources.RPCKindBasic, + RateLimit: 0, + BatchSize: 20, + HttpPollInterval: time.Duration(cfg.DeployConfig.L1BlockTime) * time.Second, } rollupCfg.L2 = &rollupNode.L2EndpointConfig{ L2EngineAddr: l2EndpointConfig, diff --git a/op-node/client/polling.go b/op-node/client/polling.go index 8794feac5f5d6..033a8942de7ff 100644 --- a/op-node/client/polling.go +++ b/op-node/client/polling.go @@ -57,7 +57,7 @@ func NewPollingClient(ctx context.Context, lgr log.Logger, c RPC, opts ...Wrappe res := &PollingClient{ c: c, lgr: lgr, - pollRate: 250 * time.Millisecond, + pollRate: 12 * time.Second, ctx: ctx, cancel: cancel, pollReqCh: make(chan struct{}, 1), diff --git a/op-node/client/rate_limited.go b/op-node/client/rate_limited.go new file mode 100644 index 0000000000000..9f4bfa159b5d9 --- /dev/null +++ b/op-node/client/rate_limited.go @@ -0,0 +1,52 @@ +package client + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/time/rate" +) + +// RateLimitingClient is a wrapper around a pure RPC that implements a global rate-limit on requests. +type RateLimitingClient struct { + c RPC + rl *rate.Limiter +} + +// NewRateLimitingClient implements a global rate-limit for all RPC requests. +// A limit of N will ensure that over a long enough time-frame the given number of tokens per second is targeted. +// Burst limits how far off we can be from the target, by specifying how many requests are allowed at once. +func NewRateLimitingClient(c RPC, limit rate.Limit, burst int) *RateLimitingClient { + return &RateLimitingClient{c: c, rl: rate.NewLimiter(limit, burst)} +} + +func (b *RateLimitingClient) Close() { + b.c.Close() +} + +func (b *RateLimitingClient) CallContext(ctx context.Context, result any, method string, args ...any) error { + if err := b.rl.Wait(ctx); err != nil { + return err + } + cCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + return b.c.CallContext(cCtx, result, method, args...) +} + +func (b *RateLimitingClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error { + if err := b.rl.WaitN(ctx, len(batch)); err != nil { + return err + } + cCtx, cancel := context.WithTimeout(ctx, 20*time.Second) + defer cancel() + return b.c.BatchCallContext(cCtx, batch) +} + +func (b *RateLimitingClient) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { + if err := b.rl.Wait(ctx); err != nil { + return nil, err + } + return b.c.EthSubscribe(ctx, channel, args...) +} diff --git a/op-node/client/rpc.go b/op-node/client/rpc.go index 694a87275859b..ec8b471a7f7eb 100644 --- a/op-node/client/rpc.go +++ b/op-node/client/rpc.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/log" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum/go-ethereum/rpc" @@ -24,27 +25,85 @@ type RPC interface { EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) } +type rpcConfig struct { + gethRPCOptions []rpc.ClientOption + httpPollInterval time.Duration + backoffAttempts int + limit float64 + burst int +} + +type RPCOption func(cfg *rpcConfig) error + +// WithDialBackoff configures the number of attempts for the initial dial to the RPC, +// attempts are executed with an exponential backoff strategy. +func WithDialBackoff(attempts int) RPCOption { + return func(cfg *rpcConfig) error { + cfg.backoffAttempts = attempts + return nil + } +} + +// WithHttpPollInterval configures the RPC to poll at the given rate, in case RPC subscriptions are not available. +func WithHttpPollInterval(duration time.Duration) RPCOption { + return func(cfg *rpcConfig) error { + cfg.httpPollInterval = duration + return nil + } +} + +// WithGethRPCOptions passes the list of go-ethereum RPC options to the internal RPC instance. +func WithGethRPCOptions(gethRPCOptions ...rpc.ClientOption) RPCOption { + return func(cfg *rpcConfig) error { + cfg.gethRPCOptions = append(cfg.gethRPCOptions, gethRPCOptions...) + return nil + } +} + +// WithRateLimit configures the RPC to target the given rate limit (in requests / second). +// See NewRateLimitingClient for more details. +func WithRateLimit(rateLimit float64, burst int) RPCOption { + return func(cfg *rpcConfig) error { + cfg.limit = rateLimit + cfg.burst = burst + return nil + } +} + // NewRPC returns the correct client.RPC instance for a given RPC url. -func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...rpc.ClientOption) (RPC, error) { - underlying, err := DialRPCClientWithBackoff(ctx, lgr, addr, opts...) +func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) (RPC, error) { + var cfg rpcConfig + for i, opt := range opts { + if err := opt(&cfg); err != nil { + return nil, fmt.Errorf("rpc option %d failed to apply to RPC config: %w", i, err) + } + } + if cfg.backoffAttempts < 1 { // default to at least 1 attempt, or it always fails to dial. + cfg.backoffAttempts = 1 + } + underlying, err := dialRPCClientWithBackoff(ctx, lgr, addr, cfg.backoffAttempts, cfg.gethRPCOptions...) if err != nil { return nil, err } - wrapped := &BaseRPCClient{ - c: underlying, + var wrapped RPC = &BaseRPCClient{c: underlying} + + if cfg.limit != 0 { + wrapped = NewRateLimitingClient(wrapped, rate.Limit(cfg.limit), cfg.burst) } + if httpRegex.MatchString(addr) { - return NewPollingClient(ctx, lgr, wrapped), nil + wrapped = NewPollingClient(ctx, lgr, wrapped, WithPollRate(cfg.httpPollInterval)) } + return wrapped, nil } // Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional. -func DialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, opts ...rpc.ClientOption) (*rpc.Client, error) { +func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, attempts int, opts ...rpc.ClientOption) (*rpc.Client, error) { bOff := backoff.Exponential() var ret *rpc.Client - err := backoff.DoCtx(ctx, 10, bOff, func() error { + err := backoff.DoCtx(ctx, attempts, bOff, func() error { client, err := rpc.DialOptions(ctx, addr, opts...) if err != nil { if client == nil { diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 9c17cd762e41b..8a475afb07e99 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -75,6 +75,24 @@ var ( return &out }(), } + L1RPCRateLimit = cli.Float64Flag{ + Name: "l1.rpc-rate-limit", + Usage: "Optional self-imposed global rate-limit on L1 RPC requests, specified in requests / second. Disabled if set to 0.", + EnvVar: prefixEnvVar("L1_RPC_RATE_LIMIT"), + Value: 0, + } + L1RPCMaxBatchSize = cli.IntFlag{ + Name: "l1.rpc-max-batch-size", + Usage: "Maximum number of RPC requests to bundle, e.g. during L1 blocks receipt fetching. The L1 RPC rate limit counts this as N items, but allows it to burst at once.", + EnvVar: prefixEnvVar("L1_RPC_MAX_BATCH_SIZE"), + Value: 20, + } + L1HTTPPollInterval = cli.DurationFlag{ + Name: "l1.http-poll-interval", + Usage: "Polling interval for latest-block subscription when using an HTTP RPC provider. Ignored for other types of RPC endpoints.", + EnvVar: prefixEnvVar("L1_HTTP_POLL_INTERVAL"), + Value: time.Second * 12, + } L2EngineJWTSecret = cli.StringFlag{ Name: "l2.jwt-secret", Usage: "Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. A new key will be generated if left empty.", @@ -196,6 +214,9 @@ var optionalFlags = []cli.Flag{ Network, L1TrustRPC, L1RPCProviderKind, + L1RPCRateLimit, + L1RPCMaxBatchSize, + L1HTTPPollInterval, L2EngineJWTSecret, VerifierL1Confs, SequencerEnabledFlag, diff --git a/op-node/node/client.go b/op-node/node/client.go index 73a2f7e4654ec..4d70abe813567 100644 --- a/op-node/node/client.go +++ b/op-node/node/client.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" + "time" "github.com/ethereum-optimism/optimism/op-node/client" + "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum/go-ethereum/log" @@ -15,14 +17,14 @@ import ( type L2EndpointSetup interface { // Setup a RPC client to a L2 execution engine to process rollup blocks with. - Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error) + Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (cl client.RPC, rpcCfg *sources.EngineClientConfig, err error) Check() error } type L2SyncEndpointSetup interface { // Setup a RPC client to another L2 node to sync L2 blocks from. // It may return a nil client with nil error if RPC based sync is not enabled. - Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) + Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (cl client.RPC, rpcCfg *sources.SyncClientConfig, err error) Check() error } @@ -30,7 +32,8 @@ type L1EndpointSetup interface { // Setup a RPC client to a L1 node to pull rollup input-data from. // The results of the RPC client may be trusted for faster processing, or strictly validated. // The kind of the RPC may be non-basic, to optimize RPC usage. - Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, kind sources.RPCProviderKind, err error) + Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (cl client.RPC, rpcCfg *sources.L1ClientConfig, err error) + Check() error } type L2EndpointConfig struct { @@ -51,17 +54,17 @@ func (cfg *L2EndpointConfig) Check() error { return nil } -func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) { +func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.EngineClientConfig, error) { if err := cfg.Check(); err != nil { - return nil, err + return nil, nil, err } auth := rpc.WithHTTPAuth(gn.NewJWTAuth(cfg.L2EngineJWTSecret)) - l2Node, err := client.NewRPC(ctx, log, cfg.L2EngineAddr, auth) + l2Node, err := client.NewRPC(ctx, log, cfg.L2EngineAddr, client.WithGethRPCOptions(auth)) if err != nil { - return nil, err + return nil, nil, err } - return l2Node, nil + return l2Node, sources.EngineClientDefaultConfig(rollupCfg), nil } // PreparedL2Endpoints enables testing with in-process pre-setup RPC connections to L2 engines @@ -78,8 +81,8 @@ func (p *PreparedL2Endpoints) Check() error { var _ L2EndpointSetup = (*PreparedL2Endpoints)(nil) -func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client.RPC, error) { - return p.Client, nil +func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.EngineClientConfig, error) { + return p.Client, sources.EngineClientDefaultConfig(rollupCfg), nil } // L2SyncEndpointConfig contains configuration for the fallback sync endpoint @@ -93,16 +96,16 @@ var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil) // Setup creates an RPC client to sync from. // It will return nil without error if no sync method is configured. -func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) { +func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.SyncClientConfig, error) { if cfg.L2NodeAddr == "" { - return nil, false, nil + return nil, nil, nil } l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr) if err != nil { - return nil, false, err + return nil, nil, err } - return l2Node, cfg.TrustRPC, nil + return l2Node, sources.SyncClientDefaultConfig(rollupCfg, cfg.TrustRPC), nil } func (cfg *L2SyncEndpointConfig) Check() error { @@ -118,8 +121,8 @@ type PreparedL2SyncEndpoint struct { var _ L2SyncEndpointSetup = (*PreparedL2SyncEndpoint)(nil) -func (cfg *PreparedL2SyncEndpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) { - return cfg.Client, cfg.TrustRPC, nil +func (cfg *PreparedL2SyncEndpoint) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.SyncClientConfig, error) { + return cfg.Client, sources.SyncClientDefaultConfig(rollupCfg, cfg.TrustRPC), nil } func (cfg *PreparedL2SyncEndpoint) Check() error { @@ -137,16 +140,48 @@ type L1EndpointConfig struct { // L1RPCKind identifies the RPC provider kind that serves the RPC, // to inform the optimal usage of the RPC for transaction receipts fetching. L1RPCKind sources.RPCProviderKind + + // RateLimit specifies a self-imposed rate-limit on L1 requests. 0 is no rate-limit. + RateLimit float64 + + // BatchSize specifies the maximum batch-size, which also applies as L1 rate-limit burst amount (if set). + BatchSize int + + // HttpPollInterval specifies the interval between polling for the latest L1 block, + // when the RPC is detected to be an HTTP type. + // It is recommended to use websockets or IPC for efficient following of the changing block. + // Setting this to 0 disables polling. + HttpPollInterval time.Duration } var _ L1EndpointSetup = (*L1EndpointConfig)(nil) -func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, kind sources.RPCProviderKind, err error) { - l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr) +func (cfg *L1EndpointConfig) Check() error { + if cfg.BatchSize < 1 || cfg.BatchSize > 500 { + return fmt.Errorf("batch size is invalid or unreasonable: %d", cfg.BatchSize) + } + if cfg.RateLimit < 0 { + return fmt.Errorf("rate limit cannot be negative") + } + return nil +} + +func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.L1ClientConfig, error) { + opts := []client.RPCOption{ + client.WithHttpPollInterval(cfg.HttpPollInterval), + client.WithDialBackoff(10), + } + if cfg.RateLimit != 0 { + opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize)) + } + + l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr, opts...) if err != nil { - return nil, false, sources.RPCKindBasic, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err) + return nil, nil, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err) } - return l1Node, cfg.L1TrustRPC, cfg.L1RPCKind, nil + rpcCfg := sources.L1ClientDefaultConfig(rollupCfg, cfg.L1TrustRPC, cfg.L1RPCKind) + rpcCfg.MaxRequestsPerBatch = cfg.BatchSize + return l1Node, rpcCfg, nil } // PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1 @@ -158,6 +193,14 @@ type PreparedL1Endpoint struct { var _ L1EndpointSetup = (*PreparedL1Endpoint)(nil) -func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, kind sources.RPCProviderKind, err error) { - return p.Client, p.TrustRPC, p.RPCProviderKind, nil +func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.L1ClientConfig, error) { + return p.Client, sources.L1ClientDefaultConfig(rollupCfg, p.TrustRPC, p.RPCProviderKind), nil +} + +func (cfg *PreparedL1Endpoint) Check() error { + if cfg.Client == nil { + return errors.New("rpc client cannot be nil") + } + + return nil } diff --git a/op-node/node/node.go b/op-node/node/node.go index d9b38c0166b1b..5de10d74246d2 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -116,14 +116,13 @@ func (n *OpNode) initTracer(ctx context.Context, cfg *Config) error { } func (n *OpNode) initL1(ctx context.Context, cfg *Config) error { - l1Node, trustRPC, rpcProvKind, err := cfg.L1.Setup(ctx, n.log) + l1Node, rpcCfg, err := cfg.L1.Setup(ctx, n.log, &cfg.Rollup) if err != nil { return fmt.Errorf("failed to get L1 RPC client: %w", err) } n.l1Source, err = sources.NewL1Client( - client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache, - sources.L1ClientDefaultConfig(&cfg.Rollup, trustRPC, rpcProvKind)) + client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache, rpcCfg) if err != nil { return fmt.Errorf("failed to create L1 source: %w", err) } @@ -184,14 +183,13 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error { } func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error { - rpcClient, err := cfg.L2.Setup(ctx, n.log) + rpcClient, rpcCfg, err := cfg.L2.Setup(ctx, n.log, &cfg.Rollup) if err != nil { return fmt.Errorf("failed to setup L2 execution-engine RPC client: %w", err) } n.l2Source, err = sources.NewEngineClient( - client.NewInstrumentedRPC(rpcClient, n.metrics), n.log, n.metrics.L2SourceCache, - sources.EngineClientDefaultConfig(&cfg.Rollup), + client.NewInstrumentedRPC(rpcClient, n.metrics), n.log, n.metrics.L2SourceCache, rpcCfg, ) if err != nil { return fmt.Errorf("failed to create Engine client: %w", err) @@ -207,17 +205,14 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger } func (n *OpNode) initRPCSync(ctx context.Context, cfg *Config) error { - rpcSyncClient, trustRPC, err := cfg.L2Sync.Setup(ctx, n.log) + rpcSyncClient, rpcCfg, err := cfg.L2Sync.Setup(ctx, n.log, &cfg.Rollup) if err != nil { return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err) } if rpcSyncClient == nil { // if no RPC client is configured to sync from, then don't add the RPC sync client return nil } - - config := sources.SyncClientDefaultConfig(&cfg.Rollup, trustRPC) - - syncClient, err := sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config) + syncClient, err := sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, rpcCfg) if err != nil { return fmt.Errorf("failed to create sync client: %w", err) } diff --git a/op-node/node/server_test.go b/op-node/node/server_test.go index adaccc6274fd6..ec617e9987066 100644 --- a/op-node/node/server_test.go +++ b/op-node/node/server_test.go @@ -115,7 +115,7 @@ func TestOutputAtBlock(t *testing.T) { require.NoError(t, server.Start()) defer server.Stop() - client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String()) + client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3)) require.NoError(t, err) var out *eth.OutputResponse @@ -147,7 +147,7 @@ func TestVersion(t *testing.T) { assert.NoError(t, server.Start()) defer server.Stop() - client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String()) + client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3)) assert.NoError(t, err) var out string @@ -189,7 +189,7 @@ func TestSyncStatus(t *testing.T) { assert.NoError(t, server.Start()) defer server.Stop() - client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String()) + client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3)) assert.NoError(t, err) var out *eth.SyncStatus diff --git a/op-node/service.go b/op-node/service.go index 0656affac6f70..54983a2097be3 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -95,9 +95,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { func NewL1EndpointConfig(ctx *cli.Context) *node.L1EndpointConfig { return &node.L1EndpointConfig{ - L1NodeAddr: ctx.GlobalString(flags.L1NodeAddr.Name), - L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name), - L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.GlobalString(flags.L1RPCProviderKind.Name))), + L1NodeAddr: ctx.GlobalString(flags.L1NodeAddr.Name), + L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name), + L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.GlobalString(flags.L1RPCProviderKind.Name))), + RateLimit: ctx.GlobalFloat64(flags.L1RPCRateLimit.Name), + BatchSize: ctx.GlobalInt(flags.L1RPCMaxBatchSize.Name), + HttpPollInterval: ctx.Duration(flags.L1HTTPPollInterval.Name), } } diff --git a/op-service/backoff/operation.go b/op-service/backoff/operation.go index cecd67cca3957..5e0f7601a82da 100644 --- a/op-service/backoff/operation.go +++ b/op-service/backoff/operation.go @@ -29,6 +29,9 @@ func Do(maxAttempts int, strategy Strategy, op Operation) error { } func DoCtx(ctx context.Context, maxAttempts int, strategy Strategy, op Operation) error { + if maxAttempts < 1 { + return fmt.Errorf("need at least 1 attempt to run op, but have %d max attempts", maxAttempts) + } var attempt int reattemptCh := make(chan struct{}, 1) From 6d433639932d3e405f16b8a2e9cdf60ed6820e4c Mon Sep 17 00:00:00 2001 From: protolambda Date: Fri, 24 Mar 2023 13:32:58 +0100 Subject: [PATCH 2/2] op-e2e: faster http poll interval for less flaky test --- op-e2e/setup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/op-e2e/setup.go b/op-e2e/setup.go index 898710df1ad5e..a1b31fd0e09da 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -418,7 +418,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { L1RPCKind: sources.RPCKindBasic, RateLimit: 0, BatchSize: 20, - HttpPollInterval: time.Duration(cfg.DeployConfig.L1BlockTime) * time.Second, + HttpPollInterval: time.Duration(cfg.DeployConfig.L1BlockTime) * time.Second / 10, } rollupCfg.L2 = &rollupNode.L2EndpointConfig{ L2EngineAddr: l2EndpointConfig,