diff --git a/docs/sources/reference/components/loki/loki.write.md b/docs/sources/reference/components/loki/loki.write.md index ba036cfcdf8..48355ddb8c6 100644 --- a/docs/sources/reference/components/loki/loki.write.md +++ b/docs/sources/reference/components/loki/loki.write.md @@ -203,6 +203,7 @@ The following fields are exported and can be referenced by other components: * `loki_write_sent_entries_total` (counter): Number of log entries sent to the ingester. * `loki_write_request_size_bytes` (histogram): Number of bytes for encoded requests. * `loki_write_request_duration_seconds` (histogram): Duration of sent requests. +* `loki_write_entry_propagation_latency_seconds` (histogram): Time in seconds from entry creation until it's either successfully sent or dropped. ## Examples diff --git a/internal/component/common/loki/client/batch.go b/internal/component/common/loki/client/batch.go index 4bcd63e677d..f6e7053853e 100644 --- a/internal/component/common/loki/client/batch.go +++ b/internal/component/common/loki/client/batch.go @@ -9,6 +9,7 @@ import ( "time" "github.com/grafana/loki/pkg/push" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/grafana/alloy/internal/component/common/loki" @@ -31,6 +32,8 @@ type SentDataMarkerHandler interface { // streams for each tenant are stored in a dedicated batch. type batch struct { streams map[string]*push.Stream + // created stores per-entry creation timestamps in unix micro seconds for latency observation. + created []int64 // createdAt is when the batch was created. createdAt time.Time // maxSize is the maximum batch size in bytes. At least one entry is always @@ -68,9 +71,10 @@ func (b *batch) add(entry loki.Entry, segmentNum int) error { return errBatchSizeReached } - stream.Entries = append(stream.Entries, entry.Entry) b.size += size b.countForSegment(segmentNum) + b.created = append(b.created, entry.Created()) + stream.Entries = append(stream.Entries, entry.Entry) return nil } @@ -87,12 +91,13 @@ func (b *batch) add(entry loki.Entry, segmentNum int) error { return errBatchSizeReached } + b.size += size + b.countForSegment(segmentNum) + b.created = append(b.created, entry.Created()) b.streams[labels] = &push.Stream{ Labels: labels, Entries: []push.Entry{entry.Entry}, } - b.size += size - b.countForSegment(segmentNum) return nil } @@ -127,12 +132,23 @@ func (b *batch) countForSegment(segmentNum int) { b.segmentCounter[segmentNum] = 1 } -// reportAsSentData will report for all segments whose data is part of this batch, the amount of that data as sent to -// the provided SentDataMarkerHandler -func (b *batch) reportAsSentData(h SentDataMarkerHandler) { +// reportAsSentData reports sent data counts per segment and observes per-entry propagation latency. +func (b *batch) reportAsSentData(h SentDataMarkerHandler, obs prometheus.Observer) { for seg, data := range b.segmentCounter { h.UpdateSentData(seg, data) } + + now := time.Now().UnixMicro() + for _, created := range b.created { + // NOTE: Some WAL entries may not have a created timestamp, so we ignore 0. + // We also only record entries where created <= now. Since created is stored as + // Unix microseconds, monotonic time is lost. If wall clock adjustments make + // created appear in the future, we skip that sample. + if created != 0 && created <= now { + // Track entry propagation latency in seconds. + obs.Observe(float64(now-created) / 1e6) + } + } } // labelsMapToString encodes an entry's label set as a string, ignoring internal labels diff --git a/internal/component/common/loki/client/consumer_wal.go b/internal/component/common/loki/client/consumer_wal.go index 438e11bf38a..2347d4befbd 100644 --- a/internal/component/common/loki/client/consumer_wal.go +++ b/internal/component/common/loki/client/consumer_wal.go @@ -205,8 +205,9 @@ func (c *walEndpointAdapter) AppendEntries(entries wal.RefEntries, segment int) ) if ok { - for _, e := range entries.Entries { - err := c.endpoint.enqueue(loki.Entry{Labels: l, Entry: e}, segment) + for i := range entries.Entries { + e := entries.EntryAt(l, i) + err := c.endpoint.enqueue(e, segment) // We can receive errQueueIsFull if we have configured endpoint with BlockOnOverflow. // Here we just skip the entry and try with the next one. if errors.Is(err, errQueueIsFull) { diff --git a/internal/component/common/loki/client/consumer_wal_test.go b/internal/component/common/loki/client/consumer_wal_test.go index 137875779d0..fa6a01cf41b 100644 --- a/internal/component/common/loki/client/consumer_wal_test.go +++ b/internal/component/common/loki/client/consumer_wal_test.go @@ -287,7 +287,8 @@ func TestWALEndpoint(t *testing.T) { }, 0) _ = adapter.AppendEntries(wal.RefEntries{ - Ref: chunks.HeadSeriesRef(mod), + Ref: chunks.HeadSeriesRef(mod), + Created: time.Now().UnixMicro(), Entries: []push.Entry{{ Timestamp: time.Now(), Line: l, @@ -433,7 +434,8 @@ func runWALEndpointBenchCase(b *testing.B, bc testCase, mhFactory func(t *testin }, 0) _ = adapter.AppendEntries(wal.RefEntries{ - Ref: chunks.HeadSeriesRef(seriesId), + Ref: chunks.HeadSeriesRef(seriesId), + Created: time.Now().UnixMicro(), Entries: []push.Entry{{ Timestamp: time.Now(), Line: l, diff --git a/internal/component/common/loki/client/metrics.go b/internal/component/common/loki/client/metrics.go index ea0696cd190..c859e7ecf3b 100644 --- a/internal/component/common/loki/client/metrics.go +++ b/internal/component/common/loki/client/metrics.go @@ -1,6 +1,8 @@ package client import ( + "time" + "github.com/grafana/alloy/internal/util" "github.com/prometheus/client_golang/prometheus" ) @@ -27,6 +29,7 @@ type metrics struct { requestSize *prometheus.HistogramVec requestDuration *prometheus.HistogramVec batchRetries *prometheus.CounterVec + entryLatency *prometheus.HistogramVec countersWithHostTenant []*prometheus.CounterVec countersWithHostTenantReason []*prometheus.CounterVec } @@ -56,6 +59,14 @@ func newMetrics(reg prometheus.Registerer) *metrics { MiB = 1024 * KiB ) + m.entryLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "loki_write_entry_propagation_latency_seconds", + Help: "Write latency for entries", + Buckets: []float64{0.1, 0.5, 1, 5, 10, 30, 60, 120, 300, 600}, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, + }, []string{labelHost, labelTenant}) m.requestSize = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "loki_write_request_size_bytes", Help: "Number of bytes for requests.", @@ -83,6 +94,7 @@ func newMetrics(reg prometheus.Registerer) *metrics { m.droppedBytes = util.MustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec) m.sentEntries = util.MustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec) m.droppedEntries = util.MustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec) + m.entryLatency = util.MustRegisterOrGet(reg, m.entryLatency).(*prometheus.HistogramVec) m.requestSize = util.MustRegisterOrGet(reg, m.requestSize).(*prometheus.HistogramVec) m.requestDuration = util.MustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec) m.batchRetries = util.MustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec) diff --git a/internal/component/common/loki/client/shards.go b/internal/component/common/loki/client/shards.go index b4ee5b6c93b..457c0b35356 100644 --- a/internal/component/common/loki/client/shards.go +++ b/internal/component/common/loki/client/shards.go @@ -401,7 +401,8 @@ func (s *shards) initBatchMetrics(tenantID string) { // sendBatch encodes a batch and sends it to Loki with retry logic. func (s *shards) sendBatch(tenantID string, batch *batch, protoBuf, snappyBuf *[]byte) { - defer batch.reportAsSentData(s.markerHandler) + obs := s.metrics.entryLatency.WithLabelValues(s.cfg.URL.Host, tenantID) + defer batch.reportAsSentData(s.markerHandler, obs) r, entriesCount := batch.request() diff --git a/internal/component/common/loki/entry.go b/internal/component/common/loki/entry.go index 9f996ee752d..3d22f704abc 100644 --- a/internal/component/common/loki/entry.go +++ b/internal/component/common/loki/entry.go @@ -1,21 +1,54 @@ package loki import ( + "time" + "github.com/grafana/loki/pkg/push" "github.com/prometheus/common/model" ) -// Entry is a log entry with labels. +func NewEntry(lset model.LabelSet, e push.Entry) Entry { + return Entry{ + Labels: lset, + Entry: e, + created: time.Now().UnixMicro(), + } +} + +func NewEntryWithCreated(lset model.LabelSet, created time.Time, e push.Entry) Entry { + return Entry{ + Labels: lset, + Entry: e, + created: created.UnixMicro(), + } +} + +func NewEntryWithCreatedUnixMicro(lset model.LabelSet, created int64, e push.Entry) Entry { + return Entry{ + Labels: lset, + Entry: e, + created: created, + } +} + +// Entry is a push.Entry with labels. +// It should be created using either NewEntry or NewEntryWithCreated. type Entry struct { Labels model.LabelSet push.Entry + + // Created is a unix timestamp in micro seconds. + // FIXME(kalleep): Currently we store created for each entry. + // When moving to batching we can store it per batch. + created int64 } // Clone returns a copy of the entry so that it can be safely fanned out. func (e *Entry) Clone() Entry { return Entry{ - Labels: e.Labels.Clone(), - Entry: e.Entry, + Labels: e.Labels.Clone(), + Entry: e.Entry, + created: e.created, } } @@ -30,3 +63,7 @@ func (e *Entry) Size() int { } return size } + +func (e *Entry) Created() int64 { + return e.created +} diff --git a/internal/component/common/loki/wal/encoding.go b/internal/component/common/loki/wal/encoding.go index 89af7d5176c..1ffb8951837 100644 --- a/internal/component/common/loki/wal/encoding.go +++ b/internal/component/common/loki/wal/encoding.go @@ -6,8 +6,11 @@ import ( "time" "github.com/grafana/loki/pkg/push" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" + + "github.com/grafana/alloy/internal/component/common/loki" ) // RecordType represents the type of the WAL/Checkpoint record. @@ -26,22 +29,37 @@ const ( WALRecordEntriesV2 // WALRecordEntriesV3 is the type for the WAL record for samples with structured metadata. WALRecordEntriesV3 + // WALRecordEntriesV4 are entries with included created time. + WALRecordEntriesV4 ) -// The current type of Entries that this distribution writes. -// Loki can read in a backwards compatible manner, but will write the newest variant. -const CurrentEntriesRec = WALRecordEntriesV3 +// The current type of Entries that WAL writes. +const CurrentEntriesRec = WALRecordEntriesV4 + +type RefEntries struct { + // Counter is unused. + Counter int64 + // Created is a Unix timestamp in microseconds that represents + // the time the entries were ingested. + Created int64 + // Ref identifies the series these entries belong to. + Ref chunks.HeadSeriesRef + // Entries are log entries belonging to the same series. + Entries []push.Entry +} + +// EntryAt returns the entry at i with the provided label set. +// i must be a valid index into Entries. +func (r RefEntries) EntryAt(lset model.LabelSet, i int) loki.Entry { + return loki.NewEntryWithCreatedUnixMicro(lset, r.Created, r.Entries[i]) +} // Record is a struct combining the series and samples record. type Record struct { - UserID string - Series []record.RefSeries - - // entryIndexMap coordinates the RefEntries index associated with a particular fingerprint. - // This is helpful for constant time lookups during ingestion and is ignored when restoring - // from the WAL. - entryIndexMap map[uint64]int - RefEntries []RefEntries + // UserID is unused. + UserID string + Series []record.RefSeries + RefEntries []RefEntries } func (r *Record) IsEmpty() bool { @@ -55,28 +73,6 @@ func (r *Record) Reset() { } r.RefEntries = r.RefEntries[:0] - r.entryIndexMap = make(map[uint64]int) -} - -func (r *Record) AddEntries(fp uint64, counter int64, entries ...push.Entry) { - if idx, ok := r.entryIndexMap[fp]; ok { - r.RefEntries[idx].Entries = append(r.RefEntries[idx].Entries, entries...) - r.RefEntries[idx].Counter = counter - return - } - - r.entryIndexMap[fp] = len(r.RefEntries) - r.RefEntries = append(r.RefEntries, RefEntries{ - Counter: counter, - Ref: chunks.HeadSeriesRef(fp), - Entries: entries, - }) -} - -type RefEntries struct { - Counter int64 - Ref chunks.HeadSeriesRef - Entries []push.Entry } func (r *Record) EncodeSeries(b []byte) []byte { @@ -117,13 +113,22 @@ outer: if len(ref.Entries) < 1 { continue } - buf.PutBE64(uint64(ref.Ref)) // write fingerprint + + // Write fingerprint. + buf.PutBE64(uint64(ref.Ref)) if version >= WALRecordEntriesV2 { - buf.PutBE64int64(ref.Counter) // write highest counter value + // Write highest counter value. + buf.PutBE64int64(ref.Counter) + } + + if version >= WALRecordEntriesV4 { + // V4 has one created timestamp per RefEntries. + buf.PutBE64int64(ref.Created) } - buf.PutUvarint(len(ref.Entries)) // write number of entries + // Write number of entries. + buf.PutUvarint(len(ref.Entries)) for _, s := range ref.Entries { buf.PutVarint64(s.Timestamp.UnixNano() - first) @@ -131,7 +136,7 @@ outer: buf.PutString(s.Line) if version >= WALRecordEntriesV3 { - // structured metadata + // Write structured metadata. buf.PutUvarint(len(s.StructuredMetadata)) for _, l := range s.StructuredMetadata { buf.PutUvarint(len(l.Name)) @@ -151,6 +156,7 @@ func DecodeEntries(b []byte, version RecordType, rec *Record) error { } dec := decWith(b) + baseTime := dec.Be64int64() for len(dec.B) > 0 && dec.Err() == nil { @@ -162,9 +168,14 @@ func DecodeEntries(b []byte, version RecordType, rec *Record) error { refEntries.Counter = dec.Be64int64() } - nEntries := dec.Uvarint() - refEntries.Entries = make([]push.Entry, 0, nEntries) - rem := nEntries + if version >= WALRecordEntriesV4 { + refEntries.Created = dec.Be64int64() + } + + n := dec.Uvarint() + refEntries.Entries = make([]push.Entry, 0, n) + rem := n + for ; dec.Err() == nil && rem > 0; rem-- { timeOffset := dec.Varint64() lineLength := dec.Uvarint() @@ -196,7 +207,7 @@ func DecodeEntries(b []byte, version RecordType, rec *Record) error { } if dec.Err() != nil { - return fmt.Errorf("entry decode error after %d RefEntries: %w", nEntries-rem, dec.Err()) + return fmt.Errorf("entry decode error after decoding %d entries in current RefEntries: %w", n-rem, dec.Err()) } rec.RefEntries = append(rec.RefEntries, refEntries) @@ -209,6 +220,7 @@ func DecodeEntries(b []byte, version RecordType, rec *Record) error { if len(dec.B) > 0 { return fmt.Errorf("unexpected %d bytes left in entry", len(dec.B)) } + return nil } @@ -226,7 +238,7 @@ func DecodeRecord(b []byte, walRec *Record) (err error) { case WALRecordSeries: userID = decbuf.UvarintStr() rSeries, err = dec.Series(decbuf.B, walRec.Series) - case WALRecordEntriesV1, WALRecordEntriesV2, WALRecordEntriesV3: + case WALRecordEntriesV1, WALRecordEntriesV2, WALRecordEntriesV3, WALRecordEntriesV4: userID = decbuf.UvarintStr() err = DecodeEntries(decbuf.B, t, walRec) default: diff --git a/internal/component/common/loki/wal/encoding_test.go b/internal/component/common/loki/wal/encoding_test.go index ae6ab427baf..6b96ce2058f 100644 --- a/internal/component/common/loki/wal/encoding_test.go +++ b/internal/component/common/loki/wal/encoding_test.go @@ -14,8 +14,7 @@ import ( func Test_Encoding_Series(t *testing.T) { record := &Record{ - entryIndexMap: make(map[uint64]int), - UserID: "123", + UserID: "123", Series: []record.RefSeries{ { Ref: 456, @@ -57,11 +56,11 @@ func Test_Encoding_Entries(t *testing.T) { { desc: "v1", rec: &Record{ - entryIndexMap: make(map[uint64]int), - UserID: "123", + UserID: "123", RefEntries: []RefEntries{ { - Ref: 456, + Ref: 456, + Created: 0, Entries: []push.Entry{ { Timestamp: time.Unix(1000, 0), @@ -82,7 +81,8 @@ func Test_Encoding_Entries(t *testing.T) { }, }, { - Ref: 789, + Ref: 789, + Created: 0, Entries: []push.Entry{ { Timestamp: time.Unix(3000, 0), @@ -109,12 +109,12 @@ func Test_Encoding_Entries(t *testing.T) { { desc: "v2", rec: &Record{ - entryIndexMap: make(map[uint64]int), - UserID: "123", + UserID: "123", RefEntries: []RefEntries{ { Ref: 456, Counter: 1, // v2 uses counter for WAL replay + Created: 0, Entries: []push.Entry{ { Timestamp: time.Unix(1000, 0), @@ -137,6 +137,7 @@ func Test_Encoding_Entries(t *testing.T) { { Ref: 789, Counter: 2, // v2 uses counter for WAL replay + Created: 0, Entries: []push.Entry{ { Timestamp: time.Unix(3000, 0), @@ -163,12 +164,12 @@ func Test_Encoding_Entries(t *testing.T) { { desc: "v3", rec: &Record{ - entryIndexMap: make(map[uint64]int), - UserID: "123", + UserID: "123", RefEntries: []RefEntries{ { Ref: 456, Counter: 1, // v2 uses counter for WAL replay + Created: 0, Entries: []push.Entry{ { Timestamp: time.Unix(1000, 0), @@ -191,6 +192,7 @@ func Test_Encoding_Entries(t *testing.T) { { Ref: 789, Counter: 2, // v2 uses counter for WAL replay + Created: 0, Entries: []push.Entry{ { Timestamp: time.Unix(3000, 0), @@ -214,6 +216,61 @@ func Test_Encoding_Entries(t *testing.T) { }, version: WALRecordEntriesV3, }, + { + desc: "v4", + rec: &Record{ + UserID: "123", + RefEntries: []RefEntries{ + { + Ref: 456, + Counter: 1, // v2 uses counter for WAL replay + Created: time.Now().UnixMicro(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(1000, 0), + Line: "first", + StructuredMetadata: push.LabelsAdapter{ + {Name: "traceID", Value: "123"}, + {Name: "userID", Value: "a"}, + }, + }, + { + Timestamp: time.Unix(2000, 0), + Line: "second", + StructuredMetadata: push.LabelsAdapter{ + {Name: "traceID", Value: "456"}, + {Name: "userID", Value: "b"}, + }, + }, + }, + }, + { + Ref: 789, + Counter: 2, // v2 uses counter for WAL replay + Created: time.Now().UnixMicro(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(3000, 0), + Line: "third", + StructuredMetadata: push.LabelsAdapter{ + {Name: "traceID", Value: "789"}, + {Name: "userID", Value: "c"}, + }, + }, + { + Timestamp: time.Unix(4000, 0), + Line: "fourth", + StructuredMetadata: push.LabelsAdapter{ + {Name: "traceID", Value: "123"}, + {Name: "userID", Value: "d"}, + }, + }, + }, + }, + }, + }, + version: WALRecordEntriesV4, + }, } { t.Run(tc.desc, func(t *testing.T) { decoded := recordPool.GetRecord() @@ -221,7 +278,6 @@ func Test_Encoding_Entries(t *testing.T) { err := DecodeRecord(buf, decoded) require.Nil(t, err) - // If the version is less than v3, we need to remove the structured metadata. expectedRecords := tc.rec if tc.version < WALRecordEntriesV3 { for i := range expectedRecords.RefEntries { @@ -231,7 +287,27 @@ func Test_Encoding_Entries(t *testing.T) { } } - require.Equal(t, expectedRecords, decoded) + require.Equal(t, expectedRecords.UserID, decoded.UserID) + require.Equal(t, expectedRecords.Series, decoded.Series) + + require.Len(t, decoded.RefEntries, len(expectedRecords.RefEntries)) + + for i := range expectedRecords.RefEntries { + require.Equal(t, expectedRecords.RefEntries[i].Ref, decoded.RefEntries[i].Ref) + require.Equal(t, expectedRecords.RefEntries[i].Counter, decoded.RefEntries[i].Counter) + require.Equal(t, expectedRecords.RefEntries[i].Created, decoded.RefEntries[i].Created) + + for j := range expectedRecords.RefEntries[i].Entries { + require.Equal(t, expectedRecords.RefEntries[i].Entries[j].Line, decoded.RefEntries[i].Entries[j].Line) + require.Equal(t, expectedRecords.RefEntries[i].Entries[j].Timestamp, decoded.RefEntries[i].Entries[j].Timestamp) + // If the version is less than v3 we don't have structured metadata. + if tc.version < WALRecordEntriesV3 { + require.Nil(t, decoded.RefEntries[i].Entries[j].StructuredMetadata) + } else { + require.Equal(t, expectedRecords.RefEntries[i].Entries[j].StructuredMetadata, decoded.RefEntries[i].Entries[j].StructuredMetadata) + } + } + } }) } } @@ -256,8 +332,7 @@ func Benchmark_EncodeEntries(b *testing.B) { entries = append(entries, entry) } record := &Record{ - entryIndexMap: make(map[uint64]int), - UserID: "123", + UserID: "123", RefEntries: []RefEntries{ { Ref: 456, @@ -301,8 +376,7 @@ func Benchmark_DecodeWAL(b *testing.B) { entries = append(entries, entry) } record := &Record{ - entryIndexMap: make(map[uint64]int), - UserID: "123", + UserID: "123", RefEntries: []RefEntries{ { Ref: 456, diff --git a/internal/component/common/loki/wal/watcher.go b/internal/component/common/loki/wal/watcher.go index eae19c8f369..3b5f85e9e10 100644 --- a/internal/component/common/loki/wal/watcher.go +++ b/internal/component/common/loki/wal/watcher.go @@ -331,6 +331,8 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { var readData bool rec := recordPool.GetRecord() + defer func() { recordPool.PutRecord(rec) }() + if err := DecodeRecord(b, rec); err != nil { w.metrics.recordDecodeFails.WithLabelValues(w.id).Inc() return readData, err diff --git a/internal/component/common/loki/wal/writer.go b/internal/component/common/loki/wal/writer.go index 31e367e54e5..0fbfdea1f85 100644 --- a/internal/component/common/loki/wal/writer.go +++ b/internal/component/common/loki/wal/writer.go @@ -261,16 +261,18 @@ func (ew *entryWriter) WriteEntry(entry loki.Entry, wl WAL, _ log.Logger) error var fp uint64 lbs := labels.FromMap(util.ModelLabelSetToMap(entry.Labels)) fp, _ = lbs.HashWithoutLabels(nil, []string(nil)...) + ref := chunks.HeadSeriesRef(fp) // Append the entry to an already existing stream (if any) ew.reusableWALRecord.RefEntries = append(ew.reusableWALRecord.RefEntries, RefEntries{ - Ref: chunks.HeadSeriesRef(fp), + Ref: ref, Entries: []push.Entry{ entry.Entry, }, + Created: entry.Created(), }) ew.reusableWALRecord.Series = append(ew.reusableWALRecord.Series, record.RefSeries{ - Ref: chunks.HeadSeriesRef(fp), + Ref: ref, Labels: lbs, }) diff --git a/internal/component/loki/process/stages/multiline.go b/internal/component/loki/process/stages/multiline.go index 75286a98bde..429694ac838 100644 --- a/internal/component/loki/process/stages/multiline.go +++ b/internal/component/loki/process/stages/multiline.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "regexp" - "slices" "strings" "sync" "time" @@ -13,9 +12,7 @@ import ( "github.com/go-kit/log" "github.com/prometheus/common/model" - "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/runtime/logging/level" - "github.com/grafana/loki/pkg/push" ) // Configuration errors. @@ -80,6 +77,12 @@ type multilineState struct { currentLines uint64 // The number of lines of the current multiline block. } +func (s *multilineState) Reset() { + s.buffer.Reset() + s.currentLines = 0 + s.startLineEntry = Entry{} +} + // newMultilineStage creates a MulitlineStage from config func newMultilineStage(logger log.Logger, config MultilineConfig) (Stage, error) { regex, err := validateMultilineConfig(config) @@ -169,6 +172,7 @@ func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.Wa if state.buffer.Len() > 0 { state.buffer.WriteRune('\n') } + line := e.Line if m.cfg.TrimNewlines { line = strings.TrimRight(line, "\r\n") @@ -188,26 +192,12 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) { level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len()) return } - // copy extracted data. - extracted := make(map[string]any, len(s.startLineEntry.Extracted)) - for k, v := range s.startLineEntry.Extracted { - extracted[k] = v - } - collapsed := Entry{ - Extracted: extracted, - Entry: loki.Entry{ - Labels: s.startLineEntry.Entry.Labels.Clone(), - Entry: push.Entry{ - Timestamp: s.startLineEntry.Entry.Entry.Timestamp, - Line: s.buffer.String(), - StructuredMetadata: slices.Clone(s.startLineEntry.Entry.Entry.StructuredMetadata), - }, - }, - } - s.buffer.Reset() - s.currentLines = 0 - out <- collapsed + entry := s.startLineEntry + entry.Line = s.buffer.String() + + s.Reset() + out <- entry } // Cleanup implements Stage. diff --git a/internal/component/loki/source/api/internal/lokipush/push_api_server.go b/internal/component/loki/source/api/internal/lokipush/push_api_server.go index d1c959c5f36..f4a03f4c650 100644 --- a/internal/component/loki/source/api/internal/lokipush/push_api_server.go +++ b/internal/component/loki/source/api/internal/lokipush/push_api_server.go @@ -204,6 +204,7 @@ func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) { entries []loki.Entry lastErr error + created = time.Now() tenantID, _ = tenant.TenantID(r.Context()) ) for _, stream := range req.Streams { @@ -241,14 +242,8 @@ func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) { } for _, entry := range stream.Entries { - e := loki.Entry{ - Labels: filtered.Clone(), - Entry: lokipush.Entry{ - Line: entry.Line, - StructuredMetadata: entry.StructuredMetadata, - Parsed: entry.Parsed, - }, - } + // TODO(kalleep): pretty sure we don't have to clone here. + e := loki.NewEntryWithCreated(filtered.Clone(), created, entry) if keepTimestamp { e.Timestamp = entry.Timestamp } else { @@ -288,7 +283,10 @@ func (s *PushAPIServer) handlePlaintext(w http.ResponseWriter, r *http.Request) body := bufio.NewReader(r.Body) addLabels := s.getLabels() - var entries []loki.Entry + var ( + entries []loki.Entry + created = time.Now() + ) for { line, err := body.ReadString('\n') @@ -305,7 +303,10 @@ func (s *PushAPIServer) handlePlaintext(w http.ResponseWriter, r *http.Request) continue } - entries = append(entries, loki.Entry{Labels: addLabels, Entry: lokipush.Entry{Timestamp: time.Now(), Line: line}}) + entries = append( + entries, + loki.NewEntryWithCreated(addLabels, created, lokipush.Entry{Timestamp: time.Now(), Line: line}), + ) if err == io.EOF { break } diff --git a/internal/component/loki/source/aws_firehose/internal/handler.go b/internal/component/loki/source/aws_firehose/internal/handler.go index 7e6f253f9d2..c6cf4d228b1 100644 --- a/internal/component/loki/source/aws_firehose/internal/handler.go +++ b/internal/component/loki/source/aws_firehose/internal/handler.go @@ -136,9 +136,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { for l, v := range requestStaticLabels { commonLabels.Set(string(l), string(v)) } - h.metrics.batchSize.WithLabelValues().Observe(float64(len(firehoseReq.Records))) + created := time.Now() for _, rec := range firehoseReq.Records { // cleanup err since it might have failed in the previous iteration err = nil @@ -159,15 +159,10 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { switch recordType { case OriginDirectPUT: - h.sender.Send(req.Context(), loki.Entry{ - Labels: h.postProcessLabels(commonLabels.Labels()), - Entry: push.Entry{ - Timestamp: ts, - Line: string(decodedRecord), - }, - }) + lset := h.postProcessLabels(commonLabels.Labels()) + h.sender.Send(req.Context(), loki.NewEntryWithCreated(lset, created, push.Entry{Timestamp: ts, Line: string(decodedRecord)})) case OriginCloudwatchLogs: - err = h.handleCloudwatchLogsRecord(req.Context(), decodedRecord, commonLabels.Labels(), ts) + err = h.handleCloudwatchLogsRecord(req.Context(), decodedRecord, commonLabels.Labels(), ts, created) } if err != nil { h.metrics.errorsRecord.WithLabelValues(getReason(err)).Inc() @@ -260,7 +255,7 @@ func (h *Handler) decodeRecord(rec string) ([]byte, RecordOrigin, error) { // handleCloudwatchLogsRecord explodes the cloudwatch logs record into each log message. Also, it adds all properties // sent in the envelope as internal labels, available for relabel. -func (h *Handler) handleCloudwatchLogsRecord(ctx context.Context, data []byte, commonLabels labels.Labels, timestamp time.Time) error { +func (h *Handler) handleCloudwatchLogsRecord(ctx context.Context, data []byte, commonLabels labels.Labels, timestamp, created time.Time) error { cwRecord := CloudwatchLogsRecord{} if err := json.Unmarshal(data, &cwRecord); err != nil { return errWithReason{ @@ -280,13 +275,9 @@ func (h *Handler) handleCloudwatchLogsRecord(ctx context.Context, data []byte, c if h.useIncomingTs { timestamp = time.UnixMilli(event.Timestamp) } - h.sender.Send(ctx, loki.Entry{ - Labels: h.postProcessLabels(cwLogsLabels.Labels()), - Entry: push.Entry{ - Timestamp: timestamp, - Line: event.Message, - }, - }) + + lset := h.postProcessLabels(cwLogsLabels.Labels()) + h.sender.Send(ctx, loki.NewEntryWithCreated(lset, created, push.Entry{Timestamp: timestamp, Line: event.Message})) } return nil diff --git a/internal/component/loki/source/azure_event_hubs/internal/parser/parser.go b/internal/component/loki/source/azure_event_hubs/internal/parser/parser.go index 1d91af1aa2c..a858ddb9c3b 100644 --- a/internal/component/loki/source/azure_event_hubs/internal/parser/parser.go +++ b/internal/component/loki/source/azure_event_hubs/internal/parser/parser.go @@ -132,14 +132,11 @@ func (e *AzureEventHubsTargetMessageParser) tryUnmarshal(message []byte) (*azure return data, nil } -func (e *AzureEventHubsTargetMessageParser) entryWithCustomPayload(body []byte, labelSet model.LabelSet, messageTime time.Time) loki.Entry { - return loki.Entry{ - Labels: labelSet, - Entry: push.Entry{ - Timestamp: messageTime, - Line: string(body), - }, - } +func (e *AzureEventHubsTargetMessageParser) entryWithCustomPayload(body []byte, lset model.LabelSet, messageTime time.Time) loki.Entry { + return loki.NewEntry(lset, push.Entry{ + Timestamp: messageTime, + Line: string(body), + }) } // processRecords handles the case when message is a valid json with a key `records`. It can be either a custom payload or a resource log. @@ -158,7 +155,7 @@ func (e *AzureEventHubsTargetMessageParser) processRecords(labelSet model.LabelS // parseRecord parses a single value from the "records" in the original message. // It can also handle a case when the record contains custom data and doesn't match the schema for Azure resource logs. -func (e *AzureEventHubsTargetMessageParser) parseRecord(record []byte, labelSet model.LabelSet, relabelConfig []*relabel.Config, useIncomingTimestamp bool, messageTime time.Time) (loki.Entry, error) { +func (e *AzureEventHubsTargetMessageParser) parseRecord(record []byte, lset model.LabelSet, relabelConfig []*relabel.Config, useIncomingTimestamp bool, messageTime time.Time) (loki.Entry, error) { logRecord := &azureMonitorResourceLog{} err := json.Unmarshal(record, logRecord) if err == nil { @@ -170,19 +167,15 @@ func (e *AzureEventHubsTargetMessageParser) parseRecord(record []byte, labelSet return loki.Entry{}, err } - return e.entryWithCustomPayload(record, labelSet, messageTime), nil + return e.entryWithCustomPayload(record, lset, messageTime), nil } logLabels := e.getLabels(logRecord, relabelConfig) - ts := e.getTime(messageTime, useIncomingTimestamp, logRecord) - - return loki.Entry{ - Labels: labelSet.Merge(logLabels), - Entry: push.Entry{ - Timestamp: ts, - Line: string(record), - }, - }, nil + + return loki.NewEntry(lset.Merge(logLabels), push.Entry{ + Timestamp: e.getTime(messageTime, useIncomingTimestamp, logRecord), + Line: string(record), + }), nil } func (e *AzureEventHubsTargetMessageParser) getTime(messageTime time.Time, useIncomingTimestamp bool, logRecord *azureMonitorResourceLog) time.Time { diff --git a/internal/component/loki/source/cloudflare/tailer.go b/internal/component/loki/source/cloudflare/tailer.go index 6b5ed4cd4e1..8fbbd6e5f1c 100644 --- a/internal/component/loki/source/cloudflare/tailer.go +++ b/internal/component/loki/source/cloudflare/tailer.go @@ -180,13 +180,13 @@ func (t *tailer) pull(ctx context.Context, start, end time.Time) error { if err != nil { ts = time.Now().UnixNano() } - t.handler.Chan() <- loki.Entry{ - Labels: t.config.Labels.Clone(), - Entry: push.Entry{ - Timestamp: time.Unix(0, ts), - Line: string(line), - }, - } + + // TODO(kalleep): pretty sure we don't have to clone here. + t.handler.Chan() <- loki.NewEntry(t.config.Labels.Clone(), push.Entry{ + Timestamp: time.Unix(0, ts), + Line: string(line), + }) + lineRead++ t.metrics.Entries.Inc() } diff --git a/internal/component/loki/source/docker/tailer.go b/internal/component/loki/source/docker/tailer.go index 26950ece6b4..509bd477bec 100644 --- a/internal/component/loki/source/docker/tailer.go +++ b/internal/component/loki/source/docker/tailer.go @@ -323,13 +323,10 @@ func (t *tailer) process(r io.Reader, logStreamLset model.LabelSet) { continue } - t.recv.Chan() <- loki.Entry{ - Labels: logStreamLset, - Entry: push.Entry{ - Timestamp: ts, - Line: string(content), - }, - } + t.recv.Chan() <- loki.NewEntry(logStreamLset, push.Entry{ + Timestamp: ts, + Line: string(content), + }) t.metrics.dockerEntries.Inc() // NOTE(@tpaschalis) We don't save the positions entry with the diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index 0ff4f515194..b38417f5ca3 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -219,13 +219,10 @@ func (t *tailer) readLines(pos int64, done chan struct{}) { } t.metrics.readLines.WithLabelValues(t.key.Path).Inc() - entries <- loki.Entry{ - Labels: t.labels, - Entry: push.Entry{ - Timestamp: line.Time, - Line: line.Text, - }, - } + entries <- loki.NewEntry(t.labels, push.Entry{ + Timestamp: line.Time, + Line: line.Text, + }) lastOffset = line.Offset if time.Since(lastUpdatedPosition) >= positionInterval { diff --git a/internal/component/loki/source/gcplog/internal/gcplogtarget/formatter.go b/internal/component/loki/source/gcplog/internal/gcplogtarget/formatter.go index 91df957ac4b..64dc5cb2e48 100644 --- a/internal/component/loki/source/gcplog/internal/gcplogtarget/formatter.go +++ b/internal/component/loki/source/gcplog/internal/gcplogtarget/formatter.go @@ -123,11 +123,5 @@ func parseGCPLogsEntry(data []byte, other model.LabelSet, otherInternal labels.L line = ge.TextPayload } - return loki.Entry{ - Labels: lbls, - Entry: push.Entry{ - Timestamp: ts, - Line: line, - }, - }, nil + return loki.NewEntry(lbls, push.Entry{Timestamp: ts, Line: line}), nil } diff --git a/internal/component/loki/source/gelf/internal/target/gelftarget.go b/internal/component/loki/source/gelf/internal/target/gelftarget.go index 64aab7bb6e7..f03717e2f82 100644 --- a/internal/component/loki/source/gelf/internal/target/gelftarget.go +++ b/internal/component/loki/source/gelf/internal/target/gelftarget.go @@ -148,13 +148,11 @@ func (t *Target) handleMessage(msg *gelf.Message) { t.metrics.gelfErrors.Inc() return } - t.handler.Chan() <- loki.Entry{ - Labels: filtered, - Entry: push.Entry{ - Timestamp: timestamp, - Line: t.encodeBuff.String(), - }, - } + + t.handler.Chan() <- loki.NewEntry(filtered, push.Entry{ + Timestamp: timestamp, + Line: t.encodeBuff.String(), + }) } func secondsToUnixTimestamp(seconds float64) time.Time { diff --git a/internal/component/loki/source/heroku/internal/herokutarget/herokutarget.go b/internal/component/loki/source/heroku/internal/herokutarget/herokutarget.go index 925631305ae..ba671dff236 100644 --- a/internal/component/loki/source/heroku/internal/herokutarget/herokutarget.go +++ b/internal/component/loki/source/heroku/internal/herokutarget/herokutarget.go @@ -121,13 +121,10 @@ func (h *HerokuTarget) drain(w http.ResponseWriter, r *http.Request) { filtered[ReservedLabelTenantID] = model.LabelValue(tenantIDHeaderValue) } - entries <- loki.Entry{ - Labels: filtered, - Entry: push.Entry{ - Timestamp: ts, - Line: message.Message, - }, - } + entries <- loki.NewEntry(filtered, push.Entry{ + Timestamp: ts, + Line: message.Message, + }) h.metrics.herokuEntries.Inc() } err := herokuScanner.Err() diff --git a/internal/component/loki/source/internal/kafkatarget/parser.go b/internal/component/loki/source/internal/kafkatarget/parser.go index 304d7b9c049..f0349e646d7 100644 --- a/internal/component/loki/source/internal/kafkatarget/parser.go +++ b/internal/component/loki/source/internal/kafkatarget/parser.go @@ -14,12 +14,9 @@ type KafkaTargetMessageParser struct{} func (p *KafkaTargetMessageParser) Parse(message *sarama.ConsumerMessage, labels model.LabelSet, relabels []*relabel.Config, useIncomingTimestamp bool) ([]loki.Entry, error) { return []loki.Entry{ - { - Labels: labels, - Entry: push.Entry{ - Timestamp: timestamp(useIncomingTimestamp, message.Timestamp), - Line: string(message.Value), - }, - }, + loki.NewEntry(labels, push.Entry{ + Timestamp: timestamp(useIncomingTimestamp, message.Timestamp), + Line: string(message.Value), + }), }, nil } diff --git a/internal/component/loki/source/journal/tailer.go b/internal/component/loki/source/journal/tailer.go index 25905f7ff2f..4db07edf0ea 100644 --- a/internal/component/loki/source/journal/tailer.go +++ b/internal/component/loki/source/journal/tailer.go @@ -339,13 +339,11 @@ func (t *tailer) formatter(entry *sdjournal.JournalEntry) (string, error) { t.metrics.journalLines.Inc() t.positions.PutString(t.positionPath, "", entry.Cursor) - t.recv.Chan() <- loki.Entry{ - Labels: lbls, - Entry: push.Entry{ - Line: msg, - Timestamp: ts, - }, - } + + t.recv.Chan() <- loki.NewEntry(lbls, push.Entry{ + Line: msg, + Timestamp: ts, + }) return journalEmptyStr, nil } diff --git a/internal/component/loki/source/kubernetes/kubetail/tailer.go b/internal/component/loki/source/kubernetes/kubetail/tailer.go index 2121873f11f..be928b9bd18 100644 --- a/internal/component/loki/source/kubernetes/kubetail/tailer.go +++ b/internal/component/loki/source/kubernetes/kubetail/tailer.go @@ -288,13 +288,10 @@ func (t *tailer) processLogStream(ctx context.Context, stream io.ReadCloser, han } lastReadTime = entryTimestamp - entry := loki.Entry{ - Labels: t.lset.Clone(), - Entry: push.Entry{ - Timestamp: entryTimestamp, - Line: entryLine, - }, - } + entry := loki.NewEntry(t.lset.Clone(), push.Entry{ + Timestamp: entryTimestamp, + Line: entryLine, + }) select { case <-ctx.Done(): diff --git a/internal/component/loki/source/kubernetes_events/event_controller.go b/internal/component/loki/source/kubernetes_events/event_controller.go index 83d2d2181b2..65bb5c09947 100644 --- a/internal/component/loki/source/kubernetes_events/event_controller.go +++ b/internal/component/loki/source/kubernetes_events/event_controller.go @@ -211,13 +211,10 @@ func (ctrl *eventController) handleEvent(ctx context.Context, event *corev1.Even return err } - entry := loki.Entry{ - Entry: push.Entry{ - Timestamp: eventTs, - Line: msg, - }, - Labels: lset, - } + entry := loki.NewEntry(lset, push.Entry{ + Timestamp: eventTs, + Line: msg, + }) select { case <-ctx.Done(): diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget.go b/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget.go index 947d0e00682..b1bea63c47b 100644 --- a/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget.go +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget.go @@ -338,13 +338,10 @@ func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Messag func (t *SyslogTarget) messageSender(entries chan<- loki.Entry) { for msg := range t.messages { - entries <- loki.Entry{ - Labels: msg.labels, - Entry: push.Entry{ - Timestamp: msg.timestamp, - Line: msg.message, - }, - } + entries <- loki.NewEntry(msg.labels, push.Entry{ + Timestamp: msg.timestamp, + Line: msg.message, + }) t.metrics.syslogEntries.Inc() } t.messagesDone <- struct{}{} diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget_test.go b/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget_test.go index 0060b8405f6..781944f06f8 100644 --- a/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget_test.go +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget_test.go @@ -738,8 +738,9 @@ func TestSyslogTarget_RFC3164CiscoComponents(t *testing.T) { received := handler.Received() require.NotEmpty(t, received) - msg := received[0] - require.Equal(t, tc.expect, msg) + got := received[0] + require.Equal(t, tc.expect.Labels, got.Labels) + require.Equal(t, tc.expect.Entry, got.Entry) }) } } diff --git a/internal/component/loki/source/windowsevent/target.go b/internal/component/loki/source/windowsevent/target.go index 85a96fb44b8..ff57fcdf7a8 100644 --- a/internal/component/loki/source/windowsevent/target.go +++ b/internal/component/loki/source/windowsevent/target.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/loki/pkg/push" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" @@ -157,9 +158,7 @@ func (t *Target) renderEntries(events []win_eventlog.Event) []loki.Entry { res := make([]loki.Entry, 0, len(events)) lbs := labels.NewBuilder(labels.EmptyLabels()) for _, event := range events { - entry := loki.Entry{ - Labels: make(model.LabelSet), - } + entry := loki.NewEntry(make(model.LabelSet), push.Entry{}) entry.Timestamp = time.Now() if t.cfg.UseIncomingTimestamp {