Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [BUGFIX] Cassandra Storage: Fix endpoint TLS host verification. #2109
* [BUGFIX] Experimental TSDB: fixed response status code from `422` to `500` when an error occurs while iterating chunks with the experimental blocks storage. #2402
* [BUGFIX] Ring: Fixed a situation where upgrading from pre-1.0 cortex with a rolling strategy caused new 1.0 ingesters to lose their zone value in the ring until manually forced to re-register. #2404
* [BUGFIX] Distributor: `/all_user_stats` now show API and Rule Ingest Rate correctly. #2457

## 1.0.0 / 2020-04-02

Expand Down
5 changes: 3 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
return d.send(localCtx, ingester, timeseries, metadata)
return d.send(localCtx, ingester, timeseries, metadata, req.Source)
}, func() { client.ReuseSlice(req.Timeseries) })
if err != nil {
return nil, err
Expand Down Expand Up @@ -566,7 +566,7 @@ func sortLabelsIfNeeded(labels []client.LabelAdapter) {
})
}

func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries, metadata []*client.MetricMetadata) error {
func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries, metadata []*client.MetricMetadata, source client.WriteRequest_SourceEnum) error {
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return err
Expand All @@ -576,6 +576,7 @@ func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, time
req := client.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
Source: source,
}
_, err = c.Push(ctx, &req)

Expand Down
2 changes: 2 additions & 0 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func (a *appender) AddFast(l labels.Labels, ref uint64, t int64, v float64) erro
}

func (a *appender) Commit() error {
// Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push.
// We shouldn't call client.ReuseSlice here.
_, err := a.pusher.Push(user.InjectOrgID(context.Background(), a.userID), client.ToWriteRequest(a.labels, a.samples, nil, client.RULE))
a.labels = nil
a.samples = nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ func Handler(cfg distributor.Config, push func(context.Context, *client.WriteReq
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Write-Version"))
var req client.PreallocWriteRequest
req.Source = client.API
_, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType)
logger := util.WithContext(r.Context(), util.Logger)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if req.Source == 0 {
req.Source = client.API
}
Comment on lines +27 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per our instructions, every commit must explain the changes made. This is an extraordinary change, so needs a very good explanation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll send another PR to add comment here.

Comment on lines +27 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting back to this change, @bboreham as right and I was wrong. The req.Source is not overridden by the unmarshalling if not included in the request, so we should have fixed the issue to the root: ensuring Source is always set within Cortex.

https://github.com/cortexproject/cortex/blob/master/pkg/ingester/client/cortex.pb.go#L5502-L5520

@Wing924 May you re-iterate on this?

Copy link
Contributor Author

@Wing924 Wing924 Apr 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The req.Source is not overridden by the unmarshalling if not included in the request

It's not true.

Let's change the code like this:

+ req.Source = 2
		_, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType)
		logger := util.WithContext(r.Context(), util.Logger)
		if err != nil {
			level.Error(logger).Log("err", err.Error())
			http.Error(w, err.Error(), http.StatusBadRequest)
			return
		}
- if req.Source == 0 {
- 	req.Source = client.API
- }

if Source is not included in the request, we expect req.Source is 2, right?
But actually it's 0.
You can check my test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about you explain your change, where I have substituted one const value with its definition for clarity:

if req.Source == 0 {
	req.Source = 0
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know client.API is equals to 0 so that you think these code is useless.
This code is explicitly set default value if req.Source is not set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Physically, client.API equals to 0, but logically, they are different things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, the Reset() is indeed the important line, thanks.
So the original code didn't do anything (it set to 0 which was then overridden to 0) and the new code doesn't do anything.


if _, err := push(r.Context(), &req.WriteRequest); err != nil {
resp, ok := httpgrpc.HTTPResponseFromError(err)
Expand Down
95 changes: 95 additions & 0 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package push

import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/ingester/client"
)

func TestHandler_remoteWrite(t *testing.T) {
req := createRequest(t, createPrometheusRemoteWriteProtobuf(t))
resp := httptest.NewRecorder()
handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, verifyWriteRequestHandler(t, client.API))
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}

func TestHandler_cortexWriteRequest(t *testing.T) {
req := createRequest(t, createCortexWriteRequestProtobuf(t))
resp := httptest.NewRecorder()
handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, verifyWriteRequestHandler(t, client.RULE))
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}

func verifyWriteRequestHandler(t *testing.T, expectSource client.WriteRequest_SourceEnum) func(ctx context.Context, request *client.WriteRequest) (response *client.WriteResponse, err error) {
t.Helper()
return func(ctx context.Context, request *client.WriteRequest) (response *client.WriteResponse, err error) {
assert.Len(t, request.Timeseries, 1)
assert.Equal(t, "__name__", request.Timeseries[0].Labels[0].Name)
assert.Equal(t, "foo", request.Timeseries[0].Labels[0].Value)
assert.Equal(t, expectSource, request.Source)
return &client.WriteResponse{}, nil
}
}

func createRequest(t *testing.T, protobuf []byte) *http.Request {
t.Helper()
inoutBytes := snappy.Encode(nil, protobuf)
req, err := http.NewRequest("POST", "http://localhost/", bytes.NewReader(inoutBytes))
require.NoError(t, err)
req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
return req
}

func createPrometheusRemoteWriteProtobuf(t *testing.T) []byte {
t.Helper()
input := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "foo"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()},
},
},
},
}
inoutBytes, err := input.Marshal()
require.NoError(t, err)
return inoutBytes
}
func createCortexWriteRequestProtobuf(t *testing.T) []byte {
t.Helper()
ts := client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "foo"},
},
Samples: []client.Sample{
{Value: 1, TimestampMs: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()},
},
},
}
input := client.WriteRequest{
Timeseries: []client.PreallocTimeseries{ts},
Source: client.RULE,
}
inoutBytes, err := input.Marshal()
require.NoError(t, err)
return inoutBytes
}