Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 33 additions & 64 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"
"time"

"github.com/go-kit/kit/log/level"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -354,10 +353,6 @@ func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, user
nil
}

func (d *Distributor) validateMetadata(m *ingester_client.MetricMetadata, userID string) error {
return validation.ValidateMetadata(d.limits, userID, m)
}

// Push implements client.IngesterServer
func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
userID, err := user.ExtractOrgID(ctx)
Expand Down Expand Up @@ -470,7 +465,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
}

for _, m := range req.Metadata {
err := d.validateMetadata(m, userID)
err := validation.ValidateMetadata(d.limits, userID, m)

if err != nil {
if firstPartialErr == nil {
Expand Down Expand Up @@ -519,49 +514,32 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
}
}

if len(metadataKeys) > 0 {
err = ring.DoBatch(ctx, subRing, metadataKeys, func(ingester ring.IngesterDesc, indexes []int) error {
metadata := make([]*client.MetricMetadata, 0, len(indexes))
for _, i := range indexes {
metadata = append(metadata, validatedMetadata[i])
}
keys := append(seriesKeys, metadataKeys...)
initialMetadataIndex := len(seriesKeys)

localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectUserID(localCtx, userID)

if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}

return d.sendMetadata(localCtx, ingester, metadata)
}, func() {})
if err != nil {
// Metadata is a best-effort approach so we consider failures non-fatal, log them, and move on.
logger := util.WithContext(ctx, util.Logger)
level.Error(logger).Log("msg", "Failed to send metadata to ingesters", "err", err)
}
}
err = ring.DoBatch(ctx, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {
timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
metadata := make([]*client.MetricMetadata, 0, len(indexes))

if len(seriesKeys) > 0 {
err = ring.DoBatch(ctx, subRing, seriesKeys, func(ingester ring.IngesterDesc, indexes []int) error {
timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
for _, i := range indexes {
for _, i := range indexes {
if i >= initialMetadataIndex {
metadata = append(metadata, validatedMetadata[i-initialMetadataIndex])
} else {
timeseries = append(timeseries, validatedTimeseries[i])
}
}

// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
return d.sendSamples(localCtx, ingester, timeseries)
}, func() { client.ReuseSlice(req.Timeseries) })
if err != nil {
return nil, err
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
return d.send(localCtx, ingester, timeseries, metadata)
}, func() { client.ReuseSlice(req.Timeseries) })
if err != nil {
return nil, err
}
return &client.WriteResponse{}, firstPartialErr
}
Expand All @@ -588,7 +566,7 @@ func sortLabelsIfNeeded(labels []client.LabelAdapter) {
})
}

func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries) error {
func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries, metadata []*client.MetricMetadata) error {
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return err
Expand All @@ -597,32 +575,23 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDes

req := client.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
}
_, err = c.Push(ctx, &req)

ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc()
if err != nil {
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples).Inc()
}
return err
}

func (d *Distributor) sendMetadata(ctx context.Context, ingester ring.IngesterDesc, metadata []*client.MetricMetadata) error {
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return err
}
c := h.(ingester_client.IngesterClient)

req := client.WriteRequest{
Metadata: metadata,
if len(metadata) > 0 {
ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc()
if err != nil {
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples).Inc()
}
}
_, err = c.Push(ctx, &req)

ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
if err != nil {
if len(timeseries) > 0 {
if err != nil {
ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
}
ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata).Inc()
}

return err
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,8 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
}

if len(req.Metadata) > 0 {
// Given requests can only contain either metadata or samples, no-op if there is metadata for now.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, this is "no longer true" from an implementation perspective. The remote write protocol only sends metadata or samples (for now) but IMO that comment/responsibility belongs in the distributor. The ingester should operate under no assumption that you'd receive one or the other if we can (e.g. no increase of complexity).

Obviously, this is my own personal opinion with what I know about the system, but others feel free to suggest any alternatives.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to handle samples and metadata in the same push, then I would suggest that we return errors for them separately. As it is now, errors reported by ingesters will be counted for both samples and metadata (provided both were present in the request).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry Peter, I'm not following your comment. With this change, we're no longer making assumptions that a WriteRequest can come with one or the other.

The error handling in send is purely for metrics reporting. In the ingesters, we simply log when we receive metadata, so erroring out on that is not an option at the moment.

The comment I originally left in the distributors is an operational concern, but for the code itself, it wouldn't make a difference - that is indented.

logger := util.WithContext(ctx, util.Logger)
level.Debug(logger).Log("msg", "metadata received in the ingester", "count", len(req.Metadata))
return &client.WriteResponse{}, nil
}

for _, ts := range req.Timeseries {
Expand Down