diff --git a/op-alt-da/daserver.go b/op-alt-da/daserver.go index ccdc2a0cb4d36..0be7438e8e633 100644 --- a/op-alt-da/daserver.go +++ b/op-alt-da/daserver.go @@ -14,7 +14,7 @@ import ( "strconv" "time" - "github.com/ethereum-optimism/optimism/op-service/rpc" + "github.com/ethereum-optimism/optimism/op-service/httputil" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" ) @@ -30,7 +30,7 @@ type DAServer struct { log log.Logger endpoint string store KVStore - tls *rpc.ServerTLSConfig + tls *httputil.ServerTLSConfig httpServer *http.Server listener net.Listener useGenericComm bool diff --git a/op-node/p2p/gossip_test.go b/op-node/p2p/gossip_test.go index 3943f0f3154b0..b05a9974cbe0e 100644 --- a/op-node/p2p/gossip_test.go +++ b/op-node/p2p/gossip_test.go @@ -128,13 +128,11 @@ func TestVerifyBlockSignatureWithRemoteSigner(t *testing.T) { "127.0.0.1", 0, "test", - oprpc.WithAPIs([]rpc.API{ - { - Namespace: "opsigner", - Service: remoteSigner, - }, - }), ) + server.AddAPI(rpc.API{ + Namespace: "opsigner", + Service: remoteSigner, + }) require.NoError(t, server.Start()) defer func() { diff --git a/op-node/rollup/interop/managed/system.go b/op-node/rollup/interop/managed/system.go index 98f2e91758697..f5238f40bb583 100644 --- a/op-node/rollup/interop/managed/system.go +++ b/op-node/rollup/interop/managed/system.go @@ -65,14 +65,12 @@ func NewManagedMode(log log.Logger, cfg *rollup.Config, addr string, port int, j out.srv = rpc.NewServer(addr, port, "v0.0.0", rpc.WithWebsocketEnabled(), rpc.WithLogger(log), - rpc.WithJWTSecret(jwtSecret[:]), - rpc.WithAPIs([]gethrpc.API{ - { - Namespace: "interop", - Service: &InteropAPI{backend: out}, - Authenticated: true, - }, - })) + rpc.WithJWTSecret(jwtSecret[:])) + out.srv.AddAPI(gethrpc.API{ + Namespace: "interop", + Service: &InteropAPI{backend: out}, + Authenticated: true, + }) return out } diff --git a/op-service/httputil/http.go b/op-service/httputil/http.go deleted file mode 100644 index 8d15b9542a3eb..0000000000000 --- a/op-service/httputil/http.go +++ /dev/null @@ -1,20 +0,0 @@ -package httputil - -import ( - "net/http" - - "github.com/ethereum/go-ethereum/rpc" -) - -// Use default timeouts from Geth as battle tested default values -var timeouts = rpc.DefaultHTTPTimeouts - -func NewHttpServer(handler http.Handler) *http.Server { - return &http.Server{ - Handler: handler, - ReadTimeout: timeouts.ReadTimeout, - ReadHeaderTimeout: timeouts.ReadHeaderTimeout, - WriteTimeout: timeouts.WriteTimeout, - IdleTimeout: timeouts.IdleTimeout, - } -} diff --git a/op-service/httputil/options.go b/op-service/httputil/options.go new file mode 100644 index 0000000000000..01912177d67aa --- /dev/null +++ b/op-service/httputil/options.go @@ -0,0 +1,58 @@ +package httputil + +import ( + "crypto/tls" + "net/http" + + optls "github.com/ethereum-optimism/optimism/op-service/tls" +) + +type config struct { + // listenAddr is the configured address to listen to when started. + // use listener.Addr to retrieve the address when online. + listenAddr string + + tls *ServerTLSConfig + + handler http.Handler + + httpOpts []HTTPOption +} + +func (c *config) ApplyOptions(opts ...Option) { + for _, opt := range opts { + opt(c) + } +} + +// Option is a general config option. +type Option func(cfg *config) + +// HTTPOption applies a change to an HTTP server, just before standup. +// HTTPOption options are be re-executed on server shutdown/startup cycles, +// for each new underlying Go *http.Server instance. +type HTTPOption func(config *http.Server) error + +func WithHTTPOptions(options ...HTTPOption) Option { + return func(cfg *config) { + cfg.httpOpts = append(cfg.httpOpts, options...) + } +} + +func WithMaxHeaderBytes(max int) HTTPOption { + return func(srv *http.Server) error { + srv.MaxHeaderBytes = max + return nil + } +} + +type ServerTLSConfig struct { + Config *tls.Config + CLIConfig *optls.CLIConfig // paths to certificate and key files +} + +func WithServerTLS(tlsCfg *ServerTLSConfig) Option { + return func(cfg *config) { + cfg.tls = tlsCfg + } +} diff --git a/op-service/httputil/server.go b/op-service/httputil/server.go index 3caa428e77ab2..727201bc7cc44 100644 --- a/op-service/httputil/server.go +++ b/op-service/httputil/server.go @@ -6,29 +6,65 @@ import ( "fmt" "net" "net/http" - "sync/atomic" + "sync" + "time" ) // HTTPServer wraps a http.Server, while providing conveniences // like exposing the running state and address. +// +// It can be started with HTTPServer.Start and closed with +// HTTPServer.Stop, HTTPServer.Close and HTTPServer.Shutdown (convenience functions for different gracefulness). +// +// The addr contains both host and port. A 0 port may be used to make the system bind to an available one. +// The resulting address can be retrieved with HTTPServer.Addr or HTTPServer.HTTPEndpoint. +// +// The server may be started, stopped and started back up. type HTTPServer struct { + // mu is the lock used for bringing the server online/offline, and accessing the address of the server. + mu sync.RWMutex + + // listener that the server is bound to. Nil if online. listener net.Listener - srv *http.Server - closed atomic.Bool + + srv *http.Server + + // used as BaseContext of the http.Server + srvCtx context.Context + srvCancel context.CancelFunc + + config *config } -// HTTPOption applies a change to an HTTP server -type HTTPOption func(srv *HTTPServer) error +// NewHTTPServer creates an HTTPServer that serves the given HTTP handler. +// The server is inactive and has to be started explicitly. +func NewHTTPServer(addr string, handler http.Handler, opts ...Option) *HTTPServer { + cfg := &config{ + listenAddr: addr, + tls: nil, + handler: handler, + httpOpts: nil, + } + cfg.ApplyOptions(opts...) + return &HTTPServer{config: cfg} +} -func StartHTTPServer(addr string, handler http.Handler, opts ...HTTPOption) (*HTTPServer, error) { - listener, err := net.Listen("tcp", addr) - if err != nil { - return nil, fmt.Errorf("failed to bind to address %q: %w", addr, err) +func StartHTTPServer(addr string, handler http.Handler, opts ...Option) (*HTTPServer, error) { + out := NewHTTPServer(addr, handler, opts...) + return out, out.Start() +} + +// Start starts the server, and checks if it comes online fully. +func (s *HTTPServer) Start() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.srv != nil { + return errors.New("already have existing server") } srvCtx, srvCancel := context.WithCancel(context.Background()) srv := &http.Server{ - Handler: handler, + Handler: s.config.handler, ReadTimeout: DefaultTimeouts.ReadTimeout, ReadHeaderTimeout: DefaultTimeouts.ReadHeaderTimeout, WriteTimeout: DefaultTimeouts.WriteTimeout, @@ -37,28 +73,56 @@ func StartHTTPServer(addr string, handler http.Handler, opts ...HTTPOption) (*HT return srvCtx }, } - out := &HTTPServer{listener: listener, srv: srv} - for _, opt := range opts { - if err := opt(out); err != nil { + + if s.config.tls != nil { + srv.TLSConfig = s.config.tls.Config + } + + for _, opt := range s.config.httpOpts { + if err := opt(srv); err != nil { srvCancel() - return nil, errors.Join(fmt.Errorf("failed to apply HTTP option: %w", err), listener.Close()) + return fmt.Errorf("failed to apply HTTP option: %w", err) } } - go func() { - err := out.srv.Serve(listener) + + listener, err := net.Listen("tcp", s.config.listenAddr) + if err != nil { srvCancel() - // no error, unless ErrServerClosed (or unused base context closes, or unused http2 config error) - if errors.Is(err, http.ErrServerClosed) { - out.closed.Store(true) + return fmt.Errorf("failed to bind to address %q: %w", s.config.listenAddr, err) + } + s.listener = listener + + s.srv = srv + s.srvCtx = srvCtx + s.srvCancel = srvCancel + + // cap of 1, to not block on non-immediate shutdown + errCh := make(chan error, 1) + go func() { + if s.config.tls != nil { + errCh <- s.srv.ServeTLS(s.listener, "", "") } else { - panic(fmt.Errorf("unexpected serve error: %w", err)) + errCh <- s.srv.Serve(s.listener) } }() - return out, nil + + // verify that the server comes up + standupTimer := time.NewTimer(10 * time.Millisecond) + defer standupTimer.Stop() + + select { + case err := <-errCh: + s.cleanup() + return fmt.Errorf("http server failed: %w", err) + case <-standupTimer.C: + return nil + } } func (s *HTTPServer) Closed() bool { - return s.closed.Load() + s.mu.RLock() + defer s.mu.RUnlock() + return s.srv == nil } // Stop is a convenience method to gracefully shut down the server, but force-close if the ctx is cancelled. @@ -73,28 +137,73 @@ func (s *HTTPServer) Stop(ctx context.Context) error { return nil } +func (s *HTTPServer) cleanup() { + s.srv = nil + s.listener = nil + s.srvCtx = nil + s.srvCancel = nil +} + // Shutdown shuts down the HTTP server and its listener, // but allows active connections to close gracefully. // If the function exits due to a ctx cancellation the listener is closed but active connections may remain, // a call to Close() can force-close any remaining active connections. func (s *HTTPServer) Shutdown(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.srv == nil { + return nil + } + s.srvCancel() // closes the underlying listener too. - return s.srv.Shutdown(ctx) + err := s.srv.Shutdown(ctx) + if err != nil { + return err + } + s.cleanup() + return nil } // Close force-closes the HTTPServer, its listener, and all its active connections. func (s *HTTPServer) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.srv == nil { + return nil + } + s.srvCancel() // closes the underlying listener too - return s.srv.Close() + err := s.srv.Close() + if err != nil { + return err + } + s.cleanup() + return nil } +// Addr returns the address that the server is listening on. +// It returns nil if the server is not online. func (s *HTTPServer) Addr() net.Addr { + s.mu.RLock() + defer s.mu.RUnlock() + if s.listener == nil { + return nil + } return s.listener.Addr() } -func WithMaxHeaderBytes(max int) HTTPOption { - return func(srv *HTTPServer) error { - srv.srv.MaxHeaderBytes = max - return nil +// HTTPEndpoint returns the http(s) endpoint the server is serving. +// It returns an empty string if the server is not online. +func (s *HTTPServer) HTTPEndpoint() string { + s.mu.RLock() + defer s.mu.RUnlock() + if s.listener == nil { + return "" + } + addr := s.listener.Addr().String() + if s.config.tls != nil { + return "https://" + addr + } else { + return "http://" + addr } } diff --git a/op-service/httputil/server_test.go b/op-service/httputil/server_test.go index 1ea197ac6e4fa..aed4c833be9d8 100644 --- a/op-service/httputil/server_test.go +++ b/op-service/httputil/server_test.go @@ -27,12 +27,12 @@ func TestStartHTTPServer(t *testing.T) { } }) - srv, err := StartHTTPServer("localhost:0", h, WithTimeouts(HTTPTimeouts{ + srv, err := StartHTTPServer("localhost:0", h, WithHTTPOptions(WithTimeouts(HTTPTimeouts{ ReadTimeout: time.Minute, ReadHeaderTimeout: time.Minute, WriteTimeout: time.Minute, IdleTimeout: time.Minute, - })) + }))) require.NoError(t, err) require.False(t, srv.Closed()) return srv, reqRespBlock @@ -97,4 +97,33 @@ func TestStartHTTPServer(t *testing.T) { wg.Wait() require.True(t, srv.Closed()) }) + + t.Run("restart", func(t *testing.T) { + srv, reqRespBlock := testSetup(t) + + request := func() { + t.Log("making request") + // test basics + go func() { + req := <-reqRespBlock // take request + block := make(chan struct{}) + req <- block // start response + <-block // unblock response + }() + resp, err := http.Get("http://" + srv.Addr().String() + "/") + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.Equal(t, http.StatusTeapot, resp.StatusCode, "I am a teapot") + } + request() + t.Log("closing 1") + require.NoError(t, srv.Close()) + require.True(t, srv.Closed()) + t.Log("starting back up") + require.NoError(t, srv.Start()) + request() + t.Log("closing 2") + require.NoError(t, srv.Close()) + require.True(t, srv.Closed()) + }) } diff --git a/op-service/httputil/timeout.go b/op-service/httputil/timeout.go index 17ea8d01c14fb..28e7e041baa10 100644 --- a/op-service/httputil/timeout.go +++ b/op-service/httputil/timeout.go @@ -1,6 +1,9 @@ package httputil -import "time" +import ( + "net/http" + "time" +) // DefaultTimeouts for HTTP server, based on the RPC timeouts that geth uses. var DefaultTimeouts = HTTPTimeouts{ @@ -45,11 +48,11 @@ type HTTPTimeouts struct { } func WithTimeouts(timeouts HTTPTimeouts) HTTPOption { - return func(s *HTTPServer) error { - s.srv.ReadTimeout = timeouts.ReadTimeout - s.srv.ReadHeaderTimeout = timeouts.ReadHeaderTimeout - s.srv.WriteTimeout = timeouts.WriteTimeout - s.srv.IdleTimeout = timeouts.IdleTimeout + return func(s *http.Server) error { + s.ReadTimeout = timeouts.ReadTimeout + s.ReadHeaderTimeout = timeouts.ReadHeaderTimeout + s.WriteTimeout = timeouts.WriteTimeout + s.IdleTimeout = timeouts.IdleTimeout return nil } } diff --git a/op-service/rpc/handler.go b/op-service/rpc/handler.go new file mode 100644 index 0000000000000..40563a25ad329 --- /dev/null +++ b/op-service/rpc/handler.go @@ -0,0 +1,246 @@ +package rpc + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + "sync" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/rpc" + + oplog "github.com/ethereum-optimism/optimism/op-service/log" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" + optls "github.com/ethereum-optimism/optimism/op-service/tls" +) + +// the root is "", since the "/" prefix is already assumed to be stripped. +const rootRoute = "" + +var wildcardHosts = []string{"*"} + +// Handler is an http Handler, serving a default RPC server on the root path. +// +// Additional RPC servers can be attached to this on sub-routes using AddRPC. +// Each sub-route has its own RPCs that can be served, registered with AddAPIToRPC. +// These inherit the same RPC settings, and each have their own health handlers, +// and websocket support if configured. +// +// Custom routes can also be added with AddHandler, these are registered to the underlying http.ServeMux. +// +// If more customization is needed, this Handler can be composed in a HTTP stack of your own. +// Use http.StripPrefix to clean the URL route prefix that this Handler is registered on (leave no prefix). +type Handler struct { + appVersion string + healthzHandler http.Handler + corsHosts []string + vHosts []string + jwtSecret []byte + wsEnabled bool + httpRecorder opmetrics.HTTPRecorder + + log log.Logger + middlewares []Middleware + + // rpcRoutes is a collection of RPC servers + rpcRoutes map[string]*rpc.Server + rpcRoutesLock sync.Mutex + + mux *http.ServeMux + + // What we serve to users of this Handler, see ServeHTTP + outer http.Handler +} + +func NewHandler(appVersion string, opts ...Option) *Handler { + bs := &Handler{ + appVersion: appVersion, + healthzHandler: defaultHealthzHandler(appVersion), + corsHosts: wildcardHosts, + vHosts: wildcardHosts, + httpRecorder: opmetrics.NoopHTTPRecorder, + log: log.Root(), + mux: &http.ServeMux{}, + rpcRoutes: make(map[string]*rpc.Server), + } + for _, opt := range opts { + opt(bs) + } + bs.log.Debug("Creating RPC handler") + + var handler http.Handler + handler = bs.mux + // Outer-most middlewares: logging, metrics, TLS + handler = optls.NewPeerTLSMiddleware(handler) + handler = opmetrics.NewHTTPRecordingMiddleware(bs.httpRecorder, handler) + handler = oplog.NewLoggingMiddleware(bs.log, handler) + bs.outer = handler + + if err := bs.AddRPC(rootRoute); err != nil { + panic(fmt.Errorf("failed to register root RPC server: %w", err)) + } + + return bs +} + +var _ http.Handler = (*Handler)(nil) + +// ServeHTTP implements http.Handler +func (b *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + b.outer.ServeHTTP(writer, request) +} + +// AddAPI adds a backend to the given RPC namespace, on the default RPC route of the server. +func (b *Handler) AddAPI(api rpc.API) error { + return b.AddAPIToRPC(rootRoute, api) +} + +// AddAPIToRPC adds a backend to the given RPC namespace, on the RPC corresponding to the given route. +func (b *Handler) AddAPIToRPC(route string, api rpc.API) error { + b.rpcRoutesLock.Lock() + defer b.rpcRoutesLock.Unlock() + server, ok := b.rpcRoutes[route] + if !ok { + return fmt.Errorf("route %q not found", route) + } + if err := server.RegisterName(api.Namespace, api.Service); err != nil { + return fmt.Errorf("failed to register API namespace %s on route %q: %w", api.Namespace, route, err) + } + b.log.Info("registered API", "route", route, "namespace", api.Namespace) + return nil +} + +// AddHandler adds a custom http.Handler, mapped to an absolute path +func (b *Handler) AddHandler(path string, handler http.Handler) { + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + b.mux.Handle(path, handler) +} + +// AddRPC creates a default RPC handler at the given route, +// with a health sub-route, HTTP endpoint, and websocket endpoint if configured. +// Once the route is added, RPC namespaces can be registered with AddAPIToRPC. +// The route must not have a "/" suffix, since the trailing "/" is ambiguous. +func (b *Handler) AddRPC(route string) error { + b.rpcRoutesLock.Lock() + defer b.rpcRoutesLock.Unlock() + if strings.HasSuffix(route, "/") { + return fmt.Errorf("routes must not have a / suffix, got %q", route) + } + _, ok := b.rpcRoutes[route] + if ok { + return fmt.Errorf("route %q already exists", route) + } + + srv := rpc.NewServer() + + if err := srv.RegisterName("health", &healthzAPI{ + appVersion: b.appVersion, + }); err != nil { + return fmt.Errorf("failed to setup default health RPC namespace") + } + + // http handler stack. + var handler http.Handler + + // default to 404 not-found + handler = http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + b.log.Info("oh no!") + http.NotFound(writer, request) + }) + + // Health endpoint is lowest priority. + handler = b.newHealthMiddleware(handler) + + // serve RPC on configured RPC path (but not on arbitrary paths) + handler = b.newHttpRPCMiddleware(srv, handler) + + // Conditionally enable Websocket support. + if b.wsEnabled { // prioritize WS RPC, if it's an upgrade request + handler = b.newWsMiddleWare(srv, handler) + } + + // Apply user middlewares + for _, middleware := range b.middlewares { + handler = middleware(handler) + } + b.rpcRoutes[route] = srv + + b.mux.Handle(route+"/", http.StripPrefix(route+"/", handler)) + if route != "" { + b.mux.Handle(route, http.StripPrefix(route, handler)) + } + return nil +} + +func (b *Handler) newHealthMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // URL is already stripped with http.StripPrefix + if r.URL.Path == "healthz" || r.URL.Path == "healthz/" { + b.healthzHandler.ServeHTTP(w, r) + return + } + next.ServeHTTP(w, r) + }) +} + +func (b *Handler) newHttpRPCMiddleware(server *rpc.Server, next http.Handler) http.Handler { + // Only allow RPC handlers behind the appropriate CORS / vhost / JWT (optional) setup. + // Note that websockets have their own handler-stack, also configured with CORS and JWT, separately. + httpHandler := node.NewHTTPHandlerStack(server, b.corsHosts, b.vHosts, b.jwtSecret) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // URL is already stripped with http.StripPrefix + if r.URL.Path == "" { + httpHandler.ServeHTTP(w, r) + return + } + next.ServeHTTP(w, r) + }) +} + +func (b *Handler) newWsMiddleWare(server *rpc.Server, next http.Handler) http.Handler { + wsHandler := node.NewWSHandlerStack(server.WebsocketHandler(b.corsHosts), b.jwtSecret) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // URL is already stripped with http.StripPrefix + if isWebsocket(r) && (r.URL.Path == "" || r.URL.Path == "ws" || r.URL.Path == "ws/") { + wsHandler.ServeHTTP(w, r) + return + } + next.ServeHTTP(w, r) + }) +} + +func (b *Handler) Stop() { + for route, s := range b.rpcRoutes { + b.log.Debug("Stopping RPC", "route", route) + s.Stop() + } +} + +type HealthzResponse struct { + Version string `json:"version"` +} + +func defaultHealthzHandler(appVersion string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + enc := json.NewEncoder(w) + _ = enc.Encode(&HealthzResponse{Version: appVersion}) + } +} + +type healthzAPI struct { + appVersion string +} + +func (h *healthzAPI) Status() string { + return h.appVersion +} + +func isWebsocket(r *http.Request) bool { + return strings.EqualFold(r.Header.Get("Upgrade"), "websocket") && + strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") +} diff --git a/op-service/rpc/handler_options.go b/op-service/rpc/handler_options.go new file mode 100644 index 0000000000000..cec12a57a8c4b --- /dev/null +++ b/op-service/rpc/handler_options.go @@ -0,0 +1,67 @@ +package rpc + +import ( + "net/http" + + "github.com/ethereum/go-ethereum/log" + + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" +) + +type Option func(b *Handler) + +type Middleware func(next http.Handler) http.Handler + +func WithHealthzHandler(hdlr http.Handler) Option { + return func(b *Handler) { + b.healthzHandler = hdlr + } +} + +func WithCORSHosts(hosts []string) Option { + return func(b *Handler) { + b.corsHosts = hosts + } +} + +func WithVHosts(hosts []string) Option { + return func(b *Handler) { + b.vHosts = hosts + } +} + +// WithWebsocketEnabled allows `ws://host:port/`, `ws://host:port/ws` and `ws://host:port/ws/` +// to be upgraded to a websocket JSON RPC connection. +func WithWebsocketEnabled() Option { + return func(b *Handler) { + b.wsEnabled = true + } +} + +// WithJWTSecret adds authentication to the RPCs (HTTP, and WS pre-upgrade if enabled). +// The health endpoint is still available without authentication. +func WithJWTSecret(secret []byte) Option { + return func(b *Handler) { + b.jwtSecret = secret + } +} + +func WithHTTPRecorder(recorder opmetrics.HTTPRecorder) Option { + return func(b *Handler) { + b.httpRecorder = recorder + } +} + +func WithLogger(lgr log.Logger) Option { + return func(b *Handler) { + b.log = lgr + } +} + +// WithMiddleware adds an http.Handler to the rpc server handler stack +// The added middleware is invoked directly before the RPC callback +func WithMiddleware(middleware func(http.Handler) (hdlr http.Handler)) Option { + return func(b *Handler) { + b.middlewares = append(b.middlewares, middleware) + } +} diff --git a/op-service/rpc/handler_test.go b/op-service/rpc/handler_test.go new file mode 100644 index 0000000000000..c958e57d61e4a --- /dev/null +++ b/op-service/rpc/handler_test.go @@ -0,0 +1,31 @@ +package rpc + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" + + "github.com/ethereum-optimism/optimism/op-service/testlog" +) + +func TestHandler(t *testing.T) { + logger := testlog.Logger(t, log.LevelInfo) + h := NewHandler("v1.2.3", WithLogger(logger)) + t.Cleanup(h.Stop) + + rpcEntry := rpc.API{ + Namespace: "foo", + Service: new(testAPI), + } + + require.ErrorContains(t, h.AddRPC("/"), "suffix") + require.ErrorContains(t, h.AddRPC(""), "already exists") + require.ErrorContains(t, h.AddAPIToRPC("/extra", rpcEntry), "not found") + require.NoError(t, h.AddRPC("/extra")) + require.NoError(t, h.AddAPIToRPC("/extra", rpcEntry)) + + // WS-RPC / HTTP-RPC / health are tested in server_test.go +} diff --git a/op-service/rpc/server.go b/op-service/rpc/server.go index f0bead5084fe3..40c21627d9276 100644 --- a/op-service/rpc/server.go +++ b/op-service/rpc/server.go @@ -2,338 +2,75 @@ package rpc import ( "context" - "crypto/tls" - "encoding/json" "fmt" "net" - "net/http" "strconv" - "strings" "time" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" - oplog "github.com/ethereum-optimism/optimism/op-service/log" - opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" - optls "github.com/ethereum-optimism/optimism/op-service/tls" + "github.com/ethereum-optimism/optimism/op-service/httputil" ) -var wildcardHosts = []string{"*"} - +// Server is a convenience util, that wraps an httputil.HTTPServer and provides an RPC Handler type Server struct { - endpoint string - apis []rpc.API - appVersion string - healthzHandler http.Handler - corsHosts []string - vHosts []string - jwtSecret []byte - wsEnabled bool - rpcPath string - healthzPath string - httpRecorder opmetrics.HTTPRecorder - httpServer *http.Server - listener net.Listener - log log.Logger - tls *ServerTLSConfig - middlewares []Middleware - rpcServer *rpc.Server - handlers map[string]http.Handler -} - -type ServerTLSConfig struct { - Config *tls.Config - CLIConfig *optls.CLIConfig // paths to certificate and key files -} - -type ServerOption func(b *Server) - -type Middleware func(next http.Handler) http.Handler - -func WithAPIs(apis []rpc.API) ServerOption { - return func(b *Server) { - b.apis = apis - } -} - -func WithHealthzHandler(hdlr http.Handler) ServerOption { - return func(b *Server) { - b.healthzHandler = hdlr - } -} - -func WithCORSHosts(hosts []string) ServerOption { - return func(b *Server) { - b.corsHosts = hosts - } -} - -func WithVHosts(hosts []string) ServerOption { - return func(b *Server) { - b.vHosts = hosts - } -} - -// WithWebsocketEnabled allows `ws://host:port/`, `ws://host:port/ws` and `ws://host:port/ws/` -// to be upgraded to a websocket JSON RPC connection. -func WithWebsocketEnabled() ServerOption { - return func(b *Server) { - b.wsEnabled = true - } -} - -// WithJWTSecret adds authentication to the RPCs (HTTP, and WS pre-upgrade if enabled). -// The health endpoint is still available without authentication. -func WithJWTSecret(secret []byte) ServerOption { - return func(b *Server) { - b.jwtSecret = secret - } -} - -func WithRPCPath(path string) ServerOption { - return func(b *Server) { - b.rpcPath = path - } -} - -func WithHealthzPath(path string) ServerOption { - return func(b *Server) { - b.healthzPath = path - } -} - -func WithHTTPRecorder(recorder opmetrics.HTTPRecorder) ServerOption { - return func(b *Server) { - b.httpRecorder = recorder - } -} + httpServer *httputil.HTTPServer -func WithLogger(lgr log.Logger) ServerOption { - return func(b *Server) { - b.log = lgr - } -} - -// WithTLSConfig configures TLS for the RPC server -// If this option is passed, the server will use ListenAndServeTLS -func WithTLSConfig(tls *ServerTLSConfig) ServerOption { - return func(b *Server) { - b.tls = tls - } -} - -// WithMiddleware adds an http.Handler to the rpc server handler stack -// The added middleware is invoked directly before the RPC callback -func WithMiddleware(middleware func(http.Handler) (hdlr http.Handler)) ServerOption { - return func(b *Server) { - b.middlewares = append(b.middlewares, middleware) - } -} - -func NewServer(host string, port int, appVersion string, opts ...ServerOption) *Server { - endpoint := net.JoinHostPort(host, strconv.Itoa(port)) - bs := &Server{ - endpoint: endpoint, - appVersion: appVersion, - healthzHandler: defaultHealthzHandler(appVersion), - corsHosts: wildcardHosts, - vHosts: wildcardHosts, - rpcPath: "/", - healthzPath: "/healthz", - httpRecorder: opmetrics.NoopHTTPRecorder, - httpServer: &http.Server{ - Addr: endpoint, - }, - log: log.Root(), - rpcServer: rpc.NewServer(), - handlers: make(map[string]http.Handler), - } - for _, opt := range opts { - opt(bs) - } - if bs.tls != nil { - bs.httpServer.TLSConfig = bs.tls.Config - } - bs.AddAPI(rpc.API{ - Namespace: "health", - Service: &healthzAPI{ - appVersion: appVersion, - }, - }) - return bs + // embedded, for easy access as caller + *Handler } // Endpoint returns the HTTP endpoint without http / ws protocol prefix. func (b *Server) Endpoint() string { - if b.listener == nil { - panic("Server has not started yet, no endpoint is known") - } - return b.listener.Addr().String() -} - -func (b *Server) AddAPI(api rpc.API) { - b.apis = append(b.apis, api) -} - -// AddHandler adds a custom http.Handler to the server, mapped to an absolute path -func (b *Server) AddHandler(path string, handler http.Handler) { - if !strings.HasPrefix(path, "/") { - path = "/" + path - } - b.handlers[path] = handler + return b.httpServer.Addr().String() } func (b *Server) Start() error { - // Register all APIs to the RPC server. - for _, api := range b.apis { - if err := b.rpcServer.RegisterName(api.Namespace, api.Service); err != nil { - return fmt.Errorf("failed to register API %s: %w", api.Namespace, err) - } - b.log.Info("registered API", "namespace", api.Namespace) - } - - // http handler stack. - var handler http.Handler - - // default to 404 not-found - handler = http.HandlerFunc(http.NotFound) - - // Health endpoint is lowest priority. - handler = b.newHealthMiddleware(handler) - - // serve RPC on configured RPC path (but not on arbitrary paths) - handler = b.newHttpRPCMiddleware(handler) - - // Conditionally enable Websocket support. - if b.wsEnabled { // prioritize WS RPC, if it's an upgrade request - handler = b.newWsMiddleWare(handler) - } - - // Apply user middlewares - for _, middleware := range b.middlewares { - handler = middleware(handler) - } - - // Outer-most middlewares: logging, metrics, TLS - handler = optls.NewPeerTLSMiddleware(handler) - handler = opmetrics.NewHTTPRecordingMiddleware(b.httpRecorder, handler) - handler = oplog.NewLoggingMiddleware(b.log, handler) - - // Add custom handlers - handler = b.newUserHandlersMiddleware(handler) - - b.httpServer.Handler = handler - - listener, err := net.Listen("tcp", b.endpoint) + err := b.httpServer.Start() if err != nil { - return fmt.Errorf("failed to listen: %w", err) + return err } - b.listener = listener - // override endpoint with the actual listener address, in case the port was 0 during test. - b.httpServer.Addr = listener.Addr().String() - b.endpoint = listener.Addr().String() - errCh := make(chan error, 1) - go func() { - if b.tls != nil { - if err := b.httpServer.ServeTLS(b.listener, "", ""); err != nil { - errCh <- err - } - } else { - if err := b.httpServer.Serve(b.listener); err != nil { - errCh <- err - } - } - }() - - // verify that the server comes up - tick := time.NewTimer(10 * time.Millisecond) - defer tick.Stop() - - select { - case err := <-errCh: - return fmt.Errorf("http server failed: %w", err) - case <-tick.C: - return nil - } -} - -func (b *Server) newHealthMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == b.healthzPath { - b.healthzHandler.ServeHTTP(w, r) - return - } - next.ServeHTTP(w, r) - }) -} - -func (b *Server) newHttpRPCMiddleware(next http.Handler) http.Handler { - // Only allow RPC handlers behind the appropriate CORS / vhost / JWT (optional) setup. - // Note that websockets have their own handler-stack, also configured with CORS and JWT, separately. - httpHandler := node.NewHTTPHandlerStack(b.rpcServer, b.corsHosts, b.vHosts, b.jwtSecret) - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == b.rpcPath { - httpHandler.ServeHTTP(w, r) - return - } - next.ServeHTTP(w, r) - }) -} - -func (b *Server) newWsMiddleWare(next http.Handler) http.Handler { - wsHandler := node.NewWSHandlerStack(b.rpcServer.WebsocketHandler(b.corsHosts), b.jwtSecret) - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if isWebsocket(r) && (r.URL.Path == "/" || r.URL.Path == "/ws" || r.URL.Path == "/ws/") { - wsHandler.ServeHTTP(w, r) - return - } - next.ServeHTTP(w, r) - }) -} - -func (b *Server) newUserHandlersMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - for path, handler := range b.handlers { - if strings.HasPrefix(r.URL.Path, path) { - handler.ServeHTTP(w, r) - return - } - } - next.ServeHTTP(w, r) - }) + b.log.Info("Started RPC server", "endpoint", b.httpServer.HTTPEndpoint()) + return nil } func (b *Server) Stop() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = b.httpServer.Shutdown(ctx) - b.rpcServer.Stop() + b.Handler.Stop() + b.log.Info("Stopped RPC server") return nil } -type HealthzResponse struct { - Version string `json:"version"` -} - -func defaultHealthzHandler(appVersion string) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - enc := json.NewEncoder(w) - _ = enc.Encode(&HealthzResponse{Version: appVersion}) +func (b *Server) AddAPI(api rpc.API) { + err := b.Handler.AddAPI(api) + if err != nil { + panic(fmt.Errorf("invalid API: %w", err)) } } -type healthzAPI struct { - appVersion string +type ServerConfig struct { + HttpOptions []httputil.Option + RpcOptions []Option + Host string + Port int + AppVersion string } -func (h *healthzAPI) Status() string { - return h.appVersion +func NewServer(host string, port int, appVersion string, opts ...Option) *Server { + return ServerFromConfig(&ServerConfig{ + HttpOptions: nil, + RpcOptions: opts, + Host: host, + Port: port, + AppVersion: appVersion, + }) } -func isWebsocket(r *http.Request) bool { - return strings.EqualFold(r.Header.Get("Upgrade"), "websocket") && - strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") +func ServerFromConfig(cfg *ServerConfig) *Server { + endpoint := net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port)) + h := NewHandler(cfg.AppVersion, cfg.RpcOptions...) + s := httputil.NewHTTPServer(endpoint, h, cfg.HttpOptions...) + return &Server{httpServer: s, Handler: h} } diff --git a/op-service/rpc/server_test.go b/op-service/rpc/server_test.go index 4e1404b5752ed..900ebbba26d52 100644 --- a/op-service/rpc/server_test.go +++ b/op-service/rpc/server_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum-optimism/optimism/op-service/testlog" @@ -27,28 +28,59 @@ func (t *testAPI) Frobnicate(n int) int { func TestBaseServer(t *testing.T) { appVersion := "test" logger := testlog.Logger(t, log.LevelTrace) - log.SetDefault(log.NewLogger(logger.Handler())) - server := NewServer( - "127.0.0.1", - 0, - appVersion, - WithLogger(logger), - WithAPIs([]rpc.API{ - { - Namespace: "test", - Service: new(testAPI), - }, - }), - WithWebsocketEnabled(), - ) + server := ServerFromConfig(&ServerConfig{ + HttpOptions: nil, + RpcOptions: []Option{ + WithLogger(logger), + WithWebsocketEnabled(), + }, + Host: "127.0.0.1", + Port: 0, + AppVersion: appVersion, + }) + server.AddAPI(rpc.API{ + Namespace: "test", + Service: new(testAPI), + }) require.NoError(t, server.Start(), "must start") - rpcClient, err := rpc.Dial(fmt.Sprintf("http://%s", server.endpoint)) + t.Cleanup(func() { + err := server.Stop() + if err != nil { + panic(err) + } + }) + + t.Run("supports 0 port", func(t *testing.T) { + _, portStr, err := net.SplitHostPort(server.Endpoint()) + require.NoError(t, err) + port, err := strconv.Atoi(portStr) + require.NoError(t, err) + require.Greater(t, port, 0) + }) + + require.NoError(t, server.AddRPC("/extra")) + require.NoError(t, server.AddAPIToRPC("/extra", rpc.API{ + Namespace: "test2", + Service: new(testAPI), + })) + + t.Run("regular", func(t *testing.T) { + testServer(t, server.Endpoint(), appVersion, "test") + }) + t.Run("extra route", func(t *testing.T) { + testServer(t, server.Endpoint()+"/extra", appVersion, "test2") + }) +} + +func testServer(t *testing.T, endpoint string, appVersion string, namespace string) { + + httpRPCClient, err := rpc.Dial("http://" + endpoint) require.NoError(t, err) - t.Cleanup(rpcClient.Close) + t.Cleanup(httpRPCClient.Close) t.Run("supports GET /healthz", func(t *testing.T) { - res, err := http.Get(fmt.Sprintf("http://%s/healthz", server.endpoint)) + res, err := http.Get("http://" + endpoint + "/healthz") require.NoError(t, err) defer res.Body.Close() body, err := io.ReadAll(res.Body) @@ -58,37 +90,104 @@ func TestBaseServer(t *testing.T) { t.Run("supports health_status", func(t *testing.T) { var res string - require.NoError(t, rpcClient.Call(&res, "health_status")) + require.NoError(t, httpRPCClient.Call(&res, "health_status")) require.Equal(t, appVersion, res) }) t.Run("supports additional RPC APIs", func(t *testing.T) { var res int - require.NoError(t, rpcClient.Call(&res, "test_frobnicate", 2)) + require.NoError(t, httpRPCClient.Call(&res, namespace+"_frobnicate", 2)) require.Equal(t, 4, res) }) - t.Run("supports 0 port", func(t *testing.T) { - endpoint := server.Endpoint() - _, portStr, err := net.SplitHostPort(endpoint) - require.NoError(t, err) - port, err := strconv.Atoi(portStr) - require.NoError(t, err) - require.Greater(t, port, 0) - }) - t.Run("supports websocket", func(t *testing.T) { - endpoint := "ws://" + server.Endpoint() - t.Log("connecting to", endpoint) + wsEndpoint := "ws://" + endpoint + t.Log("connecting to", wsEndpoint) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - wsCl, err := rpc.DialContext(ctx, endpoint) + wsCl, err := rpc.DialContext(ctx, wsEndpoint) require.NoError(t, err) - defer wsCl.Close() + t.Cleanup(wsCl.Close) var res int - require.NoError(t, wsCl.Call(&res, "test_frobnicate", 42)) + require.NoError(t, wsCl.Call(&res, namespace+"_frobnicate", 42)) require.Equal(t, 42*2, res) }) +} + +func TestAuthServer(t *testing.T) { + secret := [32]byte{0: 4} + badSecret := [32]byte{0: 5} + + appVersion := "test" + logger := testlog.Logger(t, log.LevelTrace) + server := ServerFromConfig(&ServerConfig{ + HttpOptions: nil, + RpcOptions: []Option{ + WithLogger(logger), + WithWebsocketEnabled(), + WithJWTSecret(secret[:]), + }, + Host: "127.0.0.1", + Port: 0, + AppVersion: appVersion, + }) + server.AddAPI(rpc.API{ + Namespace: "test", + Service: new(testAPI), + }) + require.NoError(t, server.Start(), "must start") + + // verify we can add routes after Start() while we are at it + require.NoError(t, server.AddRPC("/other")) + require.NoError(t, server.AddAPIToRPC("/other", rpc.API{ + Namespace: "test", + Service: new(testAPI), + })) + + t.Cleanup(func() { + err := server.Stop() + if err != nil { + panic(err) + } + }) + + testAuth := func(t *testing.T, endpoint string) { + httpRPCClient, err := rpc.DialOptions(context.Background(), "http://"+endpoint, + rpc.WithHTTPAuth(node.NewJWTAuth(secret))) + require.NoError(t, err) + t.Cleanup(httpRPCClient.Close) + + var res int + require.NoError(t, httpRPCClient.Call(&res, "test_frobnicate", 2)) + require.Equal(t, 4, res) + + httpRPCClient2, err := rpc.DialOptions(context.Background(), "http://"+endpoint, + rpc.WithHTTPAuth(node.NewJWTAuth(badSecret))) + require.NoError(t, err) + t.Cleanup(httpRPCClient2.Close) - require.NoError(t, server.Stop(), "must stop") + var resNo int + require.Error(t, httpRPCClient2.Call(&resNo, "test_frobnicate", 10), + "http is lazy-auth and should error with bad secret on first call") + + _, err = rpc.DialOptions(context.Background(), "ws://"+endpoint, + rpc.WithHTTPAuth(node.NewJWTAuth(badSecret))) + require.Error(t, err, "websocket is immediate auth and should error with bad secret on dial") + + wsRPCClient, err := rpc.DialOptions(context.Background(), "ws://"+endpoint, + rpc.WithHTTPAuth(node.NewJWTAuth(secret))) + require.NoError(t, err) + t.Cleanup(wsRPCClient.Close) + + var res2 int + require.NoError(t, wsRPCClient.Call(&res2, "test_frobnicate", 3)) + require.Equal(t, 6, res2) + } + t.Run("regular", func(t *testing.T) { + testAuth(t, server.Endpoint()) + }) + // auth should be applied to all routes that serve RPC + t.Run("other", func(t *testing.T) { + testAuth(t, server.Endpoint()+"/other") + }) } diff --git a/op-service/rpc/stream_test.go b/op-service/rpc/stream_test.go index 919ee332f30b8..f1f8f9078d2b7 100644 --- a/op-service/rpc/stream_test.go +++ b/op-service/rpc/stream_test.go @@ -203,22 +203,22 @@ func TestStreamFallback(t *testing.T) { outOfEvents: make(chan struct{}, 100), } // Create an HTTP server, this won't support RPC subscriptions - server := NewServer( - "127.0.0.1", - 0, - appVersion, - WithLogger(logger), - WithAPIs([]rpc.API{ - { - Namespace: "custom", - Service: api, - }, - }), - ) + server := ServerFromConfig(&ServerConfig{ + Host: "127.0.0.1", + Port: 0, + AppVersion: appVersion, + RpcOptions: []Option{ + WithLogger(logger), + }, + }) + server.AddAPI(rpc.API{ + Namespace: "custom", + Service: api, + }) require.NoError(t, server.Start(), "must start") // Dial via HTTP, to ensure no subscription support - rpcClient, err := rpc.Dial(fmt.Sprintf("http://%s", server.endpoint)) + rpcClient, err := rpc.Dial(fmt.Sprintf("http://%s", server.Endpoint())) require.NoError(t, err) t.Cleanup(rpcClient.Close) diff --git a/op-service/txmgr/rpc_test.go b/op-service/txmgr/rpc_test.go index ed76d57cda1a8..8991f84f61bb0 100644 --- a/op-service/txmgr/rpc_test.go +++ b/op-service/txmgr/rpc_test.go @@ -31,10 +31,8 @@ func TestTxmgrRPC(t *testing.T) { "127.0.0.1", 0, appVersion, - oprpc.WithAPIs([]rpc.API{ - h.mgr.API(), - }), ) + server.AddAPI(h.mgr.API()) require.NoError(t, server.Start()) defer func() { _ = server.Stop()