diff --git a/api/breaker/round_tripper.go b/api/breaker/round_tripper.go index 7b741eb003d39..5eec8c61b1520 100644 --- a/api/breaker/round_tripper.go +++ b/api/breaker/round_tripper.go @@ -32,6 +32,17 @@ func NewRoundTripper(cb *CircuitBreaker, tripper http.RoundTripper) *RoundTrippe } } +// CloseIdleConnections ensures idle connections of the wrapped +// [http.RoundTripper] are closed. +func (t *RoundTripper) CloseIdleConnections() { + type closeIdler interface { + CloseIdleConnections() + } + if tr, ok := t.tripper.(closeIdler); ok { + tr.CloseIdleConnections() + } +} + // RoundTrip forwards the request on to the provided http.RoundTripper if // the CircuitBreaker allows it func (t *RoundTripper) RoundTrip(request *http.Request) (*http.Response, error) { diff --git a/api/client/proxy/proxy.go b/api/client/proxy/proxy.go index c1e3da4322c46..b7f85b682274c 100644 --- a/api/client/proxy/proxy.go +++ b/api/client/proxy/proxy.go @@ -92,6 +92,16 @@ func NewHTTPRoundTripper(transport *http.Transport, extraHeaders map[string]stri } } +// CloseIdleConnections forwards closing of idle connections on to the wrapped +// transport. This is required to ensure that the underlying [http.Transport] has +// its idle connections closed per the [http.Client] docs: +// +// > If the Client's Transport does not have a CloseIdleConnections method +// > then this method does nothing. +func (rt *HTTPRoundTripper) CloseIdleConnections() { + rt.Transport.CloseIdleConnections() +} + // RoundTrip executes a single HTTP transaction. Part of the RoundTripper interface. func (rt *HTTPRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { // Add extra HTTP headers. diff --git a/api/client/webclient/webclient.go b/api/client/webclient/webclient.go index 505c8648b3372..ae7341369b923 100644 --- a/api/client/webclient/webclient.go +++ b/api/client/webclient/webclient.go @@ -34,7 +34,6 @@ import ( "github.com/gravitational/trace" log "github.com/sirupsen/logrus" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" oteltrace "go.opentelemetry.io/otel/trace" "golang.org/x/net/http/httpproxy" @@ -42,6 +41,7 @@ import ( "github.com/gravitational/teleport/api/constants" "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/observability/tracing" + tracehttp "github.com/gravitational/teleport/api/observability/tracing/http" "github.com/gravitational/teleport/api/utils" "github.com/gravitational/teleport/api/utils/keys" ) @@ -87,12 +87,13 @@ func (c *Config) CheckAndSetDefaults() error { return nil } -// newWebClient creates a new client to the HTTPS web proxy. +// newWebClient creates a new client to the Proxy Web API. func newWebClient(cfg *Config) (*http.Client, error) { if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - transport := http.Transport{ + + rt := proxy.NewHTTPRoundTripper(&http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: cfg.Insecure, RootCAs: cfg.Pool, @@ -100,13 +101,11 @@ func newWebClient(cfg *Config) (*http.Client, error) { Proxy: func(req *http.Request) (*url.URL, error) { return httpproxy.FromEnvironment().ProxyFunc()(req.URL) }, - } + }, nil) + return &http.Client{ - Transport: otelhttp.NewTransport( - proxy.NewHTTPRoundTripper(&transport, nil), - otelhttp.WithSpanNameFormatter(tracing.HTTPTransportFormatter), - ), - Timeout: cfg.Timeout, + Transport: tracehttp.NewTransport(rt), + Timeout: cfg.Timeout, }, nil } diff --git a/api/client/webclient/webclient_test.go b/api/client/webclient/webclient_test.go index 7ef45d1292f4d..45d34a4144b01 100644 --- a/api/client/webclient/webclient_test.go +++ b/api/client/webclient/webclient_test.go @@ -22,8 +22,11 @@ import ( "net" "net/http" "net/http/httptest" + "strings" "testing" + "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "github.com/gravitational/teleport/api/defaults" @@ -442,3 +445,57 @@ func TestSSHProxyHostPort(t *testing.T) { }) } } + +// TestWebClientClosesIdleConnections verifies that all http connections +// are closed when the http.Client created by newWebClient is no longer +// being used. +func TestWebClientClosesIdleConnections(t *testing.T) { + expectedResponse := &PingResponse{ + Proxy: ProxySettings{ + TLSRoutingEnabled: true, + }, + ServerVersion: "1.2.3", + MinClientVersion: "0.1.2", + ClusterName: "test", + } + + expectedStates := []http.ConnState{ + http.StateNew, http.StateActive, http.StateClosed, // the https request will fail and cause us to fallback to http + http.StateNew, http.StateActive, http.StateIdle, http.StateClosed, // the http request should be processed and closed + } + + srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/webapi/find": + json.NewEncoder(w).Encode(expectedResponse) + default: + w.WriteHeader(http.StatusBadRequest) + } + })) + + stateChange := make(chan http.ConnState, len(expectedStates)) + srv.Config.ConnState = func(conn net.Conn, state http.ConnState) { + stateChange <- state + } + + srv.Start() + t.Cleanup(srv.Close) + + resp, err := Find(&Config{ + Context: context.Background(), + ProxyAddr: strings.TrimPrefix(srv.URL, "http://"), + Insecure: true, + }) + require.NoError(t, err) + require.Empty(t, cmp.Diff(expectedResponse, resp)) + + for _, expected := range expectedStates { + select { + case state := <-stateChange: + require.Equal(t, expected, state, "expected connection state %s got %s", expected.String(), state.String()) + case <-time.After(3 * time.Second): + t.Fatalf("timeout waiting for expected connection state %s", expected.String()) + } + } + +} diff --git a/api/observability/tracing/http/http.go b/api/observability/tracing/http/http.go new file mode 100644 index 0000000000000..8d8eeca7bfe12 --- /dev/null +++ b/api/observability/tracing/http/http.go @@ -0,0 +1,81 @@ +// Copyright 2023 Gravitational, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + nethttp "net/http" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +// TransportFormatter is a span formatter that may be provided to +// [otelhttp.WithSpanNameFormatter] to include the url path in the span +// names generated by an [otelhttp.Transport]. +func TransportFormatter(_ string, r *nethttp.Request) string { + return "HTTP " + r.Method + " " + r.URL.Path +} + +// HandlerFormatter is a span formatter that may be provided to +// [otelhttp.WithSpanNameFormatter] to include the component and url path in the span +// names generated by [otelhttp.NewHandler]. +func HandlerFormatter(operation string, r *nethttp.Request) string { + return operation + " " + r.Method + " " + r.URL.Path +} + +// NewTransport wraps the provided [nethttp.RoundTripper] with one +// that automatically adds spans for each http request. +// +// Note: special care has been taken to ensure that the returned +// [nethttp.RoundTripper] has a CloseIdleConnections method because +// the [otelhttp.Transport] does not implement it: +// https://github.com/open-telemetry/opentelemetry-go-contrib/issues/3543. +// Once the issue is resolved the wrapper may be discarded. +func NewTransport(rt nethttp.RoundTripper) nethttp.RoundTripper { + return enforceCloseIdleConnections( + otelhttp.NewTransport(rt, + otelhttp.WithSpanNameFormatter(TransportFormatter), + ), rt) +} + +// enforceCloseIdleConnections ensures that the returned [nethttp.RoundTripper] +// has a CloseIdleConnections method. Since [otelhttp.Transport] does not implement +// this any [nethttp.Client.CloseIdleConnections] calls result in a noop instead +// of forwarding the request onto its wrapped [nethttp.RoundTripper]. +// +// DELETE WHEN https://github.com/open-telemetry/opentelemetry-go-contrib/issues/3543 +// has been resolved. +func enforceCloseIdleConnections(wrapper, wrapped nethttp.RoundTripper) nethttp.RoundTripper { + type closeIdler interface { + CloseIdleConnections() + } + + type unwrapper struct { + nethttp.RoundTripper + closeIdler + } + + if _, ok := wrapper.(closeIdler); ok { + return wrapper + } + + if c, ok := wrapped.(closeIdler); ok { + return &unwrapper{ + RoundTripper: wrapper, + closeIdler: c, + } + } + + return wrapper +} diff --git a/api/observability/tracing/tracing.go b/api/observability/tracing/tracing.go index c5613b3f95b69..6e6f848404135 100644 --- a/api/observability/tracing/tracing.go +++ b/api/observability/tracing/tracing.go @@ -16,7 +16,6 @@ package tracing import ( "context" - "net/http" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" @@ -48,17 +47,3 @@ func WithPropagationContext(ctx context.Context, pc PropagationContext, opts ... func DefaultProvider() oteltrace.TracerProvider { return otel.GetTracerProvider() } - -// HTTPTransportFormatter is a span formatter that may be provided to -// otelhttp.WithSpanNameFormatter to include the url path in the span -// names generated by an otelhttp.Transport -func HTTPTransportFormatter(_ string, r *http.Request) string { - return "HTTP " + r.Method + " " + r.URL.Path -} - -// HTTPHandlerFormatter is a span formatter that may be provided to -// otelhttp.WithSpanNameFormatter to include the component and url path in the span -// names generated by otelhttp.NewHandler -func HTTPHandlerFormatter(operation string, r *http.Request) string { - return operation + " " + r.Method + " " + r.URL.Path -} diff --git a/lib/auth/clt.go b/lib/auth/clt.go index 0c6abd57fac3e..d7c2b08178653 100644 --- a/lib/auth/clt.go +++ b/lib/auth/clt.go @@ -30,7 +30,6 @@ import ( "github.com/gravitational/roundtrip" "github.com/gravitational/trace" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/breaker" @@ -41,7 +40,7 @@ import ( devicepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/devicetrust/v1" loginrulepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/loginrule/v1" samlidppb "github.com/gravitational/teleport/api/gen/proto/go/teleport/samlidp/v1" - "github.com/gravitational/teleport/api/observability/tracing" + tracehttp "github.com/gravitational/teleport/api/observability/tracing/http" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/defaults" @@ -198,11 +197,8 @@ func NewHTTPClient(cfg client.Config, tls *tls.Config, params ...roundtrip.Clien clientParams := append( []roundtrip.ClientParam{ roundtrip.HTTPClient(&http.Client{ - Timeout: defaults.HTTPRequestTimeout, - Transport: otelhttp.NewTransport( - breaker.NewRoundTripper(cb, transport), - otelhttp.WithSpanNameFormatter(tracing.HTTPTransportFormatter), - ), + Timeout: defaults.HTTPRequestTimeout, + Transport: tracehttp.NewTransport(breaker.NewRoundTripper(cb, transport)), }), roundtrip.SanitizerEnabled(true), }, diff --git a/lib/auth/trustedcluster.go b/lib/auth/trustedcluster.go index 232202c170dfb..8793a5edcd003 100644 --- a/lib/auth/trustedcluster.go +++ b/lib/auth/trustedcluster.go @@ -27,11 +27,10 @@ import ( "github.com/google/go-cmp/cmp" "github.com/gravitational/roundtrip" "github.com/gravitational/trace" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/constants" - "github.com/gravitational/teleport/api/observability/tracing" + tracehttp "github.com/gravitational/teleport/api/observability/tracing/http" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib" @@ -645,7 +644,7 @@ func (a *Server) sendValidateRequestToProxy(host string, validateRequest *Valida tr.TLSClientConfig = tlsConfig insecureWebClient := &http.Client{ - Transport: otelhttp.NewTransport(tr, otelhttp.WithSpanNameFormatter(tracing.HTTPTransportFormatter)), + Transport: tracehttp.NewTransport(tr), } opts = append(opts, roundtrip.HTTPClient(insecureWebClient)) } diff --git a/lib/client/https_client.go b/lib/client/https_client.go index 38d9b4d80339c..c25164581ad99 100644 --- a/lib/client/https_client.go +++ b/lib/client/https_client.go @@ -25,12 +25,11 @@ import ( "github.com/gravitational/roundtrip" "github.com/gravitational/trace" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "golang.org/x/net/http/httpproxy" "github.com/gravitational/teleport" apiproxy "github.com/gravitational/teleport/api/client/proxy" - "github.com/gravitational/teleport/api/observability/tracing" + tracehttp "github.com/gravitational/teleport/api/observability/tracing/http" apiutils "github.com/gravitational/teleport/api/utils" "github.com/gravitational/teleport/lib/httplib" "github.com/gravitational/teleport/lib/utils" @@ -42,10 +41,7 @@ func NewInsecureWebClient() *http.Client { func newClient(insecure bool, pool *x509.CertPool, extraHeaders map[string]string) *http.Client { return &http.Client{ - Transport: otelhttp.NewTransport( - apiproxy.NewHTTPRoundTripper(httpTransport(insecure, pool), extraHeaders), - otelhttp.WithSpanNameFormatter(tracing.HTTPTransportFormatter), - ), + Transport: tracehttp.NewTransport(apiproxy.NewHTTPRoundTripper(httpTransport(insecure, pool), extraHeaders)), } } diff --git a/lib/httplib/httplib.go b/lib/httplib/httplib.go index 713243f8e8c2d..1e26d590df118 100644 --- a/lib/httplib/httplib.go +++ b/lib/httplib/httplib.go @@ -36,6 +36,7 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/observability/tracing" + tracehttp "github.com/gravitational/teleport/api/observability/tracing/http" "github.com/gravitational/teleport/lib/httplib/csrf" "github.com/gravitational/teleport/lib/utils" ) @@ -93,7 +94,7 @@ func MakeTracingHandler(h http.Handler, component string) http.Handler { h.ServeHTTP(w, r) } - return otelhttp.NewHandler(http.HandlerFunc(handler), component, otelhttp.WithSpanNameFormatter(tracing.HTTPHandlerFormatter)) + return otelhttp.NewHandler(http.HandlerFunc(handler), component, otelhttp.WithSpanNameFormatter(tracehttp.HandlerFormatter)) } // MakeHandlerWithErrorWriter returns a httprouter.Handle from the HandlerFunc, diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index e1b849e652536..15de36c5b50ec 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -46,7 +46,6 @@ import ( "github.com/jonboulle/clockwork" "github.com/julienschmidt/httprouter" "github.com/sirupsen/logrus" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "golang.org/x/crypto/ssh" "golang.org/x/exp/slices" "golang.org/x/net/http2" @@ -65,7 +64,7 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/constants" apidefaults "github.com/gravitational/teleport/api/defaults" - "github.com/gravitational/teleport/api/observability/tracing" + tracehttp "github.com/gravitational/teleport/api/observability/tracing/http" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" apiutils "github.com/gravitational/teleport/api/utils" @@ -1888,6 +1887,8 @@ func (f *Forwarder) getExecutor(ctx authContext, sess *clusterSession, req *http return nil, trace.Wrap(err) } } + rt = tracehttp.NewTransport(rt) + return remotecommand.NewSPDYExecutorForTransports(rt, upgradeRoundTripper, req.Method, req.URL) } @@ -1909,7 +1910,7 @@ func (f *Forwarder) getDialer(ctx authContext, sess *clusterSession, req *http.R } } client := &http.Client{ - Transport: otelhttp.NewTransport(rt, otelhttp.WithSpanNameFormatter(tracing.HTTPTransportFormatter)), + Transport: tracehttp.NewTransport(rt), } return spdy.NewDialer(upgradeRoundTripper, client, req.Method, req.URL), nil @@ -2165,6 +2166,8 @@ func (f *Forwarder) makeSessionForwarder(sess *clusterSession) (*forward.Forward } } + rt = tracehttp.NewTransport(rt) + forwarder, err := forward.New( forward.FlushInterval(100*time.Millisecond), forward.RoundTripper(rt), diff --git a/lib/kube/proxy/kube_creds.go b/lib/kube/proxy/kube_creds.go index 46b8da5864c32..ead415ad8feaa 100644 --- a/lib/kube/proxy/kube_creds.go +++ b/lib/kube/proxy/kube_creds.go @@ -88,7 +88,41 @@ func (s *staticKubeCreds) wrapTransport(rt http.RoundTripper) (http.RoundTripper if s == nil { return rt, nil } - return transport.HTTPWrappersForConfig(s.transportConfig, rt) + + wrapped, err := transport.HTTPWrappersForConfig(s.transportConfig, rt) + if err != nil { + return nil, trace.Wrap(err) + } + + return enforceCloseIdleConnections(wrapped, rt), nil +} + +// enforceCloseIdleConnections ensures that the returned [http.RoundTripper] +// has a CloseIdleConnections method. [transport.HTTPWrappersForConfig] returns +// a [http.RoundTripper] that does not implement it so any calls to [http.Client.CloseIdleConnections] +// will result in a noop instead of forwarding the request onto its wrapped [http.RoundTripper]. +func enforceCloseIdleConnections(wrapper, wrapped http.RoundTripper) http.RoundTripper { + type closeIdler interface { + CloseIdleConnections() + } + + type unwrapper struct { + http.RoundTripper + closeIdler + } + + if _, ok := wrapper.(closeIdler); ok { + return wrapper + } + + if c, ok := wrapped.(closeIdler); ok { + return &unwrapper{ + RoundTripper: wrapper, + closeIdler: c, + } + } + + return wrapper } func (s *staticKubeCreds) close() error { diff --git a/tool/tsh/resolve_default_addr.go b/tool/tsh/resolve_default_addr.go index 23fc21bf3203a..82ebfea53de32 100644 --- a/tool/tsh/resolve_default_addr.go +++ b/tool/tsh/resolve_default_addr.go @@ -29,10 +29,9 @@ import ( "time" "github.com/gravitational/trace" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "github.com/gravitational/teleport/api/client/proxy" - "github.com/gravitational/teleport/api/observability/tracing" + tracehttp "github.com/gravitational/teleport/api/observability/tracing/http" ) type raceResult struct { @@ -116,7 +115,7 @@ func pickDefaultAddr(ctx context.Context, insecure bool, host string, ports []in } httpClient := &http.Client{ - Transport: otelhttp.NewTransport( + Transport: tracehttp.NewTransport( &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: insecure, @@ -125,7 +124,6 @@ func pickDefaultAddr(ctx context.Context, insecure bool, host string, ports []in return proxy.GetProxyURL(req.URL.String()), nil }, }, - otelhttp.WithSpanNameFormatter(tracing.HTTPTransportFormatter), ), }