Skip to content

Commit

Permalink
[receiver/libhoney] Libhoney receiver trace signal (open-telemetry#36902
Browse files Browse the repository at this point in the history
)

#### Description

This PR is the implementation for the traces signal related to the new
libhoney receiver.

<!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. -->
#### Link to tracking issue

open-telemetry#36693
  • Loading branch information
mterhar authored and AkhigbeEromo committed Jan 13, 2025
1 parent 7fe51d6 commit a4e421c
Show file tree
Hide file tree
Showing 7 changed files with 751 additions and 9 deletions.
27 changes: 27 additions & 0 deletions .chloggen/traces-for-libhoneyreceiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: libhoneyreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implement trace signal for libhoney receiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36693]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion receiver/libhoneyreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
go.opentelemetry.io/collector/receiver v0.116.0
go.opentelemetry.io/collector/receiver/receivertest v0.116.0
go.opentelemetry.io/collector/semconv v0.116.0
go.opentelemetry.io/otel/trace v1.32.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53
Expand Down Expand Up @@ -68,7 +69,6 @@ require (
go.opentelemetry.io/otel/metric v1.32.0 // indirect
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
Expand Down
188 changes: 186 additions & 2 deletions receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@
package libhoneyevent // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent"

import (
"crypto/rand"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"slices"
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
trc "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime"
Expand Down Expand Up @@ -87,8 +93,29 @@ func (l *LibhoneyEvent) DebugString() string {
}

// SignalType returns the type of signal this event represents. Only log is implemented for now.
func (l *LibhoneyEvent) SignalType() (string, error) {
return "log", nil
func (l *LibhoneyEvent) SignalType(logger zap.Logger) string {
if sig, ok := l.Data["meta.signal_type"]; ok {
switch sig {
case "trace":
if atype, ok := l.Data["meta.annotation_type"]; ok {
if atype == "span_event" {
return "span_event"
} else if atype == "link" {
return "span_link"
}
logger.Warn("invalid annotation type", zap.String("meta.annotation_type", atype.(string)))
return "span"
}
return "span"
case "log":
return "log"
default:
logger.Warn("invalid meta.signal_type", zap.String("meta.signal_type", sig.(string)))
return "log"
}
}
logger.Warn("missing meta.signal_type and meta.annotation_type")
return "log"
}

// GetService returns the service name from the event or the dataset name if no service name is found.
Expand Down Expand Up @@ -126,6 +153,36 @@ func (l *LibhoneyEvent) GetScope(fields FieldMapConfig, seen *ScopeHistory, serv
return "libhoney.receiver", errors.New("library name not found")
}

func spanIDFrom(s string) trc.SpanID {
hash := fnv.New64a()
hash.Write([]byte(s))
n := hash.Sum64()
sid := trc.SpanID{}
binary.LittleEndian.PutUint64(sid[:], n)
return sid
}

func traceIDFrom(s string) trc.TraceID {
hash := fnv.New64a()
hash.Write([]byte(s))
n1 := hash.Sum64()
hash.Write([]byte(s))
n2 := hash.Sum64()
tid := trc.TraceID{}
binary.LittleEndian.PutUint64(tid[:], n1)
binary.LittleEndian.PutUint64(tid[8:], n2)
return tid
}

func generateAnId(length int) []byte {
token := make([]byte, length)
_, err := rand.Read(token)
if err != nil {
return []byte{}
}
return token
}

// SimpleScope is a simple struct to hold the scope data
type SimpleScope struct {
ServiceName string
Expand Down Expand Up @@ -198,3 +255,130 @@ func (l *LibhoneyEvent) ToPLogRecord(newLog *plog.LogRecord, alreadyUsedFields *
}
return nil
}

// GetParentID returns the parent id from the event or an error if it's not found
func (l *LibhoneyEvent) GetParentID(fieldName string) (trc.SpanID, error) {
if pid, ok := l.Data[fieldName]; ok {
pid := strings.ReplaceAll(pid.(string), "-", "")
pidByteArray, err := hex.DecodeString(pid)
if err == nil {
if len(pidByteArray) == 32 {
pidByteArray = pidByteArray[8:24]
} else if len(pidByteArray) >= 16 {
pidByteArray = pidByteArray[0:16]
}
return trc.SpanID(pidByteArray), nil
}
return trc.SpanID{}, errors.New("parent id is not a valid span id")
}
return trc.SpanID{}, errors.New("parent id not found")
}

// ToPTraceSpan converts a LibhoneyEvent to a Pdata Span
func (l *LibhoneyEvent) ToPTraceSpan(newSpan *ptrace.Span, alreadyUsedFields *[]string, cfg FieldMapConfig, logger zap.Logger) error {
time_ns := l.MsgPackTimestamp.UnixNano()
logger.Debug("processing trace with", zap.Int64("timestamp", time_ns))

var parent_id trc.SpanID
if pid, ok := l.Data[cfg.Attributes.ParentID]; ok {
parent_id = spanIDFrom(pid.(string))
newSpan.SetParentSpanID(pcommon.SpanID(parent_id))
}

duration_ms := 0.0
for _, df := range cfg.Attributes.DurationFields {
if duration, okay := l.Data[df]; okay {
duration_ms = duration.(float64)
break
}
}
end_timestamp := time_ns + (int64(duration_ms) * 1000000)

if tid, ok := l.Data[cfg.Attributes.TraceID]; ok {
tid := strings.ReplaceAll(tid.(string), "-", "")
tidByteArray, err := hex.DecodeString(tid)
if err == nil {
if len(tidByteArray) >= 32 {
tidByteArray = tidByteArray[0:32]
}
newSpan.SetTraceID(pcommon.TraceID(tidByteArray))
} else {
newSpan.SetTraceID(pcommon.TraceID(traceIDFrom(tid)))
}
} else {
newSpan.SetTraceID(pcommon.TraceID(generateAnId(32)))
}

if sid, ok := l.Data[cfg.Attributes.SpanID]; ok {
sid := strings.ReplaceAll(sid.(string), "-", "")
sidByteArray, err := hex.DecodeString(sid)
if err == nil {
if len(sidByteArray) == 32 {
sidByteArray = sidByteArray[8:24]
} else if len(sidByteArray) >= 16 {
sidByteArray = sidByteArray[0:16]
}
newSpan.SetSpanID(pcommon.SpanID(sidByteArray))
} else {
newSpan.SetSpanID(pcommon.SpanID(spanIDFrom(sid)))
}
} else {
newSpan.SetSpanID(pcommon.SpanID(generateAnId(16)))
}

newSpan.SetStartTimestamp(pcommon.Timestamp(time_ns))
newSpan.SetEndTimestamp(pcommon.Timestamp(end_timestamp))

if spanName, ok := l.Data[cfg.Attributes.Name]; ok {
newSpan.SetName(spanName.(string))
}
if spanStatusMessge, ok := l.Data["status_message"]; ok {
newSpan.Status().SetMessage(spanStatusMessge.(string))
}
newSpan.Status().SetCode(ptrace.StatusCodeUnset)

if _, ok := l.Data[cfg.Attributes.Error]; ok {
newSpan.Status().SetCode(ptrace.StatusCodeError)
}

if spanKind, ok := l.Data[cfg.Attributes.SpanKind]; ok {
switch spanKind.(string) {
case "server":
newSpan.SetKind(ptrace.SpanKindServer)
case "client":
newSpan.SetKind(ptrace.SpanKindClient)
case "producer":
newSpan.SetKind(ptrace.SpanKindProducer)
case "consumer":
newSpan.SetKind(ptrace.SpanKindConsumer)
case "internal":
newSpan.SetKind(ptrace.SpanKindInternal)
default:
newSpan.SetKind(ptrace.SpanKindUnspecified)
}
}

newSpan.Attributes().PutInt("SampleRate", int64(l.Samplerate))

for k, v := range l.Data {
if slices.Contains(*alreadyUsedFields, k) {
continue
}
switch v := v.(type) {
case string:
newSpan.Attributes().PutStr(k, v)
case int:
newSpan.Attributes().PutInt(k, int64(v))
case int64, int16, int32:
intv := v.(int64)
newSpan.Attributes().PutInt(k, intv)
case float64:
newSpan.Attributes().PutDouble(k, v)
case bool:
newSpan.Attributes().PutBool(k, v)
default:
logger.Warn("Span data type issue", zap.String("trace.trace_id", newSpan.TraceID().String()), zap.String("trace.span_id", newSpan.SpanID().String()), zap.String("key", k))
}
}
return nil
}
107 changes: 107 additions & 0 deletions receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -292,3 +293,109 @@ func TestLibhoneyEvent_GetScope(t *testing.T) {
})
}
}

func TestToPTraceSpan(t *testing.T) {
now := time.Now()
tests := []struct {
name string
event LibhoneyEvent
want func(ptrace.Span)
wantErr bool
}{
{
name: "basic span conversion",
event: LibhoneyEvent{
Time: now.Format(time.RFC3339),
MsgPackTimestamp: &now,
Data: map[string]any{
"name": "test-span",
"trace.span_id": "1234567890abcdef",
"trace.trace_id": "1234567890abcdef1234567890abcdef",
"duration_ms": 100.0,
"error": true,
"status_message": "error message",
"kind": "server",
"string_attr": "value",
"int_attr": 42,
"bool_attr": true,
},
Samplerate: 1,
},
want: func(s ptrace.Span) {
s.SetName("test-span")
s.SetSpanID([8]byte{0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef})
s.SetTraceID([16]byte{0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef})
s.SetStartTimestamp(pcommon.NewTimestampFromTime(now))
s.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(100 * time.Millisecond)))
s.Status().SetCode(ptrace.StatusCodeError)
s.Status().SetMessage("error message")
s.SetKind(ptrace.SpanKindServer)
s.Attributes().PutStr("string_attr", "value")
s.Attributes().PutInt("int_attr", 42)
s.Attributes().PutBool("bool_attr", true)
},
},
}

alreadyUsedFields := []string{"name", "trace.span_id", "trace.trace_id", "duration_ms", "status.code", "status.message", "kind"}
testCfg := FieldMapConfig{
Attributes: AttributesConfig{
Name: "name",
TraceID: "trace.trace_id",
SpanID: "trace.span_id",
ParentID: "trace.parent_id",
Error: "error",
SpanKind: "kind",
DurationFields: []string{"duration_ms"},
},
Resources: ResourcesConfig{
ServiceName: "service.name",
},
Scopes: ScopesConfig{
LibraryName: "library.name",
LibraryVersion: "library.version",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
span := ptrace.NewSpan()
err := tt.event.ToPTraceSpan(&span, &alreadyUsedFields, testCfg, *zap.NewNop())

if tt.wantErr {
assert.Error(t, err)
return
}

require.NoError(t, err)
if tt.want != nil {
want := ptrace.NewSpan()
tt.want(want)

// Check basic fields
assert.Equal(t, want.Name(), span.Name())
assert.Equal(t, want.SpanID(), span.SpanID())
assert.Equal(t, want.TraceID(), span.TraceID())
assert.Equal(t, want.StartTimestamp(), span.StartTimestamp())
assert.Equal(t, want.EndTimestamp(), span.EndTimestamp())
assert.Equal(t, want.Kind(), span.Kind())

// Check status
assert.Equal(t, want.Status().Code(), span.Status().Code())
assert.Equal(t, want.Status().Message(), span.Status().Message())

// Check attributes
want.Attributes().Range(func(k string, v pcommon.Value) bool {
got, ok := span.Attributes().Get(k)
assert.True(t, ok, "missing attribute %s", k)
assert.Equal(t, v.Type(), got.Type(), "wrong type for attribute %s", k)
assert.Equal(t, v, got, "wrong value for attribute %s", k)
return true
})

// Verify no fewer attributes, extras are expected
assert.LessOrEqual(t, want.Attributes().Len(), span.Attributes().Len())
}
})
}
}
Loading

0 comments on commit a4e421c

Please sign in to comment.