Skip to content

Commit

Permalink
Catch panic from Prometheus client on invalid label strings (jaegertr…
Browse files Browse the repository at this point in the history
…acing#4051)

Partially addresses jaegertracing#2944.
Supersedes and closes jaegertracing#4050.

This only solves the problem in httpmetrics package, we still have other
scenarios where malformed strings can leak into Prometheus labels and
cause panic, specifically the service-name-based metrics produced by the
collector. So I am keeping the original issue open. Hopefully, we can
resolve it when we migrate the metrics to OTEL, and allow factories to
return errors to be handled gracefully.

Signed-off-by: Yuri Shkuro <[email protected]>
Signed-off-by: shubbham1215 <[email protected]>
  • Loading branch information
yurishkuro authored and shubbham1215 committed Mar 5, 2023
1 parent 20f3e87 commit d9fa975
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 18 deletions.
2 changes: 1 addition & 1 deletion cmd/collector/app/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func serveHTTP(server *http.Server, listener net.Listener, params *HTTPServerPar
cfgHandler.RegisterRoutes(r)

recoveryHandler := recoveryhandler.NewRecoveryHandler(params.Logger, true)
server.Handler = httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory)
server.Handler = httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory, params.Logger)
go func() {
var err error
if params.TLSConfig.Enabled {
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/server/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func serveZipkin(server *http.Server, listener net.Listener, params *ZipkinServe
})

recoveryHandler := recoveryhandler.NewRecoveryHandler(params.Logger, true)
server.Handler = cors.Handler(httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory))
server.Handler = cors.Handler(httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory, params.Logger))
go func(listener net.Listener, server *http.Server) {
var err error
if params.TLSConfig.Enabled {
Expand Down
63 changes: 48 additions & 15 deletions pkg/httpmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ import (
"sync"
"time"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/metrics"
)

// limit the size of cache for timers to avoid DDOS.
const maxEntries = 1000

type statusRecorder struct {
http.ResponseWriter
status int
Expand All @@ -44,8 +49,8 @@ func (r *statusRecorder) WriteHeader(status int) {
//
// Do not use with HTTP endpoints that take parameters from URL path, such as `/user/{user_id}`,
// because they will result in high cardinality metrics.
func Wrap(h http.Handler, metricsFactory metrics.Factory) http.Handler {
timers := newRequestDurations(metricsFactory)
func Wrap(h http.Handler, metricsFactory metrics.Factory, logger *zap.Logger) http.Handler {
timers := newRequestDurations(metricsFactory, logger)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
recorder := &statusRecorder{ResponseWriter: w}
Expand Down Expand Up @@ -76,39 +81,66 @@ type recordedRequest struct {
}

type requestDurations struct {
lock sync.RWMutex
metrics metrics.Factory
timers map[recordedRequestKey]metrics.Timer
lock sync.RWMutex

metrics metrics.Factory
logger *zap.Logger
maxEntries int

timers map[recordedRequestKey]metrics.Timer
fallback metrics.Timer
}

func newRequestDurations(metricsFactory metrics.Factory) *requestDurations {
return &requestDurations{
timers: make(map[recordedRequestKey]metrics.Timer),
metrics: metricsFactory,
func newRequestDurations(metricsFactory metrics.Factory, logger *zap.Logger) *requestDurations {
r := &requestDurations{
timers: make(map[recordedRequestKey]metrics.Timer),
metrics: metricsFactory,
logger: logger,
maxEntries: maxEntries,
}
r.fallback = r.getTimer(recordedRequestKey{
method: "other",
path: "other",
status: "other",
})
return r
}

func (r *requestDurations) record(request recordedRequest) {
cacheKey := request.key
timer := r.getTimer(request.key)
timer.Record(request.duration)
}

func (r *requestDurations) getTimer(cacheKey recordedRequestKey) metrics.Timer {
r.lock.RLock()
timer, ok := r.timers[cacheKey]
size := len(r.timers)
r.lock.RUnlock()
if !ok {
if size >= r.maxEntries {
return r.fallback
}
r.lock.Lock()
timer, ok = r.timers[cacheKey]
if !ok {
timer = buildTimer(r.metrics, cacheKey)
timer = r.buildTimer(r.metrics, cacheKey)
r.timers[cacheKey] = timer
}
r.lock.Unlock()
}

timer.Record(request.duration)
return timer
}

func buildTimer(metricsFactory metrics.Factory, key recordedRequestKey) metrics.Timer {
return metricsFactory.Timer(metrics.TimerOptions{
func (r *requestDurations) buildTimer(metricsFactory metrics.Factory, key recordedRequestKey) (out metrics.Timer) {
// deal with https://github.com/jaegertracing/jaeger/issues/2944
defer func() {
if err := recover(); err != nil {
r.logger.Error("panic in metrics factory trying to create a timer", zap.Any("error", err))
out = metrics.NullTimer
}
}()

out = metricsFactory.Timer(metrics.TimerOptions{
Name: "http.request.duration",
Help: "Duration of HTTP requests",
Tags: map[string]string{
Expand All @@ -117,4 +149,5 @@ func buildTimer(metricsFactory metrics.Factory, key recordedRequestKey) metrics.
"method": key.method,
},
})
return
}
37 changes: 36 additions & 1 deletion pkg/httpmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/metrics/prometheus"
"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/pkg/metrics"
)

func TestNewMetricsHandler(t *testing.T) {
Expand All @@ -33,7 +36,7 @@ func TestNewMetricsHandler(t *testing.T) {
})

mb := metricstest.NewFactory(time.Hour)
handler := Wrap(dummyHandlerFunc, mb)
handler := Wrap(dummyHandlerFunc, mb, zap.NewNop())

req, err := http.NewRequest(http.MethodGet, "/subdir/qwerty", nil)
assert.NoError(t, err)
Expand All @@ -49,3 +52,35 @@ func TestNewMetricsHandler(t *testing.T) {

assert.Fail(t, "gauge hasn't been updated within a reasonable amount of time")
}

func TestMaxEntries(t *testing.T) {
mf := metricstest.NewFactory(time.Hour)
r := newRequestDurations(mf, zap.NewNop())
r.maxEntries = 1
r.record(recordedRequest{
key: recordedRequestKey{
path: "/foo",
},
duration: time.Millisecond,
})
r.lock.RLock()
size := len(r.timers)
r.lock.RUnlock()
assert.Equal(t, 1, size)
}

func TestIllegalPrometheusLabel(t *testing.T) {
dummyHandlerFunc := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
time.Sleep(time.Millisecond)
w.WriteHeader(http.StatusAccepted)
w.WriteHeader(http.StatusTeapot) // any subsequent statuses should be ignored
})

mf := prometheus.New().Namespace(metrics.NSOptions{})
handler := Wrap(dummyHandlerFunc, mf, zap.NewNop())

invalidUtf8 := []byte{0xC0, 0xAE, 0xC0, 0xAE}
req, err := http.NewRequest(http.MethodGet, string(invalidUtf8), nil)
assert.NoError(t, err)
handler.ServeHTTP(httptest.NewRecorder(), req)
}

0 comments on commit d9fa975

Please sign in to comment.