-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kubernetes annotator #3888
Kubernetes annotator #3888
Conversation
bf4c810
to
1e3d5f4
Compare
Jenkins standing by to test this. If you aren't a maintainer, you can ignore this comment. Someone with commit access, please review this and clear it for Jenkins to run. |
1 similar comment
Jenkins standing by to test this. If you aren't a maintainer, you can ignore this comment. Someone with commit access, please review this and clear it for Jenkins to run. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome contribution @vjsamuel, thank you!! I have left some questions and comments, I'll try to test this in a while
} | ||
|
||
func init() { | ||
processors.RegisterPlugin("annotate.kubernetes", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to use just kubernetes
instead of annotate.kubernetes
, until now we have left the processor type as something implicit
commonMeta := common.MapStr{ | ||
"pod": name, | ||
"namespace": pod.Namespace, | ||
"labels": pod.Labels, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to define these fields in _meta folder, check https://github.com/elastic/beats/blob/master/libbeat/processors/add_cloud_metadata/_meta/fields.yml as an example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do.
|
||
//Pod is in terminating state, remove the metadata | ||
if pod.DeletionTimestamp != nil { | ||
p.removePodMapEntry(name, endpoints, containers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering, what happens when you delete the pod here but some logs are still being processed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typically, log flow/metric scraping stops after the pod is marked for terminated. we could also move this to another method so that we delete it after the container is completely destroyed.
default: | ||
pod, ok := podObj.(*corev1.Pod) | ||
if ok { | ||
logp.Info("Processing %s:%s", pod.Namespace, pod.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be Debug?
|
||
p.h.Reset() | ||
fmt.Fprint(p.h, commonMeta) | ||
commonMeta["_hash"] = p.h.Sum64() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are you using this hash for?
return event, nil | ||
} | ||
|
||
} else if value, ok := event["pod"]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this comes from a custom beat or module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we leave this check to make sure that any beat that writes an event with a key pod
, we can still annotate it with the metadata.
) | ||
|
||
const ( | ||
timeout = time.Second * 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. agreed.
|
||
delete(event, "pod") | ||
} else { | ||
return event, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log an error here? Because that means someone configured the kubernetes processor but no field was found to match and no enhancement happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i could have a filebeat configuration, that tails /var/log/*
which is not managed by kubernetes and /var/lib/docker/containers/*/*log
which is managed by kubernetes. the processor can be global and it would not pass through this check when a message from /var/log
come in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That speaks for having local processors.
If you don't want to see the error with the global config, the processor checks could be used to exclude some files from processing.
func (k kubernetesAnnotator) Run(event common.MapStr) (common.MapStr, error) { | ||
|
||
metadata := common.MapStr{} | ||
if value, ok := event["source"]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is the right place for this to happen. I would suggest in a first version to make it configurable in the processor config with format strings on which fields should be picked / combined for the lookup.
In a second version we should definitively have some auto detection for example based on the beat type or where it could be passed as a option to Run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i like the idea of format strings. didnt think about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my use case is that I have a source file that has a container id and i need to extract it out. This is not supported by format strings today.
"github.com/elastic/beats/libbeat/logp" | ||
"k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/fields" | ||
"k8s.io/apimachinery/pkg/runtime" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious how many of these imports we will actually need or are empty interfaces we could also have locally. The reason I bring this up because on the docker side we have currently too many dependencies because of this and in the end all we do is reading stats which is only a very small subset. If this also applies to kubernetes, perhaps we can remove most of the dependencies here with our own client or having some empty interfaces in our code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a custom client is not an option here, as this one is more advanced than a stateless REST consumer. Check SharedIndexInformer and Watch mechanism, the client code creates a copy of the state by watching k8s API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue that I am facing is that
|
I remember we had in the past our own FlagSet to prevent overlapping but I think we removed it as it was no issue anymore and made things simpler for having flags in different packages. Perhaps time to reintroduce it? I assume there isn't a trick in golang to disable some of the flags from library on compile time? |
7bfc4f7
to
84210aa
Compare
f0d45b9
to
020c563
Compare
for the record, after changing to a different k8s client the flags issue is gone. We changed to a more lightweight client to avoid increasing filebeat binary size |
70d7aef
to
1c96434
Compare
NOTICE
Outdated
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
|
||
-------------------------------------------------------------------- | ||
golang.org/x/text |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have this already in the global vendor directory. Not sure if we can tell govendor somehow about this that it only adds the packages which are not there. Or add the missing packages there? @vjsamuel any idea?
BTW: Same is true for other packages and we have the same issue in the docker module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ruflin, if there is a version mismatch between $BEATS_ROOT/vendor
and $MODULE/vendor
then there would be two copies. if we can upgrade the root vendor to the more recent version this would ideally go away
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's cool, I personally miss some more verbosity from govendor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have moved the dependency to root vendor so now they share the same commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vjsamuel That means now all dependencies are on the top level? Not sure I like that the kubernetes dependencies are on the top level as they are module specific. Any chance to have a mix? Means the common ones on the top level and all the other dependencies in here?
"comment": "", | ||
"ignore": "test github.com/elastic/beats", | ||
"package": [ | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
appengine can be added to ignore
NOTICE
Outdated
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
|
||
-------------------------------------------------------------------- | ||
golang.org/x/text |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for k, v := range pod.Metadata.Labels { | ||
labelMap[k] = v | ||
} | ||
return common.MapStr{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under which namespace will this end up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still have the namespacing discussion open, I think we should put everything under the same kubernetes
namespace but I was waiting to see the rest of modules to know if it makes sense
|
||
func newLogsPathMatcher(cfg common.Config) Matcher { | ||
config := struct { | ||
LogsPath string `config:"logs_path"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming this is in filebeat, that would mean the filebeat processors have an additional config option?
} | ||
|
||
func newKubernetesAnnotator(cfg common.Config) (processors.Processor, error) { | ||
config := kubeAnnotatorConfig{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we normally have the defaults as a var external to the method. Something called defaultKubeAnnotatorConfig
in a config.go
file. Check some other config.go
files for an example. This makes it easy to find the config options.
) | ||
|
||
const ( | ||
podCheckPeriod = 20 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't find where this is used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its unused. i will remove this one.
} | ||
} | ||
|
||
func (p *PodWatcher) deletePod(pod *corev1.Pod) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would call this remove
as the Pod will still exist (hopefully) and it's only removed from the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the naming convention that kube controllers have for various events that are processed which are:
onAdd
onUpdate
onDelete
we could rename it to onPodAdd
, onPodUpdate
and onPodDelete
. does that work @ruflin ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the onPodDelete
but as it is a private method naming is not too important. I just stumbled over it when looking at the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we have no tests for this processor. Would it be possible to test at least parts of the processor without predefined data structures to not have to start a kubernetes environment.
There are a few new config options. Could we add to the PR docs and example of how the config will look like and also the end output event? I would also be happy to have this in doc.go
in the package itself.
f72a4a1
to
96604da
Compare
@ruflin, i have moved the config to a dedicated |
@ruflin, basic test cases and processor documentation have been added as well. |
in_cluster: true | ||
------------------------------------------------------------------------------- | ||
|
||
The configuration below enables the processor on a beat running as a process on the kubernetes node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would also work outside the kubernetes node, I would just say "outside the cluster"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outside the node is useless for logs as the filebeat needs to sit inside the same node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking also in the metricbeat case, but works for me, it's a minor thing anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, capitalize Beat and Kubernetes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return kubeAnnotatorConfig{ | ||
InCluster: true, | ||
SyncPeriod: 1 * time.Second, | ||
Namespace: "kube-system", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should suggest default? I guess entry level users are not aware of kube-system and may miss beats when listing pods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wouldnt recommend that. default namespace is only for control plane components. kube-system
or monitoring
are recommended namespaces.
|
||
// AddIndexer to the register | ||
func (r *Register) AddIndexer(name string, indexer IndexConstructor) { | ||
r.indexers[name] = indexer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm missing all RWMutex locking logic in getters & setters, could you please add it? Like in https://github.com/elastic/beats/pull/3888/files#diff-2afe85a18185c27752efda431319dac3R120
//Add a log path matcher which can extract container ID from the "source" field. | ||
logsPathMatcher, err := newLogsPathMatcher(*cfg) | ||
if err == nil { | ||
kubernetes.Indexing.AddDefaultMatcher(logsPathMatcher) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just adding the registered matcher/indexer name here, instead of registering an instance? like AddDefaultIndexer("logs_path")
This would force us to register it before marking it as default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some more high level comments. Will have a detailed look again tomorrow morning. In general LGTM. Lets make sure to get it in rather soonish and then iterate on top of it.
@exekias Could you already start a follow up github meta issue to track open points?
filebeat/beater/filebeat.go
Outdated
@@ -17,6 +17,9 @@ import ( | |||
"github.com/elastic/beats/filebeat/publisher" | |||
"github.com/elastic/beats/filebeat/registrar" | |||
"github.com/elastic/beats/filebeat/spooler" | |||
|
|||
//Add filebeat level processors | |||
_ "github.com/elastic/beats/filebeat/processors/annotate/kubernetes" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that approach
@@ -0,0 +1,91 @@ | |||
package kubernetes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a little bit a mess with package names singular/plural. We started all new packages to be singular (like module) when it contains multiple packages with each package one module. processors
initially contained all processors in one package I think, that is why it was called processors
and not processor
. It should be also changed in libbeat later but in filebeat we should directly start with processor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
`logs_path` matcher that takes the `source` field, extracts the container ID and uses it to retrieve | ||
metadata. | ||
|
||
The configuration below enables the processor when the beat as a pod in kubernetes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"when the beat as a pod" ? Is a word missing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor changes to capitalization, word choice, and commas: "For example, Filebeat defines the container
indexer, which indexes pod metadata based on all container IDs, and a logs_path
matcher, which takes the source
field, extracts the container ID, and uses it to retrieve metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
in_cluster: false | ||
host: <hostname> | ||
kube_config: ~/.kube/config | ||
enable_default_indexers: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would turn this around to default_indexers.enabled
to be consistend with enabled
in other places. With ucfg
the enabled
comes for "free" as it is already part of the config option.
Same below for default_matches.enabled
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
matchers: | ||
- fields: | ||
lookup_fields: ["metricset.host"] | ||
------------------------------------------------------------------------------- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I miss a note on how where the enriched data for the event will be put. How will the end event look like?
metricbeat/beater/metricbeat.go
Outdated
@@ -12,6 +12,9 @@ import ( | |||
|
|||
"github.com/elastic/beats/libbeat/cfgfile" | |||
"github.com/pkg/errors" | |||
|
|||
//Add metricbeat specific processors | |||
_ "github.com/elastic/beats/metricbeat/processors/annotate/kubernetes" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should auto generate this in the future for all beats based on the convention for the namespace and make it part of the include
package.
@@ -0,0 +1,126 @@ | |||
package kubernetes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the package name see comment in filebeat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor edits and questions about areas of the doc that will be potentially confusing to users.
=== kubernetes | ||
|
||
The `kubernetes` processor annotates each event with relevant metadata based on | ||
which kubernetes pod the event originated from. Each event is annotated with: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capitalize Kubernetes whenever it's used as a product name.
the `ip_port` indexer can take a kubernetes pod and index the pod metadata based on all | ||
`pod_ip:container_port` combinations. | ||
|
||
Matchers are used to build lookup keys with which indices can be queried. For example, the `fields` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little hard to follow because of the passive voice. Maybe say, "...build lookup keys for querying indices."
`pod_ip:container_port` combinations. | ||
|
||
Matchers are used to build lookup keys with which indices can be queried. For example, the `fields` | ||
matcher can take a list of fields based on whose values, indices can be queried to retrieve metadata. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't follow your meaning here, so you'll need to reword. Might also help to have a specific example. In fact, the concepts here are a bit hard to follow (not sure if it's a problem with the doc or my lack of knowledge regarding Kubernetes). Specific examples somewhere showing how the events are enriched with Kubernetes data would be very helpful.
Matchers are used to build lookup keys with which indices can be queried. For example, the `fields` | ||
matcher can take a list of fields based on whose values, indices can be queried to retrieve metadata. | ||
|
||
Each beat can define its own default indexers and matchers which are enabled by default. For example, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capitalize Beat and Filebeat.
`logs_path` matcher that takes the `source` field, extracts the container ID and uses it to retrieve | ||
metadata. | ||
|
||
The configuration below enables the processor when the beat as a pod in kubernetes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor changes to capitalization, word choice, and commas: "For example, Filebeat defines the container
indexer, which indexes pod metadata based on all container IDs, and a logs_path
matcher, which takes the source
field, extracts the container ID, and uses it to retrieve metadata.
[source,yaml] | ||
------------------------------------------------------------------------------- | ||
processors: | ||
- kuberenes: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this say kubernetes
? This question applies to other places where you use kuberenes instead of kubernetes.
* Namespace | ||
* Labels | ||
|
||
The `kuberentes` process has two basic building blocks which are: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this say kubernetes
processor instead of process? Note that there is also a typo: kuberentes
Matchers are used to build lookup keys with which indices can be queried. For example, the `fields` | ||
matcher can take a list of fields based on whose values, indices can be queried to retrieve metadata. | ||
|
||
Each beat can define its own default indexers and matchers which are enabled by default. For example, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do users know which indexers and matchers are enabled for each Beat?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we would need to document individual beats on what indexers/matchers each of them enable by default. having them in libbeat documentation wouldnt be ideal.
in_cluster: true | ||
------------------------------------------------------------------------------- | ||
|
||
The configuration below enables the processor on a beat running as a process on the kubernetes node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, capitalize Beat and Kubernetes.
kube_config: ~/.kube/config | ||
------------------------------------------------------------------------------- | ||
|
||
The configuration below allows custom indexers and matchers to be enabled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this configuration enable the indexers and matchers, or does it allow you to enable them somewhere else? The passive voice used here makes it sound like the latter.
20a4e63
to
668e571
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few minor comments but I'm ok with moving forward with the PR as is and address the comments later. Please also check the comment from @dedemorton
Before we merge it, all the commits should be squashed into 1 and the commit message and PR message should be updated.
A few additional things we should do after merging:
- Going through the config options again and check the naming.
- Discuss again where which part of the indexer / matcher logic should be. I could see that for example the
ip:port
matcher logic is in libbeat close to the processor, but configuration and activation as default matcher happens in metricbeat. Same for filebeat. - Discuss on how to handle dependencies best, as having it on the global level is not optimal, but also having duplicates is not good.
return nil, fmt.Errorf("fail to unpack the `logs_path` configuration: %s", err) | ||
} | ||
|
||
logPath := config.LogsPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be logsPath
?
|
||
indexer := kubernetes.Indexing.GetIndexer(kubernetes.ContainerIndexerName) | ||
//Add a container indexer by default. | ||
if indexer != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we split the code into 2 func init()
functions which I think Golang allows, we could make the code quite a bit nicer. Also it allows to check for err != nil
and then return which is more the "standard" golang way.
SyncPeriod time.Duration `config:"sync_period"` | ||
Indexers PluginConfig `config:"indexers"` | ||
Matchers PluginConfig `config:"matchers"` | ||
DefaultMatchers Enabled `config:"default_matchers"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could use here config:"default_matchers.enabled"
and then just use a bool. No additional type needed.
return kubeAnnotatorConfig{ | ||
InCluster: true, | ||
SyncPeriod: 1 * time.Second, | ||
Namespace: "kube-system", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to understand how this Namespace works. Is that something set in the kubernetes environment? Is there a default when I just set up Kubernetes?
return nil, fmt.Errorf("fail to unpack the kubernetes configuration: %s", err) | ||
} | ||
|
||
err = validate(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ucfg checks for a Validate() error
interface and calls the method automatically on unpack. See https://github.com/elastic/beats/blob/master/heartbeat/monitors/active/http/config.go#L82 This could be used here (can be done later)
return nil, err | ||
} | ||
|
||
indexers := Indexers{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the double nesting here? Naming gets kind of interesting like this.
return kubernetesAnnotator{podWatcher: watcher, matchers: &matchers}, nil | ||
} | ||
|
||
return nil, fatalError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line could be removed because there is the same return in the end.
logp.Err("Unable to get indexer plugin %s", IpPortIndexerName) | ||
} | ||
|
||
matcher := kubernetes.Indexing.GetMatcher(kubernetes.FieldMatcherName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two init functions would simplify the logic here (see comment in filebeat)
21ec33c
to
f1e4147
Compare
This PR adds support for Kubernetes as a processor for Beats. The Kubernetes processor allows Beats to enrich events with metadata coming from the Kubernetes Pod from which the event originated. Metadata might include: * pod name * pod namespace * container name The Kubernetes processor relies on two constructs: * Indexers - used to generate metadata from Pods and store them with unique keys. Example of keys might be container IDs, IP:Port combinations, PodName. * Matchers - used to generate a lookup key from an event. Example of a matcher might be a log_path matcher that looks at the source field of an event, extract the container ID from the log path and use it as a lookup key to retrieve metadata about the Pod from which the log message originated.
f1e4147
to
2b9db99
Compare
This PR adds support for Kubernetes as a processor for Beats. The Kubernetes processor allows Beats to enrich events with metadata coming from the Kubernetes Pod from which the event originated. Metadata might include:
The Kubernetes processor relies on two constructs:
log_path
matcher that looks at thesource
field of an event, extract the container ID from the log path and use it as a lookup key to retrieve metadata about the Pod from which the log message originated.