-
Notifications
You must be signed in to change notification settings - Fork 5k
Add ID processor #14524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ID processor #14524
Changes from all commits
e0dc306
4269333
bfa27b1
98a891d
1133141
f35064b
f6ed19c
e2d85fe
477fe7f
998a85f
e4a2af6
229d57d
c9a7bae
4cc8c56
a6b3646
950cd5b
02cdbf6
6881da2
b3af9e8
ae02eb2
b60e88c
881c040
f405381
7245204
3f9652a
63fe346
63a96fc
fc0e6bb
539188d
ef1e22c
848c35e
6d604d9
c89d3c4
1e62ad7
fb6e89d
e00cb7b
439b56f
92637a5
b7fe6fb
d45e591
0a76462
1b44e75
aa8739b
7c79403
99c01b7
e4f9fe9
f2faf14
51f8eed
72b97c5
5e8420c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| // Licensed to Elasticsearch B.V. under one or more contributor | ||
| // license agreements. See the NOTICE file distributed with | ||
| // this work for additional information regarding copyright | ||
| // ownership. Elasticsearch B.V. licenses this file to you under | ||
| // the Apache License, Version 2.0 (the "License"); you may | ||
| // not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package add_id | ||
|
ycombinator marked this conversation as resolved.
|
||
|
|
||
| import ( | ||
| "fmt" | ||
|
|
||
| "github.com/elastic/beats/libbeat/processors/add_id/generator" | ||
|
|
||
| "github.com/elastic/beats/libbeat/beat" | ||
| "github.com/elastic/beats/libbeat/common" | ||
| "github.com/elastic/beats/libbeat/processors" | ||
| jsprocessor "github.com/elastic/beats/libbeat/processors/script/javascript/module/processor" | ||
| ) | ||
|
|
||
| func init() { | ||
| processors.RegisterPlugin("add_id", New) | ||
| jsprocessor.RegisterPlugin("AddID", New) | ||
| } | ||
|
|
||
| const processorName = "add_id" | ||
|
|
||
| type addID struct { | ||
| config config | ||
| gen generator.IDGenerator | ||
| } | ||
|
|
||
| // New constructs a new Add ID processor. | ||
| func New(cfg *common.Config) (processors.Processor, error) { | ||
| config := defaultConfig() | ||
| if err := cfg.Unpack(&config); err != nil { | ||
| return nil, makeErrConfigUnpack(err) | ||
| } | ||
|
|
||
| gen, err := generator.Factory(config.Type) | ||
| if err != nil { | ||
| return nil, makeErrComputeID(err) | ||
| } | ||
|
|
||
| p := &addID{ | ||
| config, | ||
| gen, | ||
| } | ||
|
|
||
| return p, nil | ||
| } | ||
|
|
||
| // Run enriches the given event with an ID | ||
| func (p *addID) Run(event *beat.Event) (*beat.Event, error) { | ||
| id := p.gen.NextID() | ||
|
|
||
| if _, err := event.PutValue(p.config.TargetField, id); err != nil { | ||
| return nil, makeErrComputeID(err) | ||
| } | ||
|
|
||
| return event, nil | ||
| } | ||
|
|
||
| func (p *addID) String() string { | ||
| return fmt.Sprintf("%v=[target_field=[%v]]", processorName, p.config.TargetField) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| // Licensed to Elasticsearch B.V. under one or more contributor | ||
| // license agreements. See the NOTICE file distributed with | ||
| // this work for additional information regarding copyright | ||
| // ownership. Elasticsearch B.V. licenses this file to you under | ||
| // the Apache License, Version 2.0 (the "License"); you may | ||
| // not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package add_id | ||
|
ycombinator marked this conversation as resolved.
|
||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/elastic/beats/libbeat/common" | ||
|
|
||
| "github.com/elastic/beats/libbeat/beat" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| ) | ||
|
|
||
| func TestDefaultTargetField(t *testing.T) { | ||
| p, err := New(common.MustNewConfigFrom(nil)) | ||
| assert.NoError(t, err) | ||
|
|
||
| testEvent := &beat.Event{} | ||
|
|
||
| newEvent, err := p.Run(testEvent) | ||
| assert.NoError(t, err) | ||
|
|
||
| v, err := newEvent.GetValue("@metadata.id") | ||
| assert.NoError(t, err) | ||
| assert.NotEmpty(t, v) | ||
| } | ||
|
|
||
| func TestNonDefaultTargetField(t *testing.T) { | ||
| cfg := common.MustNewConfigFrom(common.MapStr{ | ||
| "target_field": "foo", | ||
| }) | ||
| p, err := New(cfg) | ||
| assert.NoError(t, err) | ||
|
|
||
| testEvent := &beat.Event{ | ||
| Fields: common.MapStr{}, | ||
| } | ||
|
|
||
| newEvent, err := p.Run(testEvent) | ||
| assert.NoError(t, err) | ||
|
|
||
| v, err := newEvent.GetValue("foo") | ||
| assert.NoError(t, err) | ||
| assert.NotEmpty(t, v) | ||
|
|
||
| v, err = newEvent.GetValue("@metadata.id") | ||
| assert.NoError(t, err) | ||
| assert.Empty(t, v) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| // Licensed to Elasticsearch B.V. under one or more contributor | ||
| // license agreements. See the NOTICE file distributed with | ||
| // this work for additional information regarding copyright | ||
| // ownership. Elasticsearch B.V. licenses this file to you under | ||
| // the Apache License, Version 2.0 (the "License"); you may | ||
| // not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package add_id | ||
|
ycombinator marked this conversation as resolved.
|
||
|
|
||
| import ( | ||
| "github.com/elastic/beats/libbeat/processors/add_id/generator" | ||
| ) | ||
|
|
||
| // configuration for Add ID processor. | ||
| type config struct { | ||
| TargetField string `config:"target_field"` // Target field for the ID | ||
| Type string `config:"type"` // Type of ID | ||
| } | ||
|
ycombinator marked this conversation as resolved.
|
||
|
|
||
| func defaultConfig() config { | ||
| return config{ | ||
| TargetField: "@metadata.id", | ||
| Type: "elasticsearch", | ||
| } | ||
| } | ||
|
|
||
| func (c *config) Validate() error { | ||
| // Validate type of ID generator | ||
| if !generator.Exists(c.Type) { | ||
| return makeErrUnknownType(c.Type) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| [[add-id]] | ||
| === Generate an ID for an event | ||
|
|
||
| The `add_id` processor generates a unique ID for an event. | ||
|
|
||
| [source,yaml] | ||
| ----------------------------------------------------- | ||
| processors: | ||
| - add_id: ~ | ||
| ----------------------------------------------------- | ||
|
|
||
| The following settings are supported: | ||
|
|
||
| `target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata.id`. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. somewhere we should document that the Elasticsearch output will use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea. I was initially thinking of doing this in a blog post (that would cover the introduction of this processor and the I wonder if this sort of documentation belongs in the ES output and LS output docs, since it's strictly agnostic to what generated the ID (e.g. this processor, but could be something else too). @dedemorton WDYT?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ycombinator I'm planning to work on a related issue this week: #13739. We should talk about your plans for the blog post and see what makes sense for the docs. I'd like to avoid burying the content too deeply under the config settings. I think we need a topic that describes what the document ID is (with links to the ES docs), how it's set, and why you want to set it (deduplication etc). Maybe we can chat later this week. |
||
|
|
||
| `type`:: (Optional) Type of ID to generate. Currently only `elasticsearch` is supported and is the default. | ||
| The `elasticsearch` type generates IDs using the same algorithm that Elasticsearch uses for auto-generating | ||
| document IDs. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| // Licensed to Elasticsearch B.V. under one or more contributor | ||
| // license agreements. See the NOTICE file distributed with | ||
| // this work for additional information regarding copyright | ||
| // ownership. Elasticsearch B.V. licenses this file to you under | ||
| // the Apache License, Version 2.0 (the "License"); you may | ||
| // not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package add_id | ||
|
ycombinator marked this conversation as resolved.
|
||
|
|
||
| import ( | ||
| "fmt" | ||
| ) | ||
|
|
||
| type ( | ||
| errConfigUnpack struct{ cause error } | ||
| errComputeID struct{ cause error } | ||
| errUnknownType struct{ typ string } | ||
| ) | ||
|
|
||
| func makeErrConfigUnpack(cause error) errConfigUnpack { | ||
| return errConfigUnpack{cause} | ||
| } | ||
| func (e errConfigUnpack) Error() string { | ||
| return fmt.Sprintf("failed to unpack %v processor configuration: %v", processorName, e.cause) | ||
| } | ||
| func (e errConfigUnpack) Unwrap() error { | ||
| return e.cause | ||
| } | ||
|
|
||
| func makeErrComputeID(cause error) errComputeID { | ||
| return errComputeID{cause} | ||
| } | ||
| func (e errComputeID) Error() string { | ||
| return fmt.Sprintf("failed to compute ID: %v", e.cause) | ||
| } | ||
| func (e errComputeID) Unwrap() error { | ||
| return e.cause | ||
| } | ||
|
|
||
| func makeErrUnknownType(typ string) errUnknownType { | ||
| return errUnknownType{typ} | ||
| } | ||
| func (e errUnknownType) Error() string { | ||
| return fmt.Sprintf("invalid type [%s]", e.typ) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| // Licensed to Elasticsearch B.V. under one or more contributor | ||
| // license agreements. See the NOTICE file distributed with | ||
| // this work for additional information regarding copyright | ||
| // ownership. Elasticsearch B.V. licenses this file to you under | ||
| // the Apache License, Version 2.0 (the "License"); you may | ||
| // not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package generator | ||
|
|
||
| import ( | ||
| "fmt" | ||
| ) | ||
|
|
||
| type ( | ||
| errUnknownType struct{ typ string } | ||
| ) | ||
|
|
||
| func makeErrUnknownType(typ string) errUnknownType { | ||
| return errUnknownType{typ} | ||
| } | ||
| func (e errUnknownType) Error() string { | ||
| return fmt.Sprintf("invalid type [%s]", e.typ) | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.