Skip to content

Commit

Permalink
[exporter/clickhouse] use observed timestamp in logs if timestamp is …
Browse files Browse the repository at this point in the history
…missing

Some senders, such as the rust-opentelemetry do _not_ send timestamp
in their logs. In these cases, if the timestamp is not set, we must
use the observed timestamp instead.
  • Loading branch information
pimeys committed Jul 26, 2024
1 parent 79c0bf1 commit 56cceb8
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 1 deletion.
31 changes: 31 additions & 0 deletions .chloggen/use-observed-timestamp-if-timestamp-is-missing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: clickhouseexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Use observed timestamp if timestamp is zero

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34150]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
Some opentelemetry libraries do not send timestamp for logs, but they should always send
| the observed timestamp. In these cases the clickhouse exporter just stored a zero timestamp
| to the database. This changes the behavior to look into the observed timestamp if the timestamp
| is zero.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
11 changes: 10 additions & 1 deletion exporter/clickhouseexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
_ = statement.Close()
}()
var serviceName string

for i := 0; i < ld.ResourceLogs().Len(); i++ {
logs := ld.ResourceLogs().At(i)
res := logs.Resource()
Expand All @@ -80,17 +81,25 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}

for j := 0; j < logs.ScopeLogs().Len(); j++ {
rs := logs.ScopeLogs().At(j).LogRecords()
scopeURL := logs.ScopeLogs().At(j).SchemaUrl()
scopeName := logs.ScopeLogs().At(j).Scope().Name()
scopeVersion := logs.ScopeLogs().At(j).Scope().Version()
scopeAttr := attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes())

for k := 0; k < rs.Len(); k++ {
r := rs.At(k)

timestamp := r.Timestamp()
if timestamp == 0 {
timestamp = r.ObservedTimestamp()
}

logAttr := attributesToMap(r.Attributes())
_, err = statement.ExecContext(ctx,
r.Timestamp().AsTime(),
timestamp.AsTime(),
traceutil.TraceIDToHexOrEmptyString(r.TraceID()),
traceutil.SpanIDToHexOrEmptyString(r.SpanID()),
uint32(r.Flags()),
Expand Down
36 changes: 36 additions & 0 deletions exporter/clickhouseexporter/exporter_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ func TestExporter_pushLogsData(t *testing.T) {
exporter := newTestLogsExporter(t, defaultEndpoint)
mustPushLogsData(t, exporter, simpleLogs(1))
})
t.Run("test with only observed timestamp", func(t *testing.T) {
initClickhouseTestServer(t, func(query string, values []driver.Value) error {
if strings.HasPrefix(query, "INSERT") {
require.NotEqual(t, "0", values[0])
}
return nil
})

exporter := newTestLogsExporter(t, defaultEndpoint)
mustPushLogsData(t, exporter, simpleLogsWithNoTimestamp(1))
})
}

func TestLogsClusterConfig(t *testing.T) {
Expand Down Expand Up @@ -181,6 +192,31 @@ func simpleLogs(count int) plog.Logs {
return logs
}

func simpleLogsWithNoTimestamp(count int) plog.Logs {
logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
rl.SetSchemaUrl("https://opentelemetry.io/schemas/1.4.0")
rl.Resource().Attributes().PutStr("service.name", "test-service")
sl := rl.ScopeLogs().AppendEmpty()
sl.SetSchemaUrl("https://opentelemetry.io/schemas/1.7.0")
sl.Scope().SetName("io.opentelemetry.contrib.clickhouse")
sl.Scope().SetVersion("1.0.0")
sl.Scope().Attributes().PutStr("lib", "clickhouse")
timestamp := time.Unix(1703498029, 0)
for i := 0; i < count; i++ {
r := sl.LogRecords().AppendEmpty()
r.SetObservedTimestamp(pcommon.NewTimestampFromTime(timestamp))
r.SetSeverityNumber(plog.SeverityNumberError2)
r.SetSeverityText("error")
r.Body().SetStr("error message")
r.Attributes().PutStr(conventions.AttributeServiceNamespace, "default")
r.SetFlags(plog.DefaultLogRecordFlags)
r.SetTraceID([16]byte{1, 2, 3, byte(i)})
r.SetSpanID([8]byte{1, 2, 3, byte(i)})
}
return logs
}

func mustPushLogsData(t *testing.T, exporter *logsExporter, ld plog.Logs) {
err := exporter.pushLogsData(context.TODO(), ld)
require.NoError(t, err)
Expand Down

0 comments on commit 56cceb8

Please sign in to comment.