Skip to content

Commit 2f9c448

Browse files
rboyerhc-github-team-consul-core
authored andcommitted
server: ensure that central service config flattening properly resets the state each time (#10245)
The prior solution to call reply.Reset() aged poorly since newer fields were added to the reply, but not added to Reset() leading serial blocking query loops on the server to blend replies. This could manifest as a service-defaults protocol change from default=>http not reverting back to default after the config entry reponsible was deleted. Backport of #10239 to 1.9.x
1 parent 63d03e3 commit 2f9c448

File tree

4 files changed

+238
-28
lines changed

4 files changed

+238
-28
lines changed

Diff for: .changelog/10239.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
server: ensure that central service config flattening properly resets the state each time
3+
```

Diff for: agent/consul/config_endpoint.go

+23-20
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import (
55
"time"
66

77
metrics "github.com/armon/go-metrics"
8+
memdb "github.com/hashicorp/go-memdb"
9+
"github.com/mitchellh/copystructure"
10+
811
"github.com/hashicorp/consul/acl"
912
"github.com/hashicorp/consul/agent/consul/state"
1013
"github.com/hashicorp/consul/agent/structs"
11-
memdb "github.com/hashicorp/go-memdb"
12-
"github.com/mitchellh/copystructure"
1314
)
1415

1516
// The ConfigEntry endpoint is used to query centralized config information
@@ -263,9 +264,9 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
263264
&args.QueryOptions,
264265
&reply.QueryMeta,
265266
func(ws memdb.WatchSet, state *state.Store) error {
266-
reply.Reset()
267+
var thisReply structs.ServiceConfigResponse
267268

268-
reply.MeshGateway.Mode = structs.MeshGatewayModeDefault
269+
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
269270
// Pass the WatchSet to both the service and proxy config lookups. If either is updated
270271
// during the blocking query, this function will be rerun and these state store lookups
271272
// will both be current.
@@ -299,28 +300,28 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
299300
if err != nil {
300301
return fmt.Errorf("failed to copy global proxy-defaults: %v", err)
301302
}
302-
reply.ProxyConfig = mapCopy.(map[string]interface{})
303-
reply.MeshGateway = proxyConf.MeshGateway
304-
reply.Expose = proxyConf.Expose
303+
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
304+
thisReply.MeshGateway = proxyConf.MeshGateway
305+
thisReply.Expose = proxyConf.Expose
305306
}
306307

307-
reply.Index = index
308+
thisReply.Index = index
308309

309310
if serviceConf != nil {
310311
if serviceConf.Expose.Checks {
311-
reply.Expose.Checks = true
312+
thisReply.Expose.Checks = true
312313
}
313314
if len(serviceConf.Expose.Paths) >= 1 {
314-
reply.Expose.Paths = serviceConf.Expose.Paths
315+
thisReply.Expose.Paths = serviceConf.Expose.Paths
315316
}
316317
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
317-
reply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
318+
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
318319
}
319320
if serviceConf.Protocol != "" {
320-
if reply.ProxyConfig == nil {
321-
reply.ProxyConfig = make(map[string]interface{})
321+
if thisReply.ProxyConfig == nil {
322+
thisReply.ProxyConfig = make(map[string]interface{})
322323
}
323-
reply.ProxyConfig["protocol"] = serviceConf.Protocol
324+
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
324325
}
325326
}
326327

@@ -378,26 +379,28 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
378379

379380
// don't allocate the slices just to not fill them
380381
if len(usConfigs) == 0 {
382+
*reply = thisReply
381383
return nil
382384
}
383385

384386
if legacyUpstreams {
385-
if reply.UpstreamConfigs == nil {
386-
reply.UpstreamConfigs = make(map[string]map[string]interface{})
387+
if thisReply.UpstreamConfigs == nil {
388+
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
387389
}
388390
for us, conf := range usConfigs {
389-
reply.UpstreamConfigs[us.ID] = conf
391+
thisReply.UpstreamConfigs[us.ID] = conf
390392
}
391393
} else {
392-
if reply.UpstreamIDConfigs == nil {
393-
reply.UpstreamIDConfigs = make(structs.UpstreamConfigs, 0, len(usConfigs))
394+
if thisReply.UpstreamIDConfigs == nil {
395+
thisReply.UpstreamIDConfigs = make(structs.UpstreamConfigs, 0, len(usConfigs))
394396
}
395397

396398
for us, conf := range usConfigs {
397-
reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs, structs.UpstreamConfig{Upstream: us, Config: conf})
399+
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs, structs.UpstreamConfig{Upstream: us, Config: conf})
398400
}
399401
}
400402

403+
*reply = thisReply
401404
return nil
402405
})
403406
}

Diff for: agent/consul/config_endpoint_test.go

+212-2
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import (
55
"testing"
66
"time"
77

8+
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
9+
"github.com/stretchr/testify/require"
10+
811
"github.com/hashicorp/consul/acl"
912
"github.com/hashicorp/consul/agent/structs"
1013
"github.com/hashicorp/consul/sdk/testutil/retry"
1114
"github.com/hashicorp/consul/testrpc"
12-
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
13-
"github.com/stretchr/testify/require"
1415
)
1516

1617
func TestConfigEntry_Apply(t *testing.T) {
@@ -811,6 +812,9 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
811812
// of the blocking query does NOT bleed over into the next run. Concretely
812813
// in this test the data present in the initial proxy-defaults should not
813814
// be present when we are woken up due to proxy-defaults being deleted.
815+
//
816+
// This test does not pertain to upstreams, see:
817+
// TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking
814818

815819
state := s1.fsm.State()
816820
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
@@ -962,6 +966,205 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
962966
}
963967
}
964968

969+
func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) {
970+
if testing.Short() {
971+
t.Skip("too slow for testing.Short")
972+
}
973+
974+
t.Parallel()
975+
976+
dir1, s1 := testServer(t)
977+
defer os.RemoveAll(dir1)
978+
defer s1.Shutdown()
979+
codec := rpcClient(t, s1)
980+
defer codec.Close()
981+
982+
// The main thing this should test is that information from one iteration
983+
// of the blocking query does NOT bleed over into the next run. Concretely
984+
// in this test the data present in the initial proxy-defaults should not
985+
// be present when we are woken up due to proxy-defaults being deleted.
986+
//
987+
// This test is about fields in upstreams, see:
988+
// TestConfigEntry_ResolveServiceConfig_Blocking
989+
990+
state := s1.fsm.State()
991+
require.NoError(t, state.EnsureConfigEntry(1, &structs.ServiceConfigEntry{
992+
Kind: structs.ServiceDefaults,
993+
Name: "foo",
994+
Protocol: "http",
995+
}, nil))
996+
require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
997+
Kind: structs.ServiceDefaults,
998+
Name: "bar",
999+
Protocol: "http",
1000+
}, nil))
1001+
1002+
var index uint64
1003+
1004+
runStep(t, "foo and bar should be both http", func(t *testing.T) {
1005+
// Verify that we get the results of service-defaults for 'foo' and 'bar'.
1006+
var out structs.ServiceConfigResponse
1007+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
1008+
&structs.ServiceConfigRequest{
1009+
Name: "foo",
1010+
Datacenter: "dc1",
1011+
UpstreamIDs: []structs.ServiceID{
1012+
structs.NewServiceID("bar", nil),
1013+
structs.NewServiceID("other", nil),
1014+
},
1015+
},
1016+
&out,
1017+
))
1018+
1019+
expected := structs.ServiceConfigResponse{
1020+
ProxyConfig: map[string]interface{}{
1021+
"protocol": "http",
1022+
},
1023+
UpstreamIDConfigs: []structs.UpstreamConfig{
1024+
{
1025+
Upstream: structs.NewServiceID("bar", nil),
1026+
Config: map[string]interface{}{
1027+
"protocol": "http",
1028+
},
1029+
},
1030+
},
1031+
QueryMeta: out.QueryMeta, // don't care
1032+
}
1033+
1034+
require.Equal(t, expected, out)
1035+
index = out.Index
1036+
})
1037+
1038+
runStep(t, "blocking query for foo wakes on bar entry delete", func(t *testing.T) {
1039+
// Now setup a blocking query for 'foo' while we erase the
1040+
// service-defaults for bar.
1041+
1042+
// Async cause a change
1043+
start := time.Now()
1044+
go func() {
1045+
time.Sleep(100 * time.Millisecond)
1046+
err := state.DeleteConfigEntry(index+1,
1047+
structs.ServiceDefaults,
1048+
"bar",
1049+
nil,
1050+
)
1051+
if err != nil {
1052+
t.Errorf("delete config entry failed: %v", err)
1053+
}
1054+
}()
1055+
1056+
// Re-run the query
1057+
var out structs.ServiceConfigResponse
1058+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
1059+
&structs.ServiceConfigRequest{
1060+
Name: "foo",
1061+
Datacenter: "dc1",
1062+
UpstreamIDs: []structs.ServiceID{
1063+
structs.NewServiceID("bar", nil),
1064+
structs.NewServiceID("other", nil),
1065+
},
1066+
QueryOptions: structs.QueryOptions{
1067+
MinQueryIndex: index,
1068+
MaxQueryTime: time.Second,
1069+
},
1070+
},
1071+
&out,
1072+
))
1073+
1074+
// Should block at least 100ms
1075+
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
1076+
1077+
// Check the indexes
1078+
require.Equal(t, out.Index, index+1)
1079+
1080+
expected := structs.ServiceConfigResponse{
1081+
ProxyConfig: map[string]interface{}{
1082+
"protocol": "http",
1083+
},
1084+
QueryMeta: out.QueryMeta, // don't care
1085+
}
1086+
1087+
require.Equal(t, expected, out)
1088+
index = out.Index
1089+
})
1090+
1091+
runStep(t, "foo should be http and bar should be unset", func(t *testing.T) {
1092+
// Verify that we get the results of service-defaults for just 'foo'.
1093+
var out structs.ServiceConfigResponse
1094+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
1095+
&structs.ServiceConfigRequest{
1096+
Name: "foo",
1097+
Datacenter: "dc1",
1098+
UpstreamIDs: []structs.ServiceID{
1099+
structs.NewServiceID("bar", nil),
1100+
structs.NewServiceID("other", nil),
1101+
},
1102+
},
1103+
&out,
1104+
))
1105+
1106+
expected := structs.ServiceConfigResponse{
1107+
ProxyConfig: map[string]interface{}{
1108+
"protocol": "http",
1109+
},
1110+
QueryMeta: out.QueryMeta, // don't care
1111+
}
1112+
1113+
require.Equal(t, expected, out)
1114+
index = out.Index
1115+
})
1116+
1117+
runStep(t, "blocking query for foo wakes on foo entry delete", func(t *testing.T) {
1118+
// Now setup a blocking query for 'foo' while we erase the
1119+
// service-defaults for foo.
1120+
1121+
// Async cause a change
1122+
start := time.Now()
1123+
go func() {
1124+
time.Sleep(100 * time.Millisecond)
1125+
err := state.DeleteConfigEntry(index+1,
1126+
structs.ServiceDefaults,
1127+
"foo",
1128+
nil,
1129+
)
1130+
if err != nil {
1131+
t.Errorf("delete config entry failed: %v", err)
1132+
}
1133+
}()
1134+
1135+
// Re-run the query
1136+
var out structs.ServiceConfigResponse
1137+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
1138+
&structs.ServiceConfigRequest{
1139+
Name: "foo",
1140+
Datacenter: "dc1",
1141+
UpstreamIDs: []structs.ServiceID{
1142+
structs.NewServiceID("bar", nil),
1143+
structs.NewServiceID("other", nil),
1144+
},
1145+
QueryOptions: structs.QueryOptions{
1146+
MinQueryIndex: index,
1147+
MaxQueryTime: time.Second,
1148+
},
1149+
},
1150+
&out,
1151+
))
1152+
1153+
// Should block at least 100ms
1154+
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
1155+
1156+
// Check the indexes
1157+
require.Equal(t, out.Index, index+1)
1158+
1159+
expected := structs.ServiceConfigResponse{
1160+
QueryMeta: out.QueryMeta, // don't care
1161+
}
1162+
1163+
require.Equal(t, expected, out)
1164+
index = out.Index
1165+
})
1166+
}
1167+
9651168
func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testing.T) {
9661169
t.Parallel()
9671170

@@ -1219,3 +1422,10 @@ func TestConfigEntry_ProxyDefaultsExposeConfig(t *testing.T) {
12191422
require.True(t, ok)
12201423
require.Equal(t, expose, proxyConf.Expose)
12211424
}
1425+
1426+
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
1427+
t.Helper()
1428+
if !t.Run(name, fn) {
1429+
t.FailNow()
1430+
}
1431+
}

Diff for: agent/structs/config_entry.go

-6
Original file line numberDiff line numberDiff line change
@@ -567,12 +567,6 @@ type ServiceConfigResponse struct {
567567
QueryMeta
568568
}
569569

570-
func (r *ServiceConfigResponse) Reset() {
571-
r.ProxyConfig = nil
572-
r.UpstreamConfigs = nil
573-
r.MeshGateway = MeshGatewayConfig{}
574-
}
575-
576570
// MarshalBinary writes ServiceConfigResponse as msgpack encoded. It's only here
577571
// because we need custom decoding of the raw interface{} values.
578572
func (r *ServiceConfigResponse) MarshalBinary() (data []byte, err error) {

0 commit comments

Comments
 (0)