Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/observability/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
// PropagationContext contains tracing information to be passed across service boundaries
type PropagationContext map[string]string

// TraceParent is the name of the header or query parameter that contains
// tracing context across service boundaries.
const TraceParent = "traceparent"

// PropagationContextFromContext creates a PropagationContext from the given context.Context. If the context
// does not contain any tracing information, the PropagationContext will be empty.
func PropagationContextFromContext(ctx context.Context, opts ...Option) PropagationContext {
Expand Down
22 changes: 21 additions & 1 deletion lib/httplib/httplib.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,27 @@ func MakeHandler(fn HandlerFunc) httprouter.Handle {
// MakeTracingHandler returns a new httprouter.Handle func that wraps the provided handler func
// with one that will add a tracing span for each request.
func MakeTracingHandler(h http.Handler, component string) http.Handler {
return otelhttp.NewHandler(h, component, otelhttp.WithSpanNameFormatter(tracing.HTTPHandlerFormatter))
// Wrap the provided handler with one that will inject
// any propagated tracing context provided via a query parameter
// if there isn't already a header containing tracing context.
// This is required for scenarios using web sockets as headers
// cannot be modified to inject the tracing context.
handler := func(w http.ResponseWriter, r *http.Request) {
// ensure headers have priority over query parameters
if r.Header.Get(tracing.TraceParent) != "" {
h.ServeHTTP(w, r)
return
}

traceParent := r.URL.Query()[tracing.TraceParent]
if len(traceParent) > 0 {
r.Header.Add(tracing.TraceParent, traceParent[0])
}

h.ServeHTTP(w, r)
}

return otelhttp.NewHandler(http.HandlerFunc(handler), component, otelhttp.WithSpanNameFormatter(tracing.HTTPHandlerFormatter))
}

// MakeHandlerWithErrorWriter returns a httprouter.Handle from the HandlerFunc,
Expand Down
77 changes: 77 additions & 0 deletions lib/httplib/httplib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"github.com/gravitational/roundtrip"
"github.com/julienschmidt/httprouter"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/observability/tracing"
)

type netError struct{}
Expand Down Expand Up @@ -186,3 +189,77 @@ func TestReadJSON_ContentType(t *testing.T) {
})
}
}

func TestMakeTracingHandler(t *testing.T) {
t.Parallel()

newRequest := func(t *testing.T) *http.Request {
req, err := http.NewRequest(http.MethodGet, "", nil)
require.NoError(t, err)

return req
}

cases := []struct {
name string
req func(t *testing.T) *http.Request
headerAssertion func(t *testing.T, req *http.Request)
}{
{
name: "no tracing context provided",
req: newRequest,
headerAssertion: func(t *testing.T, req *http.Request) {
require.Empty(t, req.Header.Get(tracing.TraceParent))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
require.Empty(t, req.Header.Get(tracing.TraceParent))
require.Empty(t, req.Header.Get(tracing.TraceParent), "TraceParent header mismatch")

Same for others.

},
},
{
name: "tracing context provided via header",
req: func(t *testing.T) *http.Request {
req := newRequest(t)
req.Header.Add(tracing.TraceParent, "test")
return req
},
headerAssertion: func(t *testing.T, req *http.Request) {
require.Equal(t, "test", req.Header.Get(tracing.TraceParent))
},
},
{
name: "tracing context provided via parameter",
req: func(t *testing.T) *http.Request {
req := newRequest(t)
q := req.URL.Query()
q.Set(tracing.TraceParent, "test")
req.URL.RawQuery = q.Encode()
return req
},
headerAssertion: func(t *testing.T, req *http.Request) {
require.Equal(t, "test", req.Header.Get(tracing.TraceParent))
},
},
{
name: "header has priority",
req: func(t *testing.T) *http.Request {
req := newRequest(t)
q := req.URL.Query()
req.Header.Add(tracing.TraceParent, "header")
q.Set(tracing.TraceParent, "parameter")
req.URL.RawQuery = q.Encode()
return req
},
headerAssertion: func(t *testing.T, req *http.Request) {
require.Equal(t, "header", req.Header.Get(tracing.TraceParent))
},
},
}

for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
handler := MakeTracingHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tt.headerAssertion(t, r)
}), teleport.ComponentProxy)

handler.ServeHTTP(httptest.NewRecorder(), tt.req(t))
})
}

}
17 changes: 17 additions & 0 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3477,6 +3477,22 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
proxyKubeAddr = cfg.Proxy.Kube.PublicAddrs[0]
}

traceClt := tracing.NewNoopClient()
if cfg.Tracing.Enabled {
traceConf, err := process.Config.Tracing.Config()
if err != nil {
return trace.Wrap(err)
}
traceConf.Logger = process.log.WithField(trace.Component, teleport.ComponentTracing)

clt, err := tracing.NewStartedClient(process.ExitContext(), *traceConf)
if err != nil {
return trace.Wrap(err)
}

traceClt = clt
}

webConfig := web.Config{
Proxy: tsrv,
AuthServers: cfg.AuthServerAddresses()[0],
Expand All @@ -3498,6 +3514,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
PublicProxyAddr: process.proxyPublicAddr().Addr,
ALPNHandler: alpnHandlerForWeb.HandleConnection,
ProxyKubeAddr: proxyKubeAddr,
TraceClient: traceClt,
}
webHandler, err = web.NewHandler(webConfig)
if err != nil {
Expand Down
82 changes: 78 additions & 4 deletions lib/web/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@ import (
"github.com/julienschmidt/httprouter"
lemma_secret "github.com/mailgun/lemma/secret"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
oteltrace "go.opentelemetry.io/otel/trace"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
"golang.org/x/crypto/ssh"
"golang.org/x/exp/slices"
"golang.org/x/mod/semver"
"google.golang.org/protobuf/encoding/protojson"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/client/webclient"
"github.com/gravitational/teleport/api/constants"
apidefaults "github.com/gravitational/teleport/api/defaults"
apitracing "github.com/gravitational/teleport/api/observability/tracing"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/api/types/installers"
Expand All @@ -65,7 +70,6 @@ import (
"github.com/gravitational/teleport/lib/httplib/csrf"
"github.com/gravitational/teleport/lib/jwt"
"github.com/gravitational/teleport/lib/limiter"
"github.com/gravitational/teleport/lib/observability/tracing"
"github.com/gravitational/teleport/lib/plugin"
"github.com/gravitational/teleport/lib/reversetunnel"
"github.com/gravitational/teleport/lib/secret"
Expand Down Expand Up @@ -206,6 +210,9 @@ type Config struct {
// ALPNHandler is the ALPN connection handler for handling upgraded ALPN
// connection through a HTTP upgrade call.
ALPNHandler ConnectionHandler

// TraceClient is used to forward spans to the upstream collector for the UI
TraceClient otlptrace.Client
}

type APIHandler struct {
Expand Down Expand Up @@ -481,6 +488,9 @@ func (h *Handler) bindDefaultEndpoints(challengeLimiter *limiter.RateLimiter) {
// Migrated this endpoint to /webapi/sessions/web below.
h.POST("/webapi/sessions", httplib.WithCSRFProtection(h.createWebSession))

// Forwards traces to the configured upstream collector
h.POST("/webapi/traces", h.WithAuth(h.traces))

// Web sessions
h.POST("/webapi/sessions/web", httplib.WithCSRFProtection(h.createWebSession))
h.POST("/webapi/sessions/app", h.WithAuth(h.createAppSession))
Expand Down Expand Up @@ -911,6 +921,70 @@ func getAuthSettings(ctx context.Context, authClient auth.ClientI) (webclient.Au
return as, nil
}

// traces forwards spans from the web ui to the upstream collector configured for the proxy. If tracing is
// disabled then the forwarding is a noop.
func (h *Handler) traces(w http.ResponseWriter, r *http.Request, _ httprouter.Params, _ *SessionContext) (interface{}, error) {
body, err := io.ReadAll(io.LimitReader(r.Body, teleport.MaxHTTPRequestSize))
if err != nil {
h.log.WithError(err).Error("Failed to read traces request")
w.WriteHeader(http.StatusBadRequest)
return nil, nil
}

if err := r.Body.Close(); err != nil {
h.log.WithError(err).Warn("Failed to close traces request body")
}

var data tracepb.TracesData
if err := protojson.Unmarshal(body, &data); err != nil {
h.log.WithError(err).Error("Failed to unmarshal traces request")
w.WriteHeader(http.StatusBadRequest)
return nil, nil
}

if len(data.ResourceSpans) == 0 {
w.WriteHeader(http.StatusBadRequest)
return nil, nil
}

// Unmarshalling of TraceId, SpanId, and ParentSpanId might all yield incorrect values. The raw values from
// OpenTelemetry-js are hex encoded, but the unmarshal call above will decode them as base64.
// In order to ensure the ids are in the right format and won't be rejected by the upstream collector
// we attempt to convert them back into the base64 and then hex decode them.
for _, resourceSpan := range data.ResourceSpans {
for _, scopeSpan := range resourceSpan.ScopeSpans {
for _, span := range scopeSpan.Spans {

// attempt to convert the trace id to the right format
if tid, err := oteltrace.TraceIDFromHex(base64.StdEncoding.EncodeToString(span.TraceId)); err == nil {
span.TraceId = tid[:]
}

// attempt to convert the span id to the right format
if sid, err := oteltrace.SpanIDFromHex(base64.StdEncoding.EncodeToString(span.SpanId)); err == nil {
span.SpanId = sid[:]
}

// attempt to convert the parent span id to the right format
if len(span.ParentSpanId) > 0 {
if psid, err := oteltrace.SpanIDFromHex(base64.StdEncoding.EncodeToString(span.ParentSpanId)); err == nil {
span.ParentSpanId = psid[:]
}
}
}
}
}

go func() {
if err := h.cfg.TraceClient.UploadTraces(r.Context(), data.ResourceSpans); err != nil {
h.log.WithError(err).Error("Failed to upload traces")
}
}()

w.WriteHeader(http.StatusOK)
return nil, nil
}

func (h *Handler) ping(w http.ResponseWriter, r *http.Request, p httprouter.Params) (interface{}, error) {
var err error
authSettings, err := getAuthSettings(r.Context(), h.cfg.ProxyClient)
Expand Down Expand Up @@ -2151,7 +2225,7 @@ func (h *Handler) siteNodeConnect(
return nil, trace.Wrap(err)
}

netConfig, err := authAccessPoint.GetClusterNetworkingConfig(h.cfg.Context)
netConfig, err := authAccessPoint.GetClusterNetworkingConfig(r.Context())
if err != nil {
h.log.WithError(err).Debug("Unable to fetch cluster networking config.")
return nil, trace.Wrap(err)
Expand All @@ -2175,7 +2249,7 @@ func (h *Handler) siteNodeConnect(

// start the websocket session with a web-based terminal:
h.log.Infof("Getting terminal to %#v.", req)
term.Serve(w, r)
httplib.MakeTracingHandler(term, teleport.ComponentProxy).ServeHTTP(w, r)

return nil, nil
}
Expand Down Expand Up @@ -3106,7 +3180,7 @@ func makeTeleportClientConfig(ctx context.Context, sesCtx *SessionContext) (*cli
DefaultPrincipal: cert.ValidPrincipals[0],
HostKeyCallback: callback,
TLSRoutingEnabled: proxyListenerMode == types.ProxyListenerMode_Multiplex,
Tracer: tracing.NoopProvider().Tracer("test"),
Tracer: apitracing.DefaultProvider().Tracer("webterminal"),
}

return config, nil
Expand Down
Loading