Skip to content

Commit

Permalink
fix: backport #13485 to k210 (#13492)
Browse files Browse the repository at this point in the history
  • Loading branch information
cstyan authored Jul 11, 2024
1 parent fb0c90a commit 716a7af
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 33 deletions.
3 changes: 1 addition & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,10 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
return nil, fmt.Errorf("unsupported result type: %T", r)
}
}
return nil, nil
return nil, errors.New("unexpected empty result")
}

func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {

seriesIndex := map[uint64]*promql.Series{}

vec := promql.Vector{}
Expand Down
47 changes: 38 additions & 9 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,17 @@ func TestEngine_LogsRateUnwrap(t *testing.T) {
{newSeries(testSize, offset(46, constantValue(1)), `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{
Start: time.Unix(30, 0),
End: time.Unix(60, 0),
Selector: `rate({app="foo"} | unwrap foo[30s])`,
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`rate({app="foo"} | unwrap foo[30s])`),
{
&logproto.SampleQueryRequest{
Start: time.Unix(30, 0),
End: time.Unix(60, 0),
Selector: `rate({app="foo"} | unwrap foo[30s])`,
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`rate({app="foo"} | unwrap foo[30s])`),
},
},
},
}},
},
// there are 15 samples (from 47 to 61) matched from the generated series
// SUM(n=47, 61, 1) = 15
// 15 / 30 = 0.5
Expand Down Expand Up @@ -1611,10 +1613,12 @@ func TestEngine_RangeQuery(t *testing.T) {
promql.Series{
// vector result
Metric: labels.Labels(nil),
Floats: []promql.FPoint{{T: 60000, F: 0}, {T: 80000, F: 0}, {T: 100000, F: 0}, {T: 120000, F: 0}, {T: 140000, F: 0}, {T: 160000, F: 0}, {T: 180000, F: 0}}},
Floats: []promql.FPoint{{T: 60000, F: 0}, {T: 80000, F: 0}, {T: 100000, F: 0}, {T: 120000, F: 0}, {T: 140000, F: 0}, {T: 160000, F: 0}, {T: 180000, F: 0}},
},
promql.Series{
Metric: labels.FromStrings("app", "foo"),
Floats: []promql.FPoint{{T: 60000, F: 0.03333333333333333}, {T: 80000, F: 0.06666666666666667}, {T: 100000, F: 0.06666666666666667}, {T: 120000, F: 0.03333333333333333}, {T: 180000, F: 0.03333333333333333}}},
Floats: []promql.FPoint{{T: 60000, F: 0.03333333333333333}, {T: 80000, F: 0.06666666666666667}, {T: 100000, F: 0.06666666666666667}, {T: 120000, F: 0.03333333333333333}, {T: 180000, F: 0.03333333333333333}},
},
},
},
{
Expand Down Expand Up @@ -2656,6 +2660,31 @@ func TestHashingStability(t *testing.T) {
}
}

func TestUnexpectedEmptyResults(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")

mock := &mockEvaluatorFactory{SampleEvaluatorFunc(func(context.Context, SampleEvaluatorFactory, syntax.SampleExpr, Params) (StepEvaluator, error) {
return EmptyEvaluator[SampleVector]{value: nil}, nil
})}

eng := NewEngine(EngineOpts{}, nil, NoLimits, log.NewNopLogger())
params, err := NewLiteralParams(`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, time.Now(), time.Now(), 0, 0, logproto.BACKWARD, 0, nil, nil)
require.NoError(t, err)
q := eng.Query(params).(*query)
q.evaluator = mock

_, err = q.Exec(ctx)
require.Error(t, err)
}

type mockEvaluatorFactory struct {
SampleEvaluatorFactory
}

func (*mockEvaluatorFactory) NewIterator(context.Context, syntax.LogSelectorExpr, Params) (iter.EntryIterator, error) {
return nil, errors.New("unimplemented mock EntryEvaluatorFactory")
}

func getLocalQuerier(size int64) Querier {
return &querierRecorder{
series: map[string][]logproto.Series{
Expand Down
27 changes: 5 additions & 22 deletions pkg/logql/first_last_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
// of a windowed aggregation.
func newFirstWithTimestampIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64) RangeVectorIterator {
selRange, step, start, end, offset int64,
) RangeVectorIterator {
inner := &batchRangeVectorIterator{
iter: it,
step: step,
Expand Down Expand Up @@ -67,7 +68,8 @@ func (r *firstWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint

func newLastWithTimestampIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64) RangeVectorIterator {
selRange, step, start, end, offset int64,
) RangeVectorIterator {
inner := &batchRangeVectorIterator{
iter: it,
step: step,
Expand Down Expand Up @@ -129,10 +131,7 @@ type mergeOverTimeStepEvaluator struct {

// Next returns the first or last element within one step of each matrix.
func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {

var (
vec promql.Vector
)
var vec promql.Vector

e.ts = e.ts.Add(e.step)
if e.ts.After(e.end) {
Expand All @@ -158,10 +157,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
vec[i].T = ts
}

if len(vec) == 0 {
return e.hasNext(), ts, SampleVector(vec)
}

return true, ts, SampleVector(vec)
}

Expand All @@ -179,18 +174,6 @@ func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool {
return (ts-e.step.Milliseconds()) <= t && t < ts
}

func (e *mergeOverTimeStepEvaluator) hasNext() bool {
for _, m := range e.matrices {
for _, s := range m {
if len(s.Floats) != 0 {
return true
}
}
}

return false
}

func (*mergeOverTimeStepEvaluator) Close() error { return nil }

func (*mergeOverTimeStepEvaluator) Error() error { return nil }
Expand Down

0 comments on commit 716a7af

Please sign in to comment.