Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Collect duplicate log line metrics #13084

Merged
merged 21 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
774f2bd
First pass at getting duplicate log line metrics to be outputted via …
paul1r May 15, 2024
1e63ac1
Merge branch 'main' into paul1r/add_metrics_for_duplicate_log_line_bytes
paul1r May 30, 2024
40063ab
WIP: PR comments: remove err, update metric name, put metric stuff in…
paul1r May 30, 2024
00b67f0
Refactor so we can get tenant information and such in the unordered h…
paul1r May 30, 2024
3b40971
make fmt
paul1r May 30, 2024
471059f
Remove commented out code from first pass
paul1r May 30, 2024
2070530
Pass metrics and such into MemChunk and UnorderedHeadBlock
paul1r May 31, 2024
faaa10a
Merge branch 'main' into paul1r/add_metrics_for_duplicate_log_line_bytes
paul1r Jun 6, 2024
074f1f9
Move back to a variant of version 1, using a bool instead of an err
paul1r Jun 7, 2024
5c5e631
Add a comment about what the new return val is
paul1r Jun 7, 2024
788b274
Merge branch 'main' into paul1r/add_metrics_for_duplicate_log_line_bytes
paul1r Jun 7, 2024
c80a90f
Fix removal of return val
paul1r Jun 7, 2024
6f321d4
Lint
paul1r Jun 7, 2024
3a43546
Merge branch 'main' into paul1r/add_metrics_for_duplicate_log_line_bytes
paul1r Jun 17, 2024
19cc158
Merge branch 'main' into paul1r/add_metrics_for_duplicate_log_line_bytes
paul1r Jun 20, 2024
ae10548
Rework writefailures logic to be easier to read
paul1r Jun 20, 2024
07a82f0
Break out logging of duplicate metrics/log into a function
paul1r Jun 20, 2024
6b6d3d6
Don't make a nested if, break out early if no tenant configsZ
paul1r Jun 20, 2024
1f33c60
Add size to duplicate log line info
paul1r Jun 20, 2024
982ebf6
PR review comments, add comments about what each implementation may r…
paul1r Jun 26, 2024
00f9b82
Merge branch 'main' into paul1r/add_metrics_for_duplicate_log_line_bytes
paul1r Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (
ErrInvalidSize = errors.New("invalid size")
ErrInvalidFlag = errors.New("invalid flag")
ErrInvalidChecksum = errors.New("invalid chunk checksum")
ErrDuplicateEntry = errors.New("duplicate entry")
)

type errTooFarBehind struct {
Expand Down Expand Up @@ -48,6 +49,31 @@ func IsOutOfOrderErr(err error) bool {
return err == ErrOutOfOrder || IsErrTooFarBehind(err)
}

type errDuplicateEntry struct {
// original timestamp of the entry itself.
entryTs time.Time

// string representation of the stream for the entry
stream string
}

func IsErrDuplicateEntry(err error) bool {
_, ok := err.(*errDuplicateEntry)
return ok
}

func ErrDuplicateLogEntry(entryTs time.Time, stream string) error {
return &errDuplicateEntry{entryTs: entryTs, stream: stream}
}

func (m *errDuplicateEntry) Error() string {
return fmt.Sprintf("entry is a duplicate, entry timestamp is: %s, stream information is: %s", m.entryTs.Format(time.RFC3339), m.stream)
}

func IsDuplicateEntryErr(err error) bool {
return err == ErrDuplicateEntry || IsErrDuplicateEntry(err)
}

// Encoding is the identifier for a chunk encoding.
type Encoding byte

Expand Down
10 changes: 10 additions & 0 deletions pkg/chunkenc/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,13 @@ func TestIsOutOfOrderErr(t *testing.T) {
require.Equal(t, true, IsOutOfOrderErr(err))
}
}

func TestIsDuplicateEntryErr(t *testing.T) {
if !IsDuplicateEntryErr(ErrDuplicateEntry) {
t.Errorf("IsDuplicateEntryErr() = false, want true for ErrDuplicateEntry")
}

if IsDuplicateEntryErr(ErrOutOfOrder) {
t.Errorf("IsDuplicateEntryErr() = true, want false for errors other than ErrDuplicateEntry")
}
}
2 changes: 1 addition & 1 deletion pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata l
for _, et := range displaced[0].(*nsEntries).entries {
if et.line == line {
e.entries = displaced[0].(*nsEntries).entries
return nil
return ErrDuplicateEntry
paul1r marked this conversation as resolved.
Show resolved Hide resolved
}
}
e.entries = append(displaced[0].(*nsEntries).entries, nsEntry{line, hb.symbolizer.Add(structuredMetadata)})
Expand Down
16 changes: 13 additions & 3 deletions pkg/chunkenc/unordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func Test_Unordered_InsertRetrieval(t *testing.T) {
desc string
input, exp []entry
dir logproto.Direction
hasDup bool
}{
{
desc: "simple forward",
Expand Down Expand Up @@ -152,7 +153,8 @@ func Test_Unordered_InsertRetrieval(t *testing.T) {
exp: []entry{
{0, "a", nil}, {0, "b", nil}, {1, "c", nil},
},
dir: logproto.FORWARD,
dir: logproto.FORWARD,
hasDup: true,
},
{
desc: "ts remove exact dupe backward",
Expand All @@ -162,7 +164,8 @@ func Test_Unordered_InsertRetrieval(t *testing.T) {
exp: []entry{
{1, "c", nil}, {0, "b", nil}, {0, "a", nil},
},
dir: logproto.BACKWARD,
dir: logproto.BACKWARD,
hasDup: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
Expand All @@ -173,7 +176,14 @@ func Test_Unordered_InsertRetrieval(t *testing.T) {
t.Run(format.String(), func(t *testing.T) {
hb := newUnorderedHeadBlock(format, newSymbolizer())
for _, e := range tc.input {
require.Nil(t, hb.Append(e.t, e.s, e.structuredMetadata))
err := hb.Append(e.t, e.s, e.structuredMetadata)
if tc.hasDup {
if err != nil && err != ErrDuplicateEntry {
require.Equal(t, err, ErrDuplicateEntry)
}
} else {
require.Nil(t, err)
}
}

itr := hb.Iterator(
Expand Down
3 changes: 2 additions & 1 deletion pkg/distributor/writefailures/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func (m *Manager) Log(tenantID string, err error) {
return
}

if !m.tenantCfgs.LimitedLogPushErrors(tenantID) {
if !m.tenantCfgs.LimitedLogPushErrors(tenantID) &&
!m.tenantCfgs.LogDuplicateStreamInfo(tenantID) {
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
return nil, fmt.Errorf("failed to create stream: %w", err)
}

s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures)
s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs)

// record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them).
if record != nil {
Expand Down Expand Up @@ -358,7 +358,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*st
return nil, fmt.Errorf("failed to create stream for fingerprint: %w", err)
}

s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures)
s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs)

i.streamsCreatedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Inc()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
require.NoError(t, err)
chunkfmt, headfmt, err := instance.chunkFormatAt(minTs(&testStream))
require.NoError(t, err)
chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil).NewChunk()
chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil, nil).NewChunk()
for _, entry := range testStream.Entries {
err = chunk.Append(&entry)
require.NoError(t, err)
Expand Down Expand Up @@ -567,7 +567,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {

b.Run("addTailersToNewStream", func(b *testing.B) {
for n := 0; n < b.N; n++ {
inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil))
inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil, nil))
}
})
}
Expand Down
40 changes: 34 additions & 6 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/grafana/loki/v3/pkg/runtime"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -78,6 +80,8 @@ type stream struct {

chunkFormat byte
chunkHeadBlockFormat chunkenc.HeadBlockFmt

configs *runtime.TenantConfigs
}

type chunkDesc struct {
Expand Down Expand Up @@ -107,6 +111,7 @@ func newStream(
streamRateCalculator *StreamRateCalculator,
metrics *ingesterMetrics,
writeFailures *writefailures.Manager,
configs *runtime.TenantConfigs,
) *stream {
hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName)
return &stream{
Expand All @@ -126,6 +131,8 @@ func newStream(
writeFailures: writeFailures,
chunkFormat: chunkFormat,
chunkHeadBlockFormat: headBlockFmt,

configs: configs,
}
}

Expand Down Expand Up @@ -334,13 +341,23 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa

chunk.lastUpdated = time.Now()
if err := chunk.chunk.Append(&entries[i]); err != nil {
invalid = append(invalid, entryWithError{&entries[i], err})
if chunkenc.IsOutOfOrderErr(err) {
s.writeFailures.Log(s.tenant, err)
outOfOrderSamples++
outOfOrderBytes += len(entries[i].Line)
if chunkenc.IsDuplicateEntryErr(err) {
if s.configs.LogDuplicateMetrics(s.tenant) {
s.reportDuplicateMetrics(len(entries[i].Line))
}
if s.configs.LogDuplicateStreamInfo(s.tenant) {
err = chunkenc.ErrDuplicateLogEntry(entries[i].Timestamp, s.labelsString)
s.writeFailures.Log(s.tenant, err)
}
} else {
invalid = append(invalid, entryWithError{&entries[i], err})
if chunkenc.IsOutOfOrderErr(err) {
s.writeFailures.Log(s.tenant, err)
outOfOrderSamples++
outOfOrderBytes += len(entries[i].Line)
}
continue
}
continue
}

s.entryCt++
Expand Down Expand Up @@ -380,6 +397,13 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry,
// NOTE: it's still possible for duplicates to be appended if a stream is
// deleted from inactivity.
if entries[i].Timestamp.Equal(lastLine.ts) && entries[i].Line == lastLine.content {
if s.configs.LogDuplicateMetrics(s.tenant) {
s.reportDuplicateMetrics(len(entries[i].Line))
}
if s.configs.LogDuplicateStreamInfo(s.tenant) {
err := chunkenc.ErrDuplicateLogEntry(entries[i].Timestamp, s.labelsString)
s.writeFailures.Log(s.tenant, err)
}
continue
}

Expand Down Expand Up @@ -456,6 +480,10 @@ func (s *stream) reportMetrics(ctx context.Context, outOfOrderSamples, outOfOrde
}
}

func (s *stream) reportDuplicateMetrics(duplicateLogLineBytes int) {
validation.DuplicateLogEntries.WithLabelValues(validation.DiscardedBytesTotal, s.tenant).Add(float64(duplicateLogLineBytes))
}

func (s *stream) cutChunk(ctx context.Context) *chunkDesc {
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV("event", "stream started to cut chunk")
Expand Down
Loading
Loading