Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bdab34b
loki: Add create to loki entry and New functions
kalleep Feb 27, 2026
9ebc54f
loki.source.api: Create entry using loki.NewEntryWithCreated
kalleep Feb 27, 2026
15394bf
loki.source.firehose: Use loki.NewEntry
kalleep Feb 27, 2026
7fad8fd
loki.source.azure_event_hubs: use loki.NewEntry
kalleep Feb 27, 2026
ce109cd
loki.souce.cloudflare: use loki.NewEntry
kalleep Feb 27, 2026
96ba122
loki.source.docker: use loki.NewEntry
kalleep Feb 27, 2026
77ccb13
loki.source.file: Use loki.NewEntry
kalleep Feb 27, 2026
be1eb2e
loki.source.gcplog: Use loki.NewEntry
kalleep Feb 27, 2026
aa17278
loki.source.gelf: use loki.NewEntry
kalleep Feb 27, 2026
793bc7a
loki.source.heroku: Use loki.NewEntry
kalleep Feb 27, 2026
0979bce
loki.source.kafka: Use loki.NewEntry
kalleep Feb 27, 2026
51d8e61
loki.source.journal: Use loki.NewEntry
kalleep Feb 27, 2026
cd10d64
loki.source.kubernetes: Use loki.NewEntry
kalleep Feb 27, 2026
138c5e3
loki.source.kubernetes_events: Use loki.NewEntry
kalleep Feb 27, 2026
16d2894
loki.source.syslog: Use loki.New
kalleep Feb 27, 2026
10b572e
loki.source.windowsevent: Use loki.New
kalleep Feb 27, 2026
b25a879
stage.multiline: remove unnessisary clones and preserve created from
kalleep Feb 27, 2026
d6e9e28
Record entry propagation latency for non wal
kalleep Feb 27, 2026
63ebbd9
Put record back in pool
kalleep Mar 3, 2026
f107613
Remove unused map and method
kalleep Mar 3, 2026
b8dba66
add v4 of wal entries where we also encode / decode created timestamps
kalleep Mar 3, 2026
3c2b0bd
Add comment
kalleep Mar 3, 2026
c3c3a18
Update docs
kalleep Mar 3, 2026
7992171
Update internal/component/common/loki/entry.go
kalleep Mar 3, 2026
76cf40a
Update internal/component/common/loki/client/metrics.go
kalleep Mar 3, 2026
bda3d9c
Update
kalleep Mar 3, 2026
6df8449
Update comment
kalleep Mar 3, 2026
5ce9652
fix metric name, buckets and add native histogram settings
kalleep Mar 4, 2026
f4312dd
Store UnixMicro seconds instead of time.Time
kalleep Mar 4, 2026
19923f4
Add comment
kalleep Mar 4, 2026
147c3ff
Update comment
kalleep Mar 4, 2026
5825ba2
Update docs/sources/reference/components/loki/loki.write.md
kalleep Mar 9, 2026
fdc07a0
Update comments
kalleep Mar 9, 2026
c059136
Update internal/component/common/loki/wal/encoding.go
kalleep Mar 10, 2026
749a85f
Update internal/component/common/loki/wal/encoding.go
kalleep Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sources/reference/components/loki/loki.write.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ The following fields are exported and can be referenced by other components:
* `loki_write_sent_entries_total` (counter): Number of log entries sent to the ingester.
* `loki_write_request_size_bytes` (histogram): Number of bytes for encoded requests.
* `loki_write_request_duration_seconds` (histogram): Duration of sent requests.
* `loki_write_entry_propagation_latency_seconds` (histogram): Time in seconds from entry creation until it's either successfully sent or dropped.

## Examples

Expand Down
28 changes: 22 additions & 6 deletions internal/component/common/loki/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/grafana/loki/pkg/push"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/alloy/internal/component/common/loki"
Expand All @@ -31,6 +32,8 @@ type SentDataMarkerHandler interface {
// streams for each tenant are stored in a dedicated batch.
type batch struct {
streams map[string]*push.Stream
// created stores per-entry creation timestamps in unix micro seconds for latency observation.
created []int64
// createdAt is when the batch was created.
createdAt time.Time
Comment thread
thampiotr marked this conversation as resolved.
// maxSize is the maximum batch size in bytes. At least one entry is always
Expand Down Expand Up @@ -68,9 +71,10 @@ func (b *batch) add(entry loki.Entry, segmentNum int) error {
return errBatchSizeReached
}

stream.Entries = append(stream.Entries, entry.Entry)
b.size += size
b.countForSegment(segmentNum)
b.created = append(b.created, entry.Created())
stream.Entries = append(stream.Entries, entry.Entry)
return nil
}

Expand All @@ -87,12 +91,13 @@ func (b *batch) add(entry loki.Entry, segmentNum int) error {
return errBatchSizeReached
}

b.size += size
b.countForSegment(segmentNum)
b.created = append(b.created, entry.Created())
b.streams[labels] = &push.Stream{
Labels: labels,
Entries: []push.Entry{entry.Entry},
}
b.size += size
b.countForSegment(segmentNum)
return nil
}

Expand Down Expand Up @@ -127,12 +132,23 @@ func (b *batch) countForSegment(segmentNum int) {
b.segmentCounter[segmentNum] = 1
}

// reportAsSentData will report for all segments whose data is part of this batch, the amount of that data as sent to
// the provided SentDataMarkerHandler
func (b *batch) reportAsSentData(h SentDataMarkerHandler) {
// reportAsSentData reports sent data counts per segment and observes per-entry propagation latency.
func (b *batch) reportAsSentData(h SentDataMarkerHandler, obs prometheus.Observer) {
for seg, data := range b.segmentCounter {
h.UpdateSentData(seg, data)
}

now := time.Now().UnixMicro()
for _, created := range b.created {
// NOTE: Some WAL entries may not have a created timestamp, so we ignore 0.
// We also only record entries where created <= now. Since created is stored as
// Unix microseconds, monotonic time is lost. If wall clock adjustments make
// created appear in the future, we skip that sample.
if created != 0 && created <= now {
// Track entry propagation latency in seconds.
obs.Observe(float64(now-created) / 1e6)
}
Comment thread
kalleep marked this conversation as resolved.
}
}

// labelsMapToString encodes an entry's label set as a string, ignoring internal labels
Expand Down
5 changes: 3 additions & 2 deletions internal/component/common/loki/client/consumer_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,9 @@ func (c *walEndpointAdapter) AppendEntries(entries wal.RefEntries, segment int)
)

if ok {
for _, e := range entries.Entries {
err := c.endpoint.enqueue(loki.Entry{Labels: l, Entry: e}, segment)
for i := range entries.Entries {
e := entries.EntryAt(l, i)
err := c.endpoint.enqueue(e, segment)
// We can receive errQueueIsFull if we have configured endpoint with BlockOnOverflow.
// Here we just skip the entry and try with the next one.
if errors.Is(err, errQueueIsFull) {
Expand Down
6 changes: 4 additions & 2 deletions internal/component/common/loki/client/consumer_wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ func TestWALEndpoint(t *testing.T) {
}, 0)

_ = adapter.AppendEntries(wal.RefEntries{
Ref: chunks.HeadSeriesRef(mod),
Ref: chunks.HeadSeriesRef(mod),
Created: time.Now().UnixMicro(),
Entries: []push.Entry{{
Timestamp: time.Now(),
Line: l,
Expand Down Expand Up @@ -433,7 +434,8 @@ func runWALEndpointBenchCase(b *testing.B, bc testCase, mhFactory func(t *testin
}, 0)

_ = adapter.AppendEntries(wal.RefEntries{
Ref: chunks.HeadSeriesRef(seriesId),
Ref: chunks.HeadSeriesRef(seriesId),
Created: time.Now().UnixMicro(),
Entries: []push.Entry{{
Timestamp: time.Now(),
Line: l,
Expand Down
12 changes: 12 additions & 0 deletions internal/component/common/loki/client/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package client

import (
"time"

"github.com/grafana/alloy/internal/util"
"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -27,6 +29,7 @@ type metrics struct {
requestSize *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
batchRetries *prometheus.CounterVec
entryLatency *prometheus.HistogramVec
countersWithHostTenant []*prometheus.CounterVec
countersWithHostTenantReason []*prometheus.CounterVec
}
Expand Down Expand Up @@ -56,6 +59,14 @@ func newMetrics(reg prometheus.Registerer) *metrics {
MiB = 1024 * KiB
)

m.entryLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "loki_write_entry_propagation_latency_seconds",
Help: "Write latency for entries",
Buckets: []float64{0.1, 0.5, 1, 5, 10, 30, 60, 120, 300, 600},
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{labelHost, labelTenant})
m.requestSize = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "loki_write_request_size_bytes",
Help: "Number of bytes for requests.",
Expand Down Expand Up @@ -83,6 +94,7 @@ func newMetrics(reg prometheus.Registerer) *metrics {
m.droppedBytes = util.MustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)
m.sentEntries = util.MustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec)
m.droppedEntries = util.MustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec)
m.entryLatency = util.MustRegisterOrGet(reg, m.entryLatency).(*prometheus.HistogramVec)
m.requestSize = util.MustRegisterOrGet(reg, m.requestSize).(*prometheus.HistogramVec)
m.requestDuration = util.MustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec)
m.batchRetries = util.MustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec)
Expand Down
3 changes: 2 additions & 1 deletion internal/component/common/loki/client/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,8 @@ func (s *shards) initBatchMetrics(tenantID string) {

// sendBatch encodes a batch and sends it to Loki with retry logic.
func (s *shards) sendBatch(tenantID string, batch *batch, protoBuf, snappyBuf *[]byte) {
defer batch.reportAsSentData(s.markerHandler)
obs := s.metrics.entryLatency.WithLabelValues(s.cfg.URL.Host, tenantID)
defer batch.reportAsSentData(s.markerHandler, obs)

r, entriesCount := batch.request()

Expand Down
43 changes: 40 additions & 3 deletions internal/component/common/loki/entry.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,54 @@
package loki

import (
"time"

"github.com/grafana/loki/pkg/push"
"github.com/prometheus/common/model"
)

// Entry is a log entry with labels.
func NewEntry(lset model.LabelSet, e push.Entry) Entry {
return Entry{
Labels: lset,
Entry: e,
created: time.Now().UnixMicro(),
}
}

func NewEntryWithCreated(lset model.LabelSet, created time.Time, e push.Entry) Entry {
return Entry{
Labels: lset,
Entry: e,
created: created.UnixMicro(),
}
}

func NewEntryWithCreatedUnixMicro(lset model.LabelSet, created int64, e push.Entry) Entry {
return Entry{
Labels: lset,
Entry: e,
created: created,
}
}

// Entry is a push.Entry with labels.
// It should be created using either NewEntry or NewEntryWithCreated.
type Entry struct {
Labels model.LabelSet
push.Entry

// Created is a unix timestamp in micro seconds.
// FIXME(kalleep): Currently we store created for each entry.
// When moving to batching we can store it per batch.
created int64
}

// Clone returns a copy of the entry so that it can be safely fanned out.
func (e *Entry) Clone() Entry {
return Entry{
Labels: e.Labels.Clone(),
Entry: e.Entry,
Labels: e.Labels.Clone(),
Entry: e.Entry,
created: e.created,
}
}

Expand All @@ -30,3 +63,7 @@ func (e *Entry) Size() int {
}
return size
}

func (e *Entry) Created() int64 {
return e.created
}
Loading
Loading