Skip to content

Commit

Permalink
[RCM-598] upgrade(remote-config): Use layered gRPC client between tra…
Browse files Browse the repository at this point in the history
…ce-agent & core-agent (#15100)

* upgrade(remote-config): Bump message size limit to 500MB

* fix(size): Size down to 110MB max

* refactor(auth): Refactor RC auth

* fix(interface): Remove opts
  • Loading branch information
BaptisteFoy authored Jan 19, 2023
1 parent c0dadbd commit c22e601
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 49 deletions.
11 changes: 3 additions & 8 deletions cmd/trace-agent/remote_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import (
"sync"
"time"

"google.golang.org/grpc/metadata"

"github.com/DataDog/datadog-agent/pkg/config/remote"
"github.com/DataDog/datadog-agent/pkg/proto/pbgo"
"github.com/DataDog/datadog-agent/pkg/trace/api"
"github.com/DataDog/datadog-agent/pkg/trace/config"
Expand All @@ -41,7 +40,7 @@ func putBuffer(buffer *bytes.Buffer) {
bufferPool.Put(buffer)
}

func remoteConfigHandler(r *api.HTTPReceiver, client pbgo.AgentSecureClient, token string, cfg *config.AgentConfig) http.Handler {
func remoteConfigHandler(r *api.HTTPReceiver, client remote.ConfigUpdater, cfg *config.AgentConfig) http.Handler {
cidProvider := api.NewIDProvider(cfg.ContainerProcRoot)
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer timing.Since("datadog.trace_agent.receiver.config_process_ms", time.Now())
Expand All @@ -66,10 +65,6 @@ func remoteConfigHandler(r *api.HTTPReceiver, client pbgo.AgentSecureClient, tok
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
md := metadata.MD{
"authorization": []string{fmt.Sprintf("Bearer %s", token)},
}
ctx := metadata.NewOutgoingContext(req.Context(), md)
if configsRequest.GetClient().GetClientTracer() != nil {
normalize(&configsRequest)
if configsRequest.Client.ClientTracer.Tags == nil {
Expand All @@ -79,7 +74,7 @@ func remoteConfigHandler(r *api.HTTPReceiver, client pbgo.AgentSecureClient, tok
configsRequest.Client.ClientTracer.Tags = append(configsRequest.Client.ClientTracer.Tags, tag)
}
}
cfg, err := client.ClientGetConfigs(ctx, &configsRequest)
cfg, err := client.ClientGetConfigs(req.Context(), &configsRequest)
if err != nil {
statusCode = http.StatusInternalServerError
http.Error(w, err.Error(), statusCode)
Expand Down
35 changes: 7 additions & 28 deletions cmd/trace-agent/remote_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
)

func TestConfigEndpoint(t *testing.T) {
Expand Down Expand Up @@ -55,11 +54,11 @@ func TestConfigEndpoint(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
assert := assert.New(t)
grpc := mockAgentSecureServer{}
grpc := agentGRPCConfigFetcher{}
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil)
mux := http.NewServeMux()
cfg := &config.AgentConfig{}
mux.Handle("/v0.7/config", remoteConfigHandler(rcv, &grpc, "", cfg))
mux.Handle("/v0.7/config", remoteConfigHandler(rcv, &grpc, cfg))
server := httptest.NewServer(mux)
if tc.valid {
var request pbgo.ClientGetConfigsRequest
Expand Down Expand Up @@ -130,7 +129,7 @@ func TestUpstreamRequest(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
assert := assert.New(t)
grpc := mockAgentSecureServer{}
grpc := agentGRPCConfigFetcher{}
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil)

var request pbgo.ClientGetConfigsRequest
Expand All @@ -139,7 +138,7 @@ func TestUpstreamRequest(t *testing.T) {
grpc.On("ClientGetConfigs", mock.Anything, &request, mock.Anything).Return(&pbgo.ClientGetConfigsResponse{Targets: []byte("test")}, nil)

mux := http.NewServeMux()
mux.Handle("/v0.7/config", remoteConfigHandler(rcv, &grpc, "", tc.cfg))
mux.Handle("/v0.7/config", remoteConfigHandler(rcv, &grpc, tc.cfg))
server := httptest.NewServer(mux)

req, _ := http.NewRequest("POST", server.URL+"/v0.7/config", strings.NewReader(tc.tracerReq))
Expand All @@ -156,32 +155,12 @@ func TestUpstreamRequest(t *testing.T) {
}
}

type mockAgentSecureServer struct {
type agentGRPCConfigFetcher struct {
pbgo.AgentSecureClient
mock.Mock
}

func (a *mockAgentSecureServer) TaggerStreamEntities(ctx context.Context, in *pbgo.StreamTagsRequest, opts ...grpc.CallOption) (pbgo.AgentSecure_TaggerStreamEntitiesClient, error) {
args := a.Called(ctx, in, opts)
return args.Get(0).(pbgo.AgentSecure_TaggerStreamEntitiesClient), args.Error(1)
}

func (a *mockAgentSecureServer) TaggerFetchEntity(ctx context.Context, in *pbgo.FetchEntityRequest, opts ...grpc.CallOption) (*pbgo.FetchEntityResponse, error) {
args := a.Called(ctx, in, opts)
return args.Get(0).(*pbgo.FetchEntityResponse), args.Error(1)
}

func (a *mockAgentSecureServer) DogstatsdCaptureTrigger(ctx context.Context, in *pbgo.CaptureTriggerRequest, opts ...grpc.CallOption) (*pbgo.CaptureTriggerResponse, error) {
args := a.Called(ctx, in, opts)
return args.Get(0).(*pbgo.CaptureTriggerResponse), args.Error(1)
}

func (a *mockAgentSecureServer) DogstatsdSetTaggerState(ctx context.Context, in *pbgo.TaggerState, opts ...grpc.CallOption) (*pbgo.TaggerStateResponse, error) {
args := a.Called(ctx, in, opts)
return args.Get(0).(*pbgo.TaggerStateResponse), args.Error(1)
}

func (a *mockAgentSecureServer) ClientGetConfigs(ctx context.Context, in *pbgo.ClientGetConfigsRequest, opts ...grpc.CallOption) (*pbgo.ClientGetConfigsResponse, error) {
args := a.Called(ctx, in, opts)
func (a *agentGRPCConfigFetcher) ClientGetConfigs(ctx context.Context, in *pbgo.ClientGetConfigsRequest) (*pbgo.ClientGetConfigsResponse, error) {
args := a.Called(ctx, in)
return args.Get(0).(*pbgo.ClientGetConfigsResponse), args.Error(1)
}
12 changes: 4 additions & 8 deletions cmd/trace-agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
cmdconfig "github.com/DataDog/datadog-agent/cmd/trace-agent/config"
"github.com/DataDog/datadog-agent/cmd/trace-agent/internal/flags"
"github.com/DataDog/datadog-agent/cmd/trace-agent/internal/osutil"
"github.com/DataDog/datadog-agent/pkg/api/security"
coreconfig "github.com/DataDog/datadog-agent/pkg/config"
rc "github.com/DataDog/datadog-agent/pkg/config/remote"
"github.com/DataDog/datadog-agent/pkg/pidfile"
"github.com/DataDog/datadog-agent/pkg/tagger"
"github.com/DataDog/datadog-agent/pkg/tagger/local"
Expand All @@ -34,7 +34,6 @@ import (
"github.com/DataDog/datadog-agent/pkg/trace/metrics/timing"
"github.com/DataDog/datadog-agent/pkg/trace/watchdog"
"github.com/DataDog/datadog-agent/pkg/util"
"github.com/DataDog/datadog-agent/pkg/util/grpc"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/util/profiling"
"github.com/DataDog/datadog-agent/pkg/version"
Expand Down Expand Up @@ -188,17 +187,14 @@ func Run(ctx context.Context) {
}()

if coreconfig.Datadog.GetBool("remote_configuration.enabled") {
client, err := grpc.GetDDAgentSecureClient(context.Background())
// Auth tokens are handled by the rcClient
rcClient, err := rc.NewAgentGRPCConfigFetcher()
if err != nil {
osutil.Exitf("could not instantiate the tracer remote config client: %v", err)
}
token, err := security.FetchAuthToken()
if err != nil {
osutil.Exitf("could obtain the auth token for the tracer remote config client: %v", err)
}
api.AttachEndpoint(api.Endpoint{
Pattern: "/v0.7/config",
Handler: func(r *api.HTTPReceiver) http.Handler { return remoteConfigHandler(r, client, token, cfg) },
Handler: func(r *api.HTTPReceiver) http.Handler { return remoteConfigHandler(r, rcClient, cfg) },
})
}

Expand Down
16 changes: 11 additions & 5 deletions pkg/config/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/DataDog/datadog-agent/pkg/api/security"
Expand All @@ -22,7 +23,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/proto/pbgo"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
"github.com/DataDog/datadog-agent/pkg/util/backoff"
"github.com/DataDog/datadog-agent/pkg/util/grpc"
ddgrpc "github.com/DataDog/datadog-agent/pkg/util/grpc"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand All @@ -31,6 +32,8 @@ const (
maximalMaxBackoffTime = 90 * time.Second
minBackoffFactor = 2.0
recoveryInterval = 2

maxMessageSize = 1024 * 1024 * 110 // 110MB, current backend limit
)

// ConfigUpdater defines the interface that an agent client uses to get config updates
Expand Down Expand Up @@ -75,8 +78,11 @@ type agentGRPCConfigFetcher struct {
client pbgo.AgentSecureClient
}

func newAgentGRPCConfigFetcher() (*agentGRPCConfigFetcher, error) {
c, err := grpc.GetDDAgentSecureClient(context.Background())
// NewAgentGRPCConfigFetcher returns a gRPC config fetcher using the secure agent client
func NewAgentGRPCConfigFetcher() (*agentGRPCConfigFetcher, error) {
c, err := ddgrpc.GetDDAgentSecureClient(context.Background(), grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxMessageSize),
))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -111,7 +117,7 @@ func NewClient(agentName string, updater ConfigUpdater, agentVersion string, pro

// NewGRPCClient creates a new client that retrieves updates over the datadog-agent's secure GRPC client
func NewGRPCClient(agentName string, agentVersion string, products []data.Product, pollInterval time.Duration) (*Client, error) {
grpcClient, err := newAgentGRPCConfigFetcher()
grpcClient, err := NewAgentGRPCConfigFetcher()
if err != nil {
return nil, err
}
Expand All @@ -121,7 +127,7 @@ func NewGRPCClient(agentName string, agentVersion string, products []data.Produc

// NewUnverifiedClient creates a new client that does not perform any TUF verification
func NewUnverifiedClient(agentName string, agentVersion string, products []data.Product, pollInterval time.Duration) (*Client, error) {
grpcClient, err := newAgentGRPCConfigFetcher()
grpcClient, err := NewAgentGRPCConfigFetcher()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit c22e601

Please sign in to comment.