Skip to content

Commit

Permalink
pkg/clusteragent/admission/patch: poll rc on leadership switch (#15062)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmed-mez authored Jan 16, 2023
1 parent e0b997b commit 2f567d6
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 27 deletions.
11 changes: 6 additions & 5 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,12 @@ func start(log log.Component, config config.Component, cliParams *command.Global
if pkgconfig.Datadog.GetBool("admission_controller.enabled") {
if pkgconfig.Datadog.GetBool("admission_controller.auto_instrumentation.patcher.enabled") {
patchCtx := admissionpatch.ControllerContext{
IsLeaderFunc: le.IsLeader,
K8sClient: apiCl.Cl,
RcClient: rcClient,
ClusterName: clusterName,
StopCh: stopCh,
IsLeaderFunc: le.IsLeader,
LeaderSubscribeFunc: le.Subscribe,
K8sClient: apiCl.Cl,
RcClient: rcClient,
ClusterName: clusterName,
StopCh: stopCh,
}
if err := admissionpatch.StartControllers(patchCtx); err != nil {
log.Errorf("Cannot start auto instrumentation patcher: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/clusteragent/admission/patch/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ type patchProvider interface {
subscribe(kind TargetObjKind) chan PatchRequest
}

func newPatchProvider(rcClient *remote.Client, clusterName string) (patchProvider, error) {
func newPatchProvider(rcClient *remote.Client, isLeaderNotif <-chan struct{}, clusterName string) (patchProvider, error) {
if config.Datadog.GetBool("remote_configuration.enabled") {
return newRemoteConfigProvider(rcClient, clusterName)
return newRemoteConfigProvider(rcClient, isLeaderNotif, clusterName)
}
if config.Datadog.GetBool("admission_controller.auto_instrumentation.patcher.fallback_to_file_provider") {
// Use the file config provider for e2e testing only (it replaces RC as a source of configs)
Expand Down
36 changes: 23 additions & 13 deletions pkg/clusteragent/admission/patch/rc_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,41 @@ import (

// remoteConfigProvider consumes tracing configs from RC and delivers them to the patcher
type remoteConfigProvider struct {
client *remote.Client
subscribers map[TargetObjKind]chan PatchRequest
clusterName string
client *remote.Client
isLeaderNotif <-chan struct{}
subscribers map[TargetObjKind]chan PatchRequest
clusterName string
}

var _ patchProvider = &remoteConfigProvider{}

func newRemoteConfigProvider(client *remote.Client, clusterName string) (*remoteConfigProvider, error) {
func newRemoteConfigProvider(client *remote.Client, isLeaderNotif <-chan struct{}, clusterName string) (*remoteConfigProvider, error) {
if client == nil {
return nil, errors.New("remote config client not initialized")
}
return &remoteConfigProvider{
client: client,
subscribers: make(map[TargetObjKind]chan PatchRequest),
clusterName: clusterName,
client: client,
isLeaderNotif: isLeaderNotif,
subscribers: make(map[TargetObjKind]chan PatchRequest),
clusterName: clusterName,
}, nil
}

func (rcp *remoteConfigProvider) start(stopCh <-chan struct{}) {
log.Info("Starting RC patch provider")
log.Info("Starting remote-config patch provider")
rcp.client.RegisterAPMTracing(rcp.process)
rcp.client.Start()
<-stopCh
log.Info("Shutting down RC patch provider")
rcp.client.Close()
for {
select {
case <-rcp.isLeaderNotif:
log.Info("Got a leader notification, polling from remote-config")
rcp.process(rcp.client.APMTracingConfigs())
case <-stopCh:
log.Info("Shutting down remote-config patch provider")
rcp.client.Close()
return
}
}
}

func (rcp *remoteConfigProvider) subscribe(kind TargetObjKind) chan PatchRequest {
Expand All @@ -54,9 +64,9 @@ func (rcp *remoteConfigProvider) subscribe(kind TargetObjKind) chan PatchRequest

// process is the event handler called by the RC client on config updates
func (rcp *remoteConfigProvider) process(update map[string]state.APMTracingConfig) {
log.Infof("Got %d updates from RC", len(update))
log.Infof("Got %d updates from remote-config", len(update))
for path, config := range update {
log.Debugf("Parsing config %s with metadata %+v from path %s", config.Config, config.Metadata, path)
log.Debugf("Parsing config %s from path %s", config.Config, path)
var req PatchRequest
err := json.Unmarshal(config.Config, &req)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/admission/patch/rc_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestProcess(t *testing.T) {
`
return []byte(fmt.Sprintf(base, cluster, kind))
}
rcp, err := newRemoteConfigProvider(&remote.Client{}, "dev")
rcp, err := newRemoteConfigProvider(&remote.Client{}, make(chan struct{}), "dev")
require.NoError(t, err)
notifs := rcp.subscribe(KindDeployment)
in := map[string]state.APMTracingConfig{
Expand Down
13 changes: 7 additions & 6 deletions pkg/clusteragent/admission/patch/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@ import (

// ControllerContext holds necessary context for the patch controller
type ControllerContext struct {
IsLeaderFunc func() bool
K8sClient kubernetes.Interface
RcClient *remote.Client
ClusterName string
StopCh chan struct{}
IsLeaderFunc func() bool
LeaderSubscribeFunc func() <-chan struct{}
K8sClient kubernetes.Interface
RcClient *remote.Client
ClusterName string
StopCh chan struct{}
}

// StartControllers starts the patch controllers
func StartControllers(ctx ControllerContext) error {
log.Info("Starting patch controllers")
provider, err := newPatchProvider(ctx.RcClient, ctx.ClusterName)
provider, err := newPatchProvider(ctx.RcClient, ctx.LeaderSubscribeFunc(), ctx.ClusterName)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ func (c *Client) RegisterAPMTracing(fn func(update map[string]state.APMTracingCo
fn(c.state.APMTracingConfigs())
}

// APMTracingConfigs returns the current set of valid APM Tracing configs
func (c *Client) APMTracingConfigs() map[string]state.APMTracingConfig {
c.m.Lock()
defer c.m.Unlock()
return c.state.APMTracingConfigs()
}

func (c *Client) applyUpdate(pbUpdate *pbgo.ClientGetConfigsResponse) ([]string, error) {
fileMap := make(map[string][]byte, len(pbUpdate.TargetFiles))
for _, f := range pbUpdate.TargetFiles {
Expand Down

0 comments on commit 2f567d6

Please sign in to comment.