Skip to content

Commit 3357a14

Browse files
authored
server: ensure that central service config flattening properly resets the state each time (#10245)
The prior solution to call reply.Reset() aged poorly since newer fields were added to the reply, but not added to Reset() leading serial blocking query loops on the server to blend replies. This could manifest as a service-defaults protocol change from default=>http not reverting back to default after the config entry reponsible was deleted. Backport of #10239 to 1.9.x
1 parent 89180eb commit 3357a14

File tree

4 files changed

+238
-28
lines changed

4 files changed

+238
-28
lines changed

Diff for: .changelog/10239.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
server: ensure that central service config flattening properly resets the state each time
3+
```

Diff for: agent/consul/config_endpoint.go

+23-20
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ import (
77
"github.com/armon/go-metrics/prometheus"
88

99
metrics "github.com/armon/go-metrics"
10+
memdb "github.com/hashicorp/go-memdb"
11+
"github.com/mitchellh/copystructure"
12+
1013
"github.com/hashicorp/consul/acl"
1114
"github.com/hashicorp/consul/agent/consul/state"
1215
"github.com/hashicorp/consul/agent/structs"
13-
memdb "github.com/hashicorp/go-memdb"
14-
"github.com/mitchellh/copystructure"
1516
)
1617

1718
var ConfigSummaries = []prometheus.SummaryDefinition{
@@ -328,9 +329,9 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
328329
&args.QueryOptions,
329330
&reply.QueryMeta,
330331
func(ws memdb.WatchSet, state *state.Store) error {
331-
reply.Reset()
332+
var thisReply structs.ServiceConfigResponse
332333

333-
reply.MeshGateway.Mode = structs.MeshGatewayModeDefault
334+
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
334335
// Pass the WatchSet to both the service and proxy config lookups. If either is updated
335336
// during the blocking query, this function will be rerun and these state store lookups
336337
// will both be current.
@@ -364,28 +365,28 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
364365
if err != nil {
365366
return fmt.Errorf("failed to copy global proxy-defaults: %v", err)
366367
}
367-
reply.ProxyConfig = mapCopy.(map[string]interface{})
368-
reply.MeshGateway = proxyConf.MeshGateway
369-
reply.Expose = proxyConf.Expose
368+
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
369+
thisReply.MeshGateway = proxyConf.MeshGateway
370+
thisReply.Expose = proxyConf.Expose
370371
}
371372

372-
reply.Index = index
373+
thisReply.Index = index
373374

374375
if serviceConf != nil {
375376
if serviceConf.Expose.Checks {
376-
reply.Expose.Checks = true
377+
thisReply.Expose.Checks = true
377378
}
378379
if len(serviceConf.Expose.Paths) >= 1 {
379-
reply.Expose.Paths = serviceConf.Expose.Paths
380+
thisReply.Expose.Paths = serviceConf.Expose.Paths
380381
}
381382
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
382-
reply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
383+
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
383384
}
384385
if serviceConf.Protocol != "" {
385-
if reply.ProxyConfig == nil {
386-
reply.ProxyConfig = make(map[string]interface{})
386+
if thisReply.ProxyConfig == nil {
387+
thisReply.ProxyConfig = make(map[string]interface{})
387388
}
388-
reply.ProxyConfig["protocol"] = serviceConf.Protocol
389+
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
389390
}
390391
}
391392

@@ -443,26 +444,28 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
443444

444445
// don't allocate the slices just to not fill them
445446
if len(usConfigs) == 0 {
447+
*reply = thisReply
446448
return nil
447449
}
448450

449451
if legacyUpstreams {
450-
if reply.UpstreamConfigs == nil {
451-
reply.UpstreamConfigs = make(map[string]map[string]interface{})
452+
if thisReply.UpstreamConfigs == nil {
453+
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
452454
}
453455
for us, conf := range usConfigs {
454-
reply.UpstreamConfigs[us.ID] = conf
456+
thisReply.UpstreamConfigs[us.ID] = conf
455457
}
456458
} else {
457-
if reply.UpstreamIDConfigs == nil {
458-
reply.UpstreamIDConfigs = make(structs.UpstreamConfigs, 0, len(usConfigs))
459+
if thisReply.UpstreamIDConfigs == nil {
460+
thisReply.UpstreamIDConfigs = make(structs.UpstreamConfigs, 0, len(usConfigs))
459461
}
460462

461463
for us, conf := range usConfigs {
462-
reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs, structs.UpstreamConfig{Upstream: us, Config: conf})
464+
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs, structs.UpstreamConfig{Upstream: us, Config: conf})
463465
}
464466
}
465467

468+
*reply = thisReply
466469
return nil
467470
})
468471
}

Diff for: agent/consul/config_endpoint_test.go

+212-2
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import (
55
"testing"
66
"time"
77

8+
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
9+
"github.com/stretchr/testify/require"
10+
811
"github.com/hashicorp/consul/acl"
912
"github.com/hashicorp/consul/agent/structs"
1013
"github.com/hashicorp/consul/sdk/testutil/retry"
1114
"github.com/hashicorp/consul/testrpc"
12-
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
13-
"github.com/stretchr/testify/require"
1415
)
1516

1617
func TestConfigEntry_Apply(t *testing.T) {
@@ -858,6 +859,9 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
858859
// of the blocking query does NOT bleed over into the next run. Concretely
859860
// in this test the data present in the initial proxy-defaults should not
860861
// be present when we are woken up due to proxy-defaults being deleted.
862+
//
863+
// This test does not pertain to upstreams, see:
864+
// TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking
861865

862866
state := s1.fsm.State()
863867
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
@@ -1009,6 +1013,205 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
10091013
}
10101014
}
10111015

1016+
func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) {
1017+
if testing.Short() {
1018+
t.Skip("too slow for testing.Short")
1019+
}
1020+
1021+
t.Parallel()
1022+
1023+
dir1, s1 := testServer(t)
1024+
defer os.RemoveAll(dir1)
1025+
defer s1.Shutdown()
1026+
codec := rpcClient(t, s1)
1027+
defer codec.Close()
1028+
1029+
// The main thing this should test is that information from one iteration
1030+
// of the blocking query does NOT bleed over into the next run. Concretely
1031+
// in this test the data present in the initial proxy-defaults should not
1032+
// be present when we are woken up due to proxy-defaults being deleted.
1033+
//
1034+
// This test is about fields in upstreams, see:
1035+
// TestConfigEntry_ResolveServiceConfig_Blocking
1036+
1037+
state := s1.fsm.State()
1038+
require.NoError(t, state.EnsureConfigEntry(1, &structs.ServiceConfigEntry{
1039+
Kind: structs.ServiceDefaults,
1040+
Name: "foo",
1041+
Protocol: "http",
1042+
}, nil))
1043+
require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
1044+
Kind: structs.ServiceDefaults,
1045+
Name: "bar",
1046+
Protocol: "http",
1047+
}, nil))
1048+
1049+
var index uint64
1050+
1051+
runStep(t, "foo and bar should be both http", func(t *testing.T) {
1052+
// Verify that we get the results of service-defaults for 'foo' and 'bar'.
1053+
var out structs.ServiceConfigResponse
1054+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
1055+
&structs.ServiceConfigRequest{
1056+
Name: "foo",
1057+
Datacenter: "dc1",
1058+
UpstreamIDs: []structs.ServiceID{
1059+
structs.NewServiceID("bar", nil),
1060+
structs.NewServiceID("other", nil),
1061+
},
1062+
},
1063+
&out,
1064+
))
1065+
1066+
expected := structs.ServiceConfigResponse{
1067+
ProxyConfig: map[string]interface{}{
1068+
"protocol": "http",
1069+
},
1070+
UpstreamIDConfigs: []structs.UpstreamConfig{
1071+
{
1072+
Upstream: structs.NewServiceID("bar", nil),
1073+
Config: map[string]interface{}{
1074+
"protocol": "http",
1075+
},
1076+
},
1077+
},
1078+
QueryMeta: out.QueryMeta, // don't care
1079+
}
1080+
1081+
require.Equal(t, expected, out)
1082+
index = out.Index
1083+
})
1084+
1085+
runStep(t, "blocking query for foo wakes on bar entry delete", func(t *testing.T) {
1086+
// Now setup a blocking query for 'foo' while we erase the
1087+
// service-defaults for bar.
1088+
1089+
// Async cause a change
1090+
start := time.Now()
1091+
go func() {
1092+
time.Sleep(100 * time.Millisecond)
1093+
err := state.DeleteConfigEntry(index+1,
1094+
structs.ServiceDefaults,
1095+
"bar",
1096+
nil,
1097+
)
1098+
if err != nil {
1099+
t.Errorf("delete config entry failed: %v", err)
1100+
}
1101+
}()
1102+
1103+
// Re-run the query
1104+
var out structs.ServiceConfigResponse
1105+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
1106+
&structs.ServiceConfigRequest{
1107+
Name: "foo",
1108+
Datacenter: "dc1",
1109+
UpstreamIDs: []structs.ServiceID{
1110+
structs.NewServiceID("bar", nil),
1111+
structs.NewServiceID("other", nil),
1112+
},
1113+
QueryOptions: structs.QueryOptions{
1114+
MinQueryIndex: index,
1115+
MaxQueryTime: time.Second,
1116+
},
1117+
},
1118+
&out,
1119+
))
1120+
1121+
// Should block at least 100ms
1122+
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
1123+
1124+
// Check the indexes
1125+
require.Equal(t, out.Index, index+1)
1126+
1127+
expected := structs.ServiceConfigResponse{
1128+
ProxyConfig: map[string]interface{}{
1129+
"protocol": "http",
1130+
},
1131+
QueryMeta: out.QueryMeta, // don't care
1132+
}
1133+
1134+
require.Equal(t, expected, out)
1135+
index = out.Index
1136+
})
1137+
1138+
runStep(t, "foo should be http and bar should be unset", func(t *testing.T) {
1139+
// Verify that we get the results of service-defaults for just 'foo'.
1140+
var out structs.ServiceConfigResponse
1141+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
1142+
&structs.ServiceConfigRequest{
1143+
Name: "foo",
1144+
Datacenter: "dc1",
1145+
UpstreamIDs: []structs.ServiceID{
1146+
structs.NewServiceID("bar", nil),
1147+
structs.NewServiceID("other", nil),
1148+
},
1149+
},
1150+
&out,
1151+
))
1152+
1153+
expected := structs.ServiceConfigResponse{
1154+
ProxyConfig: map[string]interface{}{
1155+
"protocol": "http",
1156+
},
1157+
QueryMeta: out.QueryMeta, // don't care
1158+
}
1159+
1160+
require.Equal(t, expected, out)
1161+
index = out.Index
1162+
})
1163+
1164+
runStep(t, "blocking query for foo wakes on foo entry delete", func(t *testing.T) {
1165+
// Now setup a blocking query for 'foo' while we erase the
1166+
// service-defaults for foo.
1167+
1168+
// Async cause a change
1169+
start := time.Now()
1170+
go func() {
1171+
time.Sleep(100 * time.Millisecond)
1172+
err := state.DeleteConfigEntry(index+1,
1173+
structs.ServiceDefaults,
1174+
"foo",
1175+
nil,
1176+
)
1177+
if err != nil {
1178+
t.Errorf("delete config entry failed: %v", err)
1179+
}
1180+
}()
1181+
1182+
// Re-run the query
1183+
var out structs.ServiceConfigResponse
1184+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
1185+
&structs.ServiceConfigRequest{
1186+
Name: "foo",
1187+
Datacenter: "dc1",
1188+
UpstreamIDs: []structs.ServiceID{
1189+
structs.NewServiceID("bar", nil),
1190+
structs.NewServiceID("other", nil),
1191+
},
1192+
QueryOptions: structs.QueryOptions{
1193+
MinQueryIndex: index,
1194+
MaxQueryTime: time.Second,
1195+
},
1196+
},
1197+
&out,
1198+
))
1199+
1200+
// Should block at least 100ms
1201+
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
1202+
1203+
// Check the indexes
1204+
require.Equal(t, out.Index, index+1)
1205+
1206+
expected := structs.ServiceConfigResponse{
1207+
QueryMeta: out.QueryMeta, // don't care
1208+
}
1209+
1210+
require.Equal(t, expected, out)
1211+
index = out.Index
1212+
})
1213+
}
1214+
10121215
func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testing.T) {
10131216
t.Parallel()
10141217

@@ -1266,3 +1469,10 @@ func TestConfigEntry_ProxyDefaultsExposeConfig(t *testing.T) {
12661469
require.True(t, ok)
12671470
require.Equal(t, expose, proxyConf.Expose)
12681471
}
1472+
1473+
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
1474+
t.Helper()
1475+
if !t.Run(name, fn) {
1476+
t.FailNow()
1477+
}
1478+
}

Diff for: agent/structs/config_entry.go

-6
Original file line numberDiff line numberDiff line change
@@ -619,12 +619,6 @@ type ServiceConfigResponse struct {
619619
QueryMeta
620620
}
621621

622-
func (r *ServiceConfigResponse) Reset() {
623-
r.ProxyConfig = nil
624-
r.UpstreamConfigs = nil
625-
r.MeshGateway = MeshGatewayConfig{}
626-
}
627-
628622
// MarshalBinary writes ServiceConfigResponse as msgpack encoded. It's only here
629623
// because we need custom decoding of the raw interface{} values.
630624
func (r *ServiceConfigResponse) MarshalBinary() (data []byte, err error) {

0 commit comments

Comments
 (0)