Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,15 @@ jobs:
gotestsum --junitfile /test-results/op-batcher.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./...
working_directory: op-batcher
- run:
name: test op-e2e
name: test op-e2e (WS)
command: |
gotestsum --format standard-verbose --junitfile /test-results/op-e2e.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./...
working_directory: op-e2e
- run:
name: test op-e2e (HTTP)
command: |
OP_E2E_USE_HTTP=true gotestsum --junitfile /test-results/op-e2e.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./...
working_directory: op-e2e
- run:
name: test op-service
command: |
Expand Down
2 changes: 1 addition & 1 deletion op-e2e/actions/l1_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *L1Replica) EthClient() *ethclient.Client {
func (s *L1Replica) RPCClient() client.RPC {
cl, _ := s.node.Attach() // never errors
return testutils.RPCErrFaker{
RPC: cl,
RPC: client.NewBaseRPCClient(cl),
ErrFn: func() error {
err := s.failL1RPC
s.failL1RPC = nil // reset back, only error once.
Expand Down
2 changes: 1 addition & 1 deletion op-e2e/actions/l2_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *L2Engine) EthClient() *ethclient.Client {
func (e *L2Engine) RPCClient() client.RPC {
cl, _ := e.node.Attach() // never errors
return testutils.RPCErrFaker{
RPC: cl,
RPC: client.NewBaseRPCClient(cl),
ErrFn: func() error {
err := e.failL2RPC
e.failL2RPC = nil // reset back, only error once.
Expand Down
2 changes: 2 additions & 0 deletions op-e2e/geth.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func initL1Geth(cfg *SystemConfig, wallet *hdwallet.Wallet, genesis *core.Genesi
}
nodeConfig := &node.Config{
Name: "l1-geth",
HTTPHost: "127.0.0.1",
HTTPPort: 0,
WSHost: "127.0.0.1",
WSPort: 0,
WSModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine"},
Expand Down
17 changes: 15 additions & 2 deletions op-e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/big"
"os"
"strings"
"time"

Expand Down Expand Up @@ -337,13 +338,25 @@ func (cfg SystemConfig) start() (*System, error) {

// Configure connections to L1 and L2 for rollup nodes.
// TODO: refactor testing to use in-process rpc connections instead of websockets.

l1EndpointConfig := l1Node.WSEndpoint()
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
if useHTTP {
log.Info("using HTTP client")
l1EndpointConfig = l1Node.HTTPEndpoint()
}

for name, rollupCfg := range cfg.Nodes {
l2EndpointConfig := sys.nodes[name].WSAuthEndpoint()
if useHTTP {
l2EndpointConfig = sys.nodes[name].HTTPAuthEndpoint()
}
rollupCfg.L1 = &rollupNode.L1EndpointConfig{
L1NodeAddr: l1Node.WSEndpoint(),
L1NodeAddr: l1EndpointConfig,
L1TrustRPC: false,
}
rollupCfg.L2 = &rollupNode.L2EndpointConfig{
L2EngineAddr: sys.nodes[name].WSAuthEndpoint(),
L2EngineAddr: l2EndpointConfig,
L2EngineJWTSecret: cfg.JWTSecret,
}
}
Expand Down
6 changes: 4 additions & 2 deletions op-node/client/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var ErrSubscriberClosed = errors.New("subscriber closed")
// via a polling loop. It's designed for HTTP endpoints, but WS will
// work too.
type PollingClient struct {
c RPCGeneric
c RPC
lgr log.Logger
pollRate time.Duration
ctx context.Context
Expand Down Expand Up @@ -52,7 +52,7 @@ func WithPollRate(duration time.Duration) WrappedHTTPClientOption {
// NewPollingClient returns a new PollingClient. Canceling the passed-in context
// will close the client. Callers are responsible for closing the client in order
// to prevent resource leaks.
func NewPollingClient(ctx context.Context, lgr log.Logger, c RPCGeneric, opts ...WrappedHTTPClientOption) *PollingClient {
func NewPollingClient(ctx context.Context, lgr log.Logger, c RPC, opts ...WrappedHTTPClientOption) *PollingClient {
ctx, cancel := context.WithCancel(ctx)
res := &PollingClient{
c: c,
Expand Down Expand Up @@ -143,6 +143,8 @@ func (w *PollingClient) pollHeads() {
time.AfterFunc(w.pollRate, w.reqPoll)
}

reqPollAfter()

defer close(w.closedCh)

for {
Expand Down
2 changes: 1 addition & 1 deletion op-node/client/polling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,6 @@ func requireChansEqual(t *testing.T, chans []chan *types.Header, root common.Has
}
}

func doSubscribe(client RPCGeneric, ch chan<- *types.Header) (ethereum.Subscription, error) {
func doSubscribe(client RPC, ch chan<- *types.Header) (ethereum.Subscription, error) {
return client.EthSubscribe(context.Background(), ch, "newHeads")
}
91 changes: 72 additions & 19 deletions op-node/client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,99 @@ package client

import (
"context"
"fmt"
"regexp"

"github.com/ethereum-optimism/optimism/op-node/backoff"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum/go-ethereum/rpc"
)

var httpRegex = regexp.MustCompile("^http(s)?://")

type RPC interface {
Close()
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error)
EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error)
}

// RPCGeneric is a temporary interface added to make compilation work until interfaces
// are updated to support the generic EthSubscribe that returns ethereum.Subscription
// below
type RPCGeneric interface {
Close()
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error)
// NewRPC returns the correct client.RPC instance for a given RPC url.
func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...rpc.ClientOption) (RPC, error) {
underlying, err := DialRPCClientWithBackoff(ctx, lgr, addr, opts...)
if err != nil {
return nil, err
}

wrapped := &BaseRPCClient{
c: underlying,
}
if httpRegex.MatchString(addr) {
return NewPollingClient(ctx, lgr, wrapped), nil
}
return wrapped, nil
}

// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func DialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, opts ...rpc.ClientOption) (*rpc.Client, error) {
bOff := backoff.Exponential()
var ret *rpc.Client
err := backoff.Do(10, bOff, func() error {
client, err := rpc.DialOptions(ctx, addr, opts...)
if err != nil {
if client == nil {
return fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err)
}
ret = client
return nil
})
if err != nil {
return nil, err
}
return ret, nil
}

// BaseRPCClient is a wrapper around a concrete *rpc.Client instance to make it compliant
// with the client.RPC interface.
type BaseRPCClient struct {
c *rpc.Client
}

func NewBaseRPCClient(c *rpc.Client) *BaseRPCClient {
return &BaseRPCClient{c: c}
}

func (b *BaseRPCClient) Close() {
b.c.Close()
}

func (b *BaseRPCClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
return b.c.CallContext(ctx, result, method, args...)
}

func (b *BaseRPCClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error {
return b.c.BatchCallContext(ctx, batch)
}

func (b *BaseRPCClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
return b.c.EthSubscribe(ctx, channel, args...)
}

// InstrumentedRPCClient is an RPC client that tracks
// Prometheus metrics for each call.
type InstrumentedRPCClient struct {
c *rpc.Client
c RPC
m *metrics.Metrics
}

// NewInstrumentedRPC creates a new instrumented RPC client. It takes
// a concrete *rpc.Client to prevent people from passing in an already
// instrumented client.
func NewInstrumentedRPC(c *rpc.Client, m *metrics.Metrics) *InstrumentedRPCClient {
// NewInstrumentedRPC creates a new instrumented RPC client.
func NewInstrumentedRPC(c RPC, m *metrics.Metrics) *InstrumentedRPCClient {
return &InstrumentedRPCClient{
c: c,
m: m,
Expand All @@ -60,14 +117,10 @@ func (ic *InstrumentedRPCClient) BatchCallContext(ctx context.Context, b []rpc.B
}, b)
}

func (ic *InstrumentedRPCClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) {
func (ic *InstrumentedRPCClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
return ic.c.EthSubscribe(ctx, channel, args...)
}

func (ic *InstrumentedRPCClient) Client() Client {
return NewInstrumentedClient(ic.c, ic.m)
}

// instrumentBatch handles metrics for batch calls. Request metrics are
// increased for each batch element. Request durations are tracked for
// the batch as a whole using a special <batch> method. Errors are tracked
Expand Down
43 changes: 11 additions & 32 deletions op-node/node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ import (
"errors"
"fmt"

"github.com/ethereum-optimism/optimism/op-node/backoff"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum/go-ethereum/log"
gn "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
)

type L2EndpointSetup interface {
// Setup a RPC client to a L2 execution engine to process rollup blocks with.
Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, err error)
Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error)
Check() error
}

type L1EndpointSetup interface {
// Setup a RPC client to a L1 node to pull rollup input-data from.
Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error)
Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error)
}

type L2EndpointConfig struct {
Expand All @@ -40,12 +40,12 @@ func (cfg *L2EndpointConfig) Check() error {
return nil
}

func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) {
func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
if err := cfg.Check(); err != nil {
return nil, err
}
auth := rpc.WithHTTPAuth(gn.NewJWTAuth(cfg.L2EngineJWTSecret))
l2Node, err := dialRPCClientWithBackoff(ctx, log, cfg.L2EngineAddr, auth)
l2Node, err := client.NewRPC(ctx, log, cfg.L2EngineAddr, auth)
if err != nil {
return nil, err
}
Expand All @@ -55,7 +55,7 @@ func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (*rpc.Cl

// PreparedL2Endpoints enables testing with in-process pre-setup RPC connections to L2 engines
type PreparedL2Endpoints struct {
Client *rpc.Client
Client client.RPC
}

func (p *PreparedL2Endpoints) Check() error {
Expand All @@ -67,7 +67,7 @@ func (p *PreparedL2Endpoints) Check() error {

var _ L2EndpointSetup = (*PreparedL2Endpoints)(nil)

func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) {
func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
return p.Client, nil
}

Expand All @@ -82,8 +82,8 @@ type L1EndpointConfig struct {

var _ L1EndpointSetup = (*L1EndpointConfig)(nil)

func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error) {
l1Node, err := dialRPCClientWithBackoff(ctx, log, cfg.L1NodeAddr)
func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr)
if err != nil {
return nil, false, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err)
}
Expand All @@ -92,33 +92,12 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl *rpc

// PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1
type PreparedL1Endpoint struct {
Client *rpc.Client
Client client.RPC
TrustRPC bool
}

var _ L1EndpointSetup = (*PreparedL1Endpoint)(nil)

func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error) {
func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
return p.Client, p.TrustRPC, nil
}

// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, opts ...rpc.ClientOption) (*rpc.Client, error) {
bOff := backoff.Exponential()
var ret *rpc.Client
err := backoff.Do(10, bOff, func() error {
client, err := rpc.DialOptions(ctx, addr, opts...)
if err != nil {
if client == nil {
return fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err)
}
ret = client
return nil
})
if err != nil {
return nil, err
}
return ret, nil
}
7 changes: 4 additions & 3 deletions op-node/node/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"testing"

rpcclient "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand Down Expand Up @@ -101,7 +102,7 @@ func TestOutputAtBlock(t *testing.T) {
assert.NoError(t, server.Start())
defer server.Stop()

client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
assert.NoError(t, err)

var out []eth.Bytes32
Expand All @@ -127,7 +128,7 @@ func TestVersion(t *testing.T) {
assert.NoError(t, server.Start())
defer server.Stop()

client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
assert.NoError(t, err)

var out string
Expand Down Expand Up @@ -162,7 +163,7 @@ func TestSyncStatus(t *testing.T) {
assert.NoError(t, server.Start())
defer server.Stop()

client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
assert.NoError(t, err)

var out *eth.SyncStatus
Expand Down
Loading