Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: collect and serve pre-aggregated bytes and counts #13020

Merged
merged 47 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
dc620e7
feat: collect and serve pre-agg bytes and count
trevorwhitney May 8, 2024
f0d6a92
feat: reject filter queries to /patterns endpoint
trevorwhitney May 23, 2024
68aa188
feat: guard aggregation behavior behind a feature flag
trevorwhitney May 23, 2024
0bfd0ad
Merge branch 'main' into sample-count-and-bytes
trevorwhitney May 23, 2024
b897fc5
fix: ring proxy methods on pattern ring_client
trevorwhitney May 24, 2024
2587657
fix: grouping
trevorwhitney May 24, 2024
6dd77ae
feat: refactor metric samples to be it's own endpoint
trevorwhitney May 31, 2024
eb84303
chore: a bit of cleanup
trevorwhitney May 31, 2024
33ead60
feat: hook up samples endpoint
trevorwhitney May 31, 2024
abb31a8
Merge branch 'main' into sample-count-and-bytes
trevorwhitney May 31, 2024
87f7282
chore: clean up linting
trevorwhitney May 31, 2024
29febb7
chore: make format
trevorwhitney May 31, 2024
cbf9fc0
docs: update docs
trevorwhitney May 31, 2024
6ed195e
fix: nanosecond values in test with non-decimal seconds value
trevorwhitney Jun 3, 2024
7942e57
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 3, 2024
1822b88
fix: formatting
trevorwhitney Jun 3, 2024
35585db
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 3, 2024
81e27a4
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 4, 2024
3a01880
fix: move /explore/query_range case up
trevorwhitney Jun 4, 2024
30c5b90
chore: add metrics and debug logging
trevorwhitney Jun 5, 2024
d94349c
chore: add more debug logging to chunk iteration
trevorwhitney Jun 5, 2024
d3f760b
fix: use pointers for chunks and samples, add chunk locking
trevorwhitney Jun 5, 2024
a2c601e
feat: add sum merge sample iterator
trevorwhitney Jun 11, 2024
e3777bc
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 11, 2024
f1cfd81
test: read metric batch test
trevorwhitney Jun 11, 2024
f42f523
fix: formatting
trevorwhitney Jun 12, 2024
5c0abde
fix: more linting
trevorwhitney Jun 13, 2024
0f7e473
fix: check-mod
trevorwhitney Jun 13, 2024
6cdeeca
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 13, 2024
ac83fcb
fix: mod again
trevorwhitney Jun 13, 2024
e5e23c8
fix: chunk test
trevorwhitney Jun 13, 2024
1664017
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 13, 2024
721e7c1
fix: docs
trevorwhitney Jun 13, 2024
12b3636
fix: logger in sample evaluator tests
trevorwhitney Jun 14, 2024
1133763
chore: more debug logging
trevorwhitney Jun 17, 2024
d59dd4c
use sum merge sample iterator
trevorwhitney Jun 17, 2024
e4b5dd7
more debug logging
trevorwhitney Jun 17, 2024
426d143
more debug logging
trevorwhitney Jun 17, 2024
0c5a436
test: add test coverage around reading batches
trevorwhitney Jun 18, 2024
96cccef
various fixes for building series
cyriltovena Jun 18, 2024
2d13e8a
Removes comments
cyriltovena Jun 18, 2024
74c245d
fix: Prune chunks older than a specified duration and only delete str…
cyriltovena Jun 18, 2024
fefa6b5
test: more test coverage
trevorwhitney Jun 18, 2024
b255b80
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 18, 2024
796d8d9
fix: truncate timestamp to nearest step
trevorwhitney Jun 18, 2024
b2d72a6
fix: tests
trevorwhitney Jun 18, 2024
fa7016b
fix: lint
trevorwhitney Jun 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ schema_config:
prefix: index_
period: 24h

pattern_ingester:
enabled: true
metric_aggregation:
enabled: true

ruler:
alertmanager_url: http://localhost:9093

Expand Down
7 changes: 7 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,13 @@ pattern_ingester:
# CLI flag: -pattern-ingester.flush-check-period
[flush_check_period: <duration> | default = 30s]

# Configures the metric aggregation and storage behavior of the pattern
# ingester.
metric_aggregation:
# Whether the pattern ingester metric aggregation is enabled.
# CLI flag: -pattern-ingester.metric-aggregation.enabled
[enabled: <boolean> | default = false]

# The index_gateway block configures the Loki index gateway server, responsible
# for serving index queries without the need to constantly interact with the
# object store.
Expand Down
35 changes: 35 additions & 0 deletions pkg/loghttp/samples.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package loghttp

import (
"net/http"

"github.com/grafana/loki/v3/pkg/logproto"
)

func ParseSamplesQuery(r *http.Request) (*logproto.QuerySamplesRequest, error) {
req := &logproto.QuerySamplesRequest{}

req.Query = query(r)
start, end, err := bounds(r)
if err != nil {
return nil, err
}
req.Start = start
req.End = end

calculatedStep, err := step(r, start, end)
if err != nil {
return nil, err
}
if calculatedStep <= 0 {
return nil, errZeroOrNegativeStep
}
// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (req.End.Sub(req.Start) / calculatedStep) > 11000 {
return nil, errStepTooSmall
}
req.Step = calculatedStep.Milliseconds()

return req, nil
}
122 changes: 122 additions & 0 deletions pkg/loghttp/samples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package loghttp

import (
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
)

func TestParseSamplesQuery(t *testing.T) {
t.Parallel()

tests := []struct {
name string
path string
want *logproto.QuerySamplesRequest
wantErr bool
}{
{
name: "should correctly parse valid params",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=5s",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (5 * time.Second).Milliseconds(),
},
},
{
name: "should default empty step param to sensible step for the range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (14 * time.Second).Milliseconds(),
},
},
{
name: "should default start to zero for empty start param",
path: "/loki/api/v1/patterns?query={}&end=3600000000000",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: (14 * time.Second).Milliseconds(),
},
},
{
name: "should accept step with no units as seconds",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=10",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (10 * time.Second).Milliseconds(),
},
},
{
name: "should accept step as string duration in seconds",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=15s",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (15 * time.Second).Milliseconds(),
},
},
{
name: "should correctly parse long duration for step",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=10h",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (10 * time.Hour).Milliseconds(),
},
},
{
name: "should reject negative step value",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=-5s",
want: nil,
wantErr: true,
},
{
name: "should reject very small step for big range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=50ms",
want: nil,
wantErr: true,
},
{
name: "should accept very small step for small range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=110000000000&step=50ms",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(110, 0),
Step: (50 * time.Millisecond).Milliseconds(),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, tt.path, nil)
require.NoError(t, err)
err = req.ParseForm()
require.NoError(t, err)

got, err := ParseSamplesQuery(req)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
assert.Equalf(t, tt.want, got, "Incorrect response from input path: %s", tt.path)
})
}
}
33 changes: 33 additions & 0 deletions pkg/logproto/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func FromMetricsToLabelAdapters(metric model.Metric) []LabelAdapter {
return result
}

func FromMetricsToLabels(metric model.Metric) labels.Labels {
return FromLabelAdaptersToLabels(FromMetricsToLabelAdapters(metric))
}

type byLabel []LabelAdapter

func (s byLabel) Len() int { return len(s) }
Expand Down Expand Up @@ -534,3 +538,32 @@ func (m *QueryPatternsRequest) LogToSpan(sp opentracing.Span) {
}
sp.LogFields(fields...)
}

func (m *QuerySamplesRequest) GetCachingOptions() (res definitions.CachingOptions) { return }

func (m *QuerySamplesRequest) WithStartEnd(start, end time.Time) definitions.Request {
clone := *m
clone.Start = start
clone.End = end
return &clone
}

func (m *QuerySamplesRequest) WithQuery(query string) definitions.Request {
clone := *m
clone.Query = query
return &clone
}

func (m *QuerySamplesRequest) WithStartEndForCache(start, end time.Time) resultscache.Request {
return m.WithStartEnd(start, end).(resultscache.Request)
}

func (m *QuerySamplesRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
otlog.String("query", m.GetQuery()),
otlog.String("start", m.Start.String()),
otlog.String("end", m.End.String()),
otlog.String("step", time.Duration(m.Step).String()),
}
sp.LogFields(fields...)
}
46 changes: 46 additions & 0 deletions pkg/logproto/extensions.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package logproto

import (
"encoding/json"
"sort"
"strings"
"sync/atomic" //lint:ignore faillint we can't use go.uber.org/atomic with a protobuf struct without wrapping it.

"github.com/buger/jsonparser"
"github.com/cespare/xxhash/v2"
"github.com/dustin/go-humanize"
jsoniter "github.com/json-iterator/go"
Expand Down Expand Up @@ -188,3 +190,47 @@ func (m *ShardsResponse) Merge(other *ShardsResponse) {
m.ChunkGroups = append(m.ChunkGroups, other.ChunkGroups...)
m.Statistics.Merge(other.Statistics)
}

func NewPatternSeries(pattern string, samples []*PatternSample) *PatternSeries {
return &PatternSeries{Pattern: pattern, Samples: samples}
}

// UnmarshalJSON implements the json.Unmarshaler interface.
// QuerySamplesResponse json representation is different from the proto
func (r *QuerySamplesResponse) UnmarshalJSON(data []byte) error {
return jsonparser.ObjectEach(
data,
func(key, value []byte, dataType jsonparser.ValueType, offset int) error {
if string(key) == "data" {
var m []model.SampleStream
if err := json.Unmarshal(value, &m); err != nil {
return err
}
series := make([]Series, len(m))

for i, s := range m {
lbls := FromMetricsToLabels(s.Metric)

newSeries := Series{
Labels: s.Metric.String(),
StreamHash: lbls.Hash(),
Samples: make([]Sample, len(s.Values)),
}

for j, samplePair := range s.Values {
newSeries.Samples[j] = Sample{
Timestamp: samplePair.Timestamp.UnixNano(),
Value: float64(samplePair.Value),
}
}

series[i] = newSeries
}

r.Series = series
}

return nil
},
)
}
91 changes: 91 additions & 0 deletions pkg/logproto/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package logproto
import (
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logql/syntax"
)

func TestShard_SpaceFor(t *testing.T) {
Expand Down Expand Up @@ -40,3 +43,91 @@ func TestShard_SpaceFor(t *testing.T) {
})
}
}

func TestQueryPatternsResponse_UnmarshalJSON(t *testing.T) {
mockData := []byte(`{
"status": "success",
"data": [
{
"pattern": "foo <*> bar",
"samples": [[1609459200, 10], [1609545600, 15]]
},
{
"pattern": "foo <*> buzz",
"samples": [[1609459200, 20], [1609545600, 25]]
}
]
}`)

expectedSeries := []*PatternSeries{
NewPatternSeries("foo <*> bar", []*PatternSample{
{Timestamp: model.TimeFromUnix(1609459200), Value: 10},
{Timestamp: model.TimeFromUnix(1609545600), Value: 15},
}),
NewPatternSeries("foo <*> buzz", []*PatternSample{
{Timestamp: model.TimeFromUnix(1609459200), Value: 20},
{Timestamp: model.TimeFromUnix(1609545600), Value: 25},
}),
}

r := &QueryPatternsResponse{}
err := r.UnmarshalJSON(mockData)

require.Nil(t, err)
require.Equal(t, expectedSeries, r.Series)
}

func TestQuerySamplesResponse_UnmarshalJSON(t *testing.T) {
mockData := []byte(`{
"status": "success",
"data": [{
"metric": {
"foo": "bar"
},
"values": [
[0.001, "1"],
[0.002, "2"]
]
},
{
"metric": {
"foo": "baz",
"bar": "qux"
},
"values": [
[0.003, "3"],
[0.004, "4"]
]
}]
}`)

lbls1, err := syntax.ParseLabels(`{foo="bar"}`)
require.NoError(t, err)
lbls2, err := syntax.ParseLabels(`{bar="qux", foo="baz"}`)
require.NoError(t, err)

expectedSamples := []Series{
{
Labels: lbls1.String(),
Samples: []Sample{
{Timestamp: 1e6, Value: 1}, // 1ms after epoch in ns
{Timestamp: 2e6, Value: 2}, // 2ms after epoch in ns
},
StreamHash: lbls1.Hash(),
},
{
Labels: lbls2.String(),
Samples: []Sample{
{Timestamp: 3e6, Value: 3}, // 3ms after epoch in ns
{Timestamp: 4e6, Value: 4}, // 4ms after epoch in ns
},
StreamHash: lbls2.Hash(),
},
}

r := &QuerySamplesResponse{}
err = r.UnmarshalJSON(mockData)

require.Nil(t, err)
require.Equal(t, expectedSamples, r.Series)
}
Loading
Loading