Skip to content

Commit

Permalink
test: add push and downsample test
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Aug 1, 2024
1 parent 019df76 commit 3dcc50d
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 3 deletions.
154 changes: 154 additions & 0 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/grafana/dskit/ring"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/pattern/aggregation"
"github.com/grafana/loki/v3/pkg/pattern/iter"

"github.com/grafana/loki/v3/pkg/pattern/drain"

"github.com/grafana/loki/pkg/push"
loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"
)

func TestInstancePushQuery(t *testing.T) {
Expand Down Expand Up @@ -96,6 +99,157 @@ func TestInstancePushQuery(t *testing.T) {
require.Equal(t, 2, len(res.Series))
}

func TestInstancePushAggregateMetrics(t *testing.T) {
lbs := labels.New(
labels.Label{Name: "test", Value: "test"},
labels.Label{Name: "service_name", Value: "test_service"},
labels.Label{Name: "level", Value: "info"},
)
lbs2 := labels.New(
labels.Label{Name: "foo", Value: "bar"},
labels.Label{Name: "service_name", Value: "foo_service"},
labels.Label{Name: "level", Value: "error"},
)

setup := func() (*instance, *mockEntryWriter) {
ingesterID := "foo"
replicationSet := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Id: ingesterID, Addr: "ingester0"},
{Id: "bar", Addr: "ingester1"},
{Id: "baz", Addr: "ingester2"},
},
}

fakeRing := &fakeRing{}
fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(replicationSet, nil)

ringClient := &fakeRingClient{
ring: fakeRing,
}

mockWriter := &mockEntryWriter{}
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything)

inst, err := newInstance(
"foo",
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
drain.DefaultConfig(),
ringClient,
ingesterID,
mockWriter,
)
require.NoError(t, err)

err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(20, 0),
Line: "ts=1 msg=hello",
},
},
},
{
Labels: lbs2.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(20, 0),
Line: "ts=1 msg=hello",
},
},
},
},
})
for i := 0; i < 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(20, 0),
Line: "foo bar foo bar",
},
},
},
{
Labels: lbs2.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(20, 0),
Line: "foo bar foo bar",
},
},
},
},
})
require.NoError(t, err)
}
require.NoError(t, err)

return inst, mockWriter
}

t.Run("accumulates bytes and count on every push", func(t *testing.T) {
inst, _ := setup()

require.Len(t, inst.aggMetricsByStream, 2)

require.Equal(t, uint64(14+(15*30)), inst.aggMetricsByStream[lbs.String()].bytes)
require.Equal(t, uint64(14+(15*30)), inst.aggMetricsByStream[lbs2.String()].bytes)

require.Equal(t, uint64(31), inst.aggMetricsByStream[lbs.String()].count)
require.Equal(t, uint64(31), inst.aggMetricsByStream[lbs2.String()].count)
})

t.Run("downsamples aggregated metrics", func(t *testing.T) {
inst, mockWriter := setup()
now := model.Now()
inst.Downsample(now)

mockWriter.AssertCalled(
t,
"WriteEntry",
now.Time(),
aggregation.AggregatedMetricEntry(
now,
uint64(14+(15*30)),
uint64(31),
"test_service",
lbs,
),
labels.New(
labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "test_service"},
labels.Label{Name: "level", Value: "info"},
),
)

mockWriter.AssertCalled(
t,
"WriteEntry",
now.Time(),
aggregation.AggregatedMetricEntry(
now,
uint64(14+(15*30)),
uint64(31),
"foo_service",
lbs2,
),
labels.New(
labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "foo_service"},
labels.Label{Name: "level", Value: "error"},
),
)

require.Equal(t, 0, len(inst.aggMetricsByStream))
})
}

type mockEntryWriter struct {
mock.Mock
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,6 @@ func (i *instance) Downsample(now model.Time) {
}()

for stream, metrics := range i.aggMetricsByStream {
// TODO(twhitney)
// c.metrics.samples.Inc()

lbls, err := syntax.ParseLabels(stream)
if err != nil {
continue
Expand Down Expand Up @@ -314,5 +311,7 @@ func (i *instance) writeAggregatedMetrics(
aggregation.AggregatedMetricEntry(now, totalBytes, totalCount, service, streamLbls),
newLbls,
)

i.metrics.samples.WithLabelValues(service).Inc()
}
}
7 changes: 7 additions & 0 deletions pkg/pattern/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type ingesterMetrics struct {
patternsDetectedTotal *prometheus.CounterVec
tokensPerLine *prometheus.HistogramVec
statePerLine *prometheus.HistogramVec
samples *prometheus.CounterVec
}

func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics {
Expand Down Expand Up @@ -47,6 +48,12 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
Help: "The number of items of additional state returned alongside tokens for pattern recognition.",
Buckets: []float64{20, 40, 80, 120, 160, 320, 640, 1280},
}, []string{"tenant", "format"}),
samples: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "metric_samples",
Help: "The total number of samples created to write back to Loki.",
}, []string{"service_name"}),
}
}

Expand Down

0 comments on commit 3dcc50d

Please sign in to comment.