Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RCM-598] upgrade(remote-config): Use layered gRPC client between trace-agent & core-agent #15100

Merged
merged 4 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions cmd/trace-agent/remote_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import (
"sync"
"time"

"google.golang.org/grpc/metadata"

"github.com/DataDog/datadog-agent/pkg/config/remote"
"github.com/DataDog/datadog-agent/pkg/proto/pbgo"
"github.com/DataDog/datadog-agent/pkg/trace/api"
"github.com/DataDog/datadog-agent/pkg/trace/config"
Expand All @@ -41,7 +40,7 @@ func putBuffer(buffer *bytes.Buffer) {
bufferPool.Put(buffer)
}

func remoteConfigHandler(r *api.HTTPReceiver, client pbgo.AgentSecureClient, token string, cfg *config.AgentConfig) http.Handler {
func remoteConfigHandler(r *api.HTTPReceiver, client remote.ConfigUpdater, cfg *config.AgentConfig) http.Handler {
cidProvider := api.NewIDProvider(cfg.ContainerProcRoot)
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer timing.Since("datadog.trace_agent.receiver.config_process_ms", time.Now())
Expand All @@ -66,10 +65,6 @@ func remoteConfigHandler(r *api.HTTPReceiver, client pbgo.AgentSecureClient, tok
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
md := metadata.MD{
"authorization": []string{fmt.Sprintf("Bearer %s", token)},
}
ctx := metadata.NewOutgoingContext(req.Context(), md)
if configsRequest.GetClient().GetClientTracer() != nil {
normalize(&configsRequest)
if configsRequest.Client.ClientTracer.Tags == nil {
Expand All @@ -79,7 +74,7 @@ func remoteConfigHandler(r *api.HTTPReceiver, client pbgo.AgentSecureClient, tok
configsRequest.Client.ClientTracer.Tags = append(configsRequest.Client.ClientTracer.Tags, tag)
}
}
cfg, err := client.ClientGetConfigs(ctx, &configsRequest)
cfg, err := client.ClientGetConfigs(req.Context(), &configsRequest)
if err != nil {
statusCode = http.StatusInternalServerError
http.Error(w, err.Error(), statusCode)
Expand Down
4 changes: 2 additions & 2 deletions cmd/trace-agent/remote_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestConfigEndpoint(t *testing.T) {
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil)
mux := http.NewServeMux()
cfg := &config.AgentConfig{}
mux.Handle("/v0.7/config", remoteConfigHandler(rcv, &grpc, "", cfg))
mux.Handle("/v0.7/config", remoteConfigHandler(rcv, &grpc, cfg))
server := httptest.NewServer(mux)
if tc.valid {
var request pbgo.ClientGetConfigsRequest
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestUpstreamRequest(t *testing.T) {
grpc.On("ClientGetConfigs", mock.Anything, &request, mock.Anything).Return(&pbgo.ClientGetConfigsResponse{Targets: []byte("test")}, nil)

mux := http.NewServeMux()
mux.Handle("/v0.7/config", remoteConfigHandler(rcv, &grpc, "", tc.cfg))
mux.Handle("/v0.7/config", remoteConfigHandler(rcv, &grpc, tc.cfg))
server := httptest.NewServer(mux)

req, _ := http.NewRequest("POST", server.URL+"/v0.7/config", strings.NewReader(tc.tracerReq))
Expand Down
12 changes: 4 additions & 8 deletions cmd/trace-agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
cmdconfig "github.com/DataDog/datadog-agent/cmd/trace-agent/config"
"github.com/DataDog/datadog-agent/cmd/trace-agent/internal/flags"
"github.com/DataDog/datadog-agent/cmd/trace-agent/internal/osutil"
"github.com/DataDog/datadog-agent/pkg/api/security"
coreconfig "github.com/DataDog/datadog-agent/pkg/config"
rc "github.com/DataDog/datadog-agent/pkg/config/remote"
"github.com/DataDog/datadog-agent/pkg/pidfile"
"github.com/DataDog/datadog-agent/pkg/tagger"
"github.com/DataDog/datadog-agent/pkg/tagger/local"
Expand All @@ -34,7 +34,6 @@ import (
"github.com/DataDog/datadog-agent/pkg/trace/metrics/timing"
"github.com/DataDog/datadog-agent/pkg/trace/watchdog"
"github.com/DataDog/datadog-agent/pkg/util"
"github.com/DataDog/datadog-agent/pkg/util/grpc"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/util/profiling"
"github.com/DataDog/datadog-agent/pkg/version"
Expand Down Expand Up @@ -188,17 +187,14 @@ func Run(ctx context.Context) {
}()

if coreconfig.Datadog.GetBool("remote_configuration.enabled") {
client, err := grpc.GetDDAgentSecureClient(context.Background())
// Auth tokens are handled by the rcClient
rcClient, err := rc.NewAgentGRPCConfigFetcher()
if err != nil {
osutil.Exitf("could not instantiate the tracer remote config client: %v", err)
}
token, err := security.FetchAuthToken()
if err != nil {
osutil.Exitf("could obtain the auth token for the tracer remote config client: %v", err)
}
api.AttachEndpoint(api.Endpoint{
Pattern: "/v0.7/config",
Handler: func(r *api.HTTPReceiver) http.Handler { return remoteConfigHandler(r, client, token, cfg) },
Handler: func(r *api.HTTPReceiver) http.Handler { return remoteConfigHandler(r, rcClient, cfg) },
})
}

Expand Down
22 changes: 14 additions & 8 deletions pkg/config/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/DataDog/datadog-agent/pkg/api/security"
Expand All @@ -22,7 +23,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/proto/pbgo"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
"github.com/DataDog/datadog-agent/pkg/util/backoff"
"github.com/DataDog/datadog-agent/pkg/util/grpc"
ddgrpc "github.com/DataDog/datadog-agent/pkg/util/grpc"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand All @@ -31,12 +32,14 @@ const (
maximalMaxBackoffTime = 90 * time.Second
minBackoffFactor = 2.0
recoveryInterval = 2

maxMessageSize = 1024 * 1024 * 110 // 110MB, current backend limit
)

// ConfigUpdater defines the interface that an agent client uses to get config updates
// from the core remote-config service
type ConfigUpdater interface {
ClientGetConfigs(context.Context, *pbgo.ClientGetConfigsRequest) (*pbgo.ClientGetConfigsResponse, error)
ClientGetConfigs(context.Context, *pbgo.ClientGetConfigsRequest, ...grpc.CallOption) (*pbgo.ClientGetConfigsResponse, error)
}

// Client is a remote-configuration client to obtain configurations from the local API
Expand Down Expand Up @@ -75,8 +78,11 @@ type agentGRPCConfigFetcher struct {
client pbgo.AgentSecureClient
}

func newAgentGRPCConfigFetcher() (*agentGRPCConfigFetcher, error) {
c, err := grpc.GetDDAgentSecureClient(context.Background())
// NewAgentGRPCConfigFetcher returns a gRPC config fetcher using the secure agent client
func NewAgentGRPCConfigFetcher() (*agentGRPCConfigFetcher, error) {
c, err := ddgrpc.GetDDAgentSecureClient(context.Background(), grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxMessageSize),
))
if err != nil {
return nil, err
}
Expand All @@ -87,7 +93,7 @@ func newAgentGRPCConfigFetcher() (*agentGRPCConfigFetcher, error) {
}

// ClientGetConfigs implements the ConfigUpdater interface for agentGRPCConfigFetcher
func (g *agentGRPCConfigFetcher) ClientGetConfigs(ctx context.Context, request *pbgo.ClientGetConfigsRequest) (*pbgo.ClientGetConfigsResponse, error) {
func (g *agentGRPCConfigFetcher) ClientGetConfigs(ctx context.Context, request *pbgo.ClientGetConfigsRequest, opts ...grpc.CallOption) (*pbgo.ClientGetConfigsResponse, error) {
// When communicating with the core service via grpc, the auth token is handled
// by the core-agent, which runs independently. It's not guaranteed it starts before us,
// or that if it restarts that the auth token remains the same. Thus we need to do this every request.
Expand All @@ -101,7 +107,7 @@ func (g *agentGRPCConfigFetcher) ClientGetConfigs(ctx context.Context, request *

ctx = metadata.NewOutgoingContext(ctx, md)

return g.client.ClientGetConfigs(ctx, request)
return g.client.ClientGetConfigs(ctx, request, opts...)
}

// NewClient creates a new client
Expand All @@ -111,7 +117,7 @@ func NewClient(agentName string, updater ConfigUpdater, agentVersion string, pro

// NewGRPCClient creates a new client that retrieves updates over the datadog-agent's secure GRPC client
func NewGRPCClient(agentName string, agentVersion string, products []data.Product, pollInterval time.Duration) (*Client, error) {
grpcClient, err := newAgentGRPCConfigFetcher()
grpcClient, err := NewAgentGRPCConfigFetcher()
if err != nil {
return nil, err
}
Expand All @@ -121,7 +127,7 @@ func NewGRPCClient(agentName string, agentVersion string, products []data.Produc

// NewUnverifiedClient creates a new client that does not perform any TUF verification
func NewUnverifiedClient(agentName string, agentVersion string, products []data.Product, pollInterval time.Duration) (*Client, error) {
grpcClient, err := newAgentGRPCConfigFetcher()
grpcClient, err := NewAgentGRPCConfigFetcher()
if err != nil {
return nil, err
}
Expand Down