diff --git a/.chloggen/38610_additional_es_telemetry.yaml b/.chloggen/38610_additional_es_telemetry.yaml new file mode 100644 index 0000000000000..4cf6993717846 --- /dev/null +++ b/.chloggen/38610_additional_es_telemetry.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add telemetry for bulk indexers used to index documents to Elasticsearch. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [38610] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 6c9095cb50770..ca12cc962dc2e 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -8,10 +8,10 @@ import ( "errors" "fmt" "io" + "net/http" "runtime" "strings" "sync" - "sync/atomic" "time" "github.com/elastic/go-docappender/v2" @@ -19,9 +19,13 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/logging" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" ) type bulkIndexer interface { @@ -59,11 +63,17 @@ type bulkIndexerSession interface { const defaultMaxRetries = 2 -func newBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) (bulkIndexer, error) { +func newBulkIndexer( + client esapi.Transport, + config *Config, + requireDataStream bool, + tb *metadata.TelemetryBuilder, + logger *zap.Logger, +) (bulkIndexer, error) { if config.Batcher.enabledSet { - return newSyncBulkIndexer(logger, client, config, requireDataStream), nil + return newSyncBulkIndexer(client, config, requireDataStream, tb, logger), nil } - return newAsyncBulkIndexer(logger, client, config, requireDataStream) + return newAsyncBulkIndexer(client, config, requireDataStream, tb, logger) } func bulkIndexerConfig(client esapi.Transport, config *Config, requireDataStream bool) docappender.BulkIndexerConfig { @@ -100,12 +110,19 @@ func bulkIndexerIncludeSourceOnError(includeSourceOnError *bool) docappender.Val return docappender.False } -func newSyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) *syncBulkIndexer { +func newSyncBulkIndexer( + client esapi.Transport, + config *Config, + requireDataStream bool, + tb *metadata.TelemetryBuilder, + logger *zap.Logger, +) *syncBulkIndexer { return &syncBulkIndexer{ config: bulkIndexerConfig(client, config, requireDataStream), flushTimeout: config.Timeout, flushBytes: config.Flush.Bytes, retryConfig: config.Retry, + telemetryBuilder: tb, logger: logger, failedDocsInputLogger: newFailedDocsInputLogger(logger, config), } @@ -118,6 +135,7 @@ type syncBulkIndexer struct { retryConfig RetrySettings logger *zap.Logger failedDocsInputLogger *zap.Logger + telemetryBuilder *metadata.TelemetryBuilder } // StartSession creates a new docappender.BulkIndexer, and wraps @@ -158,6 +176,7 @@ func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID st if err != nil { return err } + s.s.telemetryBuilder.ElasticsearchDocsReceived.Add(ctx, 1) // flush bytes should operate on uncompressed length // as Elasticsearch http.max_content_length measures uncompressed length. if s.bi.UncompressedLen() >= s.s.flushBytes { @@ -175,7 +194,14 @@ func (s *syncBulkIndexerSession) End() { func (s *syncBulkIndexerSession) Flush(ctx context.Context) error { var retryBackoff func(int) time.Duration for attempts := 0; ; attempts++ { - if _, err := flushBulkIndexer(ctx, s.bi, s.s.flushTimeout, s.s.logger, s.s.failedDocsInputLogger); err != nil { + if err := flushBulkIndexer( + ctx, + s.bi, + s.s.flushTimeout, + s.s.telemetryBuilder, + s.s.logger, + s.s.failedDocsInputLogger, + ); err != nil { return err } if s.bi.Items() == 0 { @@ -202,16 +228,22 @@ func (s *syncBulkIndexerSession) Flush(ctx context.Context) error { } } -func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) (*asyncBulkIndexer, error) { +func newAsyncBulkIndexer( + client esapi.Transport, + config *Config, + requireDataStream bool, + tb *metadata.TelemetryBuilder, + logger *zap.Logger, +) (*asyncBulkIndexer, error) { numWorkers := config.NumWorkers if numWorkers == 0 { numWorkers = runtime.NumCPU() } pool := &asyncBulkIndexer{ - wg: sync.WaitGroup{}, - items: make(chan docappender.BulkIndexerItem, config.NumWorkers), - stats: bulkIndexerStats{}, + wg: sync.WaitGroup{}, + items: make(chan docappender.BulkIndexerItem, config.NumWorkers), + telemetryBuilder: tb, } pool.wg.Add(numWorkers) @@ -226,9 +258,9 @@ func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Con flushInterval: config.Flush.Interval, flushTimeout: config.Timeout, flushBytes: config.Flush.Bytes, + telemetryBuilder: tb, logger: logger, failedDocsInputLogger: newFailedDocsInputLogger(logger, config), - stats: &pool.stats, } go func() { defer pool.wg.Done() @@ -238,14 +270,10 @@ func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Con return pool, nil } -type bulkIndexerStats struct { - docsIndexed atomic.Int64 -} - type asyncBulkIndexer struct { - items chan docappender.BulkIndexerItem - wg sync.WaitGroup - stats bulkIndexerStats + items chan docappender.BulkIndexerItem + wg sync.WaitGroup + telemetryBuilder *metadata.TelemetryBuilder } type asyncBulkIndexerSession struct { @@ -289,8 +317,9 @@ func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID st case <-ctx.Done(): return ctx.Err() case s.items <- item: - return nil } + s.telemetryBuilder.ElasticsearchDocsReceived.Add(ctx, 1) + return nil } // End is a no-op. @@ -309,10 +338,9 @@ type asyncBulkIndexerWorker struct { flushTimeout time.Duration flushBytes int - stats *bulkIndexerStats - logger *zap.Logger failedDocsInputLogger *zap.Logger + telemetryBuilder *metadata.TelemetryBuilder } func (w *asyncBulkIndexerWorker) run() { @@ -346,28 +374,100 @@ func (w *asyncBulkIndexerWorker) run() { } func (w *asyncBulkIndexerWorker) flush() { + // TODO (lahsivjar): Should use proper context else client metadata will not be accessible ctx := context.Background() - stat, _ := flushBulkIndexer(ctx, w.indexer, w.flushTimeout, w.logger, w.failedDocsInputLogger) - w.stats.docsIndexed.Add(stat.Indexed) + // ignore error as we they should be already logged and for async we don't propagate errors + _ = flushBulkIndexer( + ctx, + w.indexer, + w.flushTimeout, + w.telemetryBuilder, + w.logger, + w.failedDocsInputLogger, + ) } func flushBulkIndexer( ctx context.Context, bi *docappender.BulkIndexer, timeout time.Duration, + tb *metadata.TelemetryBuilder, logger *zap.Logger, failedDocsInputLogger *zap.Logger, -) (docappender.BulkIndexerResponseStat, error) { +) error { + itemsCount := bi.Items() + if itemsCount == 0 { + return nil + } if timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } stat, err := bi.Flush(ctx) + if flushed := bi.BytesFlushed(); flushed > 0 { + tb.ElasticsearchFlushedBytes.Add(ctx, int64(flushed)) + } + if flushed := bi.BytesUncompressedFlushed(); flushed > 0 { + tb.ElasticsearchFlushedUncompressedBytes.Add(ctx, int64(flushed)) + } if err != nil { logger.Error("bulk indexer flush error", zap.Error(err)) - } + var bulkFailedErr docappender.ErrorFlushFailed + switch { + case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded): + attrSet := metric.WithAttributeSet(attribute.NewSet( + attribute.String("outcome", "timeout"), + )) + tb.ElasticsearchDocsProcessed.Add(ctx, int64(itemsCount), attrSet) + tb.ElasticsearchBulkRequestsCount.Add(ctx, int64(1), attrSet) + case errors.As(err, &bulkFailedErr): + var outcome string + code := bulkFailedErr.StatusCode() + switch { + case code == http.StatusTooManyRequests: + outcome = "too_many" + case code >= 500: + outcome = "failed_server" + case code >= 400: + outcome = "failed_client" + } + attrSet := metric.WithAttributeSet(attribute.NewSet( + semconv.HTTPResponseStatusCode(bulkFailedErr.StatusCode()), + attribute.String("outcome", outcome), + )) + tb.ElasticsearchDocsProcessed.Add(ctx, int64(itemsCount), attrSet) + tb.ElasticsearchBulkRequestsCount.Add(ctx, int64(1), attrSet) + default: + attrSet := metric.WithAttributeSet(attribute.NewSet( + attribute.String("outcome", "internal_server_error"), + )) + tb.ElasticsearchDocsProcessed.Add(ctx, int64(itemsCount), attrSet) + tb.ElasticsearchBulkRequestsCount.Add(ctx, int64(1), attrSet) + } + } else { + // Record a successful completed bulk request + tb.ElasticsearchBulkRequestsCount.Add( + ctx, + int64(1), + metric.WithAttributeSet(attribute.NewSet( + attribute.String("outcome", "success"), + )), + ) + } + + var tooManyReqs, clientFailed, serverFailed int64 for _, resp := range stat.FailedDocs { + // Collect telemetry + switch { + case resp.Status == http.StatusTooManyRequests: + tooManyReqs++ + case resp.Status >= 500: + serverFailed++ + case resp.Status >= 400: + clientFailed++ + } + // Log failed docs fields := []zap.Field{ zap.String("index", resp.Index), zap.String("error.type", resp.Error.Type), @@ -383,7 +483,76 @@ func flushBulkIndexer( } failedDocsInputLogger.Debug("failed to index document; input may contain sensitive data", fields...) } - return stat, err + if stat.Indexed > 0 { + tb.ElasticsearchDocsProcessed.Add( + ctx, + stat.Indexed, + metric.WithAttributeSet(attribute.NewSet( + attribute.String("outcome", "success"), + )), + ) + } + if tooManyReqs > 0 { + tb.ElasticsearchDocsProcessed.Add( + ctx, + tooManyReqs, + metric.WithAttributeSet(attribute.NewSet( + attribute.String("outcome", "too_many"), + )), + ) + } + if clientFailed > 0 { + tb.ElasticsearchDocsProcessed.Add( + ctx, + clientFailed, + metric.WithAttributeSet(attribute.NewSet( + attribute.String("outcome", "failed_client"), + )), + ) + } + if serverFailed > 0 { + tb.ElasticsearchDocsProcessed.Add( + ctx, + serverFailed, + metric.WithAttributeSet(attribute.NewSet( + attribute.String("outcome", "failed_server"), + )), + ) + } + if stat.FailureStoreDocs.Used > 0 { + tb.ElasticsearchDocsProcessed.Add( + ctx, + stat.FailureStoreDocs.Used, + metric.WithAttributeSet(attribute.NewSet( + attribute.String("outcome", "failure_store"), + attribute.String("failure_store", string(docappender.FailureStoreStatusUsed)), + )), + ) + } + if stat.FailureStoreDocs.Failed > 0 { + tb.ElasticsearchDocsProcessed.Add( + ctx, + stat.FailureStoreDocs.Failed, + metric.WithAttributeSet(attribute.NewSet( + attribute.String("outcome", "failure_store"), + attribute.String("failure_store", string(docappender.FailureStoreStatusFailed)), + )), + ) + } + if stat.FailureStoreDocs.NotEnabled > 0 { + tb.ElasticsearchDocsProcessed.Add( + ctx, + stat.FailureStoreDocs.NotEnabled, + metric.WithAttributeSet(attribute.NewSet( + attribute.String("outcome", "failure_store"), + attribute.String("failure_store", string(docappender.FailureStoreStatusNotEnabled)), + )), + ) + } + if stat.RetriedDocs > 0 { + tb.ElasticsearchDocsRetried.Add(ctx, stat.RetriedDocs) + } + return err } func getErrorHint(index, errorType string) string { @@ -415,6 +584,8 @@ type bulkIndexers struct { profilingStackTraces bulkIndexer // For profiling-stacktraces profilingStackFrames bulkIndexer // For profiling-stackframes profilingExecutables bulkIndexer // For profiling-executables + + telemetryBuilder *metadata.TelemetryBuilder } func (b *bulkIndexers) start( @@ -439,32 +610,32 @@ func (b *bulkIndexers) start( for _, mode := range allowedMappingModes { var bi bulkIndexer - bi, err = newBulkIndexer(set.Logger, esClient, cfg, mode == MappingOTel) + bi, err = newBulkIndexer(esClient, cfg, mode == MappingOTel, b.telemetryBuilder, set.Logger) if err != nil { return err } b.modes[mode] = &wgTrackingBulkIndexer{bulkIndexer: bi, wg: &b.wg} } - profilingEvents, err := newBulkIndexer(set.Logger, esClient, cfg, true) + profilingEvents, err := newBulkIndexer(esClient, cfg, true, b.telemetryBuilder, set.Logger) if err != nil { return err } b.profilingEvents = &wgTrackingBulkIndexer{bulkIndexer: profilingEvents, wg: &b.wg} - profilingStackTraces, err := newBulkIndexer(set.Logger, esClient, cfg, false) + profilingStackTraces, err := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) if err != nil { return err } b.profilingStackTraces = &wgTrackingBulkIndexer{bulkIndexer: profilingStackTraces, wg: &b.wg} - profilingStackFrames, err := newBulkIndexer(set.Logger, esClient, cfg, false) + profilingStackFrames, err := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) if err != nil { return err } b.profilingStackFrames = &wgTrackingBulkIndexer{bulkIndexer: profilingStackFrames, wg: &b.wg} - profilingExecutables, err := newBulkIndexer(set.Logger, esClient, cfg, false) + profilingExecutables, err := newBulkIndexer(esClient, cfg, false, b.telemetryBuilder, set.Logger) if err != nil { return err } diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 673e88a6e1490..0621034e1c870 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -17,11 +17,19 @@ import ( "github.com/elastic/go-elasticsearch/v8" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadatatest" ) var defaultRoundTripFunc = func(*http.Request) (*http.Response, error) { @@ -66,9 +74,7 @@ func TestAsyncBulkIndexer_flushOnClose(t *testing.T) { }}) require.NoError(t, err) - bulkIndexer := runBulkIndexerOnce(t, &cfg, client) - - assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) + runBulkIndexerOnce(t, &cfg, client) } func TestAsyncBulkIndexer_flush(t *testing.T) { @@ -99,28 +105,62 @@ func TestAsyncBulkIndexer_flush(t *testing.T) { }}) require.NoError(t, err) - bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config, false) + ct := componenttest.NewTelemetry() + tb, err := metadata.NewTelemetryBuilder( + metadatatest.NewSettings(ct).TelemetrySettings, + ) + require.NoError(t, err) + bulkIndexer, err := newAsyncBulkIndexer(client, &tt.config, false, tb, zap.NewNop()) require.NoError(t, err) session := bulkIndexer.StartSession(context.Background()) assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) // should flush time.Sleep(100 * time.Millisecond) - assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) assert.NoError(t, session.Flush(context.Background())) session.End() assert.NoError(t, bulkIndexer.Close(context.Background())) + // Assert internal telemetry metrics + metadatatest.AssertEqualElasticsearchBulkRequestsCount(t, ct, []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "success"), + ), + }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualElasticsearchDocsReceived(t, ct, []metricdata.DataPoint[int64]{ + {Value: 1}, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualElasticsearchDocsProcessed(t, ct, []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "success"), + ), + }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualElasticsearchFlushedUncompressedBytes(t, ct, []metricdata.DataPoint[int64]{ + {Value: 43}, // hard-coding the flush bytes since the input is fixed + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualElasticsearchFlushedBytes(t, ct, []metricdata.DataPoint[int64]{ + {Value: 43}, // hard-coding the flush bytes since the input is fixed + }, metricdatatest.IgnoreTimestamp()) }) } } func TestAsyncBulkIndexer_flush_error(t *testing.T) { tests := []struct { - name string - roundTripFunc func(*http.Request) (*http.Response, error) - logFailedDocsInput bool - wantMessage string - wantFields []zap.Field + name string + roundTripFunc func(*http.Request) (*http.Response, error) + logFailedDocsInput bool + retrySettings RetrySettings + wantMessage string + wantFields []zap.Field + wantESBulkReqs *metricdata.DataPoint[int64] + wantESDocsProcessed *metricdata.DataPoint[int64] + wantESDocsRetried *metricdata.DataPoint[int64] }{ { name: "500", @@ -132,6 +172,20 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { }, nil }, wantMessage: "bulk indexer flush error", + wantESBulkReqs: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "failed_server"), + semconv.HTTPResponseStatusCode(500), + ), + }, + wantESDocsProcessed: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "failed_server"), + semconv.HTTPResponseStatusCode(500), + ), + }, }, { name: "429", @@ -143,6 +197,62 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { }, nil }, wantMessage: "bulk indexer flush error", + wantESBulkReqs: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "too_many"), + semconv.HTTPResponseStatusCode(429), + ), + }, + wantESDocsProcessed: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "too_many"), + semconv.HTTPResponseStatusCode(429), + ), + }, + }, + { + name: "429/with_retry", + roundTripFunc: func(*http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, + Body: io.NopCloser(strings.NewReader( + `{"items":[{"create":{"_index":"test","status":429}}]}`)), + }, nil + }, + retrySettings: RetrySettings{Enabled: true, MaxRetries: 5, RetryOnStatus: []int{429}}, + wantESBulkReqs: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "success"), + ), + }, + wantESDocsRetried: &metricdata.DataPoint[int64]{Value: 1}, + }, + { + name: "500/doc_level", + roundTripFunc: func(*http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, + Body: io.NopCloser(strings.NewReader( + `{"items":[{"create":{"_index":"test","status":500,"error":{"type":"internal_server_error","reason":""}}}]}`)), + }, nil + }, + wantESBulkReqs: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "success"), + ), + }, + wantESDocsProcessed: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "failed_server"), + ), + }, }, { name: "transport error", @@ -150,6 +260,18 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { return nil, errors.New("transport error") }, wantMessage: "bulk indexer flush error", + wantESBulkReqs: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "internal_server_error"), + ), + }, + wantESDocsProcessed: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "internal_server_error"), + ), + }, }, { name: "known version conflict error", @@ -163,6 +285,18 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { }, wantMessage: "failed to index document", wantFields: []zap.Field{zap.String("hint", "check the \"Known issues\" section of Elasticsearch Exporter docs")}, + wantESBulkReqs: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "success"), + ), + }, + wantESDocsProcessed: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "failed_client"), + ), + }, }, { name: "known version conflict error with logFailedDocsInput", @@ -182,13 +316,29 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { {"foo": "bar"} `), }, + wantESBulkReqs: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "success"), + ), + }, + wantESDocsProcessed: &metricdata.DataPoint[int64]{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "failed_client"), + ), + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() - cfg := Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 1}} + cfg := Config{ + NumWorkers: 1, + Flush: FlushSettings{Interval: time.Hour, Bytes: 1}, + Retry: tt.retrySettings, + } if tt.logFailedDocsInput { cfg.LogFailedDocsInput = true } @@ -198,7 +348,12 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { require.NoError(t, err) core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel)) - bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg, false) + ct := componenttest.NewTelemetry() + tb, err := metadata.NewTelemetryBuilder( + metadatatest.NewSettings(ct).TelemetrySettings, + ) + require.NoError(t, err) + bulkIndexer, err := newAsyncBulkIndexer(client, &cfg, false, tb, zap.New(core)) require.NoError(t, err) defer bulkIndexer.Close(context.Background()) @@ -206,14 +361,37 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) // should flush time.Sleep(100 * time.Millisecond) - assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) - messages := observed.FilterMessage(tt.wantMessage) - require.Equal(t, 1, messages.Len(), "message not found; observed.All()=%v", observed.All()) - for _, wantField := range tt.wantFields { - assert.Equal(t, 1, messages.FilterField(wantField).Len(), "message with field not found; observed.All()=%v", observed.All()) + if tt.wantMessage != "" { + messages := observed.FilterMessage(tt.wantMessage) + require.Equal(t, 1, messages.Len(), "message not found; observed.All()=%v", observed.All()) + for _, wantField := range tt.wantFields { + assert.Equal(t, 1, messages.FilterField(wantField).Len(), "message with field not found; observed.All()=%v", observed.All()) + } } assert.NoError(t, session.Flush(context.Background())) session.End() + // Assert internal telemetry metrics + if tt.wantESBulkReqs != nil { + metadatatest.AssertEqualElasticsearchBulkRequestsCount( + t, ct, + []metricdata.DataPoint[int64]{*tt.wantESBulkReqs}, + metricdatatest.IgnoreTimestamp(), + ) + } + if tt.wantESDocsProcessed != nil { + metadatatest.AssertEqualElasticsearchDocsProcessed( + t, ct, + []metricdata.DataPoint[int64]{*tt.wantESDocsProcessed}, + metricdatatest.IgnoreTimestamp(), + ) + } + if tt.wantESDocsRetried != nil { + metadatatest.AssertEqualElasticsearchDocsRetried( + t, ct, + []metricdata.DataPoint[int64]{*tt.wantESDocsRetried}, + metricdatatest.IgnoreTimestamp(), + ) + } }) } } @@ -287,7 +465,12 @@ func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) { } func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Client) *asyncBulkIndexer { - bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, config, false) + ct := componenttest.NewTelemetry() + tb, err := metadata.NewTelemetryBuilder( + metadatatest.NewSettings(ct).TelemetrySettings, + ) + require.NoError(t, err) + bulkIndexer, err := newAsyncBulkIndexer(client, config, false, tb, zap.NewNop()) require.NoError(t, err) session := bulkIndexer.StartSession(context.Background()) @@ -295,6 +478,35 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie assert.NoError(t, session.Flush(context.Background())) session.End() assert.NoError(t, bulkIndexer.Close(context.Background())) + // Assert internal telemetry metrics + metadatatest.AssertEqualElasticsearchBulkRequestsCount(t, ct, []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "success"), + ), + }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualElasticsearchDocsReceived(t, ct, []metricdata.DataPoint[int64]{ + {Value: 1}, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualElasticsearchDocsProcessed(t, ct, []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "success"), + ), + }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualElasticsearchFlushedUncompressedBytes(t, ct, []metricdata.DataPoint[int64]{ + {Value: 43}, // hard-coding the flush bytes since the input is fixed + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualElasticsearchFlushedBytes( + t, ct, + []metricdata.DataPoint[int64]{{}}, + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue(), // compression can change in test, ignore value + ) return bulkIndexer } @@ -316,7 +528,12 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { }}) require.NoError(t, err) - bi := newSyncBulkIndexer(zap.NewNop(), client, &cfg, false) + ct := componenttest.NewTelemetry() + tb, err := metadata.NewTelemetryBuilder( + metadatatest.NewSettings(ct).TelemetrySettings, + ) + require.NoError(t, err) + bi := newSyncBulkIndexer(client, &cfg, false, tb, zap.NewNop()) session := bi.StartSession(context.Background()) assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) @@ -324,4 +541,30 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { assert.NoError(t, session.Flush(context.Background())) session.End() assert.NoError(t, bi.Close(context.Background())) + // Assert internal telemetry metrics + metadatatest.AssertEqualElasticsearchBulkRequestsCount(t, ct, []metricdata.DataPoint[int64]{ + { + Value: 1, // empty session flush should be a no-op + Attributes: attribute.NewSet( + attribute.String("outcome", "success"), + ), + }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualElasticsearchDocsReceived(t, ct, []metricdata.DataPoint[int64]{ + {Value: 1}, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualElasticsearchDocsProcessed(t, ct, []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("outcome", "success"), + ), + }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualElasticsearchFlushedBytes(t, ct, []metricdata.DataPoint[int64]{ + {Value: 43}, // hard-coding the flush bytes since the input is fixed + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualElasticsearchFlushedUncompressedBytes(t, ct, []metricdata.DataPoint[int64]{ + {Value: 43}, // hard-coding the flush bytes since the input is fixed + }, metricdatatest.IgnoreTimestamp()) } diff --git a/exporter/elasticsearchexporter/documentation.md b/exporter/elasticsearchexporter/documentation.md new file mode 100644 index 0000000000000..4b9c50420f2c8 --- /dev/null +++ b/exporter/elasticsearchexporter/documentation.md @@ -0,0 +1,72 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# elasticsearch + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### otelcol.elasticsearch.bulk_requests.count + +Count of the completed bulk requests. [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +#### Attributes + +| Name | Description | Values | +| ---- | ----------- | ------ | +| outcome | The operation outcome. | Str: ``success``, ``failed_client``, ``failed_server``, ``timeout``, ``too_many``, ``failure_store``, ``internal_server_error`` | +| http_status_code | HTTP status code. | Any Int | + +### otelcol.elasticsearch.docs.processed + +Count of documents flushed to Elasticsearch. [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +#### Attributes + +| Name | Description | Values | +| ---- | ----------- | ------ | +| outcome | The operation outcome. | Str: ``success``, ``failed_client``, ``failed_server``, ``timeout``, ``too_many``, ``failure_store``, ``internal_server_error`` | +| http_status_code | HTTP status code. | Any Int | +| failure_store | The status of the failure store. | Str: ``unknown``, ``not_enabled``, ``used``, ``failed`` | + +### otelcol.elasticsearch.docs.received + +Count of Elasticsearch documents successfully received to be buffered. [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### otelcol.elasticsearch.docs.retried + +Count of document retries. [alpha] + +Only document level retries are captured, whole bulk request retries are not captured. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### otelcol.elasticsearch.flushed.bytes + +Number of bytes flushed by the indexer. [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| By | Sum | Int | true | + +### otelcol.elasticsearch.flushed.uncompressed_bytes + +Number of uncompressed bytes flushed by the indexer. [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| By | Sum | Int | true | diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 58d7591dffce8..30c78bbab47ce 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -23,6 +23,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/datapoints" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metricgroup" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer" @@ -44,6 +45,11 @@ type elasticsearchExporter struct { } func newExporter(cfg *Config, set exporter.Settings, index string) (*elasticsearchExporter, error) { + telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, fmt.Errorf("failed to initialize internal telemetry: %w", err) + } + allowedMappingModes := cfg.allowedMappingModes() defaultMappingMode := allowedMappingModes[canonicalMappingModeName(cfg.Mapping.Mode)] exporter := &elasticsearchExporter{ @@ -54,6 +60,7 @@ func newExporter(cfg *Config, set exporter.Settings, index string) (*elasticsear allowedMappingModes: allowedMappingModes, defaultMappingMode: defaultMappingMode, bufferPool: pool.NewBufferPool(), + bulkIndexers: bulkIndexers{telemetryBuilder: telemetryBuilder}, } for mappingMode := range NumMappingModes { encoder, err := newEncoder(mappingMode) diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index 3b501574b70fa..8e20a6f2266f1 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -38,6 +38,9 @@ require ( go.opentelemetry.io/collector/pdata/pprofile v0.129.1-0.20250710172626-1ebb9b20cfa5 go.opentelemetry.io/ebpf-profiler v0.0.0-20250212075250-7bf12d3f962f go.opentelemetry.io/otel v1.37.0 + go.opentelemetry.io/otel/metric v1.37.0 + go.opentelemetry.io/otel/sdk/metric v1.37.0 + go.opentelemetry.io/otel/trace v1.37.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 golang.org/x/sync v0.15.0 @@ -103,10 +106,7 @@ require ( go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect go.opentelemetry.io/otel/log v0.13.0 // indirect - go.opentelemetry.io/otel/metric v1.37.0 // indirect go.opentelemetry.io/otel/sdk v1.37.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.37.0 // indirect - go.opentelemetry.io/otel/trace v1.37.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.39.0 // indirect diff --git a/exporter/elasticsearchexporter/internal/metadata/generated_telemetry.go b/exporter/elasticsearchexporter/internal/metadata/generated_telemetry.go new file mode 100644 index 0000000000000..9c3fbf0b270a1 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/metadata/generated_telemetry.go @@ -0,0 +1,103 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "errors" + "sync" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" +) + +func Meter(settings component.TelemetrySettings) metric.Meter { + return settings.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter") +} + +func Tracer(settings component.TelemetrySettings) trace.Tracer { + return settings.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter") +} + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + mu sync.Mutex + registrations []metric.Registration + ElasticsearchBulkRequestsCount metric.Int64Counter + ElasticsearchDocsProcessed metric.Int64Counter + ElasticsearchDocsReceived metric.Int64Counter + ElasticsearchDocsRetried metric.Int64Counter + ElasticsearchFlushedBytes metric.Int64Counter + ElasticsearchFlushedUncompressedBytes metric.Int64Counter +} + +// TelemetryBuilderOption applies changes to default builder. +type TelemetryBuilderOption interface { + apply(*TelemetryBuilder) +} + +type telemetryBuilderOptionFunc func(mb *TelemetryBuilder) + +func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { + tbof(mb) +} + +// Shutdown unregister all registered callbacks for async instruments. +func (builder *TelemetryBuilder) Shutdown() { + builder.mu.Lock() + defer builder.mu.Unlock() + for _, reg := range builder.registrations { + reg.Unregister() + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{} + for _, op := range options { + op.apply(&builder) + } + builder.meter = Meter(settings) + var err, errs error + builder.ElasticsearchBulkRequestsCount, err = builder.meter.Int64Counter( + "otelcol.elasticsearch.bulk_requests.count", + metric.WithDescription("Count of the completed bulk requests. [alpha]"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ElasticsearchDocsProcessed, err = builder.meter.Int64Counter( + "otelcol.elasticsearch.docs.processed", + metric.WithDescription("Count of documents flushed to Elasticsearch. [alpha]"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ElasticsearchDocsReceived, err = builder.meter.Int64Counter( + "otelcol.elasticsearch.docs.received", + metric.WithDescription("Count of Elasticsearch documents successfully received to be buffered. [alpha]"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ElasticsearchDocsRetried, err = builder.meter.Int64Counter( + "otelcol.elasticsearch.docs.retried", + metric.WithDescription("Count of document retries. [alpha]"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ElasticsearchFlushedBytes, err = builder.meter.Int64Counter( + "otelcol.elasticsearch.flushed.bytes", + metric.WithDescription("Number of bytes flushed by the indexer. [alpha]"), + metric.WithUnit("By"), + ) + errs = errors.Join(errs, err) + builder.ElasticsearchFlushedUncompressedBytes, err = builder.meter.Int64Counter( + "otelcol.elasticsearch.flushed.uncompressed_bytes", + metric.WithDescription("Number of uncompressed bytes flushed by the indexer. [alpha]"), + metric.WithUnit("By"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/exporter/elasticsearchexporter/internal/metadata/generated_telemetry_test.go b/exporter/elasticsearchexporter/internal/metadata/generated_telemetry_test.go new file mode 100644 index 0000000000000..c88b62bec8215 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/metadata/generated_telemetry_test.go @@ -0,0 +1,74 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + embeddedmetric "go.opentelemetry.io/otel/metric/embedded" + noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + embeddedtrace "go.opentelemetry.io/otel/trace/embedded" + nooptrace "go.opentelemetry.io/otel/trace/noop" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" +) + +type mockMeter struct { + noopmetric.Meter + name string +} +type mockMeterProvider struct { + embeddedmetric.MeterProvider +} + +func (m mockMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return mockMeter{name: name} +} + +type mockTracer struct { + nooptrace.Tracer + name string +} + +type mockTracerProvider struct { + embeddedtrace.TracerProvider +} + +func (m mockTracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer { + return mockTracer{name: name} +} + +func TestProviders(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + + meter := Meter(set) + if m, ok := meter.(mockMeter); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter", m.name) + } else { + require.Fail(t, "returned Meter not mockMeter") + } + + tracer := Tracer(set) + if m, ok := tracer.(mockTracer); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter", m.name) + } else { + require.Fail(t, "returned Meter not mockTracer") + } +} + +func TestNewTelemetryBuilder(t *testing.T) { + set := componenttest.NewNopTelemetrySettings() + applied := false + _, err := NewTelemetryBuilder(set, telemetryBuilderOptionFunc(func(b *TelemetryBuilder) { + applied = true + })) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/exporter/elasticsearchexporter/internal/metadatatest/generated_telemetrytest.go b/exporter/elasticsearchexporter/internal/metadatatest/generated_telemetrytest.go new file mode 100644 index 0000000000000..9f429ca8cfdff --- /dev/null +++ b/exporter/elasticsearchexporter/internal/metadatatest/generated_telemetrytest.go @@ -0,0 +1,118 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +func NewSettings(tt *componenttest.Telemetry) exporter.Settings { + set := exportertest.NewNopSettings(exportertest.NopType) + set.ID = component.NewID(component.MustNewType("elasticsearch")) + set.TelemetrySettings = tt.NewTelemetrySettings() + return set +} + +func AssertEqualElasticsearchBulkRequestsCount(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol.elasticsearch.bulk_requests.count", + Description: "Count of the completed bulk requests. [alpha]", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol.elasticsearch.bulk_requests.count") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualElasticsearchDocsProcessed(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol.elasticsearch.docs.processed", + Description: "Count of documents flushed to Elasticsearch. [alpha]", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol.elasticsearch.docs.processed") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualElasticsearchDocsReceived(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol.elasticsearch.docs.received", + Description: "Count of Elasticsearch documents successfully received to be buffered. [alpha]", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol.elasticsearch.docs.received") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualElasticsearchDocsRetried(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol.elasticsearch.docs.retried", + Description: "Count of document retries. [alpha]", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol.elasticsearch.docs.retried") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualElasticsearchFlushedBytes(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol.elasticsearch.flushed.bytes", + Description: "Number of bytes flushed by the indexer. [alpha]", + Unit: "By", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol.elasticsearch.flushed.bytes") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualElasticsearchFlushedUncompressedBytes(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol.elasticsearch.flushed.uncompressed_bytes", + Description: "Number of uncompressed bytes flushed by the indexer. [alpha]", + Unit: "By", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol.elasticsearch.flushed.uncompressed_bytes") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} diff --git a/exporter/elasticsearchexporter/internal/metadatatest/generated_telemetrytest_test.go b/exporter/elasticsearchexporter/internal/metadatatest/generated_telemetrytest_test.go new file mode 100644 index 0000000000000..7d94ee7e777e1 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/metadatatest/generated_telemetrytest_test.go @@ -0,0 +1,49 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" + + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestSetupTelemetry(t *testing.T) { + testTel := componenttest.NewTelemetry() + tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings()) + require.NoError(t, err) + defer tb.Shutdown() + tb.ElasticsearchBulkRequestsCount.Add(context.Background(), 1) + tb.ElasticsearchDocsProcessed.Add(context.Background(), 1) + tb.ElasticsearchDocsReceived.Add(context.Background(), 1) + tb.ElasticsearchDocsRetried.Add(context.Background(), 1) + tb.ElasticsearchFlushedBytes.Add(context.Background(), 1) + tb.ElasticsearchFlushedUncompressedBytes.Add(context.Background(), 1) + AssertEqualElasticsearchBulkRequestsCount(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualElasticsearchDocsProcessed(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualElasticsearchDocsReceived(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualElasticsearchDocsRetried(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualElasticsearchFlushedBytes(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualElasticsearchFlushedUncompressedBytes(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + + require.NoError(t, testTel.Shutdown(context.Background())) +} diff --git a/exporter/elasticsearchexporter/metadata.yaml b/exporter/elasticsearchexporter/metadata.yaml index 71b7be98a7c20..d287dbf04b806 100644 --- a/exporter/elasticsearchexporter/metadata.yaml +++ b/exporter/elasticsearchexporter/metadata.yaml @@ -9,6 +9,85 @@ status: codeowners: active: [JaredTan95, carsonip, lahsivjar] +attributes: + http_status_code: + description: HTTP status code. + type: int + outcome: + description: The operation outcome. + type: string + enum: [success, failed_client, failed_server, timeout, too_many, failure_store, internal_server_error] + failure_store: + description: The status of the failure store. + type: string + enum: [unknown, not_enabled, used, failed] + +telemetry: + metrics: + elasticsearch.bulk_requests.count: + prefix: otelcol. + stability: + level: alpha + enabled: true + description: Count of the completed bulk requests. + unit: "1" + sum: + value_type: int + monotonic: true + attributes: [outcome, http_status_code] + elasticsearch.docs.received: + prefix: otelcol. + stability: + level: alpha + enabled: true + description: Count of Elasticsearch documents successfully received to be buffered. + unit: "1" + sum: + value_type: int + monotonic: true + elasticsearch.docs.processed: + prefix: otelcol. + stability: + level: alpha + enabled: true + description: Count of documents flushed to Elasticsearch. + unit: "1" + sum: + value_type: int + monotonic: true + attributes: [outcome, http_status_code, failure_store] + elasticsearch.docs.retried: + prefix: otelcol. + stability: + level: alpha + enabled: true + description: Count of document retries. + extended_documentation: Only document level retries are captured, whole bulk request retries are not captured. + unit: "1" + sum: + value_type: int + monotonic: true + elasticsearch.flushed.bytes: + prefix: otelcol. + stability: + level: alpha + enabled: true + description: Number of bytes flushed by the indexer. + unit: By + sum: + value_type: int + monotonic: true + elasticsearch.flushed.uncompressed_bytes: + prefix: otelcol. + stability: + level: alpha + enabled: true + description: Number of uncompressed bytes flushed by the indexer. + unit: By + sum: + value_type: int + monotonic: true + tests: config: endpoints: [http://localhost:9200]