diff --git a/.chloggen/use-observed-timestamp-if-timestamp-is-missing.yaml b/.chloggen/use-observed-timestamp-if-timestamp-is-missing.yaml new file mode 100644 index 000000000000..26138dcce0f6 --- /dev/null +++ b/.chloggen/use-observed-timestamp-if-timestamp-is-missing.yaml @@ -0,0 +1,31 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: clickhouseexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Use observed timestamp if timestamp is zero + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34150] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + Some opentelemetry libraries do not send timestamp for logs, but they should always send + | the observed timestamp. In these cases the clickhouse exporter just stored a zero timestamp + | to the database. This changes the behavior to look into the observed timestamp if the timestamp + | is zero. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/clickhouseexporter/exporter_logs.go b/exporter/clickhouseexporter/exporter_logs.go index 921e468fd1e0..9fa18e3f21d4 100644 --- a/exporter/clickhouseexporter/exporter_logs.go +++ b/exporter/clickhouseexporter/exporter_logs.go @@ -72,6 +72,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { _ = statement.Close() }() var serviceName string + for i := 0; i < ld.ResourceLogs().Len(); i++ { logs := ld.ResourceLogs().At(i) res := logs.Resource() @@ -80,17 +81,25 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { serviceName = v.Str() } + for j := 0; j < logs.ScopeLogs().Len(); j++ { rs := logs.ScopeLogs().At(j).LogRecords() scopeURL := logs.ScopeLogs().At(j).SchemaUrl() scopeName := logs.ScopeLogs().At(j).Scope().Name() scopeVersion := logs.ScopeLogs().At(j).Scope().Version() scopeAttr := attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes()) + for k := 0; k < rs.Len(); k++ { r := rs.At(k) + + timestamp := r.Timestamp() + if timestamp == 0 { + timestamp = r.ObservedTimestamp() + } + logAttr := attributesToMap(r.Attributes()) _, err = statement.ExecContext(ctx, - r.Timestamp().AsTime(), + timestamp.AsTime(), traceutil.TraceIDToHexOrEmptyString(r.TraceID()), traceutil.SpanIDToHexOrEmptyString(r.SpanID()), uint32(r.Flags()), diff --git a/exporter/clickhouseexporter/exporter_logs_test.go b/exporter/clickhouseexporter/exporter_logs_test.go index aa3ff11acded..4856317a7cdd 100644 --- a/exporter/clickhouseexporter/exporter_logs_test.go +++ b/exporter/clickhouseexporter/exporter_logs_test.go @@ -119,6 +119,17 @@ func TestExporter_pushLogsData(t *testing.T) { exporter := newTestLogsExporter(t, defaultEndpoint) mustPushLogsData(t, exporter, simpleLogs(1)) }) + t.Run("test with only observed timestamp", func(t *testing.T) { + initClickhouseTestServer(t, func(query string, values []driver.Value) error { + if strings.HasPrefix(query, "INSERT") { + require.NotEqual(t, "0", values[0]) + } + return nil + }) + + exporter := newTestLogsExporter(t, defaultEndpoint) + mustPushLogsData(t, exporter, simpleLogsWithNoTimestamp(1)) + }) } func TestLogsClusterConfig(t *testing.T) { @@ -181,6 +192,31 @@ func simpleLogs(count int) plog.Logs { return logs } +func simpleLogsWithNoTimestamp(count int) plog.Logs { + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + rl.SetSchemaUrl("https://opentelemetry.io/schemas/1.4.0") + rl.Resource().Attributes().PutStr("service.name", "test-service") + sl := rl.ScopeLogs().AppendEmpty() + sl.SetSchemaUrl("https://opentelemetry.io/schemas/1.7.0") + sl.Scope().SetName("io.opentelemetry.contrib.clickhouse") + sl.Scope().SetVersion("1.0.0") + sl.Scope().Attributes().PutStr("lib", "clickhouse") + timestamp := time.Unix(1703498029, 0) + for i := 0; i < count; i++ { + r := sl.LogRecords().AppendEmpty() + r.SetObservedTimestamp(pcommon.NewTimestampFromTime(timestamp)) + r.SetSeverityNumber(plog.SeverityNumberError2) + r.SetSeverityText("error") + r.Body().SetStr("error message") + r.Attributes().PutStr(conventions.AttributeServiceNamespace, "default") + r.SetFlags(plog.DefaultLogRecordFlags) + r.SetTraceID([16]byte{1, 2, 3, byte(i)}) + r.SetSpanID([8]byte{1, 2, 3, byte(i)}) + } + return logs +} + func mustPushLogsData(t *testing.T, exporter *logsExporter, ld plog.Logs) { err := exporter.pushLogsData(context.TODO(), ld) require.NoError(t, err)