diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7764ab53ee5f..245d9608c3b0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -10,6 +10,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* +- The document id fields has been renamed from @metadata.id to @metadata._id {pull}15859[15859] + *Auditbeat* @@ -77,6 +79,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* +- Add document_id setting to decode_json_fields processor. {pull}15859[15859] + *Auditbeat* diff --git a/filebeat/docs/inputs/input-common-harvester-options.asciidoc b/filebeat/docs/inputs/input-common-harvester-options.asciidoc index c7f053dfdc01..e75906f7b64c 100644 --- a/filebeat/docs/inputs/input-common-harvester-options.asciidoc +++ b/filebeat/docs/inputs/input-common-harvester-options.asciidoc @@ -193,7 +193,7 @@ occur. *`document_id`*:: Option configuration setting that specifies the JSON key to set the document id. If configured, the field will be removed from the original -json document and stored in `@metadata.id` +json document and stored in `@metadata._id` *`ignore_decoding_error`*:: An optional configuration setting that specifies if JSON decoding errors should be logged or not. If set to true, errors will not diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 53c5954d2b03..5f0c47c98784 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -444,7 +444,7 @@ func (h *Harvester) onMessage( if id != "" { meta = common.MapStr{ - "id": id, + "_id": id, } } } else if &text != nil { diff --git a/filebeat/tests/system/test_json.py b/filebeat/tests/system/test_json.py index 04794e5ccee2..bea6d9495d11 100644 --- a/filebeat/tests/system/test_json.py +++ b/filebeat/tests/system/test_json.py @@ -254,8 +254,8 @@ def test_id_in_message(self): assert len(output) == 3 for i in xrange(len(output)): - assert("@metadata.id" in output[i]) - assert(output[i]["@metadata.id"] == str(i)) + assert("@metadata._id" in output[i]) + assert(output[i]["@metadata._id"] == str(i)) assert("json.id" not in output[i]) def test_with_generic_filtering(self): diff --git a/libbeat/beat/event.go b/libbeat/beat/event.go index 54d5c9398dc4..d156a887a93c 100644 --- a/libbeat/beat/event.go +++ b/libbeat/beat/event.go @@ -51,7 +51,7 @@ func (e *Event) SetID(id string) { if e.Meta == nil { e.Meta = common.MapStr{} } - e.Meta["id"] = id + e.Meta["_id"] = id } func (e *Event) GetValue(key string) (interface{}, error) { diff --git a/libbeat/beat/event_test.go b/libbeat/beat/event_test.go index f1026654a0ad..789a3e0a994b 100644 --- a/libbeat/beat/event_test.go +++ b/libbeat/beat/event_test.go @@ -50,7 +50,7 @@ func TestEventPutGetTimestamp(t *testing.T) { func TestEventMetadata(t *testing.T) { const id = "123" - newMeta := func() common.MapStr { return common.MapStr{"id": id} } + newMeta := func() common.MapStr { return common.MapStr{"_id": id} } t.Run("put", func(t *testing.T) { evt := newEmptyEvent() @@ -75,7 +75,7 @@ func TestEventMetadata(t *testing.T) { t.Run("put sub-key", func(t *testing.T) { evt := newEmptyEvent() - evt.PutValue("@metadata.id", id) + evt.PutValue("@metadata._id", id) assert.Equal(t, newMeta(), evt.Meta) assert.Empty(t, evt.Fields) @@ -85,7 +85,7 @@ func TestEventMetadata(t *testing.T) { evt := newEmptyEvent() evt.Meta = newMeta() - v, err := evt.GetValue("@metadata.id") + v, err := evt.GetValue("@metadata._id") assert.NoError(t, err) assert.Equal(t, id, v) @@ -105,7 +105,7 @@ func TestEventMetadata(t *testing.T) { evt := newEmptyEvent() evt.Meta = newMeta() - err := evt.Delete("@metadata.id") + err := evt.Delete("@metadata._id") assert.NoError(t, err) assert.Empty(t, evt.Meta) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 10541f080131..5c3a1bc7b487 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -437,7 +437,7 @@ func createEventBulkMeta( var id string if m := event.Meta; m != nil { - if tmp := m["id"]; tmp != nil { + if tmp := m["_id"]; tmp != nil { if s, ok := tmp.(string); ok { id = s } else { diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index ca85a0d4bfdb..86712a81fee5 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -40,6 +40,7 @@ type decodeJSONFields struct { overwriteKeys bool addErrorKey bool processArray bool + documentID string target *string } @@ -50,6 +51,7 @@ type config struct { AddErrorKey bool `config:"add_error_key"` ProcessArray bool `config:"process_array"` Target *string `config:"target"` + DocumentID string `config:"document_id"` } var ( @@ -81,7 +83,15 @@ func NewDecodeJSONFields(c *common.Config) (processors.Processor, error) { return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err) } - f := &decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, overwriteKeys: config.OverwriteKeys, addErrorKey: config.AddErrorKey, processArray: config.ProcessArray, target: config.Target} + f := &decodeJSONFields{ + fields: config.Fields, + maxDepth: config.MaxDepth, + overwriteKeys: config.OverwriteKeys, + addErrorKey: config.AddErrorKey, + processArray: config.ProcessArray, + documentID: config.DocumentID, + target: config.Target, + } return f, nil } @@ -115,6 +125,18 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) { target = *f.target } + var id string + if key := f.documentID; key != "" { + if dict, ok := output.(map[string]interface{}); ok { + if tmp, err := common.MapStr(dict).GetValue(key); err == nil { + if v, ok := tmp.(string); ok { + id = v + common.MapStr(dict).Delete(key) + } + } + } + } + if target != "" { _, err = event.PutValue(target, output) } else { @@ -131,6 +153,13 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) { errs = append(errs, err.Error()) continue } + + if id != "" { + if event.Meta == nil { + event.Meta = common.MapStr{} + } + event.Meta["_id"] = id + } } if len(errs) > 0 { diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go index 69650ac9a4aa..3e801d69e5e4 100644 --- a/libbeat/processors/actions/decode_json_fields_test.go +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" @@ -94,6 +95,38 @@ func TestInvalidJSONMultiple(t *testing.T) { assert.Equal(t, expected.String(), actual.String()) } +func TestDocumentID(t *testing.T) { + logp.TestingSetup() + + input := common.MapStr{ + "msg": `{"log": "message", "myid": "myDocumentID"}`, + } + + config := common.MustNewConfigFrom(map[string]interface{}{ + "fields": []string{"msg"}, + "document_id": "myid", + }) + + p, err := NewDecodeJSONFields(config) + if err != nil { + logp.Err("Error initializing decode_json_fields") + t.Fatal(err) + } + + actual, err := p.Run(&beat.Event{Fields: input}) + require.NoError(t, err) + + wantFields := common.MapStr{ + "msg": map[string]interface{}{"log": "message"}, + } + wantMeta := common.MapStr{ + "_id": "myDocumentID", + } + + assert.Equal(t, wantFields, actual.Fields) + assert.Equal(t, wantMeta, actual.Meta) +} + func TestValidJSONDepthOne(t *testing.T) { input := common.MapStr{ "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", diff --git a/libbeat/processors/actions/docs/decode_json_fields.asciidoc b/libbeat/processors/actions/docs/decode_json_fields.asciidoc index fb86cf24fb0c..a62b8169fdac 100644 --- a/libbeat/processors/actions/docs/decode_json_fields.asciidoc +++ b/libbeat/processors/actions/docs/decode_json_fields.asciidoc @@ -34,3 +34,6 @@ default value is false. `error` field is going to be part of event with error message. If it set to false, there will not be any error in event's field. Even error occurs while decoding json keys. The default value is false +`document_id`:: (Optional) JSON key to use as the document id. If configured, +the field will be removed from the original json document and stored in +`@metadata._id` diff --git a/libbeat/processors/add_id/add_id_test.go b/libbeat/processors/add_id/add_id_test.go index a7bd9baa8c5a..6a3fb5b3a099 100644 --- a/libbeat/processors/add_id/add_id_test.go +++ b/libbeat/processors/add_id/add_id_test.go @@ -36,7 +36,7 @@ func TestDefaultTargetField(t *testing.T) { newEvent, err := p.Run(testEvent) assert.NoError(t, err) - v, err := newEvent.GetValue("@metadata.id") + v, err := newEvent.GetValue("@metadata._id") assert.NoError(t, err) assert.NotEmpty(t, v) } @@ -59,7 +59,7 @@ func TestNonDefaultTargetField(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, v) - v, err = newEvent.GetValue("@metadata.id") + v, err = newEvent.GetValue("@metadata._id") assert.NoError(t, err) assert.Empty(t, v) } diff --git a/libbeat/processors/add_id/config.go b/libbeat/processors/add_id/config.go index 40b4d305de64..ca28d48d68ee 100644 --- a/libbeat/processors/add_id/config.go +++ b/libbeat/processors/add_id/config.go @@ -29,7 +29,7 @@ type config struct { func defaultConfig() config { return config{ - TargetField: "@metadata.id", + TargetField: "@metadata._id", Type: "elasticsearch", } } diff --git a/libbeat/processors/add_id/docs/add_id.asciidoc b/libbeat/processors/add_id/docs/add_id.asciidoc index 64d475669a0b..0d9f402be8ce 100644 --- a/libbeat/processors/add_id/docs/add_id.asciidoc +++ b/libbeat/processors/add_id/docs/add_id.asciidoc @@ -11,7 +11,7 @@ processors: The following settings are supported: -`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata.id`. +`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata._id`. `type`:: (Optional) Type of ID to generate. Currently only `elasticsearch` is supported and is the default. The `elasticsearch` type generates IDs using the same algorithm that Elasticsearch uses for auto-generating diff --git a/metricbeat/mb/event_test.go b/metricbeat/mb/event_test.go index 433dae3c2a1a..1040102de8c2 100644 --- a/metricbeat/mb/event_test.go +++ b/metricbeat/mb/event_test.go @@ -139,7 +139,7 @@ func TestEventConversionToBeatEvent(t *testing.T) { e := mbEvent.BeatEvent(module, metricSet) e = mbEvent.BeatEvent(module, metricSet) - assert.Equal(t, "foobar", e.Meta["id"]) + assert.Equal(t, "foobar", e.Meta["_id"]) assert.Equal(t, timestamp, e.Timestamp) assert.Equal(t, common.MapStr{ "type": "docker",