Skip to content

health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming #12640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12640.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming #12640
```
20 changes: 14 additions & 6 deletions agent/consul/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,30 +236,38 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
var thisReply structs.IndexedCheckServiceNodes

index, nodes, err := f(ws, state, args)
if err != nil {
return err
}

reply.Index, reply.Nodes = index, nodes
thisReply.Index, thisReply.Nodes = index, nodes

if len(args.NodeMetaFilters) > 0 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general it is not safe to mutate reply until just before returning. This is not the first time this kind of bug has manifested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh. This makes me sad. This basically means the entire endpoint is not thread-safe if the response (aka reply) pointer is mutated? It would only apply to those endpoints that handle blocking queries, correct? There are a lot of endpoints that are implemented this way afaict and seems like a trap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's only one thread/goroutine involved, but it loops around during retry without resetting the reply var, so depending upon how the access goes and how the body of the blocking query function proceeds you can get "carry over" between attempts that you didn't intend.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example from the last time this kind of thing specifically caused a bug: #10239

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reply.Nodes = nodeMetaFilter(args.NodeMetaFilters, reply.Nodes)
thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes)
}

raw, err := filter.Execute(reply.Nodes)
raw, err := filter.Execute(thisReply.Nodes)
if err != nil {
return err
}
reply.Nodes = raw.(structs.CheckServiceNodes)
thisReply.Nodes = raw.(structs.CheckServiceNodes)

// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
// results that would be filtered out even if the user did have permission.
if err := h.srv.filterACL(args.Token, reply); err != nil {
if err := h.srv.filterACL(args.Token, &thisReply); err != nil {
return err
}

return h.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes)
if err := h.srv.sortNodesByDistanceFrom(args.Source, thisReply.Nodes); err != nil {
return err
}

*reply = thisReply
return nil
})

// Provide some metrics
Expand Down
79 changes: 79 additions & 0 deletions agent/consul/health_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,85 @@ func TestHealth_ServiceNodes(t *testing.T) {
}
}

func TestHealth_ServiceNodes_BlockingQuery_withFilter(t *testing.T) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: we can't test streaming here because streaming happens elsewhere.

if testing.Short() {
t.Skip("too slow for testing.Short")
}

t.Parallel()

_, s1 := testServer(t)
codec := rpcClient(t, s1)

waitForLeaderEstablishment(t, s1)

register := func(t *testing.T, name, tag string) {
arg := structs.RegisterRequest{
Datacenter: "dc1",
ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"),
Node: "node1",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: name,
Service: name,
Tags: []string{tag},
},
}
var out struct{}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
}

register(t, "web", "foo")

var lastIndex uint64
runStep(t, "read original", func(t *testing.T) {
var out structs.IndexedCheckServiceNodes
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
QueryOptions: structs.QueryOptions{
Filter: "foo in Service.Tags",
},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out))

require.Len(t, out.Nodes, 1)
node := out.Nodes[0]
require.Equal(t, "node1", node.Node.Node)
require.Equal(t, "web", node.Service.Service)
require.Equal(t, []string{"foo"}, node.Service.Tags)

require.Equal(t, structs.QueryBackendBlocking, out.Backend)
lastIndex = out.Index
})

runStep(t, "read blocking query result", func(t *testing.T) {
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
QueryOptions: structs.QueryOptions{
Filter: "foo in Service.Tags",
},
}
req.MinQueryIndex = lastIndex

var out structs.IndexedCheckServiceNodes
errCh := channelCallRPC(s1, "Health.ServiceNodes", &req, &out, nil)

time.Sleep(200 * time.Millisecond)

// Change the tags
register(t, "web", "bar")

if err := <-errCh; err != nil {
require.NoError(t, err)
}

require.Equal(t, structs.QueryBackendBlocking, out.Backend)
require.Len(t, out.Nodes, 0)
})
}

func TestHealth_ServiceNodes_MultipleServiceTags(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
Expand Down
130 changes: 130 additions & 0 deletions agent/health_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
)

func TestHealthChecksInState(t *testing.T) {
Expand Down Expand Up @@ -902,6 +903,135 @@ use_streaming_backend = true
}
}

func TestHealthServiceNodes_Blocking_withFilter(t *testing.T) {
cases := []struct {
name string
hcl string
queryBackend string
}{
{
name: "no streaming",
queryBackend: "blocking-query",
hcl: `use_streaming_backend = false`,
},
{
name: "streaming",
hcl: `
rpc { enable_streaming = true }
use_streaming_backend = true
`,
queryBackend: "streaming",
},
}

runStep := func(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}

register := func(t *testing.T, a *TestAgent, name, tag string) {
args := &structs.RegisterRequest{
Datacenter: "dc1",
ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"),
Node: "node1",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: name,
Service: name,
Tags: []string{tag},
},
}

var out struct{}
require.NoError(t, a.RPC("Catalog.Register", args, &out))
}

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
a := NewTestAgent(t, tc.hcl)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

// Register one with a tag.
register(t, a, "web", "foo")

filterUrlPart := "filter=" + url.QueryEscape("foo in Service.Tags")

// TODO: use other call format

// Initial request with a filter should return one.
var lastIndex uint64
runStep(t, "read original", func(t *testing.T) {
req, err := http.NewRequest("GET", "/v1/health/service/web?dc=dc1&"+filterUrlPart, nil)
require.NoError(t, err)

resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req)
require.NoError(t, err)

nodes := obj.(structs.CheckServiceNodes)

require.Len(t, nodes, 1)

node := nodes[0]
require.Equal(t, "node1", node.Node.Node)
require.Equal(t, "web", node.Service.Service)
require.Equal(t, []string{"foo"}, node.Service.Tags)

require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))

idx := getIndex(t, resp)
require.True(t, idx > 0)

lastIndex = idx
})

const timeout = 30 * time.Second
runStep(t, "read blocking query result", func(t *testing.T) {
var (
// out and resp are not safe to read until reading from errCh
out structs.CheckServiceNodes
resp = httptest.NewRecorder()
errCh = make(chan error, 1)
)
go func() {
url := fmt.Sprintf("/v1/health/service/web?dc=dc1&index=%d&wait=%s&%s", lastIndex, timeout, filterUrlPart)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
errCh <- err
return
}

obj, err := a.srv.HealthServiceNodes(resp, req)
if err != nil {
errCh <- err
return
}

nodes := obj.(structs.CheckServiceNodes)
out = nodes
errCh <- nil
}()

time.Sleep(200 * time.Millisecond)

// Change the tags.
register(t, a, "web", "bar")

if err := <-errCh; err != nil {
require.NoError(t, err)
}

require.Len(t, out, 0)
require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
})
})
}
}

func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
Expand Down
7 changes: 4 additions & 3 deletions agent/rpcclient/health/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,12 @@ func (s *healthView) Update(events []*pbsubscribe.Event) error {
return errors.New("check service node was unexpectedly nil")
}
passed, err := s.filter.Evaluate(*csn)
switch {
case err != nil:
if err != nil {
return err
case passed:
} else if passed {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the filter no longer matched an entry we did not GC the prior record.

s.state[id] = *csn
} else {
delete(s.state, id)
}

case pbsubscribe.CatalogOp_Deregister:
Expand Down
2 changes: 2 additions & 0 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ const (
QueryBackendStreaming
)

func (q QueryBackend) GoString() string { return q.String() }

func (q QueryBackend) String() string {
switch q {
case QueryBackendBlocking:
Expand Down