Skip to content

Commit

Permalink
chore: remove the metric aggregation experiment
Browse files Browse the repository at this point in the history
* this will make way for a new, simpler approach
  • Loading branch information
trevorwhitney committed Jul 31, 2024
1 parent dad6fb5 commit d51cecc
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 149 deletions.
2 changes: 1 addition & 1 deletion pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

ring_client "github.com/grafana/dskit/ring/client"

"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/pattern/clientpool"
"github.com/grafana/loki/v3/pkg/pattern/drain"
Expand Down Expand Up @@ -288,7 +289,6 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
i.logger,
i.metrics,
i.drainCfg,
i.cfg.MetricAggregation,
)
if err != nil {
return nil, err
Expand Down
64 changes: 11 additions & 53 deletions pkg/pattern/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/iter"
)

Expand Down Expand Up @@ -73,46 +72,20 @@ func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatte
return prunePatterns(resp, minClusterSize, q.ingesterQuerierMetrics), nil
}

func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse {
pruneConfig := drain.DefaultConfig()
pruneConfig.SimTh = 1.0 // Merge & de-dup patterns but don't modify them

func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int64, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse {
patternsBefore := len(resp.Series)
d := drain.New(pruneConfig, nil)
for _, p := range resp.Series {
d.TrainPattern(p.Pattern, p.Samples)
}
total := make([]int64, len(resp.Series))

resp.Series = resp.Series[:0]
for _, cluster := range d.Clusters() {
if cluster.Size < minClusterSize {
continue
}
pattern := d.PatternString(cluster)
if pattern == "" {
continue
for i, p := range resp.Series {
for _, s := range p.Samples {
total[i] += s.Value
}
resp.Series = append(resp.Series, &logproto.PatternSeries{
Pattern: pattern,
Samples: cluster.Samples(),
})
}
metrics.patternsPrunedTotal.Add(float64(patternsBefore - len(resp.Series)))
metrics.patternsRetainedTotal.Add(float64(len(resp.Series)))
return resp
}

// ForAllIngesters runs f, in parallel, for all ingesters
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) {
replicationSet, err := q.ringClient.ring.GetReplicationSetForOperation(ring.Read)
if err != nil {
return nil, err
}

return q.forGivenIngesters(ctx, replicationSet, f)
}

// Create a slice of structs to keep Series and total together
type SeriesWithTotal struct {
Series *logproto.PatternSeries
Total int64
}

seriesWithTotals := make([]SeriesWithTotal, len(resp.Series))
Expand Down Expand Up @@ -155,7 +128,7 @@ func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Co

// ForAllIngesters runs f, in parallel, for all ingesters
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) {
replicationSet, err := q.ringClient.ring.GetReplicationSetForOperation(ring.Read)
replicationSet, err := q.ringClient.ring.GetAllHealthy(ring.Read)
if err != nil {
return nil, err
}
Expand All @@ -176,7 +149,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
ingester := ingester
i := i
g.Go(func() error {
client, err := q.ringClient.Pool().GetClientFor(ingester.Addr)
client, err := q.ringClient.pool.GetClientFor(ingester.Addr)
if err != nil {
return err
}
Expand All @@ -189,22 +162,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
return nil
})
}
results, err := ring.DoUntilQuorum(ctx, replicationSet, cfg, func(ctx context.Context, ingester *ring.InstanceDesc) (ResponseFromIngesters, error) {
client, err := q.ringClient.pool.GetClientFor(ingester.Addr)
if err != nil {
return ResponseFromIngesters{addr: ingester.Addr}, err
}

resp, err := f(ctx, client.(logproto.PatternClient))
if err != nil {
return ResponseFromIngesters{addr: ingester.Addr}, err
}

return ResponseFromIngesters{ingester.Addr, resp}, nil
}, func(ResponseFromIngesters) {
// Nothing to do
})
if err != nil {
if err := g.Wait(); err != nil {
return nil, err
}
return responses, nil
Expand Down
166 changes: 74 additions & 92 deletions pkg/pattern/ingester_querier_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package pattern

import (
"bufio"
"os"
"testing"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -11,97 +9,81 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
)

func Test_prunePatterns(t *testing.T) {
file, err := os.Open(`testdata/patterns.txt`)
require.NoError(t, err)
defer file.Close()

resp := new(logproto.QueryPatternsResponse)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
resp.Series = append(resp.Series, &logproto.PatternSeries{
Pattern: scanner.Text(),
})
func TestPrunePatterns(t *testing.T) {
metrics := newIngesterQuerierMetrics(prometheus.NewRegistry(), "test")
testCases := []struct {
name string
inputSeries []*logproto.PatternSeries
minClusterSize int64
expectedSeries []*logproto.PatternSeries
expectedPruned int
expectedRetained int
}{
{
name: "No pruning needed",
inputSeries: []*logproto.PatternSeries{
{Pattern: `{app="test1"}`, Samples: []*logproto.PatternSample{{Value: 40}}},
{Pattern: `{app="test2"}`, Samples: []*logproto.PatternSample{{Value: 35}}},
},
minClusterSize: 20,
expectedSeries: []*logproto.PatternSeries{
{Pattern: `{app="test1"}`, Samples: []*logproto.PatternSample{{Value: 40}}},
{Pattern: `{app="test2"}`, Samples: []*logproto.PatternSample{{Value: 35}}},
},
expectedPruned: 0,
expectedRetained: 2,
},
{
name: "Pruning some patterns",
inputSeries: []*logproto.PatternSeries{
{Pattern: `{app="test1"}`, Samples: []*logproto.PatternSample{{Value: 10}}},
{Pattern: `{app="test2"}`, Samples: []*logproto.PatternSample{{Value: 5}}},
{Pattern: `{app="test3"}`, Samples: []*logproto.PatternSample{{Value: 50}}},
},
minClusterSize: 20,
expectedSeries: []*logproto.PatternSeries{
{Pattern: `{app="test3"}`, Samples: []*logproto.PatternSample{{Value: 50}}},
},
expectedPruned: 2,
expectedRetained: 1,
},
{
name: "Limit patterns to maxPatterns",
inputSeries: func() []*logproto.PatternSeries {
series := make([]*logproto.PatternSeries, maxPatterns+10)
for i := 0; i < maxPatterns+10; i++ {
series[i] = &logproto.PatternSeries{
Pattern: `{app="test"}`,
Samples: []*logproto.PatternSample{{Value: int64(maxPatterns + 10 - i)}},
}
}
return series
}(),
minClusterSize: 0,
expectedSeries: func() []*logproto.PatternSeries {
series := make([]*logproto.PatternSeries, maxPatterns)
for i := 0; i < maxPatterns; i++ {
series[i] = &logproto.PatternSeries{
Pattern: `{app="test"}`,
Samples: []*logproto.PatternSample{{Value: int64(maxPatterns + 10 - i)}},
}
}
return series
}(),
expectedPruned: 10,
expectedRetained: maxPatterns,
},
}
require.NoError(t, scanner.Err())

func (f *fakeRingClient) Ring() ring.ReadRing {
return &fakeRing{}
}

type fakeRing struct{}

// InstancesWithTokensCount returns the number of instances in the ring that have tokens.
func (f *fakeRing) InstancesWithTokensCount() int {
panic("not implemented") // TODO: Implement
}

// InstancesInZoneCount returns the number of instances in the ring that are registered in given zone.
func (f *fakeRing) InstancesInZoneCount(_ string) int {
panic("not implemented") // TODO: Implement
}

// InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens.
func (f *fakeRing) InstancesWithTokensInZoneCount(_ string) int {
panic("not implemented") // TODO: Implement
}

// ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring.
func (f *fakeRing) ZonesCount() int {
panic("not implemented") // TODO: Implement
}

func (f *fakeRing) Get(
_ uint32,
_ ring.Operation,
_ []ring.InstanceDesc,
_ []string,
_ []string,
) (ring.ReplicationSet, error) {
panic("not implemented")
}

func (f *fakeRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{}, nil
}

func (f *fakeRing) GetReplicationSetForOperation(_ ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{}, nil
}

func (f *fakeRing) ReplicationFactor() int {
panic("not implemented")
}

func (f *fakeRing) InstancesCount() int {
panic("not implemented")
}

func (f *fakeRing) ShuffleShard(_ string, _ int) ring.ReadRing {
panic("not implemented")
}

func (f *fakeRing) GetInstanceState(_ string) (ring.InstanceState, error) {
panic("not implemented")
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
resp := &logproto.QueryPatternsResponse{
Series: tc.inputSeries,
}
result := prunePatterns(resp, tc.minClusterSize, metrics)

func (f *fakeRing) ShuffleShardWithLookback(
_ string,
_ int,
_ time.Duration,
_ time.Time,
) ring.ReadRing {
panic("not implemented")
}

func (f *fakeRing) HasInstance(_ string) bool {
panic("not implemented")
}

func (f *fakeRing) CleanupShuffleShardCache(_ string) {
panic("not implemented")
}

require.Equal(t, expectedPatterns, patterns)
require.Less(t, len(patterns), startingPatterns, "prunePatterns should remove duplicates")
require.Equal(t, len(tc.expectedSeries), len(result.Series))
require.Equal(t, tc.expectedSeries, result.Series)
})
}
}
8 changes: 7 additions & 1 deletion pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ import (
"github.com/grafana/loki/v3/pkg/pattern/iter"

"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/pattern/drain"
)

func TestInstancePushQuery(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
inst, err := newInstance("foo", log.NewNopLogger(), newIngesterMetrics(nil, "test"))
inst, err := newInstance(
"foo",
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
drain.DefaultConfig(),
)
require.NoError(t, err)

err = inst.Push(context.Background(), &push.PushRequest{
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
firstEntryLine := pushReqStream.Entries[0].Line
s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID, i.drainCfg)
s, err := newStream(fp, sortedLabels, i.metrics, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID, i.drainCfg)
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/iter/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func ReadBatch(it Iterator, batchSize int) (*logproto.QueryPatternsResponse, err
Samples: samples,
})
}
return &result, it.Error()
return &result, it.Err()
}

func ReadAll(it Iterator) (*logproto.QueryPatternsResponse, error) {
Expand Down

0 comments on commit d51cecc

Please sign in to comment.