Skip to content
Merged
27 changes: 27 additions & 0 deletions .chloggen/f_add-status.code-to-failed-log-messages.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: exporter/elasticsearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `http.response.status_code` to failed document logs to allow for better filtering and error analysis.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [45829]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func flushBulkIndexer(
zap.String("index", resp.Index),
zap.String("error.type", resp.Error.Type),
zap.String("error.reason", resp.Error.Reason),
zap.Int("http.response.status_code", resp.Status),
)

if hint := getErrorHint(resp.Index, resp.Error.Type); hint != "" {
Expand Down
66 changes: 66 additions & 0 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,72 @@ func TestSyncBulkIndexer(t *testing.T) {
}
}

func TestBulkIndexerLogsStatusCode(t *testing.T) {
responseBody := `{"errors": true, "items":[
{"create":{"_index":"foo-200","status":200}},
{"create":{"_index":"foo-400","status":400,"error":{"type":"error_400","reason":"status 400"}}},
{"create":{"_index":"foo-401","status":401,"error":{"type":"error_401","reason":"status 401"}}},
{"create":{"_index":"foo-429","status":429,"error":{"type":"error_429","reason":"status 429"}}},
{"create":{"_index":"foo-500","status":500,"error":{"type":"error_500","reason":"status 500"}}}
]}`
statuses := []int{400, 401, 429, 500}

cfg := Config{
QueueBatchConfig: configoptional.Default(exporterhelper.QueueBatchConfig{
NumConsumers: 1,
}),
}
esClient, err := elastictransport.New(elastictransport.Config{
URLs: []*url.URL{{Scheme: "http", Host: "localhost:9200"}},
Transport: &mockTransport{
RoundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(responseBody)),
StatusCode: http.StatusOK,
}, nil
},
},
})
require.NoError(t, err)

ct := componenttest.NewTelemetry()
tb, err := metadata.NewTelemetryBuilder(
metadatatest.NewSettings(ct).TelemetrySettings,
)
require.NoError(t, err)

core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel))
bi := newSyncBulkIndexer(esClient, &cfg, false, tb, zap.New(core))

ctx := t.Context()
session := bi.StartSession(ctx)
// Add initial document to ensure we have at least one document to process.
require.NoError(t, session.Add(ctx, "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
for range statuses {
require.NoError(t, session.Add(ctx, "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
}
require.NoError(t, session.Flush(ctx))
session.End()
assert.NoError(t, bi.Close(ctx))

messages := observed.FilterMessage("failed to index document").FilterFieldKey("http.response.status_code")
require.Equal(t, len(statuses), messages.Len(), "message not found; observed.All()=%v", observed.All())
for i, status := range statuses {
if i >= messages.Len() {
t.Errorf("expected at least %d log messages, got %d", i+1, messages.Len())
continue
}
msg := messages.All()[i]
statusCode, ok := msg.ContextMap()["http.response.status_code"]
if !ok {
t.Errorf("http.response.status_code missing in log at index %d; msg: %+v", i, msg.ContextMap())
continue
}
assert.Equal(t, int64(status), statusCode, "http.response.status_code does not match at index %d; msg: %s", i, msg.Message)
}
}

func TestQueryParamsParsedFromEndpoints(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoints = []string{"http://localhost:9200?pipeline=test-pipeline"}
Expand Down
Loading