Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: collect and serve pre-aggregated bytes and counts #13020

Merged
merged 47 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
dc620e7
feat: collect and serve pre-agg bytes and count
trevorwhitney May 8, 2024
f0d6a92
feat: reject filter queries to /patterns endpoint
trevorwhitney May 23, 2024
68aa188
feat: guard aggregation behavior behind a feature flag
trevorwhitney May 23, 2024
0bfd0ad
Merge branch 'main' into sample-count-and-bytes
trevorwhitney May 23, 2024
b897fc5
fix: ring proxy methods on pattern ring_client
trevorwhitney May 24, 2024
2587657
fix: grouping
trevorwhitney May 24, 2024
6dd77ae
feat: refactor metric samples to be it's own endpoint
trevorwhitney May 31, 2024
eb84303
chore: a bit of cleanup
trevorwhitney May 31, 2024
33ead60
feat: hook up samples endpoint
trevorwhitney May 31, 2024
abb31a8
Merge branch 'main' into sample-count-and-bytes
trevorwhitney May 31, 2024
87f7282
chore: clean up linting
trevorwhitney May 31, 2024
29febb7
chore: make format
trevorwhitney May 31, 2024
cbf9fc0
docs: update docs
trevorwhitney May 31, 2024
6ed195e
fix: nanosecond values in test with non-decimal seconds value
trevorwhitney Jun 3, 2024
7942e57
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 3, 2024
1822b88
fix: formatting
trevorwhitney Jun 3, 2024
35585db
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 3, 2024
81e27a4
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 4, 2024
3a01880
fix: move /explore/query_range case up
trevorwhitney Jun 4, 2024
30c5b90
chore: add metrics and debug logging
trevorwhitney Jun 5, 2024
d94349c
chore: add more debug logging to chunk iteration
trevorwhitney Jun 5, 2024
d3f760b
fix: use pointers for chunks and samples, add chunk locking
trevorwhitney Jun 5, 2024
a2c601e
feat: add sum merge sample iterator
trevorwhitney Jun 11, 2024
e3777bc
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 11, 2024
f1cfd81
test: read metric batch test
trevorwhitney Jun 11, 2024
f42f523
fix: formatting
trevorwhitney Jun 12, 2024
5c0abde
fix: more linting
trevorwhitney Jun 13, 2024
0f7e473
fix: check-mod
trevorwhitney Jun 13, 2024
6cdeeca
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 13, 2024
ac83fcb
fix: mod again
trevorwhitney Jun 13, 2024
e5e23c8
fix: chunk test
trevorwhitney Jun 13, 2024
1664017
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 13, 2024
721e7c1
fix: docs
trevorwhitney Jun 13, 2024
12b3636
fix: logger in sample evaluator tests
trevorwhitney Jun 14, 2024
1133763
chore: more debug logging
trevorwhitney Jun 17, 2024
d59dd4c
use sum merge sample iterator
trevorwhitney Jun 17, 2024
e4b5dd7
more debug logging
trevorwhitney Jun 17, 2024
426d143
more debug logging
trevorwhitney Jun 17, 2024
0c5a436
test: add test coverage around reading batches
trevorwhitney Jun 18, 2024
96cccef
various fixes for building series
cyriltovena Jun 18, 2024
2d13e8a
Removes comments
cyriltovena Jun 18, 2024
74c245d
fix: Prune chunks older than a specified duration and only delete str…
cyriltovena Jun 18, 2024
fefa6b5
test: more test coverage
trevorwhitney Jun 18, 2024
b255b80
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 18, 2024
796d8d9
fix: truncate timestamp to nearest step
trevorwhitney Jun 18, 2024
b2d72a6
fix: tests
trevorwhitney Jun 18, 2024
fa7016b
fix: lint
trevorwhitney Jun 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
log_level: debug

common:
instance_addr: 127.0.0.1
Expand Down Expand Up @@ -33,6 +34,12 @@ schema_config:
prefix: index_
period: 24h

pattern_ingester:
enabled: true
metric_aggregation:
enabled: true
log_push_observations: true

ruler:
alertmanager_url: http://localhost:9093

Expand Down
11 changes: 11 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,17 @@ pattern_ingester:
# CLI flag: -pattern-ingester.flush-check-period
[flush_check_period: <duration> | default = 30s]

# Configures the metric aggregation and storage behavior of the pattern
# ingester.
metric_aggregation:
# Whether the pattern ingester metric aggregation is enabled.
# CLI flag: -pattern-ingester.metric-aggregation.enabled
[enabled: <boolean> | default = false]

# Whether to log push observations.
# CLI flag: -pattern-ingester.metric-aggregation.log-push-observations
[log_push_observations: <boolean> | default = false]

# The index_gateway block configures the Loki index gateway server, responsible
# for serving index queries without the need to constantly interact with the
# object store.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ require (

require (
github.com/dlclark/regexp2 v1.4.0 // indirect
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/pires/go-proxyproto v0.7.0 // indirect
Expand Down Expand Up @@ -230,7 +231,6 @@ require (
github.com/envoyproxy/go-control-plane v0.12.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/analysis v0.22.2 // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func TestCacheTest(t *testing.T) {

queryResultsDiff = &mockCounter{} // reset counter
mr.countOverTime = 2.3 // value not important
mr.noCacheCountOvertime = 2.30000005 // different than `countOverTime` value but withing tolerance
mr.noCacheCountOvertime = 2.30000005 // different than `countOverTime` value but within tolerance
c.cacheTest(now)
assert.Equal(t, 0, queryResultsDiff.(*mockCounter).count)

Expand Down
30 changes: 18 additions & 12 deletions pkg/iter/sample_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,25 +119,31 @@ func (it *peekingSampleIterator) Error() error {
return it.iter.Error()
}

type sampleIteratorHeap struct {
type SampleIteratorHeap struct {
its []SampleIterator
}

func (h sampleIteratorHeap) Len() int { return len(h.its) }
func (h sampleIteratorHeap) Swap(i, j int) { h.its[i], h.its[j] = h.its[j], h.its[i] }
func (h sampleIteratorHeap) Peek() SampleIterator { return h.its[0] }
func (h *sampleIteratorHeap) Push(x interface{}) {
func NewSampleIteratorHeap(its []SampleIterator) SampleIteratorHeap {
return SampleIteratorHeap{
its: its,
}
}

func (h SampleIteratorHeap) Len() int { return len(h.its) }
func (h SampleIteratorHeap) Swap(i, j int) { h.its[i], h.its[j] = h.its[j], h.its[i] }
func (h SampleIteratorHeap) Peek() SampleIterator { return h.its[0] }
func (h *SampleIteratorHeap) Push(x interface{}) {
h.its = append(h.its, x.(SampleIterator))
}

func (h *sampleIteratorHeap) Pop() interface{} {
func (h *SampleIteratorHeap) Pop() interface{} {
n := len(h.its)
x := h.its[n-1]
h.its = h.its[0 : n-1]
return x
}

func (h sampleIteratorHeap) Less(i, j int) bool {
func (h SampleIteratorHeap) Less(i, j int) bool {
s1, s2 := h.its[i].Sample(), h.its[j].Sample()
if s1.Timestamp == s2.Timestamp {
if h.its[i].StreamHash() == 0 {
Expand All @@ -150,7 +156,7 @@ func (h sampleIteratorHeap) Less(i, j int) bool {

// mergeSampleIterator iterates over a heap of iterators by merging samples.
type mergeSampleIterator struct {
heap *sampleIteratorHeap
heap *SampleIteratorHeap
is []SampleIterator
prefetched bool
stats *stats.Context
Expand All @@ -170,7 +176,7 @@ type mergeSampleIterator struct {
// This means using this iterator with a single iterator will result in the same result as the input iterator.
// If you don't need to deduplicate sample, use `NewSortSampleIterator` instead.
func NewMergeSampleIterator(ctx context.Context, is []SampleIterator) SampleIterator {
h := sampleIteratorHeap{
h := SampleIteratorHeap{
its: make([]SampleIterator, 0, len(is)),
}
return &mergeSampleIterator{
Expand Down Expand Up @@ -350,7 +356,7 @@ func (i *mergeSampleIterator) Close() error {

// sortSampleIterator iterates over a heap of iterators by sorting samples.
type sortSampleIterator struct {
heap *sampleIteratorHeap
heap *SampleIteratorHeap
is []SampleIterator
prefetched bool

Expand All @@ -369,7 +375,7 @@ func NewSortSampleIterator(is []SampleIterator) SampleIterator {
if len(is) == 1 {
return is[0]
}
h := sampleIteratorHeap{
h := SampleIteratorHeap{
its: make([]SampleIterator, 0, len(is)),
}
return &sortSampleIterator{
Expand All @@ -378,7 +384,7 @@ func NewSortSampleIterator(is []SampleIterator) SampleIterator {
}
}

// init initialize the underlaying heap
// init initialize the underlying heap
func (i *sortSampleIterator) init() {
if i.prefetched {
return
Expand Down
35 changes: 35 additions & 0 deletions pkg/loghttp/samples.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package loghttp

import (
"net/http"

"github.com/grafana/loki/v3/pkg/logproto"
)

func ParseSamplesQuery(r *http.Request) (*logproto.QuerySamplesRequest, error) {
req := &logproto.QuerySamplesRequest{}

req.Query = query(r)
start, end, err := bounds(r)
if err != nil {
return nil, err
}
req.Start = start
req.End = end

calculatedStep, err := step(r, start, end)
if err != nil {
return nil, err
}
if calculatedStep <= 0 {
return nil, errZeroOrNegativeStep
}
// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (req.End.Sub(req.Start) / calculatedStep) > 11000 {
return nil, errStepTooSmall
}
req.Step = calculatedStep.Milliseconds()

return req, nil
}
122 changes: 122 additions & 0 deletions pkg/loghttp/samples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package loghttp

import (
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
)

func TestParseSamplesQuery(t *testing.T) {
t.Parallel()

tests := []struct {
name string
path string
want *logproto.QuerySamplesRequest
wantErr bool
}{
{
name: "should correctly parse valid params",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=5s",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (5 * time.Second).Milliseconds(),
},
},
{
name: "should default empty step param to sensible step for the range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (14 * time.Second).Milliseconds(),
},
},
{
name: "should default start to zero for empty start param",
path: "/loki/api/v1/patterns?query={}&end=3600000000000",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: (14 * time.Second).Milliseconds(),
},
},
{
name: "should accept step with no units as seconds",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=10",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (10 * time.Second).Milliseconds(),
},
},
{
name: "should accept step as string duration in seconds",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=15s",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (15 * time.Second).Milliseconds(),
},
},
{
name: "should correctly parse long duration for step",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=10h",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (10 * time.Hour).Milliseconds(),
},
},
{
name: "should reject negative step value",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=-5s",
want: nil,
wantErr: true,
},
{
name: "should reject very small step for big range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=50ms",
want: nil,
wantErr: true,
},
{
name: "should accept very small step for small range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=110000000000&step=50ms",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(110, 0),
Step: (50 * time.Millisecond).Milliseconds(),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, tt.path, nil)
require.NoError(t, err)
err = req.ParseForm()
require.NoError(t, err)

got, err := ParseSamplesQuery(req)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
assert.Equalf(t, tt.want, got, "Incorrect response from input path: %s", tt.path)
})
}
}
33 changes: 33 additions & 0 deletions pkg/logproto/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func FromMetricsToLabelAdapters(metric model.Metric) []LabelAdapter {
return result
}

func FromMetricsToLabels(metric model.Metric) labels.Labels {
return FromLabelAdaptersToLabels(FromMetricsToLabelAdapters(metric))
}

type byLabel []LabelAdapter

func (s byLabel) Len() int { return len(s) }
Expand Down Expand Up @@ -591,3 +595,32 @@ func (m *DetectedLabelsRequest) LogToSpan(sp opentracing.Span) {
}
sp.LogFields(fields...)
}

func (m *QuerySamplesRequest) GetCachingOptions() (res definitions.CachingOptions) { return }

func (m *QuerySamplesRequest) WithStartEnd(start, end time.Time) definitions.Request {
clone := *m
clone.Start = start
clone.End = end
return &clone
}

func (m *QuerySamplesRequest) WithStartEndForCache(start, end time.Time) resultscache.Request {
return m.WithStartEnd(start, end).(resultscache.Request)
}

func (m *QuerySamplesRequest) WithQuery(query string) definitions.Request {
clone := *m
clone.Query = query
return &clone
}

func (m *QuerySamplesRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
otlog.String("query", m.GetQuery()),
otlog.String("start", m.Start.String()),
otlog.String("end", m.End.String()),
otlog.String("step", time.Duration(m.Step).String()),
}
sp.LogFields(fields...)
}
Loading
Loading