Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/clusteragent/admission/patch: poll rc on leadership switch #15062

Merged
merged 1 commit into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,12 @@ func start(log log.Component, config config.Component, cliParams *command.Global
if pkgconfig.Datadog.GetBool("admission_controller.enabled") {
if pkgconfig.Datadog.GetBool("admission_controller.auto_instrumentation.patcher.enabled") {
patchCtx := admissionpatch.ControllerContext{
IsLeaderFunc: le.IsLeader,
K8sClient: apiCl.Cl,
RcClient: rcClient,
ClusterName: clusterName,
StopCh: stopCh,
IsLeaderFunc: le.IsLeader,
LeaderSubscribeFunc: le.Subscribe,
K8sClient: apiCl.Cl,
RcClient: rcClient,
ClusterName: clusterName,
StopCh: stopCh,
}
if err := admissionpatch.StartControllers(patchCtx); err != nil {
log.Errorf("Cannot start auto instrumentation patcher: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/clusteragent/admission/patch/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ type patchProvider interface {
subscribe(kind TargetObjKind) chan PatchRequest
}

func newPatchProvider(rcClient *remote.Client, clusterName string) (patchProvider, error) {
func newPatchProvider(rcClient *remote.Client, isLeaderNotif <-chan struct{}, clusterName string) (patchProvider, error) {
if config.Datadog.GetBool("remote_configuration.enabled") {
return newRemoteConfigProvider(rcClient, clusterName)
return newRemoteConfigProvider(rcClient, isLeaderNotif, clusterName)
}
if config.Datadog.GetBool("admission_controller.auto_instrumentation.patcher.fallback_to_file_provider") {
// Use the file config provider for e2e testing only (it replaces RC as a source of configs)
Expand Down
36 changes: 23 additions & 13 deletions pkg/clusteragent/admission/patch/rc_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,41 @@ import (

// remoteConfigProvider consumes tracing configs from RC and delivers them to the patcher
type remoteConfigProvider struct {
client *remote.Client
subscribers map[TargetObjKind]chan PatchRequest
clusterName string
client *remote.Client
isLeaderNotif <-chan struct{}
subscribers map[TargetObjKind]chan PatchRequest
clusterName string
}

var _ patchProvider = &remoteConfigProvider{}

func newRemoteConfigProvider(client *remote.Client, clusterName string) (*remoteConfigProvider, error) {
func newRemoteConfigProvider(client *remote.Client, isLeaderNotif <-chan struct{}, clusterName string) (*remoteConfigProvider, error) {
if client == nil {
return nil, errors.New("remote config client not initialized")
}
return &remoteConfigProvider{
client: client,
subscribers: make(map[TargetObjKind]chan PatchRequest),
clusterName: clusterName,
client: client,
isLeaderNotif: isLeaderNotif,
subscribers: make(map[TargetObjKind]chan PatchRequest),
clusterName: clusterName,
}, nil
}

func (rcp *remoteConfigProvider) start(stopCh <-chan struct{}) {
log.Info("Starting RC patch provider")
log.Info("Starting remote-config patch provider")
rcp.client.RegisterAPMTracing(rcp.process)
rcp.client.Start()
<-stopCh
log.Info("Shutting down RC patch provider")
rcp.client.Close()
for {
select {
case <-rcp.isLeaderNotif:
log.Info("Got a leader notification, polling from remote-config")
rcp.process(rcp.client.APMTracingConfigs())
case <-stopCh:
log.Info("Shutting down remote-config patch provider")
rcp.client.Close()
return
}
}
}

func (rcp *remoteConfigProvider) subscribe(kind TargetObjKind) chan PatchRequest {
Expand All @@ -54,9 +64,9 @@ func (rcp *remoteConfigProvider) subscribe(kind TargetObjKind) chan PatchRequest

// process is the event handler called by the RC client on config updates
func (rcp *remoteConfigProvider) process(update map[string]state.APMTracingConfig) {
log.Infof("Got %d updates from RC", len(update))
log.Infof("Got %d updates from remote-config", len(update))
for path, config := range update {
log.Debugf("Parsing config %s with metadata %+v from path %s", config.Config, config.Metadata, path)
log.Debugf("Parsing config %s from path %s", config.Config, path)
var req PatchRequest
err := json.Unmarshal(config.Config, &req)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/admission/patch/rc_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestProcess(t *testing.T) {
`
return []byte(fmt.Sprintf(base, cluster, kind))
}
rcp, err := newRemoteConfigProvider(&remote.Client{}, "dev")
rcp, err := newRemoteConfigProvider(&remote.Client{}, make(chan struct{}), "dev")
require.NoError(t, err)
notifs := rcp.subscribe(KindDeployment)
in := map[string]state.APMTracingConfig{
Expand Down
13 changes: 7 additions & 6 deletions pkg/clusteragent/admission/patch/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@ import (

// ControllerContext holds necessary context for the patch controller
type ControllerContext struct {
IsLeaderFunc func() bool
K8sClient kubernetes.Interface
RcClient *remote.Client
ClusterName string
StopCh chan struct{}
IsLeaderFunc func() bool
LeaderSubscribeFunc func() <-chan struct{}
K8sClient kubernetes.Interface
RcClient *remote.Client
ClusterName string
StopCh chan struct{}
}

// StartControllers starts the patch controllers
func StartControllers(ctx ControllerContext) error {
log.Info("Starting patch controllers")
provider, err := newPatchProvider(ctx.RcClient, ctx.ClusterName)
provider, err := newPatchProvider(ctx.RcClient, ctx.LeaderSubscribeFunc(), ctx.ClusterName)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ func (c *Client) RegisterAPMTracing(fn func(update map[string]state.APMTracingCo
fn(c.state.APMTracingConfigs())
}

// APMTracingConfigs returns the current set of valid APM Tracing configs
func (c *Client) APMTracingConfigs() map[string]state.APMTracingConfig {
c.m.Lock()
defer c.m.Unlock()
return c.state.APMTracingConfigs()
}

func (c *Client) applyUpdate(pbUpdate *pbgo.ClientGetConfigsResponse) ([]string, error) {
fileMap := make(map[string][]byte, len(pbUpdate.TargetFiles))
for _, f := range pbUpdate.TargetFiles {
Expand Down