Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
* [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727
Expand Down
55 changes: 41 additions & 14 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"os"
"path/filepath"
"runtime"
"slices"
"strings"
"sync"
Expand Down Expand Up @@ -1637,8 +1638,9 @@ func (u *userTSDB) releaseAppendLock() {
}

// QueryExemplars implements service.IngesterServer
func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) {
if err := i.checkRunning(); err != nil {
func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (resp *client.ExemplarQueryResponse, err error) {
defer recoverIngester(i.logger, &err)
if err = i.checkRunning(); err != nil {
return nil, err
}

Expand All @@ -1659,7 +1661,7 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
return &client.ExemplarQueryResponse{}, nil
}

if err := db.acquireReadLock(); err != nil {
if err = db.acquireReadLock(); err != nil {
return &client.ExemplarQueryResponse{}, nil
}
defer db.releaseReadLock()
Expand Down Expand Up @@ -1701,14 +1703,16 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
}

// LabelValues returns all label values that are associated with a given label name.
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (resp *client.LabelValuesResponse, err error) {
defer recoverIngester(i.logger, &err)
resp, cleanup, err := i.labelsValuesCommon(ctx, req)
defer cleanup()
return resp, err
}

// LabelValuesStream returns all label values that are associated with a given label name.
func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) error {
func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) (err error) {
defer recoverIngester(i.logger, &err)
resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req)
defer cleanup()

Expand Down Expand Up @@ -1796,14 +1800,16 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
}

// LabelNames return all the label names.
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (resp *client.LabelNamesResponse, err error) {
defer recoverIngester(i.logger, &err)
resp, cleanup, err := i.labelNamesCommon(ctx, req)
defer cleanup()
return resp, err
}

// LabelNamesStream return all the label names.
func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) error {
func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) (err error) {
defer recoverIngester(i.logger, &err)
resp, cleanup, err := i.labelNamesCommon(stream.Context(), req)
defer cleanup()

Expand All @@ -1819,7 +1825,7 @@ func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client
resp := &client.LabelNamesStreamResponse{
LabelNames: resp.LabelNames[i:j],
}
err := client.SendLabelNamesStream(stream, resp)
err = client.SendLabelNamesStream(stream, resp)
if err != nil {
return err
}
Expand Down Expand Up @@ -1891,8 +1897,9 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
}

// MetricsForLabelMatchers returns all the metrics which match a set of matchers.
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
result := &client.MetricsForLabelMatchersResponse{}
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (result *client.MetricsForLabelMatchersResponse, err error) {
defer recoverIngester(i.logger, &err)
result = &client.MetricsForLabelMatchersResponse{}
cleanup, err := i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error {
result.Metric = append(result.Metric, &cortexpb.Metric{
Labels: cortexpb.FromLabelsToLabelAdapters(l),
Expand All @@ -1903,7 +1910,8 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr
return result, err
}

func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatchersRequest, stream client.Ingester_MetricsForLabelMatchersStreamServer) error {
func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatchersRequest, stream client.Ingester_MetricsForLabelMatchersStreamServer) (err error) {
defer recoverIngester(i.logger, &err)
result := &client.MetricsForLabelMatchersStreamResponse{}

cleanup, err := i.metricsForLabelMatchersCommon(stream.Context(), req, func(l labels.Labels) error {
Expand All @@ -1927,7 +1935,7 @@ func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatc

// Send last batch
if len(result.Metric) > 0 {
err := client.SendMetricsForLabelMatchersStream(stream, result)
err = client.SendMetricsForLabelMatchersStream(stream, result)
if err != nil {
return err
}
Expand Down Expand Up @@ -2160,8 +2168,10 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024

// QueryStream implements service.IngesterServer
// Streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
if err := i.checkRunning(); err != nil {
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) (err error) {
defer recoverIngester(i.logger, &err)

if err = i.checkRunning(); err != nil {
return err
}

Expand Down Expand Up @@ -3443,3 +3453,20 @@ func (c *labelSetReasonCounters) increment(matchedLabelSetLimits []validation.Li
}
}
}

func recoverIngester(logger log.Logger, errp *error) {
e := recover()
if e == nil {
return
}

switch err := e.(type) {
case runtime.Error:
// Print the stack trace but do not inhibit the running application.
buf := make([]byte, 64<<10)
buf = buf[:runtime.Stack(buf, false)]

level.Error(logger).Log("msg", "runtime panic in ingester", "err", err, "stacktrace", string(buf))
*errp = errors.Wrap(err, "unexpected error")
}
}
86 changes: 84 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math"
Expand All @@ -13,6 +14,7 @@ import (
"net/url"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
Expand All @@ -22,7 +24,6 @@ import (

"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -3814,12 +3816,16 @@ func (w *wrappedExpandedPostingsCache) PurgeExpiredItems() {

type mockQueryStreamServer struct {
grpc.ServerStream
ctx context.Context
ctx context.Context
shouldPanic bool

series []client.TimeSeriesChunk
}

func (m *mockQueryStreamServer) Send(response *client.QueryStreamResponse) error {
if m.shouldPanic {
panic("runtime error")
}
m.series = append(m.series, response.Chunkseries...)
return nil
}
Expand Down Expand Up @@ -6937,6 +6943,74 @@ func TestIngester_UpdateLabelSetMetrics(t *testing.T) {
`), "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total"))
}

func TestIngesterPanicHandling(t *testing.T) {
ctx := context.Background()
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.CloseIdleTSDBTimeout = 0 // Will not run the loop, but will allow us to close any TSDB fast.
cfg.BlocksStorageConfig.TSDB.KeepUserTSDBOpenOnShutdown = true

// Create ingester
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
require.NoError(t, err)

// Induce panic in matchers cache calls
i.matchersCache = &panickingMatchersCache{}

require.NoError(t, services.StartAndAwaitRunning(ctx, i))
defer services.StopAndAwaitTerminated(ctx, i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

pushSingleSampleAtTime(t, i, 1*time.Minute.Milliseconds())

ctx = user.InjectOrgID(context.Background(), userID)
checkRuntimeError := func(err error) {
var re runtime.Error
ok := errors.As(err, &re)
require.True(t, ok, "expected runtime.Error")
}

err = i.QueryStream(&client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: math.MaxInt64,
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}},
}, &mockQueryStreamServer{ctx: ctx})
require.Error(t, err)
checkRuntimeError(err)

_, err = i.LabelNames(ctx, &client.LabelNamesRequest{
Matchers: &client.LabelMatchers{
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}},
},
Limit: int64(1),
})
require.Error(t, err)
checkRuntimeError(err)

_, err = i.LabelValues(ctx, &client.LabelValuesRequest{
Matchers: &client.LabelMatchers{
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}},
},
Limit: int64(1),
})
require.Error(t, err)
checkRuntimeError(err)

_, err = i.MetricsForLabelMatchers(ctx, &client.MetricsForLabelMatchersRequest{
MatchersSet: []*client.LabelMatchers{{
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "unknown"},
},
}},
Limit: int64(1),
})
require.Error(t, err)
checkRuntimeError(err)
}

// mockTenantLimits exposes per-tenant limits based on a provided map
type mockTenantLimits struct {
limits map[string]*validation.Limits
Expand Down Expand Up @@ -7004,3 +7078,11 @@ func CreateBlock(t *testing.T, ctx context.Context, dir string, mint, maxt int64

return block
}

type panickingMatchersCache struct{}

func (_ *panickingMatchersCache) GetOrSet(_ storecache.ConversionLabelMatcher, _ storecache.NewItemFunc) (*labels.Matcher, error) {
var a []int
a[1] = 2 // index out of range
return nil, nil
}
Loading