Skip to content

Commit 637a054

Browse files
authored
[1.11.x] health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming (#12866)
Backport of #12640 to 1.11.x
1 parent d0f3e42 commit 637a054

File tree

6 files changed

+230
-6
lines changed

6 files changed

+230
-6
lines changed

Diff for: .changelog/12640.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming #12640
3+
```

Diff for: agent/consul/health_endpoint.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -235,30 +235,38 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
235235
&args.QueryOptions,
236236
&reply.QueryMeta,
237237
func(ws memdb.WatchSet, state *state.Store) error {
238+
var thisReply structs.IndexedCheckServiceNodes
239+
238240
index, nodes, err := f(ws, state, args)
239241
if err != nil {
240242
return err
241243
}
242244

243-
reply.Index, reply.Nodes = index, nodes
245+
thisReply.Index, thisReply.Nodes = index, nodes
246+
244247
if len(args.NodeMetaFilters) > 0 {
245-
reply.Nodes = nodeMetaFilter(args.NodeMetaFilters, reply.Nodes)
248+
thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes)
246249
}
247250

248-
raw, err := filter.Execute(reply.Nodes)
251+
raw, err := filter.Execute(thisReply.Nodes)
249252
if err != nil {
250253
return err
251254
}
252-
reply.Nodes = raw.(structs.CheckServiceNodes)
255+
thisReply.Nodes = raw.(structs.CheckServiceNodes)
253256

254257
// Note: we filter the results with ACLs *after* applying the user-supplied
255258
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
256259
// results that would be filtered out even if the user did have permission.
257-
if err := h.srv.filterACL(args.Token, reply); err != nil {
260+
if err := h.srv.filterACL(args.Token, &thisReply); err != nil {
258261
return err
259262
}
260263

261-
return h.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes)
264+
if err := h.srv.sortNodesByDistanceFrom(args.Source, thisReply.Nodes); err != nil {
265+
return err
266+
}
267+
268+
*reply = thisReply
269+
return nil
262270
})
263271

264272
// Provide some metrics

Diff for: agent/consul/health_endpoint_test.go

+79
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,85 @@ func TestHealth_ServiceNodes(t *testing.T) {
678678
}
679679
}
680680

681+
func TestHealth_ServiceNodes_BlockingQuery_withFilter(t *testing.T) {
682+
if testing.Short() {
683+
t.Skip("too slow for testing.Short")
684+
}
685+
686+
t.Parallel()
687+
688+
_, s1 := testServer(t)
689+
codec := rpcClient(t, s1)
690+
691+
waitForLeaderEstablishment(t, s1)
692+
693+
register := func(t *testing.T, name, tag string) {
694+
arg := structs.RegisterRequest{
695+
Datacenter: "dc1",
696+
ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"),
697+
Node: "node1",
698+
Address: "127.0.0.1",
699+
Service: &structs.NodeService{
700+
ID: name,
701+
Service: name,
702+
Tags: []string{tag},
703+
},
704+
}
705+
var out struct{}
706+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
707+
}
708+
709+
register(t, "web", "foo")
710+
711+
var lastIndex uint64
712+
runStep(t, "read original", func(t *testing.T) {
713+
var out structs.IndexedCheckServiceNodes
714+
req := structs.ServiceSpecificRequest{
715+
Datacenter: "dc1",
716+
ServiceName: "web",
717+
QueryOptions: structs.QueryOptions{
718+
Filter: "foo in Service.Tags",
719+
},
720+
}
721+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out))
722+
723+
require.Len(t, out.Nodes, 1)
724+
node := out.Nodes[0]
725+
require.Equal(t, "node1", node.Node.Node)
726+
require.Equal(t, "web", node.Service.Service)
727+
require.Equal(t, []string{"foo"}, node.Service.Tags)
728+
729+
require.Equal(t, structs.QueryBackendBlocking, out.Backend)
730+
lastIndex = out.Index
731+
})
732+
733+
runStep(t, "read blocking query result", func(t *testing.T) {
734+
req := structs.ServiceSpecificRequest{
735+
Datacenter: "dc1",
736+
ServiceName: "web",
737+
QueryOptions: structs.QueryOptions{
738+
Filter: "foo in Service.Tags",
739+
},
740+
}
741+
req.MinQueryIndex = lastIndex
742+
743+
var out structs.IndexedCheckServiceNodes
744+
errCh := channelCallRPC(s1, "Health.ServiceNodes", &req, &out, nil)
745+
746+
time.Sleep(200 * time.Millisecond)
747+
748+
// Change the tags
749+
register(t, "web", "bar")
750+
751+
if err := <-errCh; err != nil {
752+
require.NoError(t, err)
753+
}
754+
755+
require.Equal(t, structs.QueryBackendBlocking, out.Backend)
756+
require.Len(t, out.Nodes, 0)
757+
})
758+
}
759+
681760
func TestHealth_ServiceNodes_MultipleServiceTags(t *testing.T) {
682761
if testing.Short() {
683762
t.Skip("too slow for testing.Short")

Diff for: agent/health_endpoint_test.go

+130
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/hashicorp/consul/api"
2222
"github.com/hashicorp/consul/sdk/testutil/retry"
2323
"github.com/hashicorp/consul/testrpc"
24+
"github.com/hashicorp/consul/types"
2425
)
2526

2627
func TestHealthChecksInState(t *testing.T) {
@@ -906,6 +907,135 @@ use_streaming_backend = true
906907
}
907908
}
908909

910+
func TestHealthServiceNodes_Blocking_withFilter(t *testing.T) {
911+
cases := []struct {
912+
name string
913+
hcl string
914+
queryBackend string
915+
}{
916+
{
917+
name: "no streaming",
918+
queryBackend: "blocking-query",
919+
hcl: `use_streaming_backend = false`,
920+
},
921+
{
922+
name: "streaming",
923+
hcl: `
924+
rpc { enable_streaming = true }
925+
use_streaming_backend = true
926+
`,
927+
queryBackend: "streaming",
928+
},
929+
}
930+
931+
runStep := func(t *testing.T, name string, fn func(t *testing.T)) {
932+
t.Helper()
933+
if !t.Run(name, fn) {
934+
t.FailNow()
935+
}
936+
}
937+
938+
register := func(t *testing.T, a *TestAgent, name, tag string) {
939+
args := &structs.RegisterRequest{
940+
Datacenter: "dc1",
941+
ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"),
942+
Node: "node1",
943+
Address: "127.0.0.1",
944+
Service: &structs.NodeService{
945+
ID: name,
946+
Service: name,
947+
Tags: []string{tag},
948+
},
949+
}
950+
951+
var out struct{}
952+
require.NoError(t, a.RPC("Catalog.Register", args, &out))
953+
}
954+
955+
for _, tc := range cases {
956+
tc := tc
957+
t.Run(tc.name, func(t *testing.T) {
958+
a := NewTestAgent(t, tc.hcl)
959+
defer a.Shutdown()
960+
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
961+
962+
// Register one with a tag.
963+
register(t, a, "web", "foo")
964+
965+
filterUrlPart := "filter=" + url.QueryEscape("foo in Service.Tags")
966+
967+
// TODO: use other call format
968+
969+
// Initial request with a filter should return one.
970+
var lastIndex uint64
971+
runStep(t, "read original", func(t *testing.T) {
972+
req, err := http.NewRequest("GET", "/v1/health/service/web?dc=dc1&"+filterUrlPart, nil)
973+
require.NoError(t, err)
974+
975+
resp := httptest.NewRecorder()
976+
obj, err := a.srv.HealthServiceNodes(resp, req)
977+
require.NoError(t, err)
978+
979+
nodes := obj.(structs.CheckServiceNodes)
980+
981+
require.Len(t, nodes, 1)
982+
983+
node := nodes[0]
984+
require.Equal(t, "node1", node.Node.Node)
985+
require.Equal(t, "web", node.Service.Service)
986+
require.Equal(t, []string{"foo"}, node.Service.Tags)
987+
988+
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
989+
990+
idx := getIndex(t, resp)
991+
require.True(t, idx > 0)
992+
993+
lastIndex = idx
994+
})
995+
996+
const timeout = 30 * time.Second
997+
runStep(t, "read blocking query result", func(t *testing.T) {
998+
var (
999+
// out and resp are not safe to read until reading from errCh
1000+
out structs.CheckServiceNodes
1001+
resp = httptest.NewRecorder()
1002+
errCh = make(chan error, 1)
1003+
)
1004+
go func() {
1005+
url := fmt.Sprintf("/v1/health/service/web?dc=dc1&index=%d&wait=%s&%s", lastIndex, timeout, filterUrlPart)
1006+
req, err := http.NewRequest("GET", url, nil)
1007+
if err != nil {
1008+
errCh <- err
1009+
return
1010+
}
1011+
1012+
obj, err := a.srv.HealthServiceNodes(resp, req)
1013+
if err != nil {
1014+
errCh <- err
1015+
return
1016+
}
1017+
1018+
nodes := obj.(structs.CheckServiceNodes)
1019+
out = nodes
1020+
errCh <- nil
1021+
}()
1022+
1023+
time.Sleep(200 * time.Millisecond)
1024+
1025+
// Change the tags.
1026+
register(t, a, "web", "bar")
1027+
1028+
if err := <-errCh; err != nil {
1029+
require.NoError(t, err)
1030+
}
1031+
1032+
require.Len(t, out, 0)
1033+
require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
1034+
})
1035+
})
1036+
}
1037+
}
1038+
9091039
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
9101040
if testing.Short() {
9111041
t.Skip("too slow for testing.Short")

Diff for: agent/rpcclient/health/view.go

+2
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ func (s *healthView) Update(events []*pbsubscribe.Event) error {
7878
return err
7979
case passed:
8080
s.state[id] = csn
81+
default:
82+
delete(s.state, id)
8183
}
8284

8385
case pbsubscribe.CatalogOp_Deregister:

Diff for: agent/structs/structs.go

+2
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,8 @@ const (
355355
QueryBackendStreaming
356356
)
357357

358+
func (q QueryBackend) GoString() string { return q.String() }
359+
358360
func (q QueryBackend) String() string {
359361
switch q {
360362
case QueryBackendBlocking:

0 commit comments

Comments
 (0)