diff --git a/.circleci/config.yml b/.circleci/config.yml index b63a5a0c00ae2..c2a40ba883eb4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -286,10 +286,15 @@ jobs: gotestsum --junitfile /test-results/op-batcher.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./... working_directory: op-batcher - run: - name: test op-e2e + name: test op-e2e (WS) command: | gotestsum --format standard-verbose --junitfile /test-results/op-e2e.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./... working_directory: op-e2e + - run: + name: test op-e2e (HTTP) + command: | + OP_E2E_USE_HTTP=true gotestsum --junitfile /test-results/op-e2e.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./... + working_directory: op-e2e - run: name: test op-service command: | diff --git a/op-e2e/actions/l1_replica.go b/op-e2e/actions/l1_replica.go index 719ac00f43cde..97c0210d19b67 100644 --- a/op-e2e/actions/l1_replica.go +++ b/op-e2e/actions/l1_replica.go @@ -161,7 +161,7 @@ func (s *L1Replica) EthClient() *ethclient.Client { func (s *L1Replica) RPCClient() client.RPC { cl, _ := s.node.Attach() // never errors return testutils.RPCErrFaker{ - RPC: cl, + RPC: client.NewBaseRPCClient(cl), ErrFn: func() error { err := s.failL1RPC s.failL1RPC = nil // reset back, only error once. diff --git a/op-e2e/actions/l2_engine.go b/op-e2e/actions/l2_engine.go index 1dc7ebb62285d..60a5a32e266d1 100644 --- a/op-e2e/actions/l2_engine.go +++ b/op-e2e/actions/l2_engine.go @@ -121,7 +121,7 @@ func (s *L2Engine) EthClient() *ethclient.Client { func (e *L2Engine) RPCClient() client.RPC { cl, _ := e.node.Attach() // never errors return testutils.RPCErrFaker{ - RPC: cl, + RPC: client.NewBaseRPCClient(cl), ErrFn: func() error { err := e.failL2RPC e.failL2RPC = nil // reset back, only error once. diff --git a/op-e2e/geth.go b/op-e2e/geth.go index 49591241f4733..1445848018379 100644 --- a/op-e2e/geth.go +++ b/op-e2e/geth.go @@ -98,6 +98,8 @@ func initL1Geth(cfg *SystemConfig, wallet *hdwallet.Wallet, genesis *core.Genesi } nodeConfig := &node.Config{ Name: "l1-geth", + HTTPHost: "127.0.0.1", + HTTPPort: 0, WSHost: "127.0.0.1", WSPort: 0, WSModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine"}, diff --git a/op-e2e/setup.go b/op-e2e/setup.go index c3c0ad758971e..4e71de5581ab5 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "os" "strings" "time" @@ -337,13 +338,25 @@ func (cfg SystemConfig) start() (*System, error) { // Configure connections to L1 and L2 for rollup nodes. // TODO: refactor testing to use in-process rpc connections instead of websockets. + + l1EndpointConfig := l1Node.WSEndpoint() + useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true" + if useHTTP { + log.Info("using HTTP client") + l1EndpointConfig = l1Node.HTTPEndpoint() + } + for name, rollupCfg := range cfg.Nodes { + l2EndpointConfig := sys.nodes[name].WSAuthEndpoint() + if useHTTP { + l2EndpointConfig = sys.nodes[name].HTTPAuthEndpoint() + } rollupCfg.L1 = &rollupNode.L1EndpointConfig{ - L1NodeAddr: l1Node.WSEndpoint(), + L1NodeAddr: l1EndpointConfig, L1TrustRPC: false, } rollupCfg.L2 = &rollupNode.L2EndpointConfig{ - L2EngineAddr: sys.nodes[name].WSAuthEndpoint(), + L2EngineAddr: l2EndpointConfig, L2EngineJWTSecret: cfg.JWTSecret, } } diff --git a/op-node/client/polling.go b/op-node/client/polling.go index 2f51c1d965dcd..637371477cc69 100644 --- a/op-node/client/polling.go +++ b/op-node/client/polling.go @@ -19,7 +19,7 @@ var ErrSubscriberClosed = errors.New("subscriber closed") // via a polling loop. It's designed for HTTP endpoints, but WS will // work too. type PollingClient struct { - c RPCGeneric + c RPC lgr log.Logger pollRate time.Duration ctx context.Context @@ -52,7 +52,7 @@ func WithPollRate(duration time.Duration) WrappedHTTPClientOption { // NewPollingClient returns a new PollingClient. Canceling the passed-in context // will close the client. Callers are responsible for closing the client in order // to prevent resource leaks. -func NewPollingClient(ctx context.Context, lgr log.Logger, c RPCGeneric, opts ...WrappedHTTPClientOption) *PollingClient { +func NewPollingClient(ctx context.Context, lgr log.Logger, c RPC, opts ...WrappedHTTPClientOption) *PollingClient { ctx, cancel := context.WithCancel(ctx) res := &PollingClient{ c: c, @@ -143,6 +143,8 @@ func (w *PollingClient) pollHeads() { time.AfterFunc(w.pollRate, w.reqPoll) } + reqPollAfter() + defer close(w.closedCh) for { diff --git a/op-node/client/polling_test.go b/op-node/client/polling_test.go index 66bc514ff1d4c..63c5b8df619dc 100644 --- a/op-node/client/polling_test.go +++ b/op-node/client/polling_test.go @@ -203,6 +203,6 @@ func requireChansEqual(t *testing.T, chans []chan *types.Header, root common.Has } } -func doSubscribe(client RPCGeneric, ch chan<- *types.Header) (ethereum.Subscription, error) { +func doSubscribe(client RPC, ch chan<- *types.Header) (ethereum.Subscription, error) { return client.EthSubscribe(context.Background(), ch, "newHeads") } diff --git a/op-node/client/rpc.go b/op-node/client/rpc.go index dd466aca393a8..3e6bc71796208 100644 --- a/op-node/client/rpc.go +++ b/op-node/client/rpc.go @@ -2,42 +2,99 @@ package client import ( "context" + "fmt" + "regexp" + "github.com/ethereum-optimism/optimism/op-node/backoff" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/log" "github.com/prometheus/client_golang/prometheus" "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum/go-ethereum/rpc" ) +var httpRegex = regexp.MustCompile("^http(s)?://") + type RPC interface { Close() CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error BatchCallContext(ctx context.Context, b []rpc.BatchElem) error - EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) + EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) } -// RPCGeneric is a temporary interface added to make compilation work until interfaces -// are updated to support the generic EthSubscribe that returns ethereum.Subscription -// below -type RPCGeneric interface { - Close() - CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error - BatchCallContext(ctx context.Context, b []rpc.BatchElem) error - EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) +// NewRPC returns the correct client.RPC instance for a given RPC url. +func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...rpc.ClientOption) (RPC, error) { + underlying, err := DialRPCClientWithBackoff(ctx, lgr, addr, opts...) + if err != nil { + return nil, err + } + + wrapped := &BaseRPCClient{ + c: underlying, + } + if httpRegex.MatchString(addr) { + return NewPollingClient(ctx, lgr, wrapped), nil + } + return wrapped, nil +} + +// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional. +func DialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, opts ...rpc.ClientOption) (*rpc.Client, error) { + bOff := backoff.Exponential() + var ret *rpc.Client + err := backoff.Do(10, bOff, func() error { + client, err := rpc.DialOptions(ctx, addr, opts...) + if err != nil { + if client == nil { + return fmt.Errorf("failed to dial address (%s): %w", addr, err) + } + log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err) + } + ret = client + return nil + }) + if err != nil { + return nil, err + } + return ret, nil +} + +// BaseRPCClient is a wrapper around a concrete *rpc.Client instance to make it compliant +// with the client.RPC interface. +type BaseRPCClient struct { + c *rpc.Client +} + +func NewBaseRPCClient(c *rpc.Client) *BaseRPCClient { + return &BaseRPCClient{c: c} +} + +func (b *BaseRPCClient) Close() { + b.c.Close() +} + +func (b *BaseRPCClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { + return b.c.CallContext(ctx, result, method, args...) +} + +func (b *BaseRPCClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error { + return b.c.BatchCallContext(ctx, batch) +} + +func (b *BaseRPCClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) { + return b.c.EthSubscribe(ctx, channel, args...) } // InstrumentedRPCClient is an RPC client that tracks // Prometheus metrics for each call. type InstrumentedRPCClient struct { - c *rpc.Client + c RPC m *metrics.Metrics } -// NewInstrumentedRPC creates a new instrumented RPC client. It takes -// a concrete *rpc.Client to prevent people from passing in an already -// instrumented client. -func NewInstrumentedRPC(c *rpc.Client, m *metrics.Metrics) *InstrumentedRPCClient { +// NewInstrumentedRPC creates a new instrumented RPC client. +func NewInstrumentedRPC(c RPC, m *metrics.Metrics) *InstrumentedRPCClient { return &InstrumentedRPCClient{ c: c, m: m, @@ -60,14 +117,10 @@ func (ic *InstrumentedRPCClient) BatchCallContext(ctx context.Context, b []rpc.B }, b) } -func (ic *InstrumentedRPCClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { +func (ic *InstrumentedRPCClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) { return ic.c.EthSubscribe(ctx, channel, args...) } -func (ic *InstrumentedRPCClient) Client() Client { - return NewInstrumentedClient(ic.c, ic.m) -} - // instrumentBatch handles metrics for batch calls. Request metrics are // increased for each batch element. Request durations are tracked for // the batch as a whole using a special method. Errors are tracked diff --git a/op-node/node/client.go b/op-node/node/client.go index ebf4f6c404ca4..6e7f1e36043b3 100644 --- a/op-node/node/client.go +++ b/op-node/node/client.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" - "github.com/ethereum-optimism/optimism/op-node/backoff" + "github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum/go-ethereum/log" gn "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" @@ -13,13 +13,13 @@ import ( type L2EndpointSetup interface { // Setup a RPC client to a L2 execution engine to process rollup blocks with. - Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, err error) + Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error) Check() error } type L1EndpointSetup interface { // Setup a RPC client to a L1 node to pull rollup input-data from. - Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error) + Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) } type L2EndpointConfig struct { @@ -40,12 +40,12 @@ func (cfg *L2EndpointConfig) Check() error { return nil } -func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) { +func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) { if err := cfg.Check(); err != nil { return nil, err } auth := rpc.WithHTTPAuth(gn.NewJWTAuth(cfg.L2EngineJWTSecret)) - l2Node, err := dialRPCClientWithBackoff(ctx, log, cfg.L2EngineAddr, auth) + l2Node, err := client.NewRPC(ctx, log, cfg.L2EngineAddr, auth) if err != nil { return nil, err } @@ -55,7 +55,7 @@ func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (*rpc.Cl // PreparedL2Endpoints enables testing with in-process pre-setup RPC connections to L2 engines type PreparedL2Endpoints struct { - Client *rpc.Client + Client client.RPC } func (p *PreparedL2Endpoints) Check() error { @@ -67,7 +67,7 @@ func (p *PreparedL2Endpoints) Check() error { var _ L2EndpointSetup = (*PreparedL2Endpoints)(nil) -func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) { +func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client.RPC, error) { return p.Client, nil } @@ -82,8 +82,8 @@ type L1EndpointConfig struct { var _ L1EndpointSetup = (*L1EndpointConfig)(nil) -func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error) { - l1Node, err := dialRPCClientWithBackoff(ctx, log, cfg.L1NodeAddr) +func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) { + l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr) if err != nil { return nil, false, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err) } @@ -92,33 +92,12 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl *rpc // PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1 type PreparedL1Endpoint struct { - Client *rpc.Client + Client client.RPC TrustRPC bool } var _ L1EndpointSetup = (*PreparedL1Endpoint)(nil) -func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error) { +func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) { return p.Client, p.TrustRPC, nil } - -// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional. -func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, opts ...rpc.ClientOption) (*rpc.Client, error) { - bOff := backoff.Exponential() - var ret *rpc.Client - err := backoff.Do(10, bOff, func() error { - client, err := rpc.DialOptions(ctx, addr, opts...) - if err != nil { - if client == nil { - return fmt.Errorf("failed to dial address (%s): %w", addr, err) - } - log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err) - } - ret = client - return nil - }) - if err != nil { - return nil, err - } - return ret, nil -} diff --git a/op-node/node/server_test.go b/op-node/node/server_test.go index bac92cf3c1b8a..83b9206551875 100644 --- a/op-node/node/server_test.go +++ b/op-node/node/server_test.go @@ -6,6 +6,7 @@ import ( "math/rand" "testing" + rpcclient "github.com/ethereum-optimism/optimism/op-node/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -101,7 +102,7 @@ func TestOutputAtBlock(t *testing.T) { assert.NoError(t, server.Start()) defer server.Stop() - client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String()) + client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String()) assert.NoError(t, err) var out []eth.Bytes32 @@ -127,7 +128,7 @@ func TestVersion(t *testing.T) { assert.NoError(t, server.Start()) defer server.Stop() - client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String()) + client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String()) assert.NoError(t, err) var out string @@ -162,7 +163,7 @@ func TestSyncStatus(t *testing.T) { assert.NoError(t, server.Start()) defer server.Stop() - client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String()) + client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String()) assert.NoError(t, err) var out *eth.SyncStatus diff --git a/op-node/sources/eth_client_test.go b/op-node/sources/eth_client_test.go index 16535f3888584..bca40e087a73b 100644 --- a/op-node/sources/eth_client_test.go +++ b/op-node/sources/eth_client_test.go @@ -6,6 +6,7 @@ import ( "math/rand" "testing" + "github.com/ethereum/go-ethereum" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -30,7 +31,7 @@ func (m *mockRPC) CallContext(ctx context.Context, result interface{}, method st return m.MethodCalled("CallContext", ctx, result, method, args).Get(0).([]error)[0] } -func (m *mockRPC) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { +func (m *mockRPC) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) { called := m.MethodCalled("EthSubscribe", channel, args) return called.Get(0).(*rpc.ClientSubscription), called.Get(1).([]error)[0] } diff --git a/op-node/sources/limit.go b/op-node/sources/limit.go index 9de8933ffe596..d722631e31b43 100644 --- a/op-node/sources/limit.go +++ b/op-node/sources/limit.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/ethereum-optimism/optimism/op-node/client" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/rpc" ) @@ -39,7 +40,7 @@ func (lc *limitClient) CallContext(ctx context.Context, result interface{}, meth return lc.c.CallContext(ctx, result, method, args...) } -func (lc *limitClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { +func (lc *limitClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) { // subscription doesn't count towards request limit return lc.c.EthSubscribe(ctx, channel, args...) } diff --git a/op-node/testutils/rpc_err_faker.go b/op-node/testutils/rpc_err_faker.go index c1e9810b4b9b9..8aa5735a47255 100644 --- a/op-node/testutils/rpc_err_faker.go +++ b/op-node/testutils/rpc_err_faker.go @@ -3,6 +3,7 @@ package testutils import ( "context" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum-optimism/optimism/op-node/client" @@ -39,7 +40,7 @@ func (r RPCErrFaker) BatchCallContext(ctx context.Context, b []rpc.BatchElem) er return r.RPC.BatchCallContext(ctx, b) } -func (r RPCErrFaker) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { +func (r RPCErrFaker) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) { if r.ErrFn != nil { if err := r.ErrFn(); err != nil { return nil, err