Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [ENHANCEMENT] Query-tee: added a small tolerance to floating point sample values comparison. #2994

## 1.3.0 in progress

Expand Down
8 changes: 5 additions & 3 deletions cmd/query-tee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {
}

// Run the proxy.
proxy, err := querytee.NewProxy(cfg.ProxyConfig, util.Logger, cortexReadRoutes(cfg.PathPrefix), registry)
proxy, err := querytee.NewProxy(cfg.ProxyConfig, util.Logger, cortexReadRoutes(cfg), registry)
if err != nil {
level.Error(util.Logger).Log("msg", "Unable to initialize the proxy", "err", err.Error())
os.Exit(1)
Expand All @@ -58,13 +58,15 @@ func main() {
proxy.Await()
}

func cortexReadRoutes(prefix string) []querytee.Route {
func cortexReadRoutes(cfg Config) []querytee.Route {
prefix := cfg.PathPrefix

// Strip trailing slashes.
for len(prefix) > 0 && prefix[len(prefix)-1] == '/' {
prefix = prefix[:len(prefix)-1]
}

samplesComparator := querytee.NewSamplesComparator()
samplesComparator := querytee.NewSamplesComparator(cfg.ProxyConfig.ValueComparisonTolerance)
return []querytee.Route{
{Path: prefix + "/api/v1/query", RouteName: "api_v1_query", Methods: []string{"GET"}, ResponseComparator: samplesComparator},
{Path: prefix + "/api/v1/query_range", RouteName: "api_v1_query_range", Methods: []string{"GET"}, ResponseComparator: samplesComparator},
Expand Down
4 changes: 2 additions & 2 deletions cmd/query-tee/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
)

func TestCortexReadRoutes(t *testing.T) {
routes := cortexReadRoutes("")
routes := cortexReadRoutes(Config{PathPrefix: ""})
for _, r := range routes {
assert.True(t, strings.HasPrefix(r.Path, "/api/v1/"))
}

routes = cortexReadRoutes("/some/random/prefix///")
routes = cortexReadRoutes(Config{PathPrefix: "/some/random/prefix///"})
for _, r := range routes {
assert.True(t, strings.HasPrefix(r.Path, "/some/random/prefix/api/v1/"))
}
Expand Down
2 changes: 2 additions & 0 deletions docs/operations/query-tee.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ _Note: from the `query-tee` perspective, a backend request is considered success

When the comparison is enabled, the `query-tee` compares the response received from the two configured backends and logs a message for each query whose results don't match, as well as keeps track of the number of successful and failed comparison through the metric `cortex_querytee_responses_compared_total`.

Floating point sample values are compared with a small tolerance that can be configured via `-proxy.value-comparison-tolerance`. This prevents false positives due to differences in floating point values _rounding_ introduced by the non deterministic series ordering within the Prometheus PromQL engine.

### Slow backends

`query-tee` sends back to the client the first viable response as soon as available, without waiting to receive a response from all backends.
Expand Down
12 changes: 7 additions & 5 deletions tools/querytee/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ var (
)

type ProxyConfig struct {
ServerServicePort int
BackendEndpoints string
PreferredBackend string
BackendReadTimeout time.Duration
CompareResponses bool
ServerServicePort int
BackendEndpoints string
PreferredBackend string
BackendReadTimeout time.Duration
CompareResponses bool
ValueComparisonTolerance float64
}

func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -37,6 +38,7 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client. If no preferred backend is configured then the query-tee will send back to the client the first successful response received without waiting for other backends.")
f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 90*time.Second, "The timeout when reading the response from a backend.")
f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.")
f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).")
}

type Route struct {
Expand Down
54 changes: 36 additions & 18 deletions tools/querytee/response_comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querytee
import (
"encoding/json"
"fmt"
"math"

"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
Expand All @@ -12,7 +13,7 @@ import (
)

// SamplesComparatorFunc helps with comparing different types of samples coming from /api/v1/query and /api/v1/query_range routes.
type SamplesComparatorFunc func(expected, actual json.RawMessage) error
type SamplesComparatorFunc func(expected, actual json.RawMessage, tolerance float64) error

type SamplesResponse struct {
Status string
Expand All @@ -22,15 +23,19 @@ type SamplesResponse struct {
}
}

func NewSamplesComparator() *SamplesComparator {
return &SamplesComparator{map[string]SamplesComparatorFunc{
"matrix": compareMatrix,
"vector": compareVector,
"scalar": compareScalar,
}}
func NewSamplesComparator(tolerance float64) *SamplesComparator {
return &SamplesComparator{
tolerance: tolerance,
sampleTypesComparator: map[string]SamplesComparatorFunc{
"matrix": compareMatrix,
"vector": compareVector,
"scalar": compareScalar,
},
}
}

type SamplesComparator struct {
tolerance float64
sampleTypesComparator map[string]SamplesComparatorFunc
}

Expand All @@ -44,12 +49,12 @@ func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte) err

err := json.Unmarshal(expectedResponse, &expected)
if err != nil {
return err
return errors.Wrap(err, "unable to unmarshal expected response")
}

err = json.Unmarshal(actualResponse, &actual)
if err != nil {
return err
return errors.Wrap(err, "unable to unmarshal actual response")
}

if expected.Status != actual.Status {
Expand All @@ -65,10 +70,10 @@ func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte) err
return fmt.Errorf("resultType %s not registered for comparison", expected.Data.ResultType)
}

return comparator(expected.Data.Result, actual.Data.Result)
return comparator(expected.Data.Result, actual.Data.Result, s.tolerance)
}

func compareMatrix(expectedRaw, actualRaw json.RawMessage) error {
func compareMatrix(expectedRaw, actualRaw json.RawMessage, tolerance float64) error {
var expected, actual model.Matrix

err := json.Unmarshal(expectedRaw, &expected)
Expand Down Expand Up @@ -113,7 +118,7 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage) error {

for i, expectedSamplePair := range expectedMetric.Values {
actualSamplePair := actualMetric.Values[i]
err := compareSamplePair(expectedSamplePair, actualSamplePair)
err := compareSamplePair(expectedSamplePair, actualSamplePair, tolerance)
if err != nil {
return errors.Wrapf(err, "sample pair not matching for metric %s", expectedMetric.Metric)
}
Expand All @@ -123,7 +128,7 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage) error {
return nil
}

func compareVector(expectedRaw, actualRaw json.RawMessage) error {
func compareVector(expectedRaw, actualRaw json.RawMessage, tolerance float64) error {
var expected, actual model.Vector

err := json.Unmarshal(expectedRaw, &expected)
Expand Down Expand Up @@ -159,7 +164,7 @@ func compareVector(expectedRaw, actualRaw json.RawMessage) error {
}, model.SamplePair{
Timestamp: actualMetric.Timestamp,
Value: actualMetric.Value,
})
}, tolerance)
if err != nil {
return errors.Wrapf(err, "sample pair not matching for metric %s", expectedMetric.Metric)
}
Expand All @@ -168,7 +173,7 @@ func compareVector(expectedRaw, actualRaw json.RawMessage) error {
return nil
}

func compareScalar(expectedRaw, actualRaw json.RawMessage) error {
func compareScalar(expectedRaw, actualRaw json.RawMessage, tolerance float64) error {
var expected, actual model.Scalar
err := json.Unmarshal(expectedRaw, &expected)
if err != nil {
Expand All @@ -186,16 +191,29 @@ func compareScalar(expectedRaw, actualRaw json.RawMessage) error {
}, model.SamplePair{
Timestamp: actual.Timestamp,
Value: actual.Value,
})
}, tolerance)
}

func compareSamplePair(expected, actual model.SamplePair) error {
func compareSamplePair(expected, actual model.SamplePair, tolerance float64) error {
if expected.Timestamp != actual.Timestamp {
return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp)
}
if expected.Value != actual.Value {
if !compareSampleValue(expected.Value, actual.Value, tolerance) {
return fmt.Errorf("expected value %s for timestamp %v but got %s", expected.Value, expected.Timestamp, actual.Value)
}

return nil
}

func compareSampleValue(first, second model.SampleValue, tolerance float64) bool {
f := float64(first)
s := float64(second)

if math.IsNaN(f) && math.IsNaN(s) {
return true
} else if tolerance <= 0 {
return math.Float64bits(f) == math.Float64bits(s)
}

return math.Abs(f-s) <= tolerance
}
69 changes: 59 additions & 10 deletions tools/querytee/response_comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestCompareMatrix(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
err := compareMatrix(tc.expected, tc.actual)
err := compareMatrix(tc.expected, tc.actual, 0)
if tc.err == nil {
require.NoError(t, err)
return
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestCompareVector(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
err := compareVector(tc.expected, tc.actual)
err := compareVector(tc.expected, tc.actual, 0)
if tc.err == nil {
require.NoError(t, err)
return
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestCompareScalar(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
err := compareScalar(tc.expected, tc.actual)
err := compareScalar(tc.expected, tc.actual, 0)
if tc.err == nil {
require.NoError(t, err)
return
Expand All @@ -223,13 +223,12 @@ func TestCompareScalar(t *testing.T) {
}

func TestCompareSamplesResponse(t *testing.T) {
samplesComparator := NewSamplesComparator()

for _, tc := range []struct {
name string
expected json.RawMessage
actual json.RawMessage
err error
name string
tolerance float64
expected json.RawMessage
actual json.RawMessage
err error
}{
{
name: "difference in response status",
Expand All @@ -250,7 +249,7 @@ func TestCompareSamplesResponse(t *testing.T) {
}`),
actual: json.RawMessage(`{
"status": "success",
"data": {"resultType":"vector","result":{"metric":{"foo":"bar"},"value":[1,"1"]}}
"data": {"resultType":"vector","result":[{"metric":{"foo":"bar"},"value":[1,"1"]}]}
}`),
err: errors.New("expected resultType scalar but got vector"),
},
Expand All @@ -277,8 +276,58 @@ func TestCompareSamplesResponse(t *testing.T) {
"data": {"resultType":"scalar","result":[1,"1"]}
}`),
},
{
name: "should pass if values are slightly different but within the tolerance",
tolerance: 0.000001,
expected: json.RawMessage(`{
"status": "success",
"data": {"resultType":"vector","result":[{"metric":{"foo":"bar"},"value":[1,"773054.5916666666"]}]}
}`),
actual: json.RawMessage(`{
"status": "success",
"data": {"resultType":"vector","result":[{"metric":{"foo":"bar"},"value":[1,"773054.59166667"]}]}
}`),
},
{
name: "should correctly compare NaN values with tolerance is disabled",
tolerance: 0,
expected: json.RawMessage(`{
"status": "success",
"data": {"resultType":"vector","result":[{"metric":{"foo":"bar"},"value":[1,"NaN"]}]}
}`),
actual: json.RawMessage(`{
"status": "success",
"data": {"resultType":"vector","result":[{"metric":{"foo":"bar"},"value":[1,"NaN"]}]}
}`),
},
{
name: "should correctly compare NaN values with tolerance is enabled",
tolerance: 0.000001,
expected: json.RawMessage(`{
"status": "success",
"data": {"resultType":"vector","result":[{"metric":{"foo":"bar"},"value":[1,"NaN"]}]}
}`),
actual: json.RawMessage(`{
"status": "success",
"data": {"resultType":"vector","result":[{"metric":{"foo":"bar"},"value":[1,"NaN"]}]}
}`),
},
{
name: "should fail if values are significantly different, over the tolerance",
tolerance: 0.000001,
expected: json.RawMessage(`{
"status": "success",
"data": {"resultType":"vector","result":[{"metric":{"foo":"bar"},"value":[1,"773054.5916666666"]}]}
}`),
actual: json.RawMessage(`{
"status": "success",
"data": {"resultType":"vector","result":[{"metric":{"foo":"bar"},"value":[1,"773054.789"]}]}
}`),
err: errors.New(`sample pair not matching for metric {foo="bar"}: expected value 773054.5916666666 for timestamp 1 but got 773054.789`),
},
} {
t.Run(tc.name, func(t *testing.T) {
samplesComparator := NewSamplesComparator(tc.tolerance)
err := samplesComparator.Compare(tc.expected, tc.actual)
if tc.err == nil {
require.NoError(t, err)
Expand Down