Skip to content

Commit bd0d70b

Browse files
authored
[1.10.x] health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming (#12868)
Backport of #12640 to 1.10.x
1 parent 91baf56 commit bd0d70b

File tree

6 files changed

+230
-6
lines changed

6 files changed

+230
-6
lines changed

Diff for: .changelog/12640.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming #12640
3+
```

Diff for: agent/consul/health_endpoint.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -222,27 +222,35 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
222222
&args.QueryOptions,
223223
&reply.QueryMeta,
224224
func(ws memdb.WatchSet, state *state.Store) error {
225+
var thisReply structs.IndexedCheckServiceNodes
226+
225227
index, nodes, err := f(ws, state, args)
226228
if err != nil {
227229
return err
228230
}
229231

230-
reply.Index, reply.Nodes = index, nodes
232+
thisReply.Index, thisReply.Nodes = index, nodes
233+
231234
if len(args.NodeMetaFilters) > 0 {
232-
reply.Nodes = nodeMetaFilter(args.NodeMetaFilters, reply.Nodes)
235+
thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes)
233236
}
234237

235-
if err := h.srv.filterACL(args.Token, reply); err != nil {
238+
if err := h.srv.filterACL(args.Token, &thisReply); err != nil {
236239
return err
237240
}
238241

239-
raw, err := filter.Execute(reply.Nodes)
242+
raw, err := filter.Execute(thisReply.Nodes)
240243
if err != nil {
241244
return err
242245
}
243-
reply.Nodes = raw.(structs.CheckServiceNodes)
246+
thisReply.Nodes = raw.(structs.CheckServiceNodes)
244247

245-
return h.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes)
248+
if err := h.srv.sortNodesByDistanceFrom(args.Source, thisReply.Nodes); err != nil {
249+
return err
250+
}
251+
252+
*reply = thisReply
253+
return nil
246254
})
247255

248256
// Provide some metrics

Diff for: agent/consul/health_endpoint_test.go

+79
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,85 @@ func TestHealth_ServiceNodes(t *testing.T) {
677677
}
678678
}
679679

680+
func TestHealth_ServiceNodes_BlockingQuery_withFilter(t *testing.T) {
681+
if testing.Short() {
682+
t.Skip("too slow for testing.Short")
683+
}
684+
685+
t.Parallel()
686+
687+
_, s1 := testServer(t)
688+
codec := rpcClient(t, s1)
689+
690+
waitForLeaderEstablishment(t, s1)
691+
692+
register := func(t *testing.T, name, tag string) {
693+
arg := structs.RegisterRequest{
694+
Datacenter: "dc1",
695+
ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"),
696+
Node: "node1",
697+
Address: "127.0.0.1",
698+
Service: &structs.NodeService{
699+
ID: name,
700+
Service: name,
701+
Tags: []string{tag},
702+
},
703+
}
704+
var out struct{}
705+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
706+
}
707+
708+
register(t, "web", "foo")
709+
710+
var lastIndex uint64
711+
runStep(t, "read original", func(t *testing.T) {
712+
var out structs.IndexedCheckServiceNodes
713+
req := structs.ServiceSpecificRequest{
714+
Datacenter: "dc1",
715+
ServiceName: "web",
716+
QueryOptions: structs.QueryOptions{
717+
Filter: "foo in Service.Tags",
718+
},
719+
}
720+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out))
721+
722+
require.Len(t, out.Nodes, 1)
723+
node := out.Nodes[0]
724+
require.Equal(t, "node1", node.Node.Node)
725+
require.Equal(t, "web", node.Service.Service)
726+
require.Equal(t, []string{"foo"}, node.Service.Tags)
727+
728+
require.Equal(t, structs.QueryBackendBlocking, out.Backend)
729+
lastIndex = out.Index
730+
})
731+
732+
runStep(t, "read blocking query result", func(t *testing.T) {
733+
req := structs.ServiceSpecificRequest{
734+
Datacenter: "dc1",
735+
ServiceName: "web",
736+
QueryOptions: structs.QueryOptions{
737+
Filter: "foo in Service.Tags",
738+
},
739+
}
740+
req.MinQueryIndex = lastIndex
741+
742+
var out structs.IndexedCheckServiceNodes
743+
errCh := channelCallRPC(s1, "Health.ServiceNodes", &req, &out, nil)
744+
745+
time.Sleep(200 * time.Millisecond)
746+
747+
// Change the tags
748+
register(t, "web", "bar")
749+
750+
if err := <-errCh; err != nil {
751+
require.NoError(t, err)
752+
}
753+
754+
require.Equal(t, structs.QueryBackendBlocking, out.Backend)
755+
require.Len(t, out.Nodes, 0)
756+
})
757+
}
758+
680759
func TestHealth_ServiceNodes_MultipleServiceTags(t *testing.T) {
681760
if testing.Short() {
682761
t.Skip("too slow for testing.Short")

Diff for: agent/health_endpoint_test.go

+130
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/hashicorp/consul/api"
2222
"github.com/hashicorp/consul/sdk/testutil/retry"
2323
"github.com/hashicorp/consul/testrpc"
24+
"github.com/hashicorp/consul/types"
2425
)
2526

2627
func TestHealthChecksInState(t *testing.T) {
@@ -906,6 +907,135 @@ use_streaming_backend = true
906907
}
907908
}
908909

910+
func TestHealthServiceNodes_Blocking_withFilter(t *testing.T) {
911+
cases := []struct {
912+
name string
913+
hcl string
914+
queryBackend string
915+
}{
916+
{
917+
name: "no streaming",
918+
queryBackend: "blocking-query",
919+
hcl: `use_streaming_backend = false`,
920+
},
921+
{
922+
name: "streaming",
923+
hcl: `
924+
rpc { enable_streaming = true }
925+
use_streaming_backend = true
926+
`,
927+
queryBackend: "streaming",
928+
},
929+
}
930+
931+
runStep := func(t *testing.T, name string, fn func(t *testing.T)) {
932+
t.Helper()
933+
if !t.Run(name, fn) {
934+
t.FailNow()
935+
}
936+
}
937+
938+
register := func(t *testing.T, a *TestAgent, name, tag string) {
939+
args := &structs.RegisterRequest{
940+
Datacenter: "dc1",
941+
ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"),
942+
Node: "node1",
943+
Address: "127.0.0.1",
944+
Service: &structs.NodeService{
945+
ID: name,
946+
Service: name,
947+
Tags: []string{tag},
948+
},
949+
}
950+
951+
var out struct{}
952+
require.NoError(t, a.RPC("Catalog.Register", args, &out))
953+
}
954+
955+
for _, tc := range cases {
956+
tc := tc
957+
t.Run(tc.name, func(t *testing.T) {
958+
a := NewTestAgent(t, tc.hcl)
959+
defer a.Shutdown()
960+
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
961+
962+
// Register one with a tag.
963+
register(t, a, "web", "foo")
964+
965+
filterUrlPart := "filter=" + url.QueryEscape("foo in Service.Tags")
966+
967+
// TODO: use other call format
968+
969+
// Initial request with a filter should return one.
970+
var lastIndex uint64
971+
runStep(t, "read original", func(t *testing.T) {
972+
req, err := http.NewRequest("GET", "/v1/health/service/web?dc=dc1&"+filterUrlPart, nil)
973+
require.NoError(t, err)
974+
975+
resp := httptest.NewRecorder()
976+
obj, err := a.srv.HealthServiceNodes(resp, req)
977+
require.NoError(t, err)
978+
979+
nodes := obj.(structs.CheckServiceNodes)
980+
981+
require.Len(t, nodes, 1)
982+
983+
node := nodes[0]
984+
require.Equal(t, "node1", node.Node.Node)
985+
require.Equal(t, "web", node.Service.Service)
986+
require.Equal(t, []string{"foo"}, node.Service.Tags)
987+
988+
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
989+
990+
idx := getIndex(t, resp)
991+
require.True(t, idx > 0)
992+
993+
lastIndex = idx
994+
})
995+
996+
const timeout = 30 * time.Second
997+
runStep(t, "read blocking query result", func(t *testing.T) {
998+
var (
999+
// out and resp are not safe to read until reading from errCh
1000+
out structs.CheckServiceNodes
1001+
resp = httptest.NewRecorder()
1002+
errCh = make(chan error, 1)
1003+
)
1004+
go func() {
1005+
url := fmt.Sprintf("/v1/health/service/web?dc=dc1&index=%d&wait=%s&%s", lastIndex, timeout, filterUrlPart)
1006+
req, err := http.NewRequest("GET", url, nil)
1007+
if err != nil {
1008+
errCh <- err
1009+
return
1010+
}
1011+
1012+
obj, err := a.srv.HealthServiceNodes(resp, req)
1013+
if err != nil {
1014+
errCh <- err
1015+
return
1016+
}
1017+
1018+
nodes := obj.(structs.CheckServiceNodes)
1019+
out = nodes
1020+
errCh <- nil
1021+
}()
1022+
1023+
time.Sleep(200 * time.Millisecond)
1024+
1025+
// Change the tags.
1026+
register(t, a, "web", "bar")
1027+
1028+
if err := <-errCh; err != nil {
1029+
require.NoError(t, err)
1030+
}
1031+
1032+
require.Len(t, out, 0)
1033+
require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
1034+
})
1035+
})
1036+
}
1037+
}
1038+
9091039
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
9101040
if testing.Short() {
9111041
t.Skip("too slow for testing.Short")

Diff for: agent/rpcclient/health/view.go

+2
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ func (s *healthView) Update(events []*pbsubscribe.Event) error {
7777
return err
7878
case passed:
7979
s.state[id] = csn
80+
default:
81+
delete(s.state, id)
8082
}
8183

8284
case pbsubscribe.CatalogOp_Deregister:

Diff for: agent/structs/structs.go

+2
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,8 @@ const (
321321
QueryBackendStreaming
322322
)
323323

324+
func (q QueryBackend) GoString() string { return q.String() }
325+
324326
func (q QueryBackend) String() string {
325327
switch q {
326328
case QueryBackendBlocking:

0 commit comments

Comments
 (0)