From a6e136051f3b2ad7482549ca4834a1fcaab4563b Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Mon, 27 Jan 2025 12:05:32 -0600 Subject: [PATCH 1/2] add dynamic document pipeline support for logs --- ...earchexporter_logs_pipeline_per_event.yaml | 27 ++++ exporter/elasticsearchexporter/README.md | 4 + exporter/elasticsearchexporter/bulkindexer.go | 8 +- .../elasticsearchexporter/bulkindexer_test.go | 8 +- exporter/elasticsearchexporter/config.go | 7 + exporter/elasticsearchexporter/config_test.go | 9 ++ exporter/elasticsearchexporter/exporter.go | 56 +++++--- .../elasticsearchexporter/exporter_test.go | 127 +++++++++++++++--- exporter/elasticsearchexporter/factory.go | 3 + .../internal/elasticsearch/attribute.go | 3 + .../serializer/otelserializer/common.go | 2 +- 11 files changed, 209 insertions(+), 45 deletions(-) create mode 100644 .chloggen/elasticsearchexporter_logs_pipeline_per_event.yaml diff --git a/.chloggen/elasticsearchexporter_logs_pipeline_per_event.yaml b/.chloggen/elasticsearchexporter_logs_pipeline_per_event.yaml new file mode 100644 index 000000000000..792428659ec2 --- /dev/null +++ b/.chloggen/elasticsearchexporter_logs_pipeline_per_event.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add config `logs_dynamic_pipeline` to dynamically set the document pipeline of log records using log record attribute `elasticsearch.document_pipeline`. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37419] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index f08ac7ac3167..a8a566bbe2c8 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -149,6 +149,9 @@ This can be customised through the following settings: - `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a log record attribute. - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is not an empty string in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. The attribute `elasticsearch.document_id` is removed from the final document. See [Setting a document id dynamically](#setting-a-document-id-dynamically). +- `logs_dynamic_pipeline` (optional): Dynamically determines the ingest pipeline to be used in Elasticsearch based on a log record attribute. + - `enabled`(default=false): Enable/Disable dynamic pipeline for log records. If `elasticsearch.document_pipeline` exists and is not an empty string in the log record attributes, it will be used as the Elasticsearch ingest pipeline. The attribute `elasticsearch.document_pipeline` is removed from the final document. + ### Elasticsearch document mapping The Elasticsearch exporter supports several document schemas and preprocessing @@ -188,6 +191,7 @@ Documents may be optionally passed through an [Elasticsearch Ingest pipeline] pr This can be configured through the following settings: - `pipeline` (optional): ID of an [Elasticsearch Ingest pipeline] used for processing documents published by the exporter. +- If `elasticsearch.document_pipeline` exists and is not an empty string in the log record attributes, then that pipeline will be used for that log record. ### Elasticsearch bulk indexing diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 973c9f7b3042..8ff013033280 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -31,7 +31,7 @@ type bulkIndexer interface { type bulkIndexerSession interface { // Add adds a document to the bulk indexing session. - Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error + Add(ctx context.Context, index string, docID string, pipeline string, document io.WriterTo, dynamicTemplates map[string]string, action string) error // End must be called on the session object once it is no longer // needed, in order to release any associated resources. @@ -126,13 +126,14 @@ type syncBulkIndexerSession struct { } // Add adds an item to the sync bulk indexer session. -func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error { +func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, pipeline string, document io.WriterTo, dynamicTemplates map[string]string, action string) error { doc := docappender.BulkIndexerItem{ Index: index, Body: document, DocumentID: docID, DynamicTemplates: dynamicTemplates, Action: action, + Pipeline: pipeline, } err := s.bi.Add(doc) if err != nil { @@ -255,13 +256,14 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error { // Add adds an item to the async bulk indexer session. // // Adding an item after a call to Close() will panic. -func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error { +func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, pipeline string, document io.WriterTo, dynamicTemplates map[string]string, action string) error { item := docappender.BulkIndexerItem{ Index: index, Body: document, DocumentID: docID, DynamicTemplates: dynamicTemplates, Action: action, + Pipeline: pipeline, } select { case <-ctx.Done(): diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 1ca17772e9a2..ee0fbd9614fd 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -103,7 +103,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) + assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) @@ -180,7 +180,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) + assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) @@ -259,7 +259,7 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) + assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) assert.NoError(t, bulkIndexer.Close(context.Background())) return bulkIndexer @@ -286,7 +286,7 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { session, err := bi.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) + assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes assert.NoError(t, bi.Close(context.Background())) } diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 3d4756d4ef32..b2f984468b12 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -56,6 +56,9 @@ type Config struct { // LogsDynamicID configures whether log record attribute `elasticsearch.document_id` is set as the document ID in ES. LogsDynamicID DynamicIDSettings `mapstructure:"logs_dynamic_id"` + // LogsDynamicPipeline configures whether log record attribute `elasticsearch.document_pipeline` is set as the document ingest pipeline for ES. + LogsDynamicPipeline DynamicPipelineSettings `mapstructure:"logs_dynamic_pipeline"` + // Pipeline configures the ingest node pipeline name that should be used to process the // events. // @@ -119,6 +122,10 @@ type DynamicIDSettings struct { Enabled bool `mapstructure:"enabled"` } +type DynamicPipelineSettings struct { + Enabled bool `mapstructure:"enabled"` +} + // AuthenticationSettings defines user authentication related settings. type AuthenticationSettings struct { // User is used to configure HTTP Basic Authentication. diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 59df53f4fd3c..329bc6b9f715 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -77,6 +77,9 @@ func TestConfig(t *testing.T) { LogsDynamicID: DynamicIDSettings{ Enabled: false, }, + LogsDynamicPipeline: DynamicPipelineSettings{ + Enabled: false, + }, Pipeline: "mypipeline", ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) { cfg.Timeout = 2 * time.Minute @@ -150,6 +153,9 @@ func TestConfig(t *testing.T) { LogsDynamicID: DynamicIDSettings{ Enabled: false, }, + LogsDynamicPipeline: DynamicPipelineSettings{ + Enabled: false, + }, Pipeline: "mypipeline", ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) { cfg.Timeout = 2 * time.Minute @@ -223,6 +229,9 @@ func TestConfig(t *testing.T) { LogsDynamicID: DynamicIDSettings{ Enabled: false, }, + LogsDynamicPipeline: DynamicPipelineSettings{ + Enabled: false, + }, Pipeline: "mypipeline", ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) { cfg.Timeout = 2 * time.Minute diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index e38e468abc59..acd80eacf784 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -222,13 +222,14 @@ func (e *elasticsearchExporter) pushLogRecord( buf := e.bufferPool.NewPooledBuffer() docID := e.extractDocumentIDAttribute(record.Attributes()) + pipeline := e.extractDocumentPipelineAttribute(record.Attributes()) if err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, index, buf.Buffer); err != nil { buf.Recycle() return fmt.Errorf("failed to encode log event: %w", err) } // not recycling after Add returns an error as we don't know if it's already recycled - return bulkIndexerSession.Add(ctx, index.Index, docID, buf, nil, docappender.ActionCreate) + return bulkIndexerSession.Add(ctx, index.Index, docID, pipeline, buf, nil, docappender.ActionCreate) } type dataPointsGroup struct { @@ -371,7 +372,7 @@ func (e *elasticsearchExporter) pushMetricsData( errs = append(errs, err) continue } - if err := session.Add(ctx, index.Index, "", buf, dynamicTemplates, docappender.ActionCreate); err != nil { + if err := session.Add(ctx, index.Index, "", "", buf, dynamicTemplates, docappender.ActionCreate); err != nil { // not recycling after Add returns an error as we don't know if it's already recycled if cerr := ctx.Err(); cerr != nil { return cerr @@ -467,7 +468,7 @@ func (e *elasticsearchExporter) pushTraceRecord( return fmt.Errorf("failed to encode trace record: %w", err) } // not recycling after Add returns an error as we don't know if it's already recycled - return bulkIndexerSession.Add(ctx, index.Index, "", buf, nil, docappender.ActionCreate) + return bulkIndexerSession.Add(ctx, index.Index, "", "", buf, nil, docappender.ActionCreate) } func (e *elasticsearchExporter) pushSpanEvent( @@ -493,19 +494,7 @@ func (e *elasticsearchExporter) pushSpanEvent( return nil } // not recycling after Add returns an error as we don't know if it's already recycled - return bulkIndexerSession.Add(ctx, index.Index, "", buf, nil, docappender.ActionCreate) -} - -func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string { - if !e.config.LogsDynamicID.Enabled { - return "" - } - - v, ok := m.Get(elasticsearch.DocumentIDAttributeName) - if !ok { - return "" - } - return v.AsString() + return bulkIndexerSession.Add(ctx, index.Index, "", "", buf, nil, docappender.ActionCreate) } func (e *elasticsearchExporter) pushProfilesData(ctx context.Context, pd pprofile.Profiles) error { @@ -609,15 +598,40 @@ func (e *elasticsearchExporter) pushProfileRecord( return e.model.encodeProfile(resource, scope, record, func(buf *bytes.Buffer, docID, index string) error { switch index { case otelserializer.StackTraceIndex: - return stackTracesSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate) + return stackTracesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate) case otelserializer.StackFrameIndex: - return stackFramesSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate) + return stackFramesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate) case otelserializer.AllEventsIndex: - return eventsSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate) + return eventsSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate) case otelserializer.ExecutablesIndex: - return executablesSession.Add(ctx, index, docID, buf, nil, docappender.ActionUpdate) + return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionUpdate) default: - return defaultSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate) + return defaultSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate) } }) } + +func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string { + if !e.config.LogsDynamicID.Enabled { + return "" + } + + v, ok := m.Get(elasticsearch.DocumentIDAttributeName) + if !ok { + return "" + } + return v.AsString() +} + +func (e *elasticsearchExporter) extractDocumentPipelineAttribute(m pcommon.Map) string { + // if Pipeline is configured for the whole exporter, use that. + if !e.config.LogsDynamicPipeline.Enabled { + return "" + } + + v, ok := m.Get(elasticsearch.DocumentPipelineAttributeName) + if !ok { + return "" + } + return v.AsString() +} diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 1e00f2226eb9..e4f0201a4b64 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -816,6 +816,84 @@ func TestExporterLogs(t *testing.T) { } } }) + + t.Run("publish logs with per event pipeline", func(t *testing.T) { + t.Parallel() + examplePipeline := "abc123" + tableTests := []struct { + name string + expectedPipeline string // "" means the pipeline will not be set + recordAttrs map[string]any + }{ + { + name: "missing document pipeline attribute should not set pipeline", + expectedPipeline: "", + }, + { + name: "empty document pipeline attribute should not set pipeline", + expectedPipeline: "", + recordAttrs: map[string]any{ + elasticsearch.DocumentPipelineAttributeName: "", + }, + }, + { + name: "record attributes", + expectedPipeline: examplePipeline, + recordAttrs: map[string]any{ + elasticsearch.DocumentPipelineAttributeName: examplePipeline, + }, + }, + } + + cfgs := map[string]func(*Config){ + "async": func(cfg *Config) { + batcherEnabled := false + cfg.Batcher.Enabled = &batcherEnabled + }, + "sync": func(cfg *Config) { + batcherEnabled := true + cfg.Batcher.Enabled = &batcherEnabled + cfg.Batcher.FlushTimeout = 10 * time.Millisecond + }, + } + for _, tt := range tableTests { + for cfgName, cfgFn := range cfgs { + t.Run(tt.name+"/"+cfgName, func(t *testing.T) { + t.Parallel() + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + if tt.expectedPipeline == "" { + assert.NotContainsf(t, string(docs[0].Action), "pipeline", "%s: expected pipeline to not be set", tt.name) + } else { + assert.Equalf(t, tt.expectedPipeline, actionJSONToPipeline(t, docs[0].Action), "%s: expected pipeline to be set in action: %s", tt.name, docs[0].Action) + } + + // Ensure the document id attribute is removed from the final document. + assert.NotContainsf(t, string(docs[0].Document), elasticsearch.DocumentPipelineAttributeName, "%s: expected document pipeline attribute to be removed", tt.name) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + cfg.LogsDynamicPipeline.Enabled = true + cfgFn(cfg) + }) + logs := newLogsWithAttributes( + tt.recordAttrs, + map[string]any{}, + map[string]any{}, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + mustSendLogs(t, exporter, logs) + + rec.WaitItems(1) + }) + } + } + }) + t.Run("otel mode attribute complex value", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { @@ -2049,23 +2127,40 @@ func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { } func actionJSONToIndex(t *testing.T, actionJSON json.RawMessage) string { - action := struct { - Create struct { - Index string `json:"_index"` - } `json:"create"` - }{} - err := json.Unmarshal(actionJSON, &action) - require.NoError(t, err) - return action.Create.Index + t.Helper() + return actionGetValue(t, actionJSON, "_index") } func actionJSONToID(t *testing.T, actionJSON json.RawMessage) string { - action := struct { - Create struct { - ID string `json:"_id"` - } `json:"create"` - }{} - err := json.Unmarshal(actionJSON, &action) - require.NoError(t, err) - return action.Create.ID + t.Helper() + return actionGetValue(t, actionJSON, "_id") +} + +func actionJSONToPipeline(t *testing.T, actionJSON json.RawMessage) string { + t.Helper() + return actionGetValue(t, actionJSON, "pipeline") +} + +// actionGetValue assumes the actionJSON is an object that has a key +// of create whose value is another object and target represents one +// of the inner keys. The value of the inner key must be a string. +func actionGetValue(t *testing.T, actionJSON json.RawMessage, target string) string { + t.Helper() + var a = map[string]interface{}{} + + err := json.Unmarshal(actionJSON, &a) + require.NoErrorf(t, err, "error unmarshalling action: %s", err) + + create, prs := a["create"] + require.Truef(t, prs, "create was not present in action") + + createMap, ok := create.(map[string]interface{}) + require.True(t, ok, "create was not a map[string]interface{}") + + v, prs := createMap[target] + require.Truef(t, prs, "%s was not present in action.create", target) + + vString, ok := v.(string) + require.True(t, ok, "the type of action.create.%s was not string", target) + return vString } diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 8941cec61574..40882f5ce519 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -68,6 +68,9 @@ func createDefaultConfig() component.Config { LogsDynamicID: DynamicIDSettings{ Enabled: false, }, + LogsDynamicPipeline: DynamicPipelineSettings{ + Enabled: false, + }, Retry: RetrySettings{ Enabled: true, MaxRetries: 0, // default is set in exporter code diff --git a/exporter/elasticsearchexporter/internal/elasticsearch/attribute.go b/exporter/elasticsearchexporter/internal/elasticsearch/attribute.go index 12d4411a5759..c7d53f674d55 100644 --- a/exporter/elasticsearchexporter/internal/elasticsearch/attribute.go +++ b/exporter/elasticsearchexporter/internal/elasticsearch/attribute.go @@ -11,4 +11,7 @@ const ( // DocumentIDAttributeName is the attribute name used to specify the document ID. DocumentIDAttributeName = "elasticsearch.document_id" + + // DocumentPipelineAttributeName is the attribute name used to specify the document ingest pipeline. + DocumentPipelineAttributeName = "elasticsearch.document_pipeline" ) diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/common.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/common.go index 4f44e7cfe067..ae7a3c0f9e10 100644 --- a/exporter/elasticsearchexporter/internal/serializer/otelserializer/common.go +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/common.go @@ -59,7 +59,7 @@ func writeAttributes(v *json.Visitor, attributes pcommon.Map, stringifyMapValues _ = v.OnObjectStart(-1, structform.AnyType) attributes.Range(func(k string, val pcommon.Value) bool { switch k { - case elasticsearch.DataStreamType, elasticsearch.DataStreamDataset, elasticsearch.DataStreamNamespace, elasticsearch.MappingHintsAttrKey, elasticsearch.DocumentIDAttributeName: + case elasticsearch.DataStreamType, elasticsearch.DataStreamDataset, elasticsearch.DataStreamNamespace, elasticsearch.MappingHintsAttrKey, elasticsearch.DocumentIDAttributeName, elasticsearch.DocumentPipelineAttributeName: return true } if isGeoAttribute(k, val) { From 8ef035c6082f4d2ae0c1891faa2b8c3406c1f8b6 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Thu, 13 Feb 2025 12:52:07 -0600 Subject: [PATCH 2/2] improve docs for logs_dynmaic_pipeline and logs_dynamic_id - Added caveat that the attributes are only removed when `otel` mapping mode is used --- exporter/elasticsearchexporter/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index a8a566bbe2c8..67fcb16ed87a 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -147,10 +147,10 @@ This can be customised through the following settings: - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. - `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a log record attribute. - - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is not an empty string in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. The attribute `elasticsearch.document_id` is removed from the final document. See [Setting a document id dynamically](#setting-a-document-id-dynamically). + - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is not an empty string in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. The attribute `elasticsearch.document_id` is removed from the final document when the `otel` mapping mode is used. See [Setting a document id dynamically](#setting-a-document-id-dynamically). - `logs_dynamic_pipeline` (optional): Dynamically determines the ingest pipeline to be used in Elasticsearch based on a log record attribute. - - `enabled`(default=false): Enable/Disable dynamic pipeline for log records. If `elasticsearch.document_pipeline` exists and is not an empty string in the log record attributes, it will be used as the Elasticsearch ingest pipeline. The attribute `elasticsearch.document_pipeline` is removed from the final document. + - `enabled`(default=false): Enable/Disable dynamic pipeline for log records. If `elasticsearch.document_pipeline` exists and is not an empty string in the log record attributes, it will be used as the Elasticsearch ingest pipeline. The attribute `elasticsearch.document_pipeline` is removed from the final document when the `otel` mapping mode is used. ### Elasticsearch document mapping