From f46981be08927eff06b8f490ed6f51360ad5df49 Mon Sep 17 00:00:00 2001 From: Wing924 Date: Tue, 14 Apr 2020 18:43:02 +0900 Subject: [PATCH 01/10] [BUGFIX] Distributor: `/all_user_stats` now show API and Rule Ingest Rate correctly. Signed-off-by: Wing924 --- CHANGELOG.md | 1 + pkg/distributor/distributor.go | 6 ++++-- pkg/ruler/compat.go | 5 ++++- pkg/util/push/push.go | 2 +- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98ad35808e9..d3fbd645a8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ * [BUGFIX] Cassandra Storage: Fix endpoint TLS host verification. #2109 * [BUGFIX] Experimental TSDB: fixed response status code from `422` to `500` when an error occurs while iterating chunks with the experimental blocks storage. #2402 * [BUGFIX] Ring: Fixed a situation where upgrading from pre-1.0 cortex with a rolling strategy caused new 1.0 ingesters to lose their zone value in the ring until manually forced to re-register. #2404 +* [BUGFIX] Distributor: `/all_user_stats` now show API and Rule Ingest Rate correctly. ## 1.0.0 / 2020-04-02 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 1ceb07e7c25..840f745d588 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -360,6 +360,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie return nil, err } + source := req.Source var firstPartialErr error removeReplica := false @@ -536,7 +537,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie if sp := opentracing.SpanFromContext(ctx); sp != nil { localCtx = opentracing.ContextWithSpan(localCtx, sp) } - return d.send(localCtx, ingester, timeseries, metadata) + return d.send(localCtx, ingester, timeseries, metadata, source) }, func() { client.ReuseSlice(req.Timeseries) }) if err != nil { return nil, err @@ -566,7 +567,7 @@ func sortLabelsIfNeeded(labels []client.LabelAdapter) { }) } -func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries, metadata []*client.MetricMetadata) error { +func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries, metadata []*client.MetricMetadata, source client.WriteRequest_SourceEnum) error { h, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { return err @@ -576,6 +577,7 @@ func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, time req := client.WriteRequest{ Timeseries: timeseries, Metadata: metadata, + Source: source, } _, err = c.Push(ctx, &req) diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index e62107218f3..aae5db69920 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -39,9 +39,12 @@ func (a *appender) AddFast(l labels.Labels, ref uint64, t int64, v float64) erro } func (a *appender) Commit() error { - _, err := a.pusher.Push(user.InjectOrgID(context.Background(), a.userID), client.ToWriteRequest(a.labels, a.samples, nil, client.RULE)) + wq := client.ToWriteRequest(a.labels, a.samples, nil, client.RULE) + _, err := a.pusher.Push(user.InjectOrgID(context.Background(), a.userID), wq) + client.ReuseSlice(wq.Timeseries) a.labels = nil a.samples = nil + return err } diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index 3af2448e01a..1d1addb56c6 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -17,7 +17,6 @@ func Handler(cfg distributor.Config, push func(context.Context, *client.WriteReq return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Write-Version")) var req client.PreallocWriteRequest - req.Source = client.API _, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType) logger := util.WithContext(r.Context(), util.Logger) if err != nil { @@ -25,6 +24,7 @@ func Handler(cfg distributor.Config, push func(context.Context, *client.WriteReq http.Error(w, err.Error(), http.StatusBadRequest) return } + req.Source = client.API if _, err := push(r.Context(), &req.WriteRequest); err != nil { resp, ok := httpgrpc.HTTPResponseFromError(err) From be5079c9dec5b7e61339cfc6f8aeb9554eab8c0c Mon Sep 17 00:00:00 2001 From: Wing924 Date: Tue, 14 Apr 2020 18:44:22 +0900 Subject: [PATCH 02/10] fix Signed-off-by: Wing924 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d3fbd645a8e..262a4c49d00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,7 +26,7 @@ * [BUGFIX] Cassandra Storage: Fix endpoint TLS host verification. #2109 * [BUGFIX] Experimental TSDB: fixed response status code from `422` to `500` when an error occurs while iterating chunks with the experimental blocks storage. #2402 * [BUGFIX] Ring: Fixed a situation where upgrading from pre-1.0 cortex with a rolling strategy caused new 1.0 ingesters to lose their zone value in the ring until manually forced to re-register. #2404 -* [BUGFIX] Distributor: `/all_user_stats` now show API and Rule Ingest Rate correctly. +* [BUGFIX] Distributor: `/all_user_stats` now show API and Rule Ingest Rate correctly. #2457 ## 1.0.0 / 2020-04-02 From cd509372cb8803333bbc6c833e12e8ee796483f6 Mon Sep 17 00:00:00 2001 From: Wing924 Date: Tue, 14 Apr 2020 18:50:01 +0900 Subject: [PATCH 03/10] fix Signed-off-by: Wing924 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 262a4c49d00..14b3f055c86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ * [BUGFIX] Experimental TSDB: fixed response status code from `422` to `500` when an error occurs while iterating chunks with the experimental blocks storage. #2402 * [BUGFIX] Ring: Fixed a situation where upgrading from pre-1.0 cortex with a rolling strategy caused new 1.0 ingesters to lose their zone value in the ring until manually forced to re-register. #2404 * [BUGFIX] Distributor: `/all_user_stats` now show API and Rule Ingest Rate correctly. #2457 +* [BUGFIX] Ruler: reuse `[]Timeseries` to reduce memory allocation. #2457 ## 1.0.0 / 2020-04-02 From ae19eb6285ba2b5554738c71085cd33f72c37aba Mon Sep 17 00:00:00 2001 From: Wing924 Date: Wed, 15 Apr 2020 08:17:25 +0900 Subject: [PATCH 04/10] fix Signed-off-by: Wing924 --- CHANGELOG.md | 1 - pkg/distributor/distributor.go | 3 +-- pkg/ruler/compat.go | 6 +++--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 14b3f055c86..262a4c49d00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,6 @@ * [BUGFIX] Experimental TSDB: fixed response status code from `422` to `500` when an error occurs while iterating chunks with the experimental blocks storage. #2402 * [BUGFIX] Ring: Fixed a situation where upgrading from pre-1.0 cortex with a rolling strategy caused new 1.0 ingesters to lose their zone value in the ring until manually forced to re-register. #2404 * [BUGFIX] Distributor: `/all_user_stats` now show API and Rule Ingest Rate correctly. #2457 -* [BUGFIX] Ruler: reuse `[]Timeseries` to reduce memory allocation. #2457 ## 1.0.0 / 2020-04-02 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 840f745d588..546dfa15784 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -360,7 +360,6 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie return nil, err } - source := req.Source var firstPartialErr error removeReplica := false @@ -537,7 +536,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie if sp := opentracing.SpanFromContext(ctx); sp != nil { localCtx = opentracing.ContextWithSpan(localCtx, sp) } - return d.send(localCtx, ingester, timeseries, metadata, source) + return d.send(localCtx, ingester, timeseries, metadata, req.Source) }, func() { client.ReuseSlice(req.Timeseries) }) if err != nil { return nil, err diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index aae5db69920..3267762f5e7 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -39,9 +39,9 @@ func (a *appender) AddFast(l labels.Labels, ref uint64, t int64, v float64) erro } func (a *appender) Commit() error { - wq := client.ToWriteRequest(a.labels, a.samples, nil, client.RULE) - _, err := a.pusher.Push(user.InjectOrgID(context.Background(), a.userID), wq) - client.ReuseSlice(wq.Timeseries) + // Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push. + // We shouldn't call client.ReuseSlice here. + _, err := a.pusher.Push(user.InjectOrgID(context.Background(), a.userID), client.ToWriteRequest(a.labels, a.samples, nil, client.RULE)) a.labels = nil a.samples = nil From c3ba3c2fa0f64dc501d7c74d839659439fe292df Mon Sep 17 00:00:00 2001 From: Wing924 Date: Wed, 15 Apr 2020 08:22:57 +0900 Subject: [PATCH 05/10] fix Signed-off-by: Wing924 --- pkg/ruler/compat.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 3267762f5e7..70494c4e675 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -44,7 +44,6 @@ func (a *appender) Commit() error { _, err := a.pusher.Push(user.InjectOrgID(context.Background(), a.userID), client.ToWriteRequest(a.labels, a.samples, nil, client.RULE)) a.labels = nil a.samples = nil - return err } From 0c7a3647790d400136c794b07bece0bc209a9455 Mon Sep 17 00:00:00 2001 From: Wing924 Date: Wed, 15 Apr 2020 18:46:50 +0900 Subject: [PATCH 06/10] fix Signed-off-by: Wing924 --- pkg/util/push/push.go | 4 +- pkg/util/push/push_test.go | 95 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 pkg/util/push/push_test.go diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index 1d1addb56c6..f8eeec5d516 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -24,7 +24,9 @@ func Handler(cfg distributor.Config, push func(context.Context, *client.WriteReq http.Error(w, err.Error(), http.StatusBadRequest) return } - req.Source = client.API + if req.Source == 0 { + req.Source = client.API + } if _, err := push(r.Context(), &req.WriteRequest); err != nil { resp, ok := httpgrpc.HTTPResponseFromError(err) diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go new file mode 100644 index 00000000000..91dfa562fb2 --- /dev/null +++ b/pkg/util/push/push_test.go @@ -0,0 +1,95 @@ +package push + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/ingester/client" +) + +func TestHandler_remoteWrite(t *testing.T) { + req := createRemoteWriteRequest(t) + resp := httptest.NewRecorder() + + handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, verifyWriteRequestHandler(t, client.API)) + handler.ServeHTTP(resp, req) + assert.Equal(t, 200, resp.Code) +} + +func TestHandler_cortexWriteRequest(t *testing.T) { + req := createCortexRemoteWriteRequest(t) + resp := httptest.NewRecorder() + + handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, verifyWriteRequestHandler(t, client.RULE)) + handler.ServeHTTP(resp, req) + assert.Equal(t, 200, resp.Code) +} + +func verifyWriteRequestHandler(t *testing.T, expectSource client.WriteRequest_SourceEnum) func(ctx context.Context, request *client.WriteRequest) (response *client.WriteResponse, err error) { + t.Helper() + return func(ctx context.Context, request *client.WriteRequest) (response *client.WriteResponse, err error) { + assert.Equal(t, expectSource, request.Source) + return &client.WriteResponse{}, nil + } +} + +func createRemoteWriteRequest(t *testing.T) *http.Request { + t.Helper() + input := prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "foo"}, + }, + Samples: []prompb.Sample{ + {Value: 1, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()}, + }, + }, + }, + } + inoutBytes, err := input.Marshal() + require.NoError(t, err) + inoutBytes = snappy.Encode(nil, inoutBytes) + req, err := http.NewRequest("POST", "http://localhost/", bytes.NewReader(inoutBytes)) + require.NoError(t, err) + req.Header.Add("Content-Encoding", "snappy") + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + return req +} + +func createCortexRemoteWriteRequest(t *testing.T) *http.Request { + t.Helper() + input := client.WriteRequest{ + Timeseries: []client.PreallocTimeseries{ + {&client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "foo"}, + }, + Samples: []client.Sample{ + {Value: 1, TimestampMs: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()}, + }, + }}, + }, + Source: client.RULE, + } + inoutBytes, err := input.Marshal() + require.NoError(t, err) + inoutBytes = snappy.Encode(nil, inoutBytes) + req, err := http.NewRequest("POST", "http://localhost/", bytes.NewReader(inoutBytes)) + require.NoError(t, err) + req.Header.Add("Content-Encoding", "snappy") + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + return req +} From 12612d9e3c93e94a7ce2e5469810df9f3cc47977 Mon Sep 17 00:00:00 2001 From: Wing924 Date: Wed, 15 Apr 2020 18:55:51 +0900 Subject: [PATCH 07/10] fix Signed-off-by: Wing924 --- pkg/util/push/push_test.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index 91dfa562fb2..445ef09a677 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -70,16 +70,17 @@ func createRemoteWriteRequest(t *testing.T) *http.Request { func createCortexRemoteWriteRequest(t *testing.T) *http.Request { t.Helper() + ts := &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "foo"}, + }, + Samples: []client.Sample{ + {Value: 1, TimestampMs: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()}, + }, + } input := client.WriteRequest{ Timeseries: []client.PreallocTimeseries{ - {&client.TimeSeries{ - Labels: []client.LabelAdapter{ - {Name: "__name__", Value: "foo"}, - }, - Samples: []client.Sample{ - {Value: 1, TimestampMs: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()}, - }, - }}, + {ts}, }, Source: client.RULE, } From 03c01b18288fa6fe402a65c3044b5c2b10ee9234 Mon Sep 17 00:00:00 2001 From: Wing924 Date: Wed, 15 Apr 2020 18:58:25 +0900 Subject: [PATCH 08/10] fix Signed-off-by: Wing924 --- pkg/util/push/push_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index 445ef09a677..be827e9f22e 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -80,7 +80,7 @@ func createCortexRemoteWriteRequest(t *testing.T) *http.Request { } input := client.WriteRequest{ Timeseries: []client.PreallocTimeseries{ - {ts}, + client.PreallocTimeseries{ts}, }, Source: client.RULE, } From 985c089947c5bbe288dde789c4389c56d796cfe6 Mon Sep 17 00:00:00 2001 From: Wing924 Date: Wed, 15 Apr 2020 19:10:26 +0900 Subject: [PATCH 09/10] fix Signed-off-by: Wing924 --- pkg/util/push/push_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index be827e9f22e..8bbaacab285 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -70,19 +70,19 @@ func createRemoteWriteRequest(t *testing.T) *http.Request { func createCortexRemoteWriteRequest(t *testing.T) *http.Request { t.Helper() - ts := &client.TimeSeries{ - Labels: []client.LabelAdapter{ - {Name: "__name__", Value: "foo"}, - }, - Samples: []client.Sample{ - {Value: 1, TimestampMs: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()}, + ts := client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "foo"}, + }, + Samples: []client.Sample{ + {Value: 1, TimestampMs: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()}, + }, }, } input := client.WriteRequest{ - Timeseries: []client.PreallocTimeseries{ - client.PreallocTimeseries{ts}, - }, - Source: client.RULE, + Timeseries: []client.PreallocTimeseries{ts}, + Source: client.RULE, } inoutBytes, err := input.Marshal() require.NoError(t, err) From c49d6a15ea2aafe884aa7a63368ba6a0b237c6b1 Mon Sep 17 00:00:00 2001 From: Wing924 Date: Wed, 15 Apr 2020 19:20:11 +0900 Subject: [PATCH 10/10] fix Signed-off-by: Wing924 --- pkg/util/push/push_test.go | 41 +++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index 8bbaacab285..ab87d4f5dc3 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -18,18 +18,16 @@ import ( ) func TestHandler_remoteWrite(t *testing.T) { - req := createRemoteWriteRequest(t) + req := createRequest(t, createPrometheusRemoteWriteProtobuf(t)) resp := httptest.NewRecorder() - handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, verifyWriteRequestHandler(t, client.API)) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } func TestHandler_cortexWriteRequest(t *testing.T) { - req := createCortexRemoteWriteRequest(t) + req := createRequest(t, createCortexWriteRequestProtobuf(t)) resp := httptest.NewRecorder() - handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, verifyWriteRequestHandler(t, client.RULE)) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) @@ -38,12 +36,26 @@ func TestHandler_cortexWriteRequest(t *testing.T) { func verifyWriteRequestHandler(t *testing.T, expectSource client.WriteRequest_SourceEnum) func(ctx context.Context, request *client.WriteRequest) (response *client.WriteResponse, err error) { t.Helper() return func(ctx context.Context, request *client.WriteRequest) (response *client.WriteResponse, err error) { + assert.Len(t, request.Timeseries, 1) + assert.Equal(t, "__name__", request.Timeseries[0].Labels[0].Name) + assert.Equal(t, "foo", request.Timeseries[0].Labels[0].Value) assert.Equal(t, expectSource, request.Source) return &client.WriteResponse{}, nil } } -func createRemoteWriteRequest(t *testing.T) *http.Request { +func createRequest(t *testing.T, protobuf []byte) *http.Request { + t.Helper() + inoutBytes := snappy.Encode(nil, protobuf) + req, err := http.NewRequest("POST", "http://localhost/", bytes.NewReader(inoutBytes)) + require.NoError(t, err) + req.Header.Add("Content-Encoding", "snappy") + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + return req +} + +func createPrometheusRemoteWriteProtobuf(t *testing.T) []byte { t.Helper() input := prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ @@ -59,16 +71,9 @@ func createRemoteWriteRequest(t *testing.T) *http.Request { } inoutBytes, err := input.Marshal() require.NoError(t, err) - inoutBytes = snappy.Encode(nil, inoutBytes) - req, err := http.NewRequest("POST", "http://localhost/", bytes.NewReader(inoutBytes)) - require.NoError(t, err) - req.Header.Add("Content-Encoding", "snappy") - req.Header.Set("Content-Type", "application/x-protobuf") - req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - return req + return inoutBytes } - -func createCortexRemoteWriteRequest(t *testing.T) *http.Request { +func createCortexWriteRequestProtobuf(t *testing.T) []byte { t.Helper() ts := client.PreallocTimeseries{ TimeSeries: &client.TimeSeries{ @@ -86,11 +91,5 @@ func createCortexRemoteWriteRequest(t *testing.T) *http.Request { } inoutBytes, err := input.Marshal() require.NoError(t, err) - inoutBytes = snappy.Encode(nil, inoutBytes) - req, err := http.NewRequest("POST", "http://localhost/", bytes.NewReader(inoutBytes)) - require.NoError(t, err) - req.Header.Add("Content-Encoding", "snappy") - req.Header.Set("Content-Type", "application/x-protobuf") - req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - return req + return inoutBytes }