Skip to content

Commit 3975cb8

Browse files
authored
agent: blocking central config RPCs iterations should not interfere with each other (#6316)
1 parent 6d99524 commit 3975cb8

File tree

3 files changed

+173
-0
lines changed

3 files changed

+173
-0
lines changed

agent/consul/config_endpoint.go

+2
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,8 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
231231
&args.QueryOptions,
232232
&reply.QueryMeta,
233233
func(ws memdb.WatchSet, state *state.Store) error {
234+
reply.Reset()
235+
234236
reply.MeshGateway.Mode = structs.MeshGatewayModeDefault
235237
// Pass the WatchSet to both the service and proxy config lookups. If either is updated
236238
// during the blocking query, this function will be rerun and these state store lookups

agent/consul/config_endpoint_test.go

+165
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package consul
33
import (
44
"os"
55
"testing"
6+
"time"
67

78
"github.com/hashicorp/consul/acl"
89
"github.com/hashicorp/consul/agent/structs"
@@ -733,6 +734,170 @@ func TestConfigEntry_ResolveServiceConfig(t *testing.T) {
733734
require.Equal(map[string]interface{}{"foo": 1}, proxyConf.Config)
734735
}
735736

737+
func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
738+
t.Parallel()
739+
740+
require := require.New(t)
741+
742+
dir1, s1 := testServer(t)
743+
defer os.RemoveAll(dir1)
744+
defer s1.Shutdown()
745+
codec := rpcClient(t, s1)
746+
defer codec.Close()
747+
748+
// The main thing this should test is that information from one iteration
749+
// of the blocking query does NOT bleed over into the next run. Concretely
750+
// in this test the data present in the initial proxy-defaults should not
751+
// be present when we are woken up due to proxy-defaults being deleted.
752+
753+
state := s1.fsm.State()
754+
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
755+
Kind: structs.ProxyDefaults,
756+
Name: structs.ProxyConfigGlobal,
757+
Config: map[string]interface{}{
758+
"global": 1,
759+
},
760+
}))
761+
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
762+
Kind: structs.ServiceDefaults,
763+
Name: "foo",
764+
Protocol: "grpc",
765+
}))
766+
require.NoError(state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
767+
Kind: structs.ServiceDefaults,
768+
Name: "bar",
769+
Protocol: "http",
770+
}))
771+
772+
var index uint64
773+
774+
{ // Verify that we get the results of proxy-defaults and service-defaults for 'foo'.
775+
var out structs.ServiceConfigResponse
776+
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
777+
&structs.ServiceConfigRequest{
778+
Name: "foo",
779+
Datacenter: "dc1",
780+
},
781+
&out,
782+
))
783+
784+
expected := structs.ServiceConfigResponse{
785+
ProxyConfig: map[string]interface{}{
786+
"global": int64(1),
787+
"protocol": "grpc",
788+
},
789+
QueryMeta: out.QueryMeta,
790+
}
791+
require.Equal(expected, out)
792+
index = out.Index
793+
}
794+
795+
// Now setup a blocking query for 'foo' while we erase the service-defaults for foo.
796+
{
797+
// Async cause a change
798+
start := time.Now()
799+
go func() {
800+
time.Sleep(100 * time.Millisecond)
801+
require.NoError(state.DeleteConfigEntry(index+1,
802+
structs.ServiceDefaults,
803+
"foo",
804+
))
805+
}()
806+
807+
// Re-run the query
808+
var out structs.ServiceConfigResponse
809+
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
810+
&structs.ServiceConfigRequest{
811+
Name: "foo",
812+
Datacenter: "dc1",
813+
QueryOptions: structs.QueryOptions{
814+
MinQueryIndex: index,
815+
MaxQueryTime: time.Second,
816+
},
817+
},
818+
&out,
819+
))
820+
821+
// Should block at least 100ms
822+
require.True(time.Since(start) >= 100*time.Millisecond, "too fast")
823+
824+
// Check the indexes
825+
require.Equal(out.Index, index+1)
826+
827+
expected := structs.ServiceConfigResponse{
828+
ProxyConfig: map[string]interface{}{
829+
"global": int64(1),
830+
},
831+
QueryMeta: out.QueryMeta,
832+
}
833+
require.Equal(expected, out)
834+
835+
index = out.Index
836+
}
837+
838+
{ // Verify that we get the results of proxy-defaults and service-defaults for 'bar'.
839+
var out structs.ServiceConfigResponse
840+
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
841+
&structs.ServiceConfigRequest{
842+
Name: "bar",
843+
Datacenter: "dc1",
844+
},
845+
&out,
846+
))
847+
848+
expected := structs.ServiceConfigResponse{
849+
ProxyConfig: map[string]interface{}{
850+
"global": int64(1),
851+
"protocol": "http",
852+
},
853+
QueryMeta: out.QueryMeta,
854+
}
855+
require.Equal(expected, out)
856+
index = out.Index
857+
}
858+
859+
// Now setup a blocking query for 'bar' while we erase the global proxy-defaults.
860+
{
861+
// Async cause a change
862+
start := time.Now()
863+
go func() {
864+
time.Sleep(100 * time.Millisecond)
865+
require.NoError(state.DeleteConfigEntry(index+1,
866+
structs.ProxyDefaults,
867+
structs.ProxyConfigGlobal,
868+
))
869+
}()
870+
871+
// Re-run the query
872+
var out structs.ServiceConfigResponse
873+
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
874+
&structs.ServiceConfigRequest{
875+
Name: "bar",
876+
Datacenter: "dc1",
877+
QueryOptions: structs.QueryOptions{
878+
MinQueryIndex: index,
879+
MaxQueryTime: time.Second,
880+
},
881+
},
882+
&out,
883+
))
884+
885+
// Should block at least 100ms
886+
require.True(time.Since(start) >= 100*time.Millisecond, "too fast")
887+
888+
// Check the indexes
889+
require.Equal(out.Index, index+1)
890+
891+
expected := structs.ServiceConfigResponse{
892+
ProxyConfig: map[string]interface{}{
893+
"protocol": "http",
894+
},
895+
QueryMeta: out.QueryMeta,
896+
}
897+
require.Equal(expected, out)
898+
}
899+
}
900+
736901
func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testing.T) {
737902
t.Parallel()
738903

agent/structs/config_entry.go

+6
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,12 @@ type ServiceConfigResponse struct {
538538
QueryMeta
539539
}
540540

541+
func (r *ServiceConfigResponse) Reset() {
542+
r.ProxyConfig = nil
543+
r.UpstreamConfigs = nil
544+
r.MeshGateway = MeshGatewayConfig{}
545+
}
546+
541547
// MarshalBinary writes ServiceConfigResponse as msgpack encoded. It's only here
542548
// because we need custom decoding of the raw interface{} values.
543549
func (r *ServiceConfigResponse) MarshalBinary() (data []byte, err error) {

0 commit comments

Comments
 (0)