diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5c36f841bda4..80e0354bd662 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -243,6 +243,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Allow cgroup self-monitoring to see alternate `hostfs` paths {pull}24334[24334] - Add `expand_keys` to the list of permitted config fields for `decode_json_fields` {24862}[24862] - Fix 'make setup' instructions for a new beat {pull}24944[24944] +- Fix discovery of short-living and failing pods in Kubernetes autodiscover {issue}22718[22718] {pull}24742[24742] - Fix inode removal tracking code when files are replaced by files with the same name {pull}25002[25002] - Fix `mage GenerateCustomBeat` instructions for a new beat {pull}17679[17679] - Fix bug with annotations dedot config on k8s not used {pull}25111[25111] diff --git a/filebeat/autodiscover/builder/hints/logs.go b/filebeat/autodiscover/builder/hints/logs.go index 05014134106c..037dd3f402eb 100644 --- a/filebeat/autodiscover/builder/hints/logs.go +++ b/filebeat/autodiscover/builder/hints/logs.go @@ -93,11 +93,6 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*comm return []*common.Config{} } - host, _ := event["host"].(string) - if host == "" { - return []*common.Config{} - } - if inputConfig != nil { configs := []*common.Config{} for _, cfg := range inputConfig { diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 09c74ac4d37b..eabb18c81704 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -39,7 +39,7 @@ type pod struct { config *Config metagen metadata.MetaGen logger *logp.Logger - publish func([]bus.Event) + publishFunc func([]bus.Event) watcher kubernetes.Watcher nodeWatcher kubernetes.Watcher namespaceWatcher kubernetes.Watcher @@ -106,7 +106,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub p := &pod{ config: config, uuid: uuid, - publish: publish, + publishFunc: publish, metagen: metaGen, logger: logger, watcher: watcher, @@ -124,7 +124,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub return p, nil } -// OnAdd ensures processing of pod objects that are newly added +// OnAdd ensures processing of pod objects that are newly added. func (p *pod) OnAdd(obj interface{}) { p.crossUpdate.RLock() defer p.crossUpdate.RUnlock() @@ -133,9 +133,7 @@ func (p *pod) OnAdd(obj interface{}) { p.emit(obj.(*kubernetes.Pod), "start") } -// OnUpdate emits events for a given pod depending on the state of the pod, -// if it is terminating, a stop event is scheduled, if not, a stop and a start -// events are sent sequentially to recreate the resources assotiated to the pod. +// OnUpdate handles events for pods that have been updated. func (p *pod) OnUpdate(obj interface{}) { p.crossUpdate.RLock() defer p.crossUpdate.RUnlock() @@ -144,52 +142,21 @@ func (p *pod) OnUpdate(obj interface{}) { } func (p *pod) unlockedUpdate(obj interface{}) { - pod := obj.(*kubernetes.Pod) - - p.logger.Debugf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) - switch pod.Status.Phase { - case kubernetes.PodSucceeded, kubernetes.PodFailed: - // If Pod is in a phase where all containers in the have terminated emit a stop event - p.logger.Debugf("Watcher Pod update (terminated): %+v", obj) - time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) - return - case kubernetes.PodPending: - p.logger.Debugf("Watcher Pod update (pending): don't know what to do with this Pod yet, skipping for now: %+v", obj) - return - } - - // here handle the case when a Pod is in `Terminating` phase. - // In this case the pod is neither `PodSucceeded` nor `PodFailed` and - // hence requires special handling. - if pod.GetObjectMeta().GetDeletionTimestamp() != nil { - p.logger.Debugf("Watcher Pod update (terminating): %+v", obj) - // Pod is terminating, don't reload its configuration and ignore the event - // if some pod is still running, we will receive more events when containers - // terminate. - for _, container := range pod.Status.ContainerStatuses { - if container.State.Running != nil { - return - } - } - time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) - return - } - p.logger.Debugf("Watcher Pod update: %+v", obj) - p.emit(pod, "stop") - p.emit(pod, "start") + p.emit(obj.(*kubernetes.Pod), "stop") + p.emit(obj.(*kubernetes.Pod), "start") } -// OnDelete stops pod objects that are deleted +// OnDelete stops pod objects that are deleted. func (p *pod) OnDelete(obj interface{}) { p.crossUpdate.RLock() defer p.crossUpdate.RUnlock() p.logger.Debugf("Watcher Pod delete: %+v", obj) - time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) + p.emit(obj.(*kubernetes.Pod), "stop") } -// GenerateHints creates hints needed for hints builder +// GenerateHints creates hints needed for hints builder. func (p *pod) GenerateHints(event bus.Event) bus.Event { // Try to build a config with enabled builders. Send a provider agnostic payload. // Builders are Beat specific. @@ -211,9 +178,9 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event { // Look at all the namespace level default annotations and do a merge with priority going to the pod annotations. if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok { - nsAnn, _ := rawNsAnn.(common.MapStr) - if len(nsAnn) != 0 { - annotations.DeepUpdateNoOverwrite(nsAnn) + namespaceAnnotations, _ := rawNsAnn.(common.MapStr) + if len(namespaceAnnotations) != 0 { + annotations.DeepUpdateNoOverwrite(namespaceAnnotations) } } } @@ -280,195 +247,286 @@ func (p *pod) Stop() { } } -func (p *pod) emit(pod *kubernetes.Pod, flag string) { - containers, statuses := getContainersInPod(pod) - p.emitEvents(pod, flag, containers, statuses) +type containerInPod struct { + id string + runtime string + spec kubernetes.Container + status kubernetes.PodContainerStatus } // getContainersInPod returns all the containers defined in a pod and their statuses. // It includes init and ephemeral containers. -func getContainersInPod(pod *kubernetes.Pod) ([]kubernetes.Container, []kubernetes.PodContainerStatus) { - var containers []kubernetes.Container - var statuses []kubernetes.PodContainerStatus +func getContainersInPod(pod *kubernetes.Pod) []*containerInPod { + var containers []*containerInPod + for _, c := range pod.Spec.Containers { + containers = append(containers, &containerInPod{spec: c}) + } + for _, c := range pod.Spec.InitContainers { + containers = append(containers, &containerInPod{spec: c}) + } + for _, c := range pod.Spec.EphemeralContainers { + c := kubernetes.Container(c.EphemeralContainerCommon) + containers = append(containers, &containerInPod{spec: c}) + } - // Emit events for all containers - containers = append(containers, pod.Spec.Containers...) - statuses = append(statuses, pod.Status.ContainerStatuses...) + statuses := make(map[string]*kubernetes.PodContainerStatus) + mapStatuses := func(s []kubernetes.PodContainerStatus) { + for i := range s { + statuses[s[i].Name] = &s[i] + } + } + mapStatuses(pod.Status.ContainerStatuses) + mapStatuses(pod.Status.InitContainerStatuses) + mapStatuses(pod.Status.EphemeralContainerStatuses) + for _, c := range containers { + if s, ok := statuses[c.spec.Name]; ok { + c.id, c.runtime = kubernetes.ContainerIDWithRuntime(*s) + c.status = *s + } + } - // Emit events for all initContainers - containers = append(containers, pod.Spec.InitContainers...) - statuses = append(statuses, pod.Status.InitContainerStatuses...) + return containers +} - // Emit events for all ephemeralContainers - // Ephemeral containers are alpha feature in k8s and this code may require some changes, if their - // api change in the future. - for _, c := range pod.Spec.EphemeralContainers { - containers = append(containers, kubernetes.Container(c.EphemeralContainerCommon)) +// emit emits the events for the given pod according to its state and +// the given flag. +// It emits a pod event if the pod has at least a running container, +// and a container event for each one of the ports defined in each +// container. +// If a container doesn't have any defined port, it emits a single +// container event with "port" set to 0. +// "start" events are only generated for containers that have an id. +// "stop" events are always generated to ensure that configurations are +// deleted. +// If the pod is terminated, "stop" events are delayed during the grace +// period defined in `CleanupTimeout`. +// Network information is only included in events for running containers +// and for pods with at least one running container. +func (p *pod) emit(pod *kubernetes.Pod, flag string) { + annotations := podAnnotations(pod) + namespaceAnnotations := podNamespaceAnnotations(pod, p.namespaceWatcher) + + eventList := make([][]bus.Event, 0) + portsMap := common.MapStr{} + containers := getContainersInPod(pod) + anyContainerRunning := false + for _, c := range containers { + if c.status.State.Running != nil { + anyContainerRunning = true + } + + events, ports := p.containerPodEvents(flag, pod, c, annotations, namespaceAnnotations) + if len(events) != 0 { + eventList = append(eventList, events) + } + if len(ports) > 0 { + portsMap.DeepUpdate(ports) + } + } + if len(eventList) != 0 { + event := p.podEvent(flag, pod, portsMap, anyContainerRunning, annotations, namespaceAnnotations) + // Ensure that the pod level event is published first to avoid + // pod metadata overriding a valid container metadata. + eventList = append([][]bus.Event{{event}}, eventList...) } - statuses = append(statuses, pod.Status.EphemeralContainerStatuses...) - return containers, statuses + delay := (flag == "stop" && podTerminated(pod, containers)) + p.publishAll(eventList, delay) } -func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernetes.Container, - containerstatuses []kubernetes.PodContainerStatus) { - host := pod.Status.PodIP +// containerPodEvents creates the events for a container in a pod +// One event is created for each configured port. If there is no +// configured port, a single event is created, with the port set to 0. +// Host and port information is only included if the container is +// running. +// If the container ID is unkown, only "stop" events are generated. +// It also returns a map with the named ports. +func (p *pod) containerPodEvents(flag string, pod *kubernetes.Pod, c *containerInPod, annotations, namespaceAnnotations common.MapStr) ([]bus.Event, common.MapStr) { + if c.id == "" && flag != "stop" { + return nil, nil + } - // If the container doesn't exist in the runtime or its network - // is not configured, it won't have an IP. Skip it as we cannot - // generate configs without host, and an update will arrive when - // the container is ready. - // If stopping, emit the event in any case to ensure cleanup. - if host == "" && flag != "stop" { - return + // This must be an id that doesn't depend on the state of the container + // so it works also on `stop` if containers have been already deleted. + eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.spec.Name) + + meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.spec.Name)) + + cmeta := common.MapStr{ + "id": c.id, + "runtime": c.runtime, + "image": common.MapStr{ + "name": c.spec.Image, + }, } - // Collect all runtimes from status information. - containerIDs := map[string]string{} - runtimes := map[string]string{} - for _, c := range containerstatuses { - // If the container is not being stopped then add the container only if it is in running state. - // This makes sure that we dont keep tailing init container logs after they have stopped. - // Emit the event in case that the pod is being stopped. - if flag == "stop" || c.State.Running != nil { - cid, runtime := kubernetes.ContainerIDWithRuntime(c) - containerIDs[c.Name] = cid - runtimes[c.Name] = runtime - } + // Information that can be used in discovering a workload + kubemeta := meta.Clone() + kubemeta["annotations"] = annotations + kubemeta["container"] = common.MapStr{ + "id": c.id, + "name": c.spec.Name, + "image": c.spec.Image, + "runtime": c.runtime, + } + if len(namespaceAnnotations) != 0 { + kubemeta["namespace_annotations"] = namespaceAnnotations } - // Pass annotations to all events so that it can be used in templating and by annotation builders. - var ( - annotations = common.MapStr{} - nsAnn = common.MapStr{} - podPorts = common.MapStr{} - eventList = make([][]bus.Event, 0) - ) - for k, v := range pod.GetObjectMeta().GetAnnotations() { - safemapstr.Put(annotations, k, v) + ports := c.spec.Ports + if len(ports) == 0 { + // Ensure that at least one event is generated for this container. + // Set port to zero to signify that the event is from a container + // and not from a pod. + ports = []kubernetes.ContainerPort{{ContainerPort: 0}} } - if p.namespaceWatcher != nil { - if rawNs, ok, err := p.namespaceWatcher.Store().GetByKey(pod.Namespace); ok && err == nil { - if namespace, ok := rawNs.(*kubernetes.Namespace); ok { - for k, v := range namespace.GetAnnotations() { - safemapstr.Put(nsAnn, k, v) - } + var events []bus.Event + portsMap := common.MapStr{} + for _, port := range ports { + event := bus.Event{ + "provider": p.uuid, + "id": eventID, + flag: true, + "kubernetes": kubemeta, + // Actual metadata that will enrich the event. + "meta": common.MapStr{ + "kubernetes": meta, + "container": cmeta, + }, + } + // Include network information only if the container is running, + // so templates that need network don't generate a config. + if c.status.State.Running != nil { + if port.Name != "" && port.ContainerPort != 0 { + portsMap[port.Name] = port.ContainerPort } + event["host"] = pod.Status.PodIP + event["port"] = port.ContainerPort } + + events = append(events, event) } - // Emit container and port information - for _, c := range containers { - // If it doesn't have an ID, container doesn't exist in - // the runtime, emit only an event if we are stopping, so - // we are sure of cleaning up configurations. - cid := containerIDs[c.Name] - if cid == "" && flag != "stop" { - continue + return events, portsMap +} + +// podEvent creates an event for a pod. +// It only includes network information if `includeNetwork` is true. +func (p *pod) podEvent(flag string, pod *kubernetes.Pod, ports common.MapStr, includeNetwork bool, annotations, namespaceAnnotations common.MapStr) bus.Event { + meta := p.metagen.Generate(pod) + + // Information that can be used in discovering a workload + kubemeta := meta.Clone() + kubemeta["annotations"] = annotations + if len(namespaceAnnotations) != 0 { + kubemeta["namespace_annotations"] = namespaceAnnotations + } + + // Don't set a port on the event + event := bus.Event{ + "provider": p.uuid, + "id": fmt.Sprint(pod.GetObjectMeta().GetUID()), + flag: true, + "kubernetes": kubemeta, + "meta": common.MapStr{ + "kubernetes": meta, + }, + } + + // Include network information only if the pod has an IP and there is any + // running container that could handle requests. + if pod.Status.PodIP != "" && includeNetwork { + event["host"] = pod.Status.PodIP + if len(ports) > 0 { + event["ports"] = ports } + } - // This must be an id that doesn't depend on the state of the container - // so it works also on `stop` if containers have been already deleted. - eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) + return event +} - meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.Name)) +// podAnnotations returns the annotations in a pod +func podAnnotations(pod *kubernetes.Pod) common.MapStr { + annotations := common.MapStr{} + for k, v := range pod.GetObjectMeta().GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + return annotations +} - cmeta := common.MapStr{ - "id": cid, - "runtime": runtimes[c.Name], - "image": common.MapStr{ - "name": c.Image, - }, - } +// podNamespaceAnnotations returns the annotations of the namespace of the pod +func podNamespaceAnnotations(pod *kubernetes.Pod, watcher kubernetes.Watcher) common.MapStr { + if watcher == nil { + return nil + } - // Information that can be used in discovering a workload - kubemeta := meta.Clone() - kubemeta["annotations"] = annotations - kubemeta["container"] = common.MapStr{ - "id": cid, - "name": c.Name, - "image": c.Image, - "runtime": runtimes[c.Name], - } - if len(nsAnn) != 0 { - kubemeta["namespace_annotations"] = nsAnn - } + rawNs, ok, err := watcher.Store().GetByKey(pod.Namespace) + if !ok || err != nil { + return nil + } - var events []bus.Event - // Without this check there would be overlapping configurations with and without ports. - if len(c.Ports) == 0 { - // Set a zero port on the event to signify that the event is from a container - event := bus.Event{ - "provider": p.uuid, - "id": eventID, - flag: true, - "host": host, - "port": 0, - "kubernetes": kubemeta, - //Actual metadata that will enrich the event - "meta": common.MapStr{ - "kubernetes": meta, - "container": cmeta, - }, - } - events = append(events, event) - } + namespace, ok := rawNs.(*kubernetes.Namespace) + if !ok { + return nil + } - for _, port := range c.Ports { - podPorts[port.Name] = port.ContainerPort - event := bus.Event{ - "provider": p.uuid, - "id": eventID, - flag: true, - "host": host, - "port": port.ContainerPort, - "kubernetes": kubemeta, - "meta": common.MapStr{ - "kubernetes": meta, - "container": cmeta, - }, - } - events = append(events, event) - } - if len(events) != 0 { - eventList = append(eventList, events) - } + annotations := common.MapStr{} + for k, v := range namespace.GetAnnotations() { + safemapstr.Put(annotations, k, v) } + return annotations +} - // Publish a pod level event so that hints that have no exposed ports can get processed. - // Log hints would just ignore this event as there is no ${data.container.id} - // Publish the pod level hint only if at least one container level hint was generated. This ensures that there is - // no unnecessary pod level events emitted prematurely. - // We publish the pod level hint first so that it doesn't override a valid container level event. - if len(eventList) != 0 { - meta := p.metagen.Generate(pod) +// podTerminating returns true if a pod is marked for deletion or is in a phase beyond running. +func podTerminating(pod *kubernetes.Pod) bool { + if pod.GetObjectMeta().GetDeletionTimestamp() != nil { + return true + } - // Information that can be used in discovering a workload - kubemeta := meta.Clone() - kubemeta["annotations"] = annotations - if len(nsAnn) != 0 { - kubemeta["namespace_annotations"] = nsAnn - } + switch pod.Status.Phase { + case kubernetes.PodRunning, kubernetes.PodPending: + default: + return true + } - // Don't set a port on the event - event := bus.Event{ - "provider": p.uuid, - "id": fmt.Sprint(pod.GetObjectMeta().GetUID()), - flag: true, - "host": host, - "ports": podPorts, - "kubernetes": kubemeta, - "meta": common.MapStr{ - "kubernetes": meta, - }, + return false +} + +// podTerminated returns true if a pod is terminated, this method considers a +// pod as terminated if none of its containers are running (or going to be running). +func podTerminated(pod *kubernetes.Pod, containers []*containerInPod) bool { + // Pod is not marked for termination, so it is not terminated. + if !podTerminating(pod) { + return false + } + + // If any container is running, the pod is not terminated yet. + for _, container := range containers { + if container.status.State.Running != nil { + return false } - p.publish([]bus.Event{event}) } - // Ensure that the pod level event is published first to avoid pod metadata overriding a valid container metadata + return true +} + +// publishAll publishes all events in the event list in the same order. If delay is true +// publishAll schedules the publication of the events after the configured `CleanupPeriod` +// and returns inmediatelly. +// Order of published events matters, so this function will always publish a given eventList +// in the same goroutine. +func (p *pod) publishAll(eventList [][]bus.Event, delay bool) { + if delay && p.config.CleanupTimeout > 0 { + p.logger.Debug("Publish will wait for the cleanup timeout") + time.AfterFunc(p.config.CleanupTimeout, func() { + p.publishAll(eventList, false) + }) + return + } + for _, events := range eventList { - p.publish(events) + p.publishFunc(events) } } diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 3f874b649d4c..b1fed1fddfe7 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -370,6 +370,7 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodRunning, ContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, @@ -396,7 +397,6 @@ func TestEmitEvent(t *testing.T) { "host": "127.0.0.1", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -426,7 +426,7 @@ func TestEmitEvent(t *testing.T) { { "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(0), "id": cid, "provider": UUID, "kubernetes": common.MapStr{ @@ -484,6 +484,7 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodRunning, ContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, @@ -643,9 +644,10 @@ func TestEmitEvent(t *testing.T) { }, }, { - Message: "Test pod without host", + // This could be a succeeded pod from a short-living cron job. + Message: "Test succeeded pod start with multiple ports exposed", Flag: "start", - Pod: &v1.Pod{ + Pod: &kubernetes.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, UID: types.UID(uid), @@ -655,10 +657,174 @@ func TestEmitEvent(t *testing.T) { }, TypeMeta: typeMeta, Status: v1.PodStatus{ + PodIP: podIP, + Phase: kubernetes.PodSucceeded, ContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, ContainerID: containerID, + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: node, + Containers: []kubernetes.Container{ + { + Image: containerImage, + Name: name, + Ports: []v1.ContainerPort{ + { + ContainerPort: 8080, + Name: "port1", + }, + { + ContainerPort: 9090, + Name: "port2", + }, + }, + }, + }, + }, + }, + Expected: []bus.Event{ + { + "start": true, + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, "node": common.MapStr{ + "name": "node", + }, "container": common.MapStr{ + "name": "filebeat", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "runtime": "docker", + "id": "foobar", + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, "node": common.MapStr{ + "name": "node", + }, "container": common.MapStr{ + "name": "filebeat", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "id": "foobar", + "runtime": "docker", + }, + }, + "config": []*common.Config{}, + }, + }, + }, + { + Message: "Test pod without host", + Flag: "start", + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Status: v1.PodStatus{ + Phase: kubernetes.PodPending, + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name, }, }, }, @@ -688,6 +854,7 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodPending, ContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, @@ -738,10 +905,8 @@ func TestEmitEvent(t *testing.T) { Expected: []bus.Event{ { "stop": true, - "host": "", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -768,9 +933,7 @@ func TestEmitEvent(t *testing.T) { }, { "stop": true, - "host": "", "id": cid, - "port": 0, "provider": UUID, "kubernetes": common.MapStr{ "container": common.MapStr{ @@ -844,10 +1007,8 @@ func TestEmitEvent(t *testing.T) { Expected: []bus.Event{ { "stop": true, - "host": "127.0.0.1", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -876,8 +1037,6 @@ func TestEmitEvent(t *testing.T) { }, { "stop": true, - "host": "127.0.0.1", - "port": 0, "id": cid, "provider": UUID, "kubernetes": common.MapStr{ @@ -922,9 +1081,10 @@ func TestEmitEvent(t *testing.T) { }, }, { - Message: "Test stop pod without container id", + // This could be a succeeded pod from a short-living cron job. + Message: "Test succeeded pod stop with multiple ports exposed", Flag: "stop", - Pod: &v1.Pod{ + Pod: &kubernetes.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, UID: types.UID(uid), @@ -935,9 +1095,14 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodSucceeded, ContainerStatuses: []kubernetes.PodContainerStatus{ { - Name: name, + Name: name, + ContainerID: containerID, + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, }, }, }, @@ -947,6 +1112,16 @@ func TestEmitEvent(t *testing.T) { { Image: containerImage, Name: name, + Ports: []v1.ContainerPort{ + { + ContainerPort: 8080, + Name: "port1", + }, + { + ContainerPort: 9090, + Name: "port2", + }, + }, }, }, }, @@ -954,10 +1129,8 @@ func TestEmitEvent(t *testing.T) { Expected: []bus.Event{ { "stop": true, - "host": "127.0.0.1", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -986,16 +1159,195 @@ func TestEmitEvent(t *testing.T) { }, { "stop": true, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, "node": common.MapStr{ + "name": "node", + }, "container": common.MapStr{ + "name": "filebeat", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "runtime": "docker", + "id": "foobar", + }, + }, + "config": []*common.Config{}, + }, + { + "stop": true, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, "node": common.MapStr{ + "name": "node", + }, "container": common.MapStr{ + "name": "filebeat", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "id": "foobar", + "runtime": "docker", + }, + }, + "config": []*common.Config{}, + }, + }, + }, + { + Message: "Test terminated init container in started common pod", + Flag: "start", + Pod: &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Status: v1.PodStatus{ + PodIP: podIP, + Phase: kubernetes.PodRunning, + InitContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name + "-init", + ContainerID: containerID, + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }, + }, + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name, + ContainerID: containerID, + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: node, + Containers: []kubernetes.Container{ + { + Image: containerImage, + Name: name, + Ports: []v1.ContainerPort{ + { + ContainerPort: 8080, + Name: "http", + }, + }, + }, + }, + InitContainers: []kubernetes.Container{ + { + Image: containerImage, + Name: name + "-init", + }, + }, + }, + }, + Expected: []bus.Event{ + { + "start": true, + "host": "127.0.0.1", + "id": uid, + "ports": common.MapStr{ + "http": int32(8080), + }, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": "127.0.0.1", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": "127.0.0.1", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(8080), "id": cid, "provider": UUID, "kubernetes": common.MapStr{ "container": common.MapStr{ - "id": "", + "id": "foobar", "name": "filebeat", "image": "elastic/filebeat:6.3.0", - "runtime": "", + "runtime": "docker", }, "pod": common.MapStr{ "name": "filebeat", @@ -1023,8 +1375,51 @@ func TestEmitEvent(t *testing.T) { }, "container": common.MapStr{ "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, - "runtime": "", - "id": "", + "id": "foobar", + "runtime": "docker", + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, + "id": cid + "-init", + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat-init", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, "node": common.MapStr{ + "name": "node", + }, "container": common.MapStr{ + "name": "filebeat-init", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "id": "foobar", + "runtime": "docker", }, }, "config": []*common.Config{}, @@ -1045,6 +1440,7 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodPending, InitContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, @@ -1071,7 +1467,6 @@ func TestEmitEvent(t *testing.T) { "host": "127.0.0.1", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -1101,7 +1496,7 @@ func TestEmitEvent(t *testing.T) { { "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(0), "id": cid, "provider": UUID, "kubernetes": common.MapStr{ @@ -1159,6 +1554,7 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodRunning, EphemeralContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, @@ -1187,7 +1583,6 @@ func TestEmitEvent(t *testing.T) { "host": "127.0.0.1", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -1217,7 +1612,7 @@ func TestEmitEvent(t *testing.T) { { "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(0), "id": cid, "provider": UUID, "kubernetes": common.MapStr{ @@ -1275,6 +1670,7 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodRunning, InitContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, @@ -1334,7 +1730,6 @@ func TestEmitEvent(t *testing.T) { "host": "127.0.0.1", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -1365,7 +1760,7 @@ func TestEmitEvent(t *testing.T) { { "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(0), "id": cid, "provider": UUID, "kubernetes": common.MapStr{ @@ -1411,7 +1806,7 @@ func TestEmitEvent(t *testing.T) { { "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(0), "id": cid + "-init", "provider": UUID, "kubernetes": common.MapStr{ @@ -1457,7 +1852,7 @@ func TestEmitEvent(t *testing.T) { { "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(0), "id": cid + "-ephemeral", "provider": UUID, "kubernetes": common.MapStr{ @@ -1520,11 +1915,11 @@ func TestEmitEvent(t *testing.T) { pub := &publisher{b: p.bus} pod := &pod{ - metagen: metaGen, - config: defaultConfig(), - publish: pub.publish, - uuid: UUID, - logger: logp.NewLogger("kubernetes.pod"), + metagen: metaGen, + config: defaultConfig(), + publishFunc: pub.publish, + uuid: UUID, + logger: logp.NewLogger("kubernetes.pod"), } p.eventManager = NewMockPodEventerManager(pod) diff --git a/libbeat/common/kubernetes/informer.go b/libbeat/common/kubernetes/informer.go index b51640a248a2..b092847dca34 100644 --- a/libbeat/common/kubernetes/informer.go +++ b/libbeat/common/kubernetes/informer.go @@ -153,9 +153,8 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio return nil, "", fmt.Errorf("unsupported resource type for watching %T", resource) } - if indexers != nil { - return cache.NewSharedIndexInformer(listwatch, resource, opts.SyncTimeout, indexers), objType, nil + if indexers == nil { + indexers = cache.Indexers{} } - - return cache.NewSharedInformer(listwatch, resource, opts.SyncTimeout), objType, nil + return cache.NewSharedIndexInformer(listwatch, resource, opts.SyncTimeout, indexers), objType, nil } diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index a7e2a6319774..df58cf84a3ea 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -86,7 +86,7 @@ type watcher struct { client kubernetes.Interface informer cache.SharedInformer store cache.Store - queue workqueue.RateLimitingInterface + queue workqueue.Interface ctx context.Context stop context.CancelFunc handler ResourceEventHandler @@ -97,16 +97,15 @@ type watcher struct { // resource from the cluster (filtered to the given node) func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { var store cache.Store - var queue workqueue.RateLimitingInterface + var queue workqueue.Interface - informer, objType, err := NewInformer(client, resource, opts, indexers) + informer, _, err := NewInformer(client, resource, opts, indexers) if err != nil { return nil, err } store = informer.GetStore() - queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), objType) - ctx, cancel := context.WithCancel(context.Background()) + queue = workqueue.New() if opts.IsUpdated == nil { opts.IsUpdated = func(o, n interface{}) bool { @@ -121,6 +120,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption } } + ctx, cancel := context.WithCancel(context.TODO()) w := &watcher{ client: client, informer: informer, @@ -216,53 +216,43 @@ func (w *watcher) enqueue(obj interface{}, state string) { // process gets the top of the work queue and processes the object that is received. func (w *watcher) process(ctx context.Context) bool { - keyObj, quit := w.queue.Get() + obj, quit := w.queue.Get() if quit { return false } + defer w.queue.Done(obj) - err := func(obj interface{}) error { - defer w.queue.Done(obj) - - var entry *item - var ok bool - if entry, ok = obj.(*item); !ok { - w.queue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected *item in workqueue but got %#v", obj)) - return nil - } - - key := entry.object.(string) - - o, exists, err := w.store.GetByKey(key) - if err != nil { - return nil - } - if !exists { - if entry.state == delete { - w.logger.Debugf("Object %+v was not found in the store, deleting anyway!", key) - // delete anyway in order to clean states - w.handler.OnDelete(entry.objectRaw) - } - return nil - } - - switch entry.state { - case add: - w.handler.OnAdd(o) - case update: - w.handler.OnUpdate(o) - case delete: - w.handler.OnDelete(o) - } + var entry *item + var ok bool + if entry, ok = obj.(*item); !ok { + utilruntime.HandleError(fmt.Errorf("expected *item in workqueue but got %#v", obj)) + return true + } - return nil - }(keyObj) + key := entry.object.(string) + o, exists, err := w.store.GetByKey(key) if err != nil { - utilruntime.HandleError(err) + utilruntime.HandleError(fmt.Errorf("getting object %#v from cache: %w", obj, err)) + return true + } + if !exists { + if entry.state == delete { + w.logger.Debugf("Object %+v was not found in the store, deleting anyway!", key) + // delete anyway in order to clean states + w.handler.OnDelete(entry.objectRaw) + } return true } + switch entry.state { + case add: + w.handler.OnAdd(o) + case update: + w.handler.OnUpdate(o) + case delete: + w.handler.OnDelete(o) + } + return true }