Skip to content

Commit 20d725a

Browse files
authored
Refactor: Unify metadata and samples sends to ingesters (#2410)
* Refactor: Unify metadata and samples sends to ingesters Previously, we had samples and metadata using the ring on their own to send data to the ingesters. Since the `WriteRequest` supports metadata alongside samples there's no reason to split the calls so we unify them. There's an additional tiny refactor to inline the metadata validation as it is just a single function call. Signed-off-by: gotjosh <[email protected]> * Address review comments Signed-off-by: gotjosh <[email protected]>
1 parent 50f53db commit 20d725a

File tree

3 files changed

+103
-93
lines changed

3 files changed

+103
-93
lines changed

pkg/distributor/distributor.go

Lines changed: 33 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"strings"
99
"time"
1010

11-
"github.com/go-kit/kit/log/level"
1211
opentracing "github.com/opentracing/opentracing-go"
1312
"github.com/pkg/errors"
1413
"github.com/prometheus/client_golang/prometheus"
@@ -354,10 +353,6 @@ func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, user
354353
nil
355354
}
356355

357-
func (d *Distributor) validateMetadata(m *ingester_client.MetricMetadata, userID string) error {
358-
return validation.ValidateMetadata(d.limits, userID, m)
359-
}
360-
361356
// Push implements client.IngesterServer
362357
func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
363358
userID, err := user.ExtractOrgID(ctx)
@@ -470,7 +465,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
470465
}
471466

472467
for _, m := range req.Metadata {
473-
err := d.validateMetadata(m, userID)
468+
err := validation.ValidateMetadata(d.limits, userID, m)
474469

475470
if err != nil {
476471
if firstPartialErr == nil {
@@ -519,49 +514,32 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
519514
}
520515
}
521516

522-
if len(metadataKeys) > 0 {
523-
err = ring.DoBatch(ctx, subRing, metadataKeys, func(ingester ring.IngesterDesc, indexes []int) error {
524-
metadata := make([]*client.MetricMetadata, 0, len(indexes))
525-
for _, i := range indexes {
526-
metadata = append(metadata, validatedMetadata[i])
527-
}
517+
keys := append(seriesKeys, metadataKeys...)
518+
initialMetadataIndex := len(seriesKeys)
528519

529-
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
530-
defer cancel()
531-
localCtx = user.InjectUserID(localCtx, userID)
532-
533-
if sp := opentracing.SpanFromContext(ctx); sp != nil {
534-
localCtx = opentracing.ContextWithSpan(localCtx, sp)
535-
}
536-
537-
return d.sendMetadata(localCtx, ingester, metadata)
538-
}, func() {})
539-
if err != nil {
540-
// Metadata is a best-effort approach so we consider failures non-fatal, log them, and move on.
541-
logger := util.WithContext(ctx, util.Logger)
542-
level.Error(logger).Log("msg", "Failed to send metadata to ingesters", "err", err)
543-
}
544-
}
520+
err = ring.DoBatch(ctx, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {
521+
timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
522+
var metadata []*client.MetricMetadata
545523

546-
if len(seriesKeys) > 0 {
547-
err = ring.DoBatch(ctx, subRing, seriesKeys, func(ingester ring.IngesterDesc, indexes []int) error {
548-
timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
549-
for _, i := range indexes {
524+
for _, i := range indexes {
525+
if i >= initialMetadataIndex {
526+
metadata = append(metadata, validatedMetadata[i-initialMetadataIndex])
527+
} else {
550528
timeseries = append(timeseries, validatedTimeseries[i])
551529
}
530+
}
552531

553-
// Use a background context to make sure all ingesters get samples even if we return early
554-
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
555-
defer cancel()
556-
localCtx = user.InjectOrgID(localCtx, userID)
557-
if sp := opentracing.SpanFromContext(ctx); sp != nil {
558-
localCtx = opentracing.ContextWithSpan(localCtx, sp)
559-
}
560-
return d.sendSamples(localCtx, ingester, timeseries)
561-
}, func() { client.ReuseSlice(req.Timeseries) })
562-
if err != nil {
563-
return nil, err
532+
// Use a background context to make sure all ingesters get samples even if we return early
533+
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
534+
defer cancel()
535+
localCtx = user.InjectOrgID(localCtx, userID)
536+
if sp := opentracing.SpanFromContext(ctx); sp != nil {
537+
localCtx = opentracing.ContextWithSpan(localCtx, sp)
564538
}
539+
return d.send(localCtx, ingester, timeseries, metadata)
540+
}, func() { client.ReuseSlice(req.Timeseries) })
541+
if err != nil {
542+
return nil, err
565543
}
566544
return &client.WriteResponse{}, firstPartialErr
567545
}
@@ -588,7 +566,7 @@ func sortLabelsIfNeeded(labels []client.LabelAdapter) {
588566
})
589567
}
590568

591-
func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries) error {
569+
func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries, metadata []*client.MetricMetadata) error {
592570
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
593571
if err != nil {
594572
return err
@@ -597,32 +575,23 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDes
597575

598576
req := client.WriteRequest{
599577
Timeseries: timeseries,
578+
Metadata: metadata,
600579
}
601580
_, err = c.Push(ctx, &req)
602581

603-
ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc()
604-
if err != nil {
605-
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples).Inc()
606-
}
607-
return err
608-
}
609-
610-
func (d *Distributor) sendMetadata(ctx context.Context, ingester ring.IngesterDesc, metadata []*client.MetricMetadata) error {
611-
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
612-
if err != nil {
613-
return err
582+
if len(metadata) > 0 {
583+
ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
584+
if err != nil {
585+
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata).Inc()
586+
}
614587
}
615-
c := h.(ingester_client.IngesterClient)
616-
617-
req := client.WriteRequest{
618-
Metadata: metadata,
588+
if len(timeseries) > 0 {
589+
ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc()
590+
if err != nil {
591+
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples).Inc()
592+
}
619593
}
620-
_, err = c.Push(ctx, &req)
621594

622-
ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
623-
if err != nil {
624-
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata).Inc()
625-
}
626595
return err
627596
}
628597

pkg/distributor/distributor_test.go

Lines changed: 70 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,20 @@ var (
4848
)
4949

5050
func TestDistributor_Push(t *testing.T) {
51-
metricNames := []string{
52-
"cortex_distributor_latest_seen_sample_timestamp_seconds",
53-
}
51+
// Metrics to assert on.
52+
lastSeenTimestamp := "cortex_distributor_latest_seen_sample_timestamp_seconds"
53+
distributorAppend := "cortex_distributor_ingester_appends_total"
54+
distributorAppendFailure := "cortex_distributor_ingester_append_failures_total"
5455

56+
type samplesIn struct {
57+
num int
58+
startTimestampMs int64
59+
}
5560
for name, tc := range map[string]struct {
61+
metricNames []string
5662
numIngesters int
5763
happyIngesters int
58-
samples int
59-
startTimestampMs int64
64+
samples samplesIn
6065
metadata int
6166
expectedResponse *client.WriteResponse
6267
expectedError error
@@ -70,10 +75,10 @@ func TestDistributor_Push(t *testing.T) {
7075
"A push to 3 happy ingesters should succeed": {
7176
numIngesters: 3,
7277
happyIngesters: 3,
73-
samples: 5,
78+
samples: samplesIn{num: 5, startTimestampMs: 123456789000},
7479
metadata: 5,
7580
expectedResponse: success,
76-
startTimestampMs: 123456789000,
81+
metricNames: []string{lastSeenTimestamp},
7782
expectedMetrics: `
7883
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
7984
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
@@ -83,57 +88,95 @@ func TestDistributor_Push(t *testing.T) {
8388
"A push to 2 happy ingesters should succeed": {
8489
numIngesters: 3,
8590
happyIngesters: 2,
86-
samples: 5,
91+
samples: samplesIn{num: 5, startTimestampMs: 123456789000},
8792
metadata: 5,
8893
expectedResponse: success,
89-
startTimestampMs: 123456789000,
94+
metricNames: []string{lastSeenTimestamp},
9095
expectedMetrics: `
9196
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
9297
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
9398
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.004
9499
`,
95100
},
96101
"A push to 1 happy ingesters should fail": {
97-
numIngesters: 3,
98-
happyIngesters: 1,
99-
samples: 10,
100-
expectedError: errFail,
101-
startTimestampMs: 123456789000,
102+
numIngesters: 3,
103+
happyIngesters: 1,
104+
samples: samplesIn{num: 10, startTimestampMs: 123456789000},
105+
expectedError: errFail,
106+
metricNames: []string{lastSeenTimestamp},
102107
expectedMetrics: `
103108
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
104109
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
105110
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009
106111
`,
107112
},
108113
"A push to 0 happy ingesters should fail": {
109-
numIngesters: 3,
110-
happyIngesters: 0,
111-
samples: 10,
112-
expectedError: errFail,
113-
startTimestampMs: 123456789000,
114+
numIngesters: 3,
115+
happyIngesters: 0,
116+
samples: samplesIn{num: 10, startTimestampMs: 123456789000},
117+
expectedError: errFail,
118+
metricNames: []string{lastSeenTimestamp},
114119
expectedMetrics: `
115120
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
116121
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
117122
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009
118123
`,
119124
},
120125
"A push exceeding burst size should fail": {
121-
numIngesters: 3,
122-
happyIngesters: 3,
123-
samples: 25,
124-
metadata: 5,
125-
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 25 samples and 5 metadata"),
126-
startTimestampMs: 123456789000,
126+
numIngesters: 3,
127+
happyIngesters: 3,
128+
samples: samplesIn{num: 25, startTimestampMs: 123456789000},
129+
metadata: 5,
130+
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 25 samples and 5 metadata"),
131+
metricNames: []string{lastSeenTimestamp},
127132
expectedMetrics: `
128133
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
129134
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
130135
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.024
131136
`,
132137
},
138+
"A push to ingesters should report the correct metrics with no metadata": {
139+
numIngesters: 3,
140+
happyIngesters: 2,
141+
samples: samplesIn{num: 1, startTimestampMs: 123456789000},
142+
metadata: 0,
143+
metricNames: []string{distributorAppend, distributorAppendFailure},
144+
expectedResponse: success,
145+
expectedMetrics: `
146+
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
147+
# TYPE cortex_distributor_ingester_append_failures_total counter
148+
cortex_distributor_ingester_append_failures_total{ingester="2",type="samples"} 1
149+
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
150+
# TYPE cortex_distributor_ingester_appends_total counter
151+
cortex_distributor_ingester_appends_total{ingester="0",type="samples"} 1
152+
cortex_distributor_ingester_appends_total{ingester="1",type="samples"} 1
153+
cortex_distributor_ingester_appends_total{ingester="2",type="samples"} 1
154+
`,
155+
},
156+
"A push to ingesters should report the correct metrics with no samples": {
157+
numIngesters: 3,
158+
happyIngesters: 2,
159+
samples: samplesIn{num: 0, startTimestampMs: 123456789000},
160+
metadata: 1,
161+
metricNames: []string{distributorAppend, distributorAppendFailure},
162+
expectedResponse: success,
163+
expectedMetrics: `
164+
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
165+
# TYPE cortex_distributor_ingester_append_failures_total counter
166+
cortex_distributor_ingester_append_failures_total{ingester="2",type="metadata"} 1
167+
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
168+
# TYPE cortex_distributor_ingester_appends_total counter
169+
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
170+
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
171+
cortex_distributor_ingester_appends_total{ingester="2",type="metadata"} 1
172+
`,
173+
},
133174
} {
134175
for _, shardByAllLabels := range []bool{true, false} {
135176
t.Run(fmt.Sprintf("[%s](shardByAllLabels=%v)", name, shardByAllLabels), func(t *testing.T) {
136177
latestSeenSampleTimestampPerUser.Reset()
178+
ingesterAppends.Reset()
179+
ingesterAppendFailures.Reset()
137180

138181
limits := &validation.Limits{}
139182
flagext.DefaultValues(limits)
@@ -143,14 +186,14 @@ func TestDistributor_Push(t *testing.T) {
143186
d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels, limits, nil)
144187
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck
145188

146-
request := makeWriteRequest(tc.startTimestampMs, tc.samples, tc.metadata)
189+
request := makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata)
147190
response, err := d.Push(ctx, request)
148191
assert.Equal(t, tc.expectedResponse, response)
149192
assert.Equal(t, tc.expectedError, err)
150193

151194
// Check tracked Prometheus metrics.
152195
if tc.expectedMetrics != "" {
153-
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(tc.expectedMetrics), metricNames...)
196+
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(tc.expectedMetrics), tc.metricNames...)
154197
assert.NoError(t, err)
155198
}
156199
})

pkg/ingester/ingester.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,10 +375,8 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
375375
}
376376

377377
if len(req.Metadata) > 0 {
378-
// Given requests can only contain either metadata or samples, no-op if there is metadata for now.
379378
logger := util.WithContext(ctx, util.Logger)
380379
level.Debug(logger).Log("msg", "metadata received in the ingester", "count", len(req.Metadata))
381-
return &client.WriteResponse{}, nil
382380
}
383381

384382
for _, ts := range req.Timeseries {

0 commit comments

Comments
 (0)