Skip to content

Commit 597448d

Browse files
authored
server: ensure that central service config flattening properly resets the state each time (#10239)
The prior solution to call reply.Reset() aged poorly since newer fields were added to the reply, but not added to Reset() leading serial blocking query loops on the server to blend replies. This could manifest as a service-defaults protocol change from default=>http not reverting back to default after the config entry reponsible was deleted.
1 parent 7e1d780 commit 597448d

File tree

4 files changed

+238
-27
lines changed

4 files changed

+238
-27
lines changed

Diff for: .changelog/10239.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
server: ensure that central service config flattening properly resets the state each time
3+
```

Diff for: agent/consul/config_endpoint.go

+26-21
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,9 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
323323
&args.QueryOptions,
324324
&reply.QueryMeta,
325325
func(ws memdb.WatchSet, state *state.Store) error {
326-
reply.Reset()
327-
reply.MeshGateway.Mode = structs.MeshGatewayModeDefault
326+
var thisReply structs.ServiceConfigResponse
327+
328+
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
328329
// TODO(freddy) Refactor this into smaller set of state store functions
329330
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
330331
// blocking query, this function will be rerun and these state store lookups will both be current.
@@ -349,11 +350,11 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
349350
if err != nil {
350351
return fmt.Errorf("failed to copy global proxy-defaults: %v", err)
351352
}
352-
reply.ProxyConfig = mapCopy.(map[string]interface{})
353-
reply.Mode = proxyConf.Mode
354-
reply.TransparentProxy = proxyConf.TransparentProxy
355-
reply.MeshGateway = proxyConf.MeshGateway
356-
reply.Expose = proxyConf.Expose
353+
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
354+
thisReply.Mode = proxyConf.Mode
355+
thisReply.TransparentProxy = proxyConf.TransparentProxy
356+
thisReply.MeshGateway = proxyConf.MeshGateway
357+
thisReply.Expose = proxyConf.Expose
357358

358359
// Extract the global protocol from proxyConf for upstream configs.
359360
rawProtocol := proxyConf.Config["protocol"]
@@ -369,7 +370,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
369370
if err != nil {
370371
return err
371372
}
372-
reply.Index = index
373+
thisReply.Index = index
373374

374375
var serviceConf *structs.ServiceConfigEntry
375376
if serviceEntry != nil {
@@ -378,25 +379,25 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
378379
return fmt.Errorf("invalid service config type %T", serviceEntry)
379380
}
380381
if serviceConf.Expose.Checks {
381-
reply.Expose.Checks = true
382+
thisReply.Expose.Checks = true
382383
}
383384
if len(serviceConf.Expose.Paths) >= 1 {
384-
reply.Expose.Paths = serviceConf.Expose.Paths
385+
thisReply.Expose.Paths = serviceConf.Expose.Paths
385386
}
386387
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
387-
reply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
388+
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
388389
}
389390
if serviceConf.Protocol != "" {
390-
if reply.ProxyConfig == nil {
391-
reply.ProxyConfig = make(map[string]interface{})
391+
if thisReply.ProxyConfig == nil {
392+
thisReply.ProxyConfig = make(map[string]interface{})
392393
}
393-
reply.ProxyConfig["protocol"] = serviceConf.Protocol
394+
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
394395
}
395396
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
396-
reply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
397+
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
397398
}
398399
if serviceConf.Mode != structs.ProxyModeDefault {
399-
reply.Mode = serviceConf.Mode
400+
thisReply.Mode = serviceConf.Mode
400401
}
401402
}
402403

@@ -414,13 +415,14 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
414415

415416
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode
416417
// will never be transparent because the service config request does not use the resolved value.
417-
tproxy = args.Mode == structs.ProxyModeTransparent || reply.Mode == structs.ProxyModeTransparent
418+
tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent
418419
)
419420

420421
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
421422
// If no upstreams were passed, then we should only returned the resolved config if the proxy in transparent mode.
422423
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
423424
if noUpstreamArgs && !tproxy {
425+
*reply = thisReply
424426
return nil
425427
}
426428

@@ -534,25 +536,28 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
534536

535537
// don't allocate the slices just to not fill them
536538
if len(usConfigs) == 0 {
539+
*reply = thisReply
537540
return nil
538541
}
539542

540543
if legacyUpstreams {
541544
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
542-
reply.UpstreamConfigs = make(map[string]map[string]interface{})
545+
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
543546

544547
for us, conf := range usConfigs {
545-
reply.UpstreamConfigs[us.ID] = conf
548+
thisReply.UpstreamConfigs[us.ID] = conf
546549
}
547550

548551
} else {
549-
reply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
552+
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
550553

551554
for us, conf := range usConfigs {
552-
reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs,
555+
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
553556
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
554557
}
555558
}
559+
560+
*reply = thisReply
556561
return nil
557562
})
558563
}

Diff for: agent/consul/config_endpoint_test.go

+209
Original file line numberDiff line numberDiff line change
@@ -1420,6 +1420,9 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
14201420
// of the blocking query does NOT bleed over into the next run. Concretely
14211421
// in this test the data present in the initial proxy-defaults should not
14221422
// be present when we are woken up due to proxy-defaults being deleted.
1423+
//
1424+
// This test does not pertain to upstreams, see:
1425+
// TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking
14231426

14241427
state := s1.fsm.State()
14251428
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
@@ -1571,6 +1574,205 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
15711574
}
15721575
}
15731576

1577+
func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) {
1578+
if testing.Short() {
1579+
t.Skip("too slow for testing.Short")
1580+
}
1581+
1582+
t.Parallel()
1583+
1584+
dir1, s1 := testServer(t)
1585+
defer os.RemoveAll(dir1)
1586+
defer s1.Shutdown()
1587+
codec := rpcClient(t, s1)
1588+
defer codec.Close()
1589+
1590+
// The main thing this should test is that information from one iteration
1591+
// of the blocking query does NOT bleed over into the next run. Concretely
1592+
// in this test the data present in the initial proxy-defaults should not
1593+
// be present when we are woken up due to proxy-defaults being deleted.
1594+
//
1595+
// This test is about fields in upstreams, see:
1596+
// TestConfigEntry_ResolveServiceConfig_Blocking
1597+
1598+
state := s1.fsm.State()
1599+
require.NoError(t, state.EnsureConfigEntry(1, &structs.ServiceConfigEntry{
1600+
Kind: structs.ServiceDefaults,
1601+
Name: "foo",
1602+
Protocol: "http",
1603+
}))
1604+
require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
1605+
Kind: structs.ServiceDefaults,
1606+
Name: "bar",
1607+
Protocol: "http",
1608+
}))
1609+
1610+
var index uint64
1611+
1612+
runStep(t, "foo and bar should be both http", func(t *testing.T) {
1613+
// Verify that we get the results of service-defaults for 'foo' and 'bar'.
1614+
var out structs.ServiceConfigResponse
1615+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
1616+
&structs.ServiceConfigRequest{
1617+
Name: "foo",
1618+
Datacenter: "dc1",
1619+
UpstreamIDs: []structs.ServiceID{
1620+
structs.NewServiceID("bar", nil),
1621+
structs.NewServiceID("other", nil),
1622+
},
1623+
},
1624+
&out,
1625+
))
1626+
1627+
expected := structs.ServiceConfigResponse{
1628+
ProxyConfig: map[string]interface{}{
1629+
"protocol": "http",
1630+
},
1631+
UpstreamIDConfigs: []structs.OpaqueUpstreamConfig{
1632+
{
1633+
Upstream: structs.NewServiceID("bar", nil),
1634+
Config: map[string]interface{}{
1635+
"protocol": "http",
1636+
},
1637+
},
1638+
},
1639+
QueryMeta: out.QueryMeta, // don't care
1640+
}
1641+
1642+
require.Equal(t, expected, out)
1643+
index = out.Index
1644+
})
1645+
1646+
runStep(t, "blocking query for foo wakes on bar entry delete", func(t *testing.T) {
1647+
// Now setup a blocking query for 'foo' while we erase the
1648+
// service-defaults for bar.
1649+
1650+
// Async cause a change
1651+
start := time.Now()
1652+
go func() {
1653+
time.Sleep(100 * time.Millisecond)
1654+
err := state.DeleteConfigEntry(index+1,
1655+
structs.ServiceDefaults,
1656+
"bar",
1657+
nil,
1658+
)
1659+
if err != nil {
1660+
t.Errorf("delete config entry failed: %v", err)
1661+
}
1662+
}()
1663+
1664+
// Re-run the query
1665+
var out structs.ServiceConfigResponse
1666+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
1667+
&structs.ServiceConfigRequest{
1668+
Name: "foo",
1669+
Datacenter: "dc1",
1670+
UpstreamIDs: []structs.ServiceID{
1671+
structs.NewServiceID("bar", nil),
1672+
structs.NewServiceID("other", nil),
1673+
},
1674+
QueryOptions: structs.QueryOptions{
1675+
MinQueryIndex: index,
1676+
MaxQueryTime: time.Second,
1677+
},
1678+
},
1679+
&out,
1680+
))
1681+
1682+
// Should block at least 100ms
1683+
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
1684+
1685+
// Check the indexes
1686+
require.Equal(t, out.Index, index+1)
1687+
1688+
expected := structs.ServiceConfigResponse{
1689+
ProxyConfig: map[string]interface{}{
1690+
"protocol": "http",
1691+
},
1692+
QueryMeta: out.QueryMeta, // don't care
1693+
}
1694+
1695+
require.Equal(t, expected, out)
1696+
index = out.Index
1697+
})
1698+
1699+
runStep(t, "foo should be http and bar should be unset", func(t *testing.T) {
1700+
// Verify that we get the results of service-defaults for just 'foo'.
1701+
var out structs.ServiceConfigResponse
1702+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
1703+
&structs.ServiceConfigRequest{
1704+
Name: "foo",
1705+
Datacenter: "dc1",
1706+
UpstreamIDs: []structs.ServiceID{
1707+
structs.NewServiceID("bar", nil),
1708+
structs.NewServiceID("other", nil),
1709+
},
1710+
},
1711+
&out,
1712+
))
1713+
1714+
expected := structs.ServiceConfigResponse{
1715+
ProxyConfig: map[string]interface{}{
1716+
"protocol": "http",
1717+
},
1718+
QueryMeta: out.QueryMeta, // don't care
1719+
}
1720+
1721+
require.Equal(t, expected, out)
1722+
index = out.Index
1723+
})
1724+
1725+
runStep(t, "blocking query for foo wakes on foo entry delete", func(t *testing.T) {
1726+
// Now setup a blocking query for 'foo' while we erase the
1727+
// service-defaults for foo.
1728+
1729+
// Async cause a change
1730+
start := time.Now()
1731+
go func() {
1732+
time.Sleep(100 * time.Millisecond)
1733+
err := state.DeleteConfigEntry(index+1,
1734+
structs.ServiceDefaults,
1735+
"foo",
1736+
nil,
1737+
)
1738+
if err != nil {
1739+
t.Errorf("delete config entry failed: %v", err)
1740+
}
1741+
}()
1742+
1743+
// Re-run the query
1744+
var out structs.ServiceConfigResponse
1745+
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
1746+
&structs.ServiceConfigRequest{
1747+
Name: "foo",
1748+
Datacenter: "dc1",
1749+
UpstreamIDs: []structs.ServiceID{
1750+
structs.NewServiceID("bar", nil),
1751+
structs.NewServiceID("other", nil),
1752+
},
1753+
QueryOptions: structs.QueryOptions{
1754+
MinQueryIndex: index,
1755+
MaxQueryTime: time.Second,
1756+
},
1757+
},
1758+
&out,
1759+
))
1760+
1761+
// Should block at least 100ms
1762+
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
1763+
1764+
// Check the indexes
1765+
require.Equal(t, out.Index, index+1)
1766+
1767+
expected := structs.ServiceConfigResponse{
1768+
QueryMeta: out.QueryMeta, // don't care
1769+
}
1770+
1771+
require.Equal(t, expected, out)
1772+
index = out.Index
1773+
})
1774+
}
1775+
15741776
func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testing.T) {
15751777
if testing.Short() {
15761778
t.Skip("too slow for testing.Short")
@@ -1848,3 +2050,10 @@ func TestConfigEntry_ProxyDefaultsExposeConfig(t *testing.T) {
18482050
require.True(t, ok)
18492051
require.Equal(t, expose, proxyConf.Expose)
18502052
}
2053+
2054+
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
2055+
t.Helper()
2056+
if !t.Run(name, fn) {
2057+
t.FailNow()
2058+
}
2059+
}

Diff for: agent/structs/config_entry.go

-6
Original file line numberDiff line numberDiff line change
@@ -968,12 +968,6 @@ type ServiceConfigResponse struct {
968968
QueryMeta
969969
}
970970

971-
func (r *ServiceConfigResponse) Reset() {
972-
r.ProxyConfig = nil
973-
r.UpstreamConfigs = nil
974-
r.MeshGateway = MeshGatewayConfig{}
975-
}
976-
977971
// MarshalBinary writes ServiceConfigResponse as msgpack encoded. It's only here
978972
// because we need custom decoding of the raw interface{} values.
979973
func (r *ServiceConfigResponse) MarshalBinary() (data []byte, err error) {

0 commit comments

Comments
 (0)