From d9fa975a29a5c1a9415b86d3db38bbf6f3557a17 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 16 Nov 2022 14:19:31 -0500 Subject: [PATCH] Catch panic from Prometheus client on invalid label strings (#4051) Partially addresses #2944. Supersedes and closes #4050. This only solves the problem in httpmetrics package, we still have other scenarios where malformed strings can leak into Prometheus labels and cause panic, specifically the service-name-based metrics produced by the collector. So I am keeping the original issue open. Hopefully, we can resolve it when we migrate the metrics to OTEL, and allow factories to return errors to be handled gracefully. Signed-off-by: Yuri Shkuro Signed-off-by: shubbham1215 --- cmd/collector/app/server/http.go | 2 +- cmd/collector/app/server/zipkin.go | 2 +- pkg/httpmetrics/metrics.go | 63 +++++++++++++++++++++++------- pkg/httpmetrics/metrics_test.go | 37 +++++++++++++++++- 4 files changed, 86 insertions(+), 18 deletions(-) diff --git a/cmd/collector/app/server/http.go b/cmd/collector/app/server/http.go index 97024412e5b..6461ace05b2 100644 --- a/cmd/collector/app/server/http.go +++ b/cmd/collector/app/server/http.go @@ -89,7 +89,7 @@ func serveHTTP(server *http.Server, listener net.Listener, params *HTTPServerPar cfgHandler.RegisterRoutes(r) recoveryHandler := recoveryhandler.NewRecoveryHandler(params.Logger, true) - server.Handler = httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory) + server.Handler = httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory, params.Logger) go func() { var err error if params.TLSConfig.Enabled { diff --git a/cmd/collector/app/server/zipkin.go b/cmd/collector/app/server/zipkin.go index a4bf36d999d..27482aa0b84 100644 --- a/cmd/collector/app/server/zipkin.go +++ b/cmd/collector/app/server/zipkin.go @@ -93,7 +93,7 @@ func serveZipkin(server *http.Server, listener net.Listener, params *ZipkinServe }) recoveryHandler := recoveryhandler.NewRecoveryHandler(params.Logger, true) - server.Handler = cors.Handler(httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory)) + server.Handler = cors.Handler(httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory, params.Logger)) go func(listener net.Listener, server *http.Server) { var err error if params.TLSConfig.Enabled { diff --git a/pkg/httpmetrics/metrics.go b/pkg/httpmetrics/metrics.go index 1c13a16efd8..097c4bd1b68 100644 --- a/pkg/httpmetrics/metrics.go +++ b/pkg/httpmetrics/metrics.go @@ -20,9 +20,14 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/jaegertracing/jaeger/pkg/metrics" ) +// limit the size of cache for timers to avoid DDOS. +const maxEntries = 1000 + type statusRecorder struct { http.ResponseWriter status int @@ -44,8 +49,8 @@ func (r *statusRecorder) WriteHeader(status int) { // // Do not use with HTTP endpoints that take parameters from URL path, such as `/user/{user_id}`, // because they will result in high cardinality metrics. -func Wrap(h http.Handler, metricsFactory metrics.Factory) http.Handler { - timers := newRequestDurations(metricsFactory) +func Wrap(h http.Handler, metricsFactory metrics.Factory, logger *zap.Logger) http.Handler { + timers := newRequestDurations(metricsFactory, logger) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() recorder := &statusRecorder{ResponseWriter: w} @@ -76,39 +81,66 @@ type recordedRequest struct { } type requestDurations struct { - lock sync.RWMutex - metrics metrics.Factory - timers map[recordedRequestKey]metrics.Timer + lock sync.RWMutex + + metrics metrics.Factory + logger *zap.Logger + maxEntries int + + timers map[recordedRequestKey]metrics.Timer + fallback metrics.Timer } -func newRequestDurations(metricsFactory metrics.Factory) *requestDurations { - return &requestDurations{ - timers: make(map[recordedRequestKey]metrics.Timer), - metrics: metricsFactory, +func newRequestDurations(metricsFactory metrics.Factory, logger *zap.Logger) *requestDurations { + r := &requestDurations{ + timers: make(map[recordedRequestKey]metrics.Timer), + metrics: metricsFactory, + logger: logger, + maxEntries: maxEntries, } + r.fallback = r.getTimer(recordedRequestKey{ + method: "other", + path: "other", + status: "other", + }) + return r } func (r *requestDurations) record(request recordedRequest) { - cacheKey := request.key + timer := r.getTimer(request.key) + timer.Record(request.duration) +} +func (r *requestDurations) getTimer(cacheKey recordedRequestKey) metrics.Timer { r.lock.RLock() timer, ok := r.timers[cacheKey] + size := len(r.timers) r.lock.RUnlock() if !ok { + if size >= r.maxEntries { + return r.fallback + } r.lock.Lock() timer, ok = r.timers[cacheKey] if !ok { - timer = buildTimer(r.metrics, cacheKey) + timer = r.buildTimer(r.metrics, cacheKey) r.timers[cacheKey] = timer } r.lock.Unlock() } - - timer.Record(request.duration) + return timer } -func buildTimer(metricsFactory metrics.Factory, key recordedRequestKey) metrics.Timer { - return metricsFactory.Timer(metrics.TimerOptions{ +func (r *requestDurations) buildTimer(metricsFactory metrics.Factory, key recordedRequestKey) (out metrics.Timer) { + // deal with https://github.com/jaegertracing/jaeger/issues/2944 + defer func() { + if err := recover(); err != nil { + r.logger.Error("panic in metrics factory trying to create a timer", zap.Any("error", err)) + out = metrics.NullTimer + } + }() + + out = metricsFactory.Timer(metrics.TimerOptions{ Name: "http.request.duration", Help: "Duration of HTTP requests", Tags: map[string]string{ @@ -117,4 +149,5 @@ func buildTimer(metricsFactory metrics.Factory, key recordedRequestKey) metrics. "method": key.method, }, }) + return } diff --git a/pkg/httpmetrics/metrics_test.go b/pkg/httpmetrics/metrics_test.go index d0971246759..89daeb62c82 100644 --- a/pkg/httpmetrics/metrics_test.go +++ b/pkg/httpmetrics/metrics_test.go @@ -21,8 +21,11 @@ import ( "time" "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "github.com/jaegertracing/jaeger/internal/metrics/prometheus" "github.com/jaegertracing/jaeger/internal/metricstest" + "github.com/jaegertracing/jaeger/pkg/metrics" ) func TestNewMetricsHandler(t *testing.T) { @@ -33,7 +36,7 @@ func TestNewMetricsHandler(t *testing.T) { }) mb := metricstest.NewFactory(time.Hour) - handler := Wrap(dummyHandlerFunc, mb) + handler := Wrap(dummyHandlerFunc, mb, zap.NewNop()) req, err := http.NewRequest(http.MethodGet, "/subdir/qwerty", nil) assert.NoError(t, err) @@ -49,3 +52,35 @@ func TestNewMetricsHandler(t *testing.T) { assert.Fail(t, "gauge hasn't been updated within a reasonable amount of time") } + +func TestMaxEntries(t *testing.T) { + mf := metricstest.NewFactory(time.Hour) + r := newRequestDurations(mf, zap.NewNop()) + r.maxEntries = 1 + r.record(recordedRequest{ + key: recordedRequestKey{ + path: "/foo", + }, + duration: time.Millisecond, + }) + r.lock.RLock() + size := len(r.timers) + r.lock.RUnlock() + assert.Equal(t, 1, size) +} + +func TestIllegalPrometheusLabel(t *testing.T) { + dummyHandlerFunc := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + time.Sleep(time.Millisecond) + w.WriteHeader(http.StatusAccepted) + w.WriteHeader(http.StatusTeapot) // any subsequent statuses should be ignored + }) + + mf := prometheus.New().Namespace(metrics.NSOptions{}) + handler := Wrap(dummyHandlerFunc, mf, zap.NewNop()) + + invalidUtf8 := []byte{0xC0, 0xAE, 0xC0, 0xAE} + req, err := http.NewRequest(http.MethodGet, string(invalidUtf8), nil) + assert.NoError(t, err) + handler.ServeHTTP(httptest.NewRecorder(), req) +}