Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [BUGFIX] Cassandra Storage: Fix endpoint TLS host verification. #2109
* [BUGFIX] Experimental TSDB: fixed response status code from `422` to `500` when an error occurs while iterating chunks with the experimental blocks storage. #2402
* [BUGFIX] Ring: Fixed a situation where upgrading from pre-1.0 cortex with a rolling strategy caused new 1.0 ingesters to lose their zone value in the ring until manually forced to re-register. #2404
* [BUGFIX] Distributor: `/all_user_stats` now show API and Rule Ingest Rate correctly. #2457

## 1.0.0 / 2020-04-02

Expand Down
6 changes: 4 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
return nil, err
}

source := req.Source
var firstPartialErr error
removeReplica := false

Expand Down Expand Up @@ -536,7 +537,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
return d.send(localCtx, ingester, timeseries, metadata)
return d.send(localCtx, ingester, timeseries, metadata, source)
}, func() { client.ReuseSlice(req.Timeseries) })
if err != nil {
return nil, err
Expand Down Expand Up @@ -566,7 +567,7 @@ func sortLabelsIfNeeded(labels []client.LabelAdapter) {
})
}

func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries, metadata []*client.MetricMetadata) error {
func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries, metadata []*client.MetricMetadata, source client.WriteRequest_SourceEnum) error {
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return err
Expand All @@ -576,6 +577,7 @@ func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, time
req := client.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
Source: source,
}
_, err = c.Push(ctx, &req)

Expand Down
5 changes: 4 additions & 1 deletion pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ func (a *appender) AddFast(l labels.Labels, ref uint64, t int64, v float64) erro
}

func (a *appender) Commit() error {
_, err := a.pusher.Push(user.InjectOrgID(context.Background(), a.userID), client.ToWriteRequest(a.labels, a.samples, nil, client.RULE))
wq := client.ToWriteRequest(a.labels, a.samples, nil, client.RULE)
_, err := a.pusher.Push(user.InjectOrgID(context.Background(), a.userID), wq)
client.ReuseSlice(wq.Timeseries)
a.labels = nil
a.samples = nil

return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ func Handler(cfg distributor.Config, push func(context.Context, *client.WriteReq
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Write-Version"))
var req client.PreallocWriteRequest
req.Source = client.API
_, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType)
logger := util.WithContext(r.Context(), util.Logger)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
req.Source = client.API
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did this move?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType) overwrite that value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that was deliberate: if the caller supplies a value then we use that, otherwise we fall back to API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested here and I'm sure util.ParseProtoReader will overwrite req.Source to 0.
Because client.API equals to 0, you may not notice it.

reproduce

diff --git a/pkg/ingester/client/cortex.proto b/pkg/ingester/client/cortex.proto
index 6a341b59..a9dfcfee 100644
--- a/pkg/ingester/client/cortex.proto
+++ b/pkg/ingester/client/cortex.proto
@@ -30,7 +30,8 @@ service Ingester {
 message WriteRequest {
   repeated TimeSeries timeseries = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocTimeseries"];
   enum SourceEnum {
-    API = 0;
+    UNKNOWN_SOURCE = 0;
+    API = 2;
     RULE = 1;
   }
   SourceEnum Source = 2;

and print req.Source before and afterutil.ParseProtoReader.

result:

  • before is API
  • after is UNKNOWN_SOURCE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you test it with and without the caller supplying a value in the protobuf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just test it with real remote write from distributor.
I don't think remote write protobuf have Source field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on @Wing924 on this. The source is not part of the remote write protocol, so we need to inject it after the WriteRequest has been deseralized, otherwise the deserialization will overwrite it with the default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added test to cover both prometheus remote write and cortex writerequest.


if _, err := push(r.Context(), &req.WriteRequest); err != nil {
resp, ok := httpgrpc.HTTPResponseFromError(err)
Expand Down