diff --git a/pkg/ingester-rf1/ingester.go b/pkg/ingester-rf1/ingester.go index 546a42a56d93..8182449c6d59 100644 --- a/pkg/ingester-rf1/ingester.go +++ b/pkg/ingester-rf1/ingester.go @@ -697,11 +697,6 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro return nil, ErrReadOnly } - // Set profiling tags - defer pprof.SetGoroutineLabels(ctx) - ctx = pprof.WithLabels(ctx, pprof.Labels("path", "write", "tenant", instanceID)) - pprof.SetGoroutineLabels(ctx) - instance, err := i.GetOrCreateInstance(instanceID) if err != nil { return &logproto.PushResponse{}, err diff --git a/pkg/ingester-rf1/stream.go b/pkg/ingester-rf1/stream.go index b0b47a2c8ff9..32ccf454c41c 100644 --- a/pkg/ingester-rf1/stream.go +++ b/pkg/ingester-rf1/stream.go @@ -133,7 +133,7 @@ func (s *stream) Push( return 0, nil, errorForFailedEntries(s, invalid, len(entries)) } - bytesAdded, res, err := s.storeEntries(ctx, wal, toStore, usageTracker) + bytesAdded, res, err := s.storeEntries(ctx, wal, toStore) if err != nil { return 0, nil, err } @@ -189,13 +189,11 @@ func hasRateLimitErr(errs []entryWithError) bool { return ok } -func (s *stream) storeEntries(ctx context.Context, w *wal.Manager, entries []*logproto.Entry, usageTracker push.UsageTracker) (int, *wal.AppendResult, error) { - if sp := opentracing.SpanFromContext(ctx); sp != nil { - sp.LogKV("event", "stream started to store entries", "labels", s.labelsString) - defer sp.LogKV("event", "stream finished to store entries") - } +func (s *stream) storeEntries(ctx context.Context, w *wal.Manager, entries []*logproto.Entry) (int, *wal.AppendResult, error) { + sp, _ := opentracing.StartSpanFromContext(ctx, "storeEntries") + defer sp.Finish() - var bytesAdded, outOfOrderSamples, outOfOrderBytes int + var bytesAdded int for i := 0; i < len(entries); i++ { s.entryCt++ @@ -211,17 +209,18 @@ func (s *stream) storeEntries(ctx context.Context, w *wal.Manager, entries []*lo res, err := w.Append(wal.AppendRequest{ TenantID: s.tenant, Labels: s.labels, - LabelsStr: s.labels.String(), + LabelsStr: s.labelsString, Entries: entries, }) if err != nil { return 0, nil, err } - s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, 0, 0, usageTracker) return bytesAdded, res, nil } func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]*logproto.Entry, []entryWithError) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "validateEntries") + defer sp.Finish() var ( outOfOrderSamples, outOfOrderBytes int rateLimitedSamples, rateLimitedBytes int