Skip to content

Commit 11213ae

Browse files
authored
health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming (#12640)
The primary bug here is in the streaming subsystem that makes the overall v1/health/service/:service request behave incorrectly when servicing a blocking request with a filter provided. There is a secondary non-streaming bug being fixed here that is much less obvious related to when to update the `reply` variable in a `blockingQuery` evaluation. It is unlikely that it is triggerable in practical environments and I could not actually get the bug to manifest, but I fixed it anyway while investigating the original issue. Simple reproduction (streaming): 1. Register a service with a tag. curl -sL --request PUT 'http://localhost:8500/v1/agent/service/register' \ --header 'Content-Type: application/json' \ --data-raw '{ "ID": "ID1", "Name": "test", "Tags":[ "a" ], "EnableTagOverride": true }' 2. Do an initial filter query that matches on the tag. curl -sLi --get 'http://localhost:8500/v1/health/service/test' --data-urlencode 'filter=a in Service.Tags' 3. Note you get one result. Use the `X-Consul-Index` header to establish a blocking query in another terminal, this should not return yet. curl -sLi --get 'http://localhost:8500/v1/health/service/test?index=$INDEX' --data-urlencode 'filter=a in Service.Tags' 4. Re-register that service with a different tag. curl -sL --request PUT 'http://localhost:8500/v1/agent/service/register' \ --header 'Content-Type: application/json' \ --data-raw '{ "ID": "ID1", "Name": "test", "Tags":[ "b" ], "EnableTagOverride": true }' 5. Your blocking query from (3) should return with a header `X-Consul-Query-Backend: streaming` and empty results if it works correctly `[]`. Attempts to reproduce with non-streaming failed (where you add `&near=_agent` to the read queries and ensure `X-Consul-Query-Backend: blocking-query` shows up in the results).
1 parent 1a49188 commit 11213ae

File tree

6 files changed

+232
-9
lines changed

6 files changed

+232
-9
lines changed

Diff for: .changelog/12640.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming #12640
3+
```

Diff for: agent/consul/health_endpoint.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -236,30 +236,38 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
236236
&args.QueryOptions,
237237
&reply.QueryMeta,
238238
func(ws memdb.WatchSet, state *state.Store) error {
239+
var thisReply structs.IndexedCheckServiceNodes
240+
239241
index, nodes, err := f(ws, state, args)
240242
if err != nil {
241243
return err
242244
}
243245

244-
reply.Index, reply.Nodes = index, nodes
246+
thisReply.Index, thisReply.Nodes = index, nodes
247+
245248
if len(args.NodeMetaFilters) > 0 {
246-
reply.Nodes = nodeMetaFilter(args.NodeMetaFilters, reply.Nodes)
249+
thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes)
247250
}
248251

249-
raw, err := filter.Execute(reply.Nodes)
252+
raw, err := filter.Execute(thisReply.Nodes)
250253
if err != nil {
251254
return err
252255
}
253-
reply.Nodes = raw.(structs.CheckServiceNodes)
256+
thisReply.Nodes = raw.(structs.CheckServiceNodes)
254257

255258
// Note: we filter the results with ACLs *after* applying the user-supplied
256259
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
257260
// results that would be filtered out even if the user did have permission.
258-
if err := h.srv.filterACL(args.Token, reply); err != nil {
261+
if err := h.srv.filterACL(args.Token, &thisReply); err != nil {
259262
return err
260263
}
261264

262-
return h.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes)
265+
if err := h.srv.sortNodesByDistanceFrom(args.Source, thisReply.Nodes); err != nil {
266+
return err
267+
}
268+
269+
*reply = thisReply
270+
return nil
263271
})
264272

265273
// Provide some metrics

Diff for: agent/consul/health_endpoint_test.go

+79
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,85 @@ func TestHealth_ServiceNodes(t *testing.T) {
663663
}
664664
}
665665

666+
func TestHealth_ServiceNodes_BlockingQuery_withFilter(t *testing.T) {
667+
if testing.Short() {
668+
t.Skip("too slow for testing.Short")
669+
}
670+
671+
t.Parallel()
672+
673+
_, s1 := testServer(t)
674+
codec := rpcClient(t, s1)
675+
676+
waitForLeaderEstablishment(t, s1)
677+
678+
register := func(t *testing.T, name, tag string) {
679+
arg := structs.RegisterRequest{
680+
Datacenter: "dc1",
681+
ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"),
682+
Node: "node1",
683+
Address: "127.0.0.1",
684+
Service: &structs.NodeService{
685+
ID: name,
686+
Service: name,
687+
Tags: []string{tag},
688+
},
689+
}
690+
var out struct{}
691+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
692+
}
693+
694+
register(t, "web", "foo")
695+
696+
var lastIndex uint64
697+
runStep(t, "read original", func(t *testing.T) {
698+
var out structs.IndexedCheckServiceNodes
699+
req := structs.ServiceSpecificRequest{
700+
Datacenter: "dc1",
701+
ServiceName: "web",
702+
QueryOptions: structs.QueryOptions{
703+
Filter: "foo in Service.Tags",
704+
},
705+
}
706+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out))
707+
708+
require.Len(t, out.Nodes, 1)
709+
node := out.Nodes[0]
710+
require.Equal(t, "node1", node.Node.Node)
711+
require.Equal(t, "web", node.Service.Service)
712+
require.Equal(t, []string{"foo"}, node.Service.Tags)
713+
714+
require.Equal(t, structs.QueryBackendBlocking, out.Backend)
715+
lastIndex = out.Index
716+
})
717+
718+
runStep(t, "read blocking query result", func(t *testing.T) {
719+
req := structs.ServiceSpecificRequest{
720+
Datacenter: "dc1",
721+
ServiceName: "web",
722+
QueryOptions: structs.QueryOptions{
723+
Filter: "foo in Service.Tags",
724+
},
725+
}
726+
req.MinQueryIndex = lastIndex
727+
728+
var out structs.IndexedCheckServiceNodes
729+
errCh := channelCallRPC(s1, "Health.ServiceNodes", &req, &out, nil)
730+
731+
time.Sleep(200 * time.Millisecond)
732+
733+
// Change the tags
734+
register(t, "web", "bar")
735+
736+
if err := <-errCh; err != nil {
737+
require.NoError(t, err)
738+
}
739+
740+
require.Equal(t, structs.QueryBackendBlocking, out.Backend)
741+
require.Len(t, out.Nodes, 0)
742+
})
743+
}
744+
666745
func TestHealth_ServiceNodes_MultipleServiceTags(t *testing.T) {
667746
if testing.Short() {
668747
t.Skip("too slow for testing.Short")

Diff for: agent/health_endpoint_test.go

+130
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/hashicorp/consul/api"
2121
"github.com/hashicorp/consul/sdk/testutil/retry"
2222
"github.com/hashicorp/consul/testrpc"
23+
"github.com/hashicorp/consul/types"
2324
)
2425

2526
func TestHealthChecksInState(t *testing.T) {
@@ -936,6 +937,135 @@ use_streaming_backend = true
936937
}
937938
}
938939

940+
func TestHealthServiceNodes_Blocking_withFilter(t *testing.T) {
941+
cases := []struct {
942+
name string
943+
hcl string
944+
queryBackend string
945+
}{
946+
{
947+
name: "no streaming",
948+
queryBackend: "blocking-query",
949+
hcl: `use_streaming_backend = false`,
950+
},
951+
{
952+
name: "streaming",
953+
hcl: `
954+
rpc { enable_streaming = true }
955+
use_streaming_backend = true
956+
`,
957+
queryBackend: "streaming",
958+
},
959+
}
960+
961+
runStep := func(t *testing.T, name string, fn func(t *testing.T)) {
962+
t.Helper()
963+
if !t.Run(name, fn) {
964+
t.FailNow()
965+
}
966+
}
967+
968+
register := func(t *testing.T, a *TestAgent, name, tag string) {
969+
args := &structs.RegisterRequest{
970+
Datacenter: "dc1",
971+
ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"),
972+
Node: "node1",
973+
Address: "127.0.0.1",
974+
Service: &structs.NodeService{
975+
ID: name,
976+
Service: name,
977+
Tags: []string{tag},
978+
},
979+
}
980+
981+
var out struct{}
982+
require.NoError(t, a.RPC("Catalog.Register", args, &out))
983+
}
984+
985+
for _, tc := range cases {
986+
tc := tc
987+
t.Run(tc.name, func(t *testing.T) {
988+
a := NewTestAgent(t, tc.hcl)
989+
defer a.Shutdown()
990+
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
991+
992+
// Register one with a tag.
993+
register(t, a, "web", "foo")
994+
995+
filterUrlPart := "filter=" + url.QueryEscape("foo in Service.Tags")
996+
997+
// TODO: use other call format
998+
999+
// Initial request with a filter should return one.
1000+
var lastIndex uint64
1001+
runStep(t, "read original", func(t *testing.T) {
1002+
req, err := http.NewRequest("GET", "/v1/health/service/web?dc=dc1&"+filterUrlPart, nil)
1003+
require.NoError(t, err)
1004+
1005+
resp := httptest.NewRecorder()
1006+
obj, err := a.srv.HealthServiceNodes(resp, req)
1007+
require.NoError(t, err)
1008+
1009+
nodes := obj.(structs.CheckServiceNodes)
1010+
1011+
require.Len(t, nodes, 1)
1012+
1013+
node := nodes[0]
1014+
require.Equal(t, "node1", node.Node.Node)
1015+
require.Equal(t, "web", node.Service.Service)
1016+
require.Equal(t, []string{"foo"}, node.Service.Tags)
1017+
1018+
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
1019+
1020+
idx := getIndex(t, resp)
1021+
require.True(t, idx > 0)
1022+
1023+
lastIndex = idx
1024+
})
1025+
1026+
const timeout = 30 * time.Second
1027+
runStep(t, "read blocking query result", func(t *testing.T) {
1028+
var (
1029+
// out and resp are not safe to read until reading from errCh
1030+
out structs.CheckServiceNodes
1031+
resp = httptest.NewRecorder()
1032+
errCh = make(chan error, 1)
1033+
)
1034+
go func() {
1035+
url := fmt.Sprintf("/v1/health/service/web?dc=dc1&index=%d&wait=%s&%s", lastIndex, timeout, filterUrlPart)
1036+
req, err := http.NewRequest("GET", url, nil)
1037+
if err != nil {
1038+
errCh <- err
1039+
return
1040+
}
1041+
1042+
obj, err := a.srv.HealthServiceNodes(resp, req)
1043+
if err != nil {
1044+
errCh <- err
1045+
return
1046+
}
1047+
1048+
nodes := obj.(structs.CheckServiceNodes)
1049+
out = nodes
1050+
errCh <- nil
1051+
}()
1052+
1053+
time.Sleep(200 * time.Millisecond)
1054+
1055+
// Change the tags.
1056+
register(t, a, "web", "bar")
1057+
1058+
if err := <-errCh; err != nil {
1059+
require.NoError(t, err)
1060+
}
1061+
1062+
require.Len(t, out, 0)
1063+
require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
1064+
})
1065+
})
1066+
}
1067+
}
1068+
9391069
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
9401070
if testing.Short() {
9411071
t.Skip("too slow for testing.Short")

Diff for: agent/rpcclient/health/view.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,12 @@ func (s *healthView) Update(events []*pbsubscribe.Event) error {
8080
return errors.New("check service node was unexpectedly nil")
8181
}
8282
passed, err := s.filter.Evaluate(*csn)
83-
switch {
84-
case err != nil:
83+
if err != nil {
8584
return err
86-
case passed:
85+
} else if passed {
8786
s.state[id] = *csn
87+
} else {
88+
delete(s.state, id)
8889
}
8990

9091
case pbsubscribe.CatalogOp_Deregister:

Diff for: agent/structs/structs.go

+2
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,8 @@ const (
398398
QueryBackendStreaming
399399
)
400400

401+
func (q QueryBackend) GoString() string { return q.String() }
402+
401403
func (q QueryBackend) String() string {
402404
switch q {
403405
case QueryBackendBlocking:

0 commit comments

Comments
 (0)