Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CWS][SEC-5573] add custom CWS product #14748

Merged
merged 2 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ require (
github.com/sassoftware/go-rpmutils v0.2.0 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.4.0
github.com/shopspring/decimal v1.2.0 // indirect
github.com/skydive-project/go-debouncer v1.0.0 // indirect
github.com/skydive-project/go-debouncer v1.0.0
github.com/smira/go-ftp-protocol v0.0.0-20140829150050-066b75c2b70d // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
Expand Down
46 changes: 31 additions & 15 deletions pkg/config/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ type Client struct {
state *state.Repository

// Listeners
apmListeners []func(update map[string]state.APMSamplingConfig)
cwsListeners []func(update map[string]state.ConfigCWSDD)
apmListeners []func(update map[string]state.APMSamplingConfig)
cwsListeners []func(update map[string]state.ConfigCWSDD)
cwsCustomListeners []func(update map[string]state.ConfigCWSCustom)
}

// agentGRPCConfigFetcher defines how to retrieve config updates over a
Expand Down Expand Up @@ -153,19 +154,20 @@ func newClient(agentName string, updater ConfigUpdater, doTufVerification bool,
ctx, close := context.WithCancel(context.Background())

return &Client{
ID: generateID(),
startupSync: sync.Once{},
ctx: ctx,
close: close,
agentName: agentName,
agentVersion: agentVersion,
products: data.ProductListToString(products),
state: repository,
pollInterval: pollInterval,
backoffPolicy: backoffPolicy,
apmListeners: make([]func(update map[string]state.APMSamplingConfig), 0),
cwsListeners: make([]func(update map[string]state.ConfigCWSDD), 0),
updater: updater,
ID: generateID(),
startupSync: sync.Once{},
ctx: ctx,
close: close,
agentName: agentName,
agentVersion: agentVersion,
products: data.ProductListToString(products),
state: repository,
pollInterval: pollInterval,
backoffPolicy: backoffPolicy,
apmListeners: make([]func(update map[string]state.APMSamplingConfig), 0),
cwsListeners: make([]func(update map[string]state.ConfigCWSDD), 0),
cwsCustomListeners: make([]func(update map[string]state.ConfigCWSCustom), 0),
updater: updater,
}, nil
}

Expand Down Expand Up @@ -251,6 +253,11 @@ func (c *Client) update() error {
listener(c.state.CWSDDConfigs())
}
}
if containsProduct(changedProducts, state.ProductCWSCustom) {
for _, listener := range c.cwsCustomListeners {
listener(c.state.CWSCustomConfigs())
}
}

return nil
}
Expand Down Expand Up @@ -283,6 +290,15 @@ func (c *Client) RegisterCWSDDUpdate(fn func(update map[string]state.ConfigCWSDD
fn(c.state.CWSDDConfigs())
}

// RegisterCWSCustomUpdate registers a callback function to be called after a successful client update that will
// contain the current state of the CWS_CUSTOM product.
func (c *Client) RegisterCWSCustomUpdate(fn func(update map[string]state.ConfigCWSCustom)) {
c.m.Lock()
defer c.m.Unlock()
c.cwsCustomListeners = append(c.cwsCustomListeners, fn)
fn(c.state.CWSCustomConfigs())
}

func (c *Client) applyUpdate(pbUpdate *pbgo.ClientGetConfigsResponse) ([]string, error) {
fileMap := make(map[string][]byte, len(pbUpdate.TargetFiles))
for _, f := range pbUpdate.TargetFiles {
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/remote/data/product.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const (
ProductAPMSampling Product = "APM_SAMPLING"
// ProductCWSDD is the cloud workload security product
ProductCWSDD Product = "CWS_DD"
// ProductCWSCustom is the cloud workload security product
ProductCWSCustom Product = "CWS_CUSTOM"
// ProductTesting1 is a testing product
ProductTesting1 Product = "TESTING1"
)
Expand Down
39 changes: 38 additions & 1 deletion pkg/remoteconfig/state/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
4. Add a method on the `Repository` to retrieved typed configs for the product.
*/

var allProducts = []string{ProductAPMSampling, ProductCWSDD, ProductASMFeatures, ProductASMDD, ProductASMData}
var allProducts = []string{ProductAPMSampling, ProductCWSDD, ProductCWSCustom, ProductASMFeatures, ProductASMDD, ProductASMData}

const (
// ProductAPMSampling is the apm sampling product
ProductAPMSampling = "APM_SAMPLING"
// ProductCWSDD is the cloud workload security product managed by datadog employees
ProductCWSDD = "CWS_DD"
// ProductCWSCustom is the cloud workload security product managed by datadog customers
ProductCWSCustom = "CWS_CUSTOM"
// ProductASMFeatures is the ASM product used form ASM activation through remote config
ProductASMFeatures = "ASM_FEATURES"
// ProductASMDD is the application security monitoring product managed by datadog employees
Expand All @@ -50,6 +52,8 @@ func parseConfig(product string, raw []byte, metadata Metadata) (interface{}, er
c, err = parseASMFeaturesConfig(raw, metadata)
case ProductCWSDD:
c, err = parseConfigCWSDD(raw, metadata)
case ProductCWSCustom:
c, err = parseConfigCWSCustom(raw, metadata)
case ProductASMDD:
c, err = parseConfigASMDD(raw, metadata)
case ProductASMData:
Expand Down Expand Up @@ -128,6 +132,39 @@ func (r *Repository) CWSDDConfigs() map[string]ConfigCWSDD {
return typedConfigs
}

// ConfigCWSCustom is a deserialized CWS Custom configuration file along with its
// associated remote config metadata
type ConfigCWSCustom struct {
Config []byte
Metadata Metadata
}

func parseConfigCWSCustom(data []byte, metadata Metadata) (ConfigCWSCustom, error) {
return ConfigCWSCustom{
Config: data,
Metadata: metadata,
}, nil
}

// CWSCustomConfigs returns the currently active CWSCustom config files
func (r *Repository) CWSCustomConfigs() map[string]ConfigCWSCustom {
typedConfigs := make(map[string]ConfigCWSCustom)

configs := r.getConfigs(ProductCWSCustom)

for path, conf := range configs {
// We control this, so if this has gone wrong something has gone horribly wrong
typed, ok := conf.(ConfigCWSCustom)
if !ok {
panic("unexpected config stored as CWSDD Config")
}

typedConfigs[path] = typed
}

return typedConfigs
}

// ConfigASMDD is a deserialized ASM DD configuration file along with its
// associated remote config metadata
type ConfigASMDD struct {
Expand Down
63 changes: 51 additions & 12 deletions pkg/security/rconfig/rconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/Masterminds/semver/v3"
"github.com/hashicorp/go-multierror"
"github.com/skydive-project/go-debouncer"

"github.com/DataDog/datadog-agent/pkg/config/remote"
"github.com/DataDog/datadog-agent/pkg/config/remote/data"
Expand All @@ -24,48 +25,69 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const securityAgentRCPollInterval = time.Second * 1
const (
securityAgentRCPollInterval = time.Second * 1
debounceDelay = 5 * time.Second
)

// RCPolicyProvider defines a remote config policy provider
type RCPolicyProvider struct {
sync.RWMutex

client *remote.Client
onNewPoliciesReadyCb func()
lastConfigs map[string]state.ConfigCWSDD
lastDefaults map[string]state.ConfigCWSDD
lastCustoms map[string]state.ConfigCWSCustom
debouncer *debouncer.Debouncer
}

var _ rules.PolicyProvider = (*RCPolicyProvider)(nil)

// NewRCPolicyProvider returns a new Remote Config based policy provider
func NewRCPolicyProvider(name string, agentVersion *semver.Version) (*RCPolicyProvider, error) {
c, err := remote.NewGRPCClient(name, agentVersion.String(), []data.Product{data.ProductCWSDD}, securityAgentRCPollInterval)
c, err := remote.NewGRPCClient(name, agentVersion.String(), []data.Product{data.ProductCWSDD, data.ProductCWSCustom}, securityAgentRCPollInterval)
if err != nil {
return nil, err
}

return &RCPolicyProvider{
r := &RCPolicyProvider{
client: c,
}, nil
}
r.debouncer = debouncer.New(debounceDelay, r.onNewPoliciesReady)

return r, nil
}

// Start starts the Remote Config policy provider and subscribes to updates
func (r *RCPolicyProvider) Start() {
log.Info("remote-config policies provider started")

r.client.RegisterCWSDDUpdate(r.rcConfigUpdateCallback)
r.debouncer.Start()

r.client.RegisterCWSDDUpdate(r.rcDefaultsUpdateCallback)
r.client.RegisterCWSCustomUpdate(r.rcCustomsUpdateCallback)

r.client.Start()
}

func (r *RCPolicyProvider) rcConfigUpdateCallback(configs map[string]state.ConfigCWSDD) {
func (r *RCPolicyProvider) rcDefaultsUpdateCallback(configs map[string]state.ConfigCWSDD) {
r.Lock()
r.lastDefaults = configs
r.Unlock()

log.Info("new policies from remote-config policy provider")

r.debouncer.Call()
}

func (r *RCPolicyProvider) rcCustomsUpdateCallback(configs map[string]state.ConfigCWSCustom) {
r.Lock()
r.lastConfigs = configs
r.lastCustoms = configs
r.Unlock()

log.Info("new policies from remote-config policy provider")

r.onNewPoliciesReadyCb()
r.debouncer.Call()
}

func normalize(policy *rules.Policy) {
Expand All @@ -84,17 +106,24 @@ func (r *RCPolicyProvider) LoadPolicies(macroFilters []rules.MacroFilter, ruleFi
r.RLock()
defer r.RUnlock()

for _, c := range r.lastConfigs {
reader := bytes.NewReader(c.Config)
load := func(id string, cfg []byte) {
reader := bytes.NewReader(cfg)

policy, err := rules.LoadPolicy(c.Metadata.ID, "remote-config", reader, macroFilters, ruleFilters)
policy, err := rules.LoadPolicy(id, "remote-config", reader, macroFilters, ruleFilters)
if err != nil {
errs = multierror.Append(errs, err)
}
normalize(policy)
policies = append(policies, policy)
}

for _, c := range r.lastDefaults {
load(c.Metadata.ID, c.Config)
}
for _, c := range r.lastCustoms {
load(c.Metadata.ID, c.Config)
}

return policies, errs
}

Expand All @@ -103,8 +132,18 @@ func (r *RCPolicyProvider) SetOnNewPoliciesReadyCb(cb func()) {
r.onNewPoliciesReadyCb = cb
}

func (r *RCPolicyProvider) onNewPoliciesReady() {
r.RLock()
defer r.RUnlock()

if r.onNewPoliciesReadyCb != nil {
r.onNewPoliciesReadyCb()
}
}

// Close stops the client
func (r *RCPolicyProvider) Close() error {
r.debouncer.Stop()
r.client.Close()
return nil
}