Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[k8sprocessor] Add ability to associate metadata tags using pod UID rather than just IP #2199

Merged
31 changes: 17 additions & 14 deletions processor/k8sprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ import (

// fakeClient is used as a replacement for WatchClient in test cases.
type fakeClient struct {
Pods map[string]*kube.Pod
Rules kube.ExtractionRules
Filters kube.Filters
Informer cache.SharedInformer
StopCh chan struct{}
Pods map[string]*kube.Pod
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
Rules kube.ExtractionRules
Filters kube.Filters
Associations []kube.Association
Informer cache.SharedInformer
StopCh chan struct{}
}

func selectors() (labels.Selector, fields.Selector) {
Expand All @@ -40,22 +41,24 @@ func selectors() (labels.Selector, fields.Selector) {
}

// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
func newFakeClient(_ *zap.Logger, apiCfg k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, _ kube.APIClientsetProvider, _ kube.InformerProvider) (kube.Client, error) {
func newFakeClient(_ *zap.Logger, apiCfg k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.APIClientsetProvider, _ kube.InformerProvider) (kube.Client, error) {
cs := fake.NewSimpleClientset()

ls, fs := selectors()
return &fakeClient{
Pods: map[string]*kube.Pod{},
Rules: rules,
Filters: filters,
Informer: kube.NewFakeInformer(cs, "", ls, fs),
StopCh: make(chan struct{}),
Pods: map[string]*kube.Pod{},
Rules: rules,
Filters: filters,
Associations: associations,
Informer: kube.NewFakeInformer(cs, "", ls, fs),
StopCh: make(chan struct{}),
}, nil
}

// GetPodByIP looks up FakeClient.Pods map by the provided string.
func (f *fakeClient) GetPodByIP(ip string) (*kube.Pod, bool) {
p, ok := f.Pods[ip]
// GetPod looks up FakeClient.Pods map by the provided string,
// which might represent either IP address or Pod UID.
func (f *fakeClient) GetPod(identifier string) (*kube.Pod, bool) {
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
p, ok := f.Pods[identifier]
return p, ok
}

Expand Down
14 changes: 14 additions & 0 deletions processor/k8sprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Config struct {
// Filter section allows specifying filters to filter
// pods by labels, fields, namespaces, nodes, etc.
Filter FilterConfig `mapstructure:"filter"`

Association []PodAssociationConfig `mapstructure:"pod_association"`
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
}

// ExtractConfig section allows specifying extraction rules to extract
Expand Down Expand Up @@ -173,3 +175,15 @@ type FieldFilterConfig struct {
// equals, not-equals, exists, does-not-exist.
Op string `mapstructure:"op"`
}

// PodAssociationConfig allows specifying rules for associating resources
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
// with pod metadata
type PodAssociationConfig struct {
// From represents the source of the association.
// Allowed values are connection and labels
pmatyjasek-sumo marked this conversation as resolved.
Show resolved Hide resolved
From string `mapstructure:"from"`

// Name represents the name of the association.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the name of the association or the name of the pod's attribute that is used for association?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is name of association. For example pod_uid, k8s.pod.ip etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the answer below you wrote:

name (representing the extracted key name).

If "name" is extracted key name (which I think means the attribute name if I understand correctly) then what is "name of association"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, my bad. You're right it is extracted key name.

// e.g. ip, pod_uid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the possible values? Is "ip" and "pod_uid" the only possible values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there are few more

Name string `mapstructure:"name"`
}
22 changes: 22 additions & 0 deletions processor/k8sprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,27 @@ func TestLoadConfig(t *testing.T) {
{Key: "key2", Value: "value2", Op: "not-equals"},
},
},
Association: []PodAssociationConfig{
{
From: "labels",
Name: "ip",
},
{
From: "labels",
Name: "k8s.pod.ip",
},
{
From: "labels",
Name: "host.hostname",
pmatyjasek-sumo marked this conversation as resolved.
Show resolved Hide resolved
},
{
From: "connection",
Name: "ip",
},
{
From: "labels",
Name: "pod_uid",
},
},
})
}
28 changes: 21 additions & 7 deletions processor/k8sprocessor/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,27 @@
// Package k8sprocessor allow automatic tagging of spans, metrics and logs with k8s metadata.
//
// The processor automatically discovers k8s resources (pods), extracts metadata from them and adds the
// extracted metadata to the relevant spans, metrics and logs. The processor use the kubernetes API to discover all pods
// running in a cluster, keeps a record of their IP addresses and interesting metadata. Upon receiving telemetry data,
// the processor looks for presence of well-known resource attributes which might contain IP address ("ip",
// "k8s.pod.ip" for logs, metrics or traces and "host.name" for metrics). If this field is not available, or it
// does not contain a valid IP address, the processor tries to identify the source IP address of the service
// that sent the telemetry data.
// If a match is found, the cached metadata is added to the data as resource attributes.
// extracted metadata to the relevant spans, metrics and logs. The processor uses the kubernetes API to discover all pods
// running in a cluster, keeps a record of their IP addresses, pod UIDs and interesting metadata.
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
// The rules for associating the source record with specific Pod Metadata are configured via "pod_association" key.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"source record" is not an entirely clear term. Does this refer to the telemetry in pdata that is passing through the processor?

// It represents a list of rules that are executed in order until the first one is able to do the match.
pmatyjasek-sumo marked this conversation as resolved.
Show resolved Hide resolved
// Each rule is specified as a pair of from (representing the rule type) and name (representing the extracted key name).
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
// Following rule types are available:
// from: "labels" - takes the attribute from resource attributes (the value can contain either IP address or Pod UID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does "resource" here refer to pdata.Resource or to the pod? Is this pdata.Resource attributes or pod labels? I am confused by mixed usage of "labels" and "attributes" here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I'll clarify that

// from: "connection" - takes the IP attribute from connection context (if available) and automatically
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
// associates it with "k8s.pod.ip" attribute
// Pod association configuration.
// pod_association:
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
// - from: labels
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
// name: ip
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have anywhere documented the possible values for "name"? Is it a limited list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it is possible to add any name that will match the label.

// - from: labels
// name: k8s.pod.ip
// - from: labels
// name: host.hostname
pmatyjasek-sumo marked this conversation as resolved.
Show resolved Hide resolved
// - from: connection
// name: ip
// - from: labels
// name: pod_uid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still do not understand why this is "pod_uid" and not "k8s.pod.uid" as Otel conventions define.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just example

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be wrong but if I understand correctly we cannot expect to see "pod_uid" as a Resource attribute in normal use cases. Let's use an attribute name that is realistic, even if it is just an example.

//
// RBAC
//
Expand Down
2 changes: 2 additions & 0 deletions processor/k8sprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,7 @@ func createProcessorOpts(cfg configmodels.Processor) []Option {
opts = append(opts, WithFilterFields(oCfg.Filter.Fields...))
opts = append(opts, WithAPIConfig(oCfg.APIConfig))

opts = append(opts, WithExtractPodAssociations(oCfg.Association...))

return opts
}
58 changes: 0 additions & 58 deletions processor/k8sprocessor/ip_extractor.go

This file was deleted.

85 changes: 57 additions & 28 deletions processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,26 @@ type WatchClient struct {
deleteQueue []deleteRequest
stopCh chan struct{}

Pods map[string]*Pod
Rules ExtractionRules
Filters Filters
Pods map[string]*Pod
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
Rules ExtractionRules
Filters Filters
Associations []Association
}

// Extract deployment name from the pod name. Pod name is created using
// format: [deployment-name]-[Random-String-For-ReplicaSet]-[Random-String-For-Pod]
var dRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]*-[0-9a-zA-Z]*$`)

// New initializes a new k8s Client.
func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, newClientSet APIClientsetProvider, newInformer InformerProvider) (Client, error) {
c := &WatchClient{logger: logger, Rules: rules, Filters: filters, deploymentRegex: dRegex, stopCh: make(chan struct{})}
func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, newClientSet APIClientsetProvider, newInformer InformerProvider) (Client, error) {
c := &WatchClient{
logger: logger,
Rules: rules,
Filters: filters,
Associations: associations,
deploymentRegex: dRegex,
stopCh: make(chan struct{}),
}
go c.deleteLoop(time.Second*30, defaultPodDeleteGracePeriod)

c.Pods = map[string]*Pod{}
Expand Down Expand Up @@ -109,6 +117,8 @@ func (c *WatchClient) handlePodAdd(obj interface{}) {
} else {
c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj))
}
podTableSize := len(c.Pods)
observability.RecordPodTableSize(int64(podTableSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for improving observability.

}

func (c *WatchClient) handlePodUpdate(old, new interface{}) {
Expand All @@ -119,6 +129,8 @@ func (c *WatchClient) handlePodUpdate(old, new interface{}) {
} else {
c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", new))
}
podTableSize := len(c.Pods)
observability.RecordPodTableSize(int64(podTableSize))
}

func (c *WatchClient) handlePodDelete(obj interface{}) {
Expand All @@ -128,6 +140,8 @@ func (c *WatchClient) handlePodDelete(obj interface{}) {
} else {
c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj))
}
podTableSize := len(c.Pods)
observability.RecordPodTableSize(int64(podTableSize))
}

func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Duration) {
Expand All @@ -152,14 +166,16 @@ func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Durati

c.m.Lock()
for _, d := range toDelete {
if p, ok := c.Pods[d.ip]; ok {
if p, ok := c.Pods[d.id]; ok {
// Sanity check: make sure we are deleting the same pod
// and the underlying state (ip<>pod mapping) has not changed.
if p.Name == d.name {
delete(c.Pods, d.ip)
delete(c.Pods, d.id)
}
}
}
podTableSize := len(c.Pods)
observability.RecordPodTableSize(int64(podTableSize))
c.m.Unlock()

case <-c.stopCh:
Expand All @@ -168,10 +184,10 @@ func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Durati
}
}

// GetPodByIP takes an IP address and returns the pod the IP address is associated with.
func (c *WatchClient) GetPodByIP(ip string) (*Pod, bool) {
// GetPod takes an IP address or Pod UID and returns the pod the identifier is associated with.
func (c *WatchClient) GetPod(identifier string) (*Pod, bool) {
c.m.RLock()
pod, ok := c.Pods[ip]
pod, ok := c.Pods[identifier]
c.m.RUnlock()
if ok {
if pod.Ignore {
Expand Down Expand Up @@ -253,24 +269,12 @@ func (c *WatchClient) extractField(v string, r FieldExtractionRule) string {
}

func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) {
if pod.Status.PodIP == "" {
return
}

c.m.Lock()
defer c.m.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: move the lock after line 285. Here it is locking earlier than needed.

// compare initial scheduled timestamp for existing pod and new pod with same IP
// and only replace old pod if scheduled time of new pod is newer? This should fix
// the case where scheduler has assigned the same IP to a new pod but update event for
// the old pod came in later
if p, ok := c.Pods[pod.Status.PodIP]; ok {
if p.StartTime != nil && pod.Status.StartTime.Before(p.StartTime) {
return
}
}
newPod := &Pod{
Name: pod.Name,
Address: pod.Status.PodIP,
PodUID: string(pod.UID),
StartTime: pod.Status.StartTime,
}

Expand All @@ -279,21 +283,46 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) {
} else {
newPod.Attributes = c.extractPodAttributes(pod)
}
c.Pods[pod.Status.PodIP] = newPod
if pod.UID != "" {
c.Pods[string(pod.UID)] = newPod
}
if pod.Status.PodIP != "" {
// compare initial scheduled timestamp for existing pod and new pod with same IP
// and only replace old pod if scheduled time of new pod is newer? This should fix
// the case where scheduler has assigned the same IP to a new pod but update event for
// the old pod came in later.
if p, ok := c.Pods[pod.Status.PodIP]; ok {
if p.StartTime != nil && pod.Status.StartTime.Before(p.StartTime) {
return
}
}
c.Pods[pod.Status.PodIP] = newPod
}
}

func (c *WatchClient) forgetPod(pod *api_v1.Pod) {
if pod.Status.PodIP == "" {
return
c.m.RLock()
p, ok := c.GetPod(pod.Status.PodIP)
c.m.RUnlock()

if ok && p.Name == pod.Name {
c.deleteMut.Lock()
c.deleteQueue = append(c.deleteQueue, deleteRequest{
id: pod.Status.PodIP,
name: pod.Name,
ts: time.Now(),
})
c.deleteMut.Unlock()
}

c.m.RLock()
p, ok := c.GetPodByIP(pod.Status.PodIP)
p, ok = c.GetPod(string(pod.UID))
c.m.RUnlock()

if ok && p.Name == pod.Name {
c.deleteMut.Lock()
c.deleteQueue = append(c.deleteQueue, deleteRequest{
ip: pod.Status.PodIP,
id: string(pod.UID),
name: pod.Name,
ts: time.Now(),
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks very repetitive. could be extracted into a function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

Expand Down
Loading