Skip to content

Commit

Permalink
Merge branch 'main' into feature/elasticsearch_otel_model
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksmaus authored Jun 28, 2024
2 parents 3fba3f7 + 90a0c90 commit 3efdbec
Show file tree
Hide file tree
Showing 13 changed files with 292 additions and 59 deletions.
27 changes: 27 additions & 0 deletions .chloggen/call_cpu_info_only_on_demand.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: resourcedetectionprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fetch CPU info only if related attributes are enabled

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33774]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
27 changes: 27 additions & 0 deletions .chloggen/elasticsearch-deprecate-dedot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/elasticsearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate the "dedot" configuration.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33772]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: dedot has been deprecated, and will always be enabled in ECS mode and disabled for other modes in future

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
27 changes: 27 additions & 0 deletions .chloggen/elasticsearch-deprecate-dedup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/elasticsearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate the "dedup" configuration.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33773]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: dedup has been deprecated, and will always be enabled in future.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
13 changes: 7 additions & 6 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,13 @@ behaviours, which may be configured throug the following settings:
field names for span events.
- `fields` (optional): Configure additional fields mappings.
- `file` (optional): Read additional field mappings from the provided YAML file.
- `dedup` (default=true): Try to find and remove duplicate fields/attributes
from events before publishing to Elasticsearch. Some structured logging
libraries can produce duplicate fields (for example zap). Elasticsearch
will reject documents that have duplicate fields.
- `dedot` (default=true): When enabled attributes with `.` will be split into
proper json objects.
- `dedup` (default=true; DEPRECATED, in future deduplication will always be enabled):
Try to find and remove duplicate fields/attributes from events before publishing
to Elasticsearch. Some structured logging libraries can produce duplicate fields
(for example zap). Elasticsearch will reject documents that have duplicate fields.
- `dedot` (default=true; DEPRECATED, in future dedotting will always be enabled
for ECS mode, and never for other modes): When enabled attributes with `.`
will be split into proper json objects.

#### ECS mapping mode

Expand Down
17 changes: 17 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"
)

// Config defines configuration for Elastic exporter.
Expand Down Expand Up @@ -158,8 +159,15 @@ type MappingsSettings struct {
File string `mapstructure:"file"`

// Try to find and remove duplicate fields
//
// Deprecated: [v0.104.0] deduplication will always be applied in future,
// with no option to disable. Disabling deduplication is not meaningful,
// as Elasticsearch will reject documents with duplicate JSON object keys.
Dedup bool `mapstructure:"dedup"`

// Deprecated: [v0.104.0] dedotting will always be applied for ECS mode
// in future, and never for other modes. Elasticsearch's "dot_expander"
// Ingest processor may be used as an alternative for non-ECS modes.
Dedot bool `mapstructure:"dedot"`
}

Expand Down Expand Up @@ -314,3 +322,12 @@ func parseCloudID(input string) (*url.URL, error) {
func (cfg *Config) MappingMode() MappingMode {
return mappingModes[cfg.Mapping.Mode]
}

func logConfigDeprecationWarnings(cfg *Config, logger *zap.Logger) {
if !cfg.Mapping.Dedup {
logger.Warn("dedup has been deprecated, and will always be enabled in future")
}
if cfg.Mapping.Dedot && cfg.MappingMode() != MappingECS || !cfg.Mapping.Dedot && cfg.MappingMode() == MappingECS {
logger.Warn("dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only")
}
}
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func createLogsExporter(
set.Logger.Warn("index option are deprecated and replaced with logs_index and traces_index.")
index = cf.Index
}
logConfigDeprecationWarnings(cf, set.Logger)

exporter, err := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled)
if err != nil {
Expand All @@ -115,6 +116,7 @@ func createMetricsExporter(
cfg component.Config,
) (exporter.Metrics, error) {
cf := cfg.(*Config)
logConfigDeprecationWarnings(cf, set.Logger)

exporter, err := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled)
if err != nil {
Expand All @@ -136,6 +138,7 @@ func createTracesExporter(ctx context.Context,
cfg component.Config) (exporter.Traces, error) {

cf := cfg.(*Config)
logConfigDeprecationWarnings(cf, set.Logger)

exporter, err := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled)
if err != nil {
Expand Down
72 changes: 72 additions & 0 deletions exporter/elasticsearchexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand Down Expand Up @@ -94,3 +96,73 @@ func TestFactory_CreateLogsAndTracesExporterWithDeprecatedIndexOption(t *testing
require.NotNil(t, tracesExporter)
require.NoError(t, tracesExporter.Shutdown(context.Background()))
}

func TestFactory_DedupDeprecated(t *testing.T) {
factory := NewFactory()
cfg := withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "http://testing.invalid:9200"
cfg.Mapping.Dedup = false
cfg.Mapping.Dedot = false // avoid dedot warnings
})

loggerCore, logObserver := observer.New(zap.WarnLevel)
set := exportertest.NewNopSettings()
set.Logger = zap.New(loggerCore)

logsExporter, err := factory.CreateLogsExporter(context.Background(), set, cfg)
require.NoError(t, err)
require.NoError(t, logsExporter.Shutdown(context.Background()))

tracesExporter, err := factory.CreateTracesExporter(context.Background(), set, cfg)
require.NoError(t, err)
require.NoError(t, tracesExporter.Shutdown(context.Background()))

metricsExporter, err := factory.CreateMetricsExporter(context.Background(), set, cfg)
require.NoError(t, err)
require.NoError(t, metricsExporter.Shutdown(context.Background()))

records := logObserver.AllUntimed()
assert.Len(t, records, 3)
assert.Equal(t, "dedup has been deprecated, and will always be enabled in future", records[0].Message)
assert.Equal(t, "dedup has been deprecated, and will always be enabled in future", records[1].Message)
assert.Equal(t, "dedup has been deprecated, and will always be enabled in future", records[2].Message)
}

func TestFactory_DedotDeprecated(t *testing.T) {
loggerCore, logObserver := observer.New(zap.WarnLevel)
set := exportertest.NewNopSettings()
set.Logger = zap.New(loggerCore)

cfgNoDedotECS := withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "http://testing.invalid:9200"
cfg.Mapping.Dedot = false
cfg.Mapping.Mode = "ecs"
})

cfgDedotRaw := withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "http://testing.invalid:9200"
cfg.Mapping.Dedot = true
cfg.Mapping.Mode = "raw"
})

for _, cfg := range []*Config{cfgNoDedotECS, cfgDedotRaw} {
factory := NewFactory()
logsExporter, err := factory.CreateLogsExporter(context.Background(), set, cfg)
require.NoError(t, err)
require.NoError(t, logsExporter.Shutdown(context.Background()))

tracesExporter, err := factory.CreateTracesExporter(context.Background(), set, cfg)
require.NoError(t, err)
require.NoError(t, tracesExporter.Shutdown(context.Background()))

metricsExporter, err := factory.CreateMetricsExporter(context.Background(), set, cfg)
require.NoError(t, err)
require.NoError(t, metricsExporter.Shutdown(context.Background()))
}

records := logObserver.AllUntimed()
assert.Len(t, records, 6)
for _, record := range records {
assert.Equal(t, "dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only", record.Message)
}
}
74 changes: 24 additions & 50 deletions exporter/sumologicexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type sumologicexporter struct {
stickySessionCookieLock sync.RWMutex
stickySessionCookie string

id component.ID
id component.ID
sender *sender
}

func initExporter(cfg *Config, createSettings exporter.Settings) *sumologicexporter {
Expand Down Expand Up @@ -229,6 +230,21 @@ func (se *sumologicexporter) configure(ctx context.Context) error {
}

se.setHTTPClient(client)

logsURL, metricsURL, tracesURL := se.getDataURLs()
se.sender = newSender(
se.logger,
se.config,
se.getHTTPClient(),
se.prometheusFormatter,
metricsURL,
logsURL,
tracesURL,
se.StickySessionCookie,
se.SetStickySessionCookie,
se.id,
)

return nil
}

Expand Down Expand Up @@ -265,23 +281,9 @@ func (se *sumologicexporter) shutdown(context.Context) error {
// It returns the number of unsent logs and an error which contains a list of dropped records
// so they can be handled by OTC retry mechanism
func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
logsURL, metricsURL, tracesURL := se.getDataURLs()
sdr := newSender(
se.logger,
se.config,
se.getHTTPClient(),
se.prometheusFormatter,
metricsURL,
logsURL,
tracesURL,
se.StickySessionCookie,
se.SetStickySessionCookie,
se.id,
)

// Follow different execution path for OTLP format
if sdr.config.LogFormat == OTLPLogFormat {
if err := sdr.sendOTLPLogs(ctx, ld); err != nil {
if se.sender.config.LogFormat == OTLPLogFormat {
if err := se.sender.sendOTLPLogs(ctx, ld); err != nil {
se.handleUnauthorizedErrors(ctx, err)
return consumererror.NewLogs(err, ld)
}
Expand All @@ -304,7 +306,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err

currentMetadata := newFields(rl.Resource().Attributes())

if droppedRecords, err := sdr.sendNonOTLPLogs(ctx, rl, currentMetadata); err != nil {
if droppedRecords, err := se.sender.sendNonOTLPLogs(ctx, rl, currentMetadata); err != nil {
dropped = append(dropped, droppedResourceRecords{
resource: rl.Resource(),
records: droppedRecords,
Expand Down Expand Up @@ -342,29 +344,15 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err
// it returns number of unsent metrics and error which contains list of dropped records
// so they can be handle by the OTC retry mechanism
func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Metrics) error {
logsURL, metricsURL, tracesURL := se.getDataURLs()
sdr := newSender(
se.logger,
se.config,
se.getHTTPClient(),
se.prometheusFormatter,
metricsURL,
logsURL,
tracesURL,
se.StickySessionCookie,
se.SetStickySessionCookie,
se.id,
)

var droppedMetrics pmetric.Metrics
var errs []error
if sdr.config.MetricFormat == OTLPMetricFormat {
if err := sdr.sendOTLPMetrics(ctx, md); err != nil {
if se.sender.config.MetricFormat == OTLPMetricFormat {
if err := se.sender.sendOTLPMetrics(ctx, md); err != nil {
droppedMetrics = md
errs = []error{err}
}
} else {
droppedMetrics, errs = sdr.sendNonOTLPMetrics(ctx, md)
droppedMetrics, errs = se.sender.sendNonOTLPMetrics(ctx, md)
}

if len(errs) > 0 {
Expand Down Expand Up @@ -394,21 +382,7 @@ func (se *sumologicexporter) handleUnauthorizedErrors(ctx context.Context, errs
}

func (se *sumologicexporter) pushTracesData(ctx context.Context, td ptrace.Traces) error {
logsURL, metricsURL, tracesURL := se.getDataURLs()
sdr := newSender(
se.logger,
se.config,
se.getHTTPClient(),
se.prometheusFormatter,
metricsURL,
logsURL,
tracesURL,
se.StickySessionCookie,
se.SetStickySessionCookie,
se.id,
)

err := sdr.sendTraces(ctx, td)
err := se.sender.sendTraces(ctx, td)
se.handleUnauthorizedErrors(ctx, err)
return err
}
Expand Down
Loading

0 comments on commit 3efdbec

Please sign in to comment.