From 128b62e946806ca5850c1b4c3aaf5a52da6ea3f7 Mon Sep 17 00:00:00 2001 From: Mike Terhar Date: Thu, 19 Dec 2024 10:17:03 -0500 Subject: [PATCH] [receiver/libhoney] Libhoney receiver log signal (#36827) #### Description This PR is the implementation for the logs signal related to the new libhoney receiver. #### Link to tracking issue #36693 --- .chloggen/logs-for-libhoneyreceiver.yaml | 27 ++ receiver/libhoneyreceiver/README.md | 29 +- receiver/libhoneyreceiver/config.go | 34 +- receiver/libhoneyreceiver/config_test.go | 29 ++ receiver/libhoneyreceiver/encoder/encoder.go | 125 ++++++++ receiver/libhoneyreceiver/factory.go | 33 +- receiver/libhoneyreceiver/factory_test.go | 47 +++ receiver/libhoneyreceiver/go.mod | 26 +- receiver/libhoneyreceiver/go.sum | 6 + .../internal/eventtime/eventtime.go | 64 ++++ .../internal/libhoneyevent/libhoneyevent.go | 200 ++++++++++++ .../libhoneyevent/libhoneyevent_test.go | 294 ++++++++++++++++++ .../internal/parser/parser.go | 88 ++++++ receiver/libhoneyreceiver/libhoney.go | 127 -------- receiver/libhoneyreceiver/receiver.go | 291 +++++++++++++++++ receiver/libhoneyreceiver/receiver_test.go | 276 ++++++++++++++++ 16 files changed, 1508 insertions(+), 188 deletions(-) create mode 100644 .chloggen/logs-for-libhoneyreceiver.yaml create mode 100644 receiver/libhoneyreceiver/config_test.go create mode 100644 receiver/libhoneyreceiver/encoder/encoder.go create mode 100644 receiver/libhoneyreceiver/factory_test.go create mode 100644 receiver/libhoneyreceiver/internal/eventtime/eventtime.go create mode 100644 receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go create mode 100644 receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go create mode 100644 receiver/libhoneyreceiver/internal/parser/parser.go delete mode 100644 receiver/libhoneyreceiver/libhoney.go create mode 100644 receiver/libhoneyreceiver/receiver.go create mode 100644 receiver/libhoneyreceiver/receiver_test.go diff --git a/.chloggen/logs-for-libhoneyreceiver.yaml b/.chloggen/logs-for-libhoneyreceiver.yaml new file mode 100644 index 000000000000..dc90a4fbe50b --- /dev/null +++ b/.chloggen/logs-for-libhoneyreceiver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: libhoneyreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement log signal for libhoney receiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36693] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/receiver/libhoneyreceiver/README.md b/receiver/libhoneyreceiver/README.md index a87c8735d5d0..a765c45383f4 100644 --- a/receiver/libhoneyreceiver/README.md +++ b/receiver/libhoneyreceiver/README.md @@ -45,20 +45,21 @@ The following setting is required for refinery traffic since: - "/1/batch" include_metadata: true auth_api: https://api.honeycomb.io - resources: - service_name: service_name - scopes: - library_name: library.name - library_version: library.version - attributes: - trace_id: trace_id - parent_id: parent_id - span_id: span_id - name: name - error: error - spankind: span.kind - durationFields: - - duration_ms + fields: + resources: + service_name: service_name + scopes: + library_name: library.name + library_version: library.version + attributes: + trace_id: trace_id + parent_id: parent_id + span_id: span_id + name: name + error: error + spankind: span.kind + durationFields: + - duration_ms ``` ### Telemetry data types supported diff --git a/receiver/libhoneyreceiver/config.go b/receiver/libhoneyreceiver/config.go index abfd6476dbd1..49602fcfa9d3 100644 --- a/receiver/libhoneyreceiver/config.go +++ b/receiver/libhoneyreceiver/config.go @@ -11,18 +11,19 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/confmap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" ) // Config represents the receiver config settings within the collector's config.yaml type Config struct { - HTTP *HTTPConfig `mapstructure:"http"` - AuthAPI string `mapstructure:"auth_api"` - Wrapper string `mapstructure:"wrapper"` - Resources ResourcesConfig `mapstructure:"resources"` - Scopes ScopesConfig `mapstructure:"scopes"` - Attributes AttributesConfig `mapstructure:"attributes"` + HTTP *HTTPConfig `mapstructure:"http"` + AuthAPI string `mapstructure:"auth_api"` + Wrapper string `mapstructure:"wrapper"` + FieldMapConfig libhoneyevent.FieldMapConfig `mapstructure:"fields"` } +// HTTPConfig defines the configuration for the HTTP server receiving traces. type HTTPConfig struct { *confighttp.ServerConfig `mapstructure:",squash"` @@ -30,25 +31,7 @@ type HTTPConfig struct { TracesURLPaths []string `mapstructure:"traces_url_paths,omitempty"` } -type ResourcesConfig struct { - ServiceName string `mapstructure:"service_name"` -} - -type ScopesConfig struct { - LibraryName string `mapstructure:"library_name"` - LibraryVersion string `mapstructure:"library_version"` -} - -type AttributesConfig struct { - TraceID string `mapstructure:"trace_id"` - ParentID string `mapstructure:"parent_id"` - SpanID string `mapstructure:"span_id"` - Name string `mapstructure:"name"` - Error string `mapstructure:"error"` - SpanKind string `mapstructure:"spankind"` - DurationFields []string `mapstructure:"durationFields"` -} - +// Validate ensures the HTTP configuration is set. func (cfg *Config) Validate() error { if cfg.HTTP == nil { return errors.New("must specify at least one protocol when using the arbitrary JSON receiver") @@ -56,6 +39,7 @@ func (cfg *Config) Validate() error { return nil } +// Unmarshal unmarshals the configuration from the given configuration and then checks for errors. func (cfg *Config) Unmarshal(conf *confmap.Conf) error { // first load the config normally err := conf.Unmarshal(cfg) diff --git a/receiver/libhoneyreceiver/config_test.go b/receiver/libhoneyreceiver/config_test.go new file mode 100644 index 000000000000..1d3f6b55dc71 --- /dev/null +++ b/receiver/libhoneyreceiver/config_test.go @@ -0,0 +1,29 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestCreateDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, componenttest.CheckConfigStruct(cfg)) + + libhoneyCfg, ok := cfg.(*Config) + require.True(t, ok, "invalid Config type") + + assert.Equal(t, "localhost:8080", libhoneyCfg.HTTP.Endpoint) + assert.Equal(t, []string{"/events", "/event", "/batch"}, libhoneyCfg.HTTP.TracesURLPaths) + assert.Equal(t, "", libhoneyCfg.AuthAPI) + assert.Equal(t, "service.name", libhoneyCfg.FieldMapConfig.Resources.ServiceName) + assert.Equal(t, "library.name", libhoneyCfg.FieldMapConfig.Scopes.LibraryName) + assert.Equal(t, []string{"duration_ms"}, libhoneyCfg.FieldMapConfig.Attributes.DurationFields) +} diff --git a/receiver/libhoneyreceiver/encoder/encoder.go b/receiver/libhoneyreceiver/encoder/encoder.go new file mode 100644 index 000000000000..b0a998ef310c --- /dev/null +++ b/receiver/libhoneyreceiver/encoder/encoder.go @@ -0,0 +1,125 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package encoder // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/encoder" + +import ( + "bytes" + + "github.com/gogo/protobuf/jsonpb" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + spb "google.golang.org/genproto/googleapis/rpc/status" +) + +const ( + PbContentType = "application/x-protobuf" + JsonContentType = "application/json" + MsgpackContentType = "application/x-msgpack" +) + +var ( + JsEncoder = &JsonEncoder{} + JsonPbMarshaler = &jsonpb.Marshaler{} + MpEncoder = &msgpackEncoder{} +) + +type Encoder interface { + UnmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) + UnmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) + UnmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) + + MarshalTracesResponse(ptraceotlp.ExportResponse) ([]byte, error) + MarshalMetricsResponse(pmetricotlp.ExportResponse) ([]byte, error) + MarshalLogsResponse(plogotlp.ExportResponse) ([]byte, error) + + MarshalStatus(rsp *spb.Status) ([]byte, error) + + ContentType() string +} + +type JsonEncoder struct{} + +func (JsonEncoder) UnmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) { + req := ptraceotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (JsonEncoder) UnmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (JsonEncoder) UnmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) { + req := plogotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (JsonEncoder) MarshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (JsonEncoder) MarshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (JsonEncoder) MarshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (JsonEncoder) MarshalStatus(resp *spb.Status) ([]byte, error) { + buf := new(bytes.Buffer) + err := JsonPbMarshaler.Marshal(buf, resp) + return buf.Bytes(), err +} + +func (JsonEncoder) ContentType() string { + return JsonContentType +} + +// messagepack responses seem to work in JSON so leaving this alone for now. +type msgpackEncoder struct{} + +func (msgpackEncoder) UnmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) { + req := ptraceotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (msgpackEncoder) UnmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (msgpackEncoder) UnmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) { + req := plogotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (msgpackEncoder) MarshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (msgpackEncoder) MarshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (msgpackEncoder) MarshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (msgpackEncoder) MarshalStatus(resp *spb.Status) ([]byte, error) { + buf := new(bytes.Buffer) + err := JsonPbMarshaler.Marshal(buf, resp) + return buf.Bytes(), err +} + +func (msgpackEncoder) ContentType() string { + return MsgpackContentType +} diff --git a/receiver/libhoneyreceiver/factory.go b/receiver/libhoneyreceiver/factory.go index 4d0d0fa25cfa..02ab9dcf1855 100644 --- a/receiver/libhoneyreceiver/factory.go +++ b/receiver/libhoneyreceiver/factory.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/receiver" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/metadata" ) @@ -44,21 +45,23 @@ func createDefaultConfig() component.Config { TracesURLPaths: defaultTracesURLPaths, }, AuthAPI: "", - Resources: ResourcesConfig{ - ServiceName: "service.name", - }, - Scopes: ScopesConfig{ - LibraryName: "library.name", - LibraryVersion: "library.version", - }, - Attributes: AttributesConfig{ - TraceID: "trace.trace_id", - SpanID: "trace.span_id", - ParentID: "trace.parent_id", - Name: "name", - Error: "error", - SpanKind: "span.kind", - DurationFields: durationFieldsArr, + FieldMapConfig: libhoneyevent.FieldMapConfig{ + Resources: libhoneyevent.ResourcesConfig{ + ServiceName: "service.name", + }, + Scopes: libhoneyevent.ScopesConfig{ + LibraryName: "library.name", + LibraryVersion: "library.version", + }, + Attributes: libhoneyevent.AttributesConfig{ + TraceID: "trace.trace_id", + SpanID: "trace.span_id", + ParentID: "trace.parent_id", + Name: "name", + Error: "error", + SpanKind: "span.kind", + DurationFields: durationFieldsArr, + }, }, } } diff --git a/receiver/libhoneyreceiver/factory_test.go b/receiver/libhoneyreceiver/factory_test.go new file mode 100644 index 000000000000..9e369d4fd17e --- /dev/null +++ b/receiver/libhoneyreceiver/factory_test.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyreceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/metadata" +) + +func TestCreateTracesReceiver(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + set := receivertest.NewNopSettings() + tReceiver, err := factory.CreateTraces(context.Background(), set, cfg, consumertest.NewNop()) + + assert.NoError(t, err, "receiver creation failed") + assert.NotNil(t, tReceiver, "receiver creation failed") + + assert.NoError(t, tReceiver.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, tReceiver.Shutdown(context.Background())) +} + +func TestCreateLogsReceiver(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + set := receivertest.NewNopSettings() + lReceiver, err := factory.CreateLogs(context.Background(), set, cfg, consumertest.NewNop()) + + assert.NoError(t, err, "receiver creation failed") + assert.NotNil(t, lReceiver, "receiver creation failed") + + assert.NoError(t, lReceiver.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, lReceiver.Shutdown(context.Background())) +} + +func TestType(t *testing.T) { + factory := NewFactory() + assert.Equal(t, metadata.Type, factory.Type()) +} diff --git a/receiver/libhoneyreceiver/go.mod b/receiver/libhoneyreceiver/go.mod index 8a845b28a26c..e3e9a5634fc2 100644 --- a/receiver/libhoneyreceiver/go.mod +++ b/receiver/libhoneyreceiver/go.mod @@ -3,17 +3,28 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhon go 1.22.0 require ( + github.com/gogo/protobuf v1.3.2 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.115.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.116.0 github.com/stretchr/testify v1.10.0 + github.com/vmihailenco/msgpack/v5 v5.4.1 + go.opentelemetry.io/collector/component v0.116.0 go.opentelemetry.io/collector/component/componenttest v0.116.0 go.opentelemetry.io/collector/config/confighttp v0.116.0 go.opentelemetry.io/collector/confmap v1.22.0 go.opentelemetry.io/collector/consumer v1.22.0 go.opentelemetry.io/collector/consumer/consumertest v0.116.0 + go.opentelemetry.io/collector/pdata v1.22.0 + go.opentelemetry.io/collector/receiver v0.116.0 go.opentelemetry.io/collector/receiver/receivertest v0.116.0 + go.opentelemetry.io/collector/semconv v0.116.0 go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.0 + google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 ) require ( + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.116.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.116.0 // indirect ) @@ -25,7 +36,6 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect - github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -37,12 +47,10 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.116.0 github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.11.1 // indirect go.opentelemetry.io/collector/client v1.22.0 // indirect - go.opentelemetry.io/collector/component v0.116.0 go.opentelemetry.io/collector/component/componentstatus v0.116.0 go.opentelemetry.io/collector/config/configauth v0.116.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.22.0 // indirect @@ -53,10 +61,8 @@ require ( go.opentelemetry.io/collector/consumer/consumererror v0.116.0 // indirect go.opentelemetry.io/collector/extension v0.116.0 // indirect go.opentelemetry.io/collector/extension/auth v0.116.0 // indirect - go.opentelemetry.io/collector/pdata v1.22.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.116.0 // indirect go.opentelemetry.io/collector/pipeline v0.116.0 // indirect - go.opentelemetry.io/collector/receiver v0.116.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.32.0 // indirect @@ -64,11 +70,9 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect go.opentelemetry.io/otel/trace v1.32.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 golang.org/x/net v0.32.0 // indirect golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect google.golang.org/grpc v1.69.0 // indirect google.golang.org/protobuf v1.35.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect @@ -77,3 +81,11 @@ require ( replace google.golang.org/genproto => google.golang.org/genproto v0.0.0-20240701130421-f6361c86f094 replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/receiver/libhoneyreceiver/go.sum b/receiver/libhoneyreceiver/go.sum index d20a4d1fa883..32bb83e1775b 100644 --- a/receiver/libhoneyreceiver/go.sum +++ b/receiver/libhoneyreceiver/go.sum @@ -60,6 +60,10 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/collector/client v1.22.0 h1:AAUzHuqYQqxoNqacw1WXgGF/MxtBTwNZuhBvJIorgA0= @@ -114,6 +118,8 @@ go.opentelemetry.io/collector/receiver/receivertest v0.116.0 h1:ZF4QVcots0OUiutb go.opentelemetry.io/collector/receiver/receivertest v0.116.0/go.mod h1:7GGvtHhW3o6457/wGtSWXJtCtlW6VGFUZSlf6wboNTw= go.opentelemetry.io/collector/receiver/xreceiver v0.116.0 h1:Kc+ixqgMjU2sHhzNrFn5TttVNiJlJwTLL3sQrM9uH6s= go.opentelemetry.io/collector/receiver/xreceiver v0.116.0/go.mod h1:H2YGSNFoMbWMIDvB8tzkReHSVqvogihjtet+ppHfYv8= +go.opentelemetry.io/collector/semconv v0.116.0 h1:63xCZomsKJAWmKGWD3lnORiE3WKW6AO4LjnzcHzGx3Y= +go.opentelemetry.io/collector/semconv v0.116.0/go.mod h1:N6XE8Q0JKgBN2fAhkUQtqK9LT7rEGR6+Wu/Rtbal1iI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= diff --git a/receiver/libhoneyreceiver/internal/eventtime/eventtime.go b/receiver/libhoneyreceiver/internal/eventtime/eventtime.go new file mode 100644 index 000000000000..b7317b9f06f8 --- /dev/null +++ b/receiver/libhoneyreceiver/internal/eventtime/eventtime.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package eventtime // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime" + +import ( + "math" + "strconv" + "time" +) + +func GetNowTime() time.Time { + return time.Now() +} + +func GetEventTime(etHeader string) time.Time { + var eventTime time.Time + if etHeader != "" { + // Great, they sent us a time header. let's try and parse it. + // RFC3339Nano is the default that we send from all our SDKs + eventTime, _ = time.Parse(time.RFC3339Nano, etHeader) + if eventTime.IsZero() { + // the default didn't catch it, let's try a few other things + // is it all numeric? then try unix epoch times + epochInt, err := strconv.ParseInt(etHeader, 0, 64) + if err == nil { + // it might be seconds or it might be milliseconds! Who can know! + // 10-digit numbers are seconds, 13-digit milliseconds, 16 microseconds + if len(etHeader) == 10 { + eventTime = time.Unix(epochInt, 0) + } else if len(etHeader) > 10 { + // turn it into seconds and fractional seconds + fractionalTime := etHeader[:10] + "." + etHeader[10:] + // then chop it into the int part and the fractional part + if epochFloat, err := strconv.ParseFloat(fractionalTime, 64); err == nil { + sec, dec := math.Modf(epochFloat) + eventTime = time.Unix(int64(sec), int64(dec*(1e9))) + } + } + } else { + epochFloat, err := strconv.ParseFloat(etHeader, 64) + if err == nil { + sec, dec := math.Modf(epochFloat) + eventTime = time.Unix(int64(sec), int64(dec*(1e9))) + } + } + } + } + return eventTime.UTC() +} + +func GetEventTimeSec(etHeader string) int64 { + eventTime := GetEventTime(etHeader) + return eventTime.Unix() +} + +func GetEventTimeNano(etHeader string) int64 { + eventTime := GetEventTime(etHeader) + return eventTime.UnixNano() +} + +func GetEventTimeDefaultString() string { + return time.Now().Format(time.RFC3339Nano) +} diff --git a/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go new file mode 100644 index 000000000000..5519e304f138 --- /dev/null +++ b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go @@ -0,0 +1,200 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyevent // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" + +import ( + "encoding/json" + "errors" + "fmt" + "slices" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime" +) + +// FieldMapConfig is used to map the fields from the LibhoneyEvent to PData formats +type FieldMapConfig struct { + Resources ResourcesConfig `mapstructure:"resources"` + Scopes ScopesConfig `mapstructure:"scopes"` + Attributes AttributesConfig `mapstructure:"attributes"` +} + +// ResourcesConfig is used to map the fields from the LibhoneyEvent to PData formats +type ResourcesConfig struct { + ServiceName string `mapstructure:"service_name"` +} + +// ScopesConfig is used to map the fields from the LibhoneyEvent to PData formats +type ScopesConfig struct { + LibraryName string `mapstructure:"library_name"` + LibraryVersion string `mapstructure:"library_version"` +} + +// AttributesConfig is used to map the fields from the LibhoneyEvent to PData formats +type AttributesConfig struct { + TraceID string `mapstructure:"trace_id"` + ParentID string `mapstructure:"parent_id"` + SpanID string `mapstructure:"span_id"` + Name string `mapstructure:"name"` + Error string `mapstructure:"error"` + SpanKind string `mapstructure:"spankind"` + DurationFields []string `mapstructure:"durationFields"` +} + +// LibhoneyEvent is the event structure from libhoney +type LibhoneyEvent struct { + Samplerate int `json:"samplerate" msgpack:"samplerate"` + MsgPackTimestamp *time.Time `msgpack:"time"` + Time string `json:"time"` // should not be trusted. use MsgPackTimestamp + Data map[string]any `json:"data" msgpack:"data"` +} + +// UnmarshalJSON overrides the unmarshall to make sure the MsgPackTimestamp is set +func (l *LibhoneyEvent) UnmarshalJSON(j []byte) error { + type _libhoneyEvent LibhoneyEvent + tstr := eventtime.GetEventTimeDefaultString() + tzero := time.Time{} + tmp := _libhoneyEvent{Time: "none", MsgPackTimestamp: &tzero, Samplerate: 1} + + err := json.Unmarshal(j, &tmp) + if err != nil { + return err + } + if tmp.MsgPackTimestamp.IsZero() && tmp.Time == "none" { + // neither timestamp was set. give it right now. + tmp.Time = tstr + tnow := time.Now() + tmp.MsgPackTimestamp = &tnow + } + if tmp.MsgPackTimestamp.IsZero() { + propertime := eventtime.GetEventTime(tmp.Time) + tmp.MsgPackTimestamp = &propertime + } + + *l = LibhoneyEvent(tmp) + return nil +} + +// DebugString returns a string representation of the LibhoneyEvent +func (l *LibhoneyEvent) DebugString() string { + return fmt.Sprintf("%#v", l) +} + +// SignalType returns the type of signal this event represents. Only log is implemented for now. +func (l *LibhoneyEvent) SignalType() (string, error) { + return "log", nil +} + +// GetService returns the service name from the event or the dataset name if no service name is found. +func (l *LibhoneyEvent) GetService(fields FieldMapConfig, seen *ServiceHistory, dataset string) (string, error) { + if serviceName, ok := l.Data[fields.Resources.ServiceName]; ok { + seen.NameCount[serviceName.(string)]++ + return serviceName.(string), nil + } + return dataset, errors.New("no service.name found in event") +} + +// GetScope returns the scope key for the event. If the scope has not been seen before, it creates a new one. +func (l *LibhoneyEvent) GetScope(fields FieldMapConfig, seen *ScopeHistory, serviceName string) (string, error) { + if scopeLibraryName, ok := l.Data[fields.Scopes.LibraryName]; ok { + scopeKey := serviceName + scopeLibraryName.(string) + if _, ok := seen.Scope[scopeKey]; ok { + // if we've seen it, we don't expect it to be different right away so we'll just return it. + return scopeKey, nil + } + // otherwise, we need to make a new found scope + scopeLibraryVersion := "unset" + if scopeLibVer, ok := l.Data[fields.Scopes.LibraryVersion]; ok { + scopeLibraryVersion = scopeLibVer.(string) + } + newScope := SimpleScope{ + ServiceName: serviceName, // we only set the service name once. If the same library comes from multiple services in the same batch, we're in trouble. + LibraryName: scopeLibraryName.(string), + LibraryVersion: scopeLibraryVersion, + ScopeSpans: ptrace.NewSpanSlice(), + ScopeLogs: plog.NewLogRecordSlice(), + } + seen.Scope[scopeKey] = newScope + return scopeKey, nil + } + return "libhoney.receiver", errors.New("library name not found") +} + +// SimpleScope is a simple struct to hold the scope data +type SimpleScope struct { + ServiceName string + LibraryName string + LibraryVersion string + ScopeSpans ptrace.SpanSlice + ScopeLogs plog.LogRecordSlice +} + +// ScopeHistory is a map of scope keys to the SimpleScope object +type ScopeHistory struct { + Scope map[string]SimpleScope // key here is service.name+library.name +} + +// ServiceHistory is a map of service names to the number of times they've been seen +type ServiceHistory struct { + NameCount map[string]int +} + +// ToPLogRecord converts a LibhoneyEvent to a Pdata LogRecord +func (l *LibhoneyEvent) ToPLogRecord(newLog *plog.LogRecord, alreadyUsedFields *[]string, logger zap.Logger) error { + timeNs := l.MsgPackTimestamp.UnixNano() + logger.Debug("processing log with", zap.Int64("timestamp", timeNs)) + newLog.SetTimestamp(pcommon.Timestamp(timeNs)) + + if logSevCode, ok := l.Data["severity_code"]; ok { + logSevInt := int32(logSevCode.(int64)) + newLog.SetSeverityNumber(plog.SeverityNumber(logSevInt)) + } + + if logSevText, ok := l.Data["severity_text"]; ok { + newLog.SetSeverityText(logSevText.(string)) + } + + if logFlags, ok := l.Data["flags"]; ok { + logFlagsUint := uint32(logFlags.(uint64)) + newLog.SetFlags(plog.LogRecordFlags(logFlagsUint)) + } + + // undoing this is gonna be complicated: https://github.com/honeycombio/husky/blob/91c0498333cd9f5eed1fdb8544ca486db7dea565/otlp/logs.go#L61 + if logBody, ok := l.Data["body"]; ok { + newLog.Body().SetStr(logBody.(string)) + } + + newLog.Attributes().PutInt("SampleRate", int64(l.Samplerate)) + + logFieldsAlready := []string{"severity_text", "severity_code", "flags", "body"} + for k, v := range l.Data { + if slices.Contains(*alreadyUsedFields, k) { + continue + } + if slices.Contains(logFieldsAlready, k) { + continue + } + switch v := v.(type) { + case string: + newLog.Attributes().PutStr(k, v) + case int: + newLog.Attributes().PutInt(k, int64(v)) + case int64, int16, int32: + intv := v.(int64) + newLog.Attributes().PutInt(k, intv) + case float64: + newLog.Attributes().PutDouble(k, v) + case bool: + newLog.Attributes().PutBool(k, v) + default: + logger.Warn("Span data type issue", zap.Int64("timestamp", timeNs), zap.String("key", k)) + } + } + return nil +} diff --git a/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go new file mode 100644 index 000000000000..40348d2a640a --- /dev/null +++ b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go @@ -0,0 +1,294 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyevent + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" +) + +func TestLibhoneyEvent_UnmarshalJSON(t *testing.T) { + tests := []struct { + name string + json string + want LibhoneyEvent + wantErr bool + }{ + { + name: "basic event", + json: `{ + "time": "2024-01-01T00:00:00Z", + "data": {"key": "value"}, + "samplerate": 1 + }`, + want: LibhoneyEvent{ + Time: "2024-01-01T00:00:00Z", + Data: map[string]any{"key": "value"}, + Samplerate: 1, + }, + }, + { + name: "invalid json", + json: `{invalid`, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var got LibhoneyEvent + err := json.Unmarshal([]byte(tt.json), &got) + + if tt.wantErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, tt.want.Time, got.Time) + assert.Equal(t, tt.want.Data, got.Data) + assert.Equal(t, tt.want.Samplerate, got.Samplerate) + assert.NotNil(t, got.MsgPackTimestamp) + }) + } + + test := struct { + name string + json string + want LibhoneyEvent + wantErr bool + }{ + name: "missing time uses current", + json: `{ + "data": {"key": "value"}, + "samplerate": 2 + }`, + want: LibhoneyEvent{ + Time: "", + Data: map[string]any{"key": "value"}, + Samplerate: 2, + }, + } + t.Run(test.name, func(t *testing.T) { + var got LibhoneyEvent + err := json.Unmarshal([]byte(test.json), &got) + + require.NoError(t, err) + assert.Equal(t, test.want.Data, got.Data) + gotTime, timeErr := time.Parse(time.RFC3339Nano, got.Time) + assert.NoError(t, timeErr) + assert.WithinDuration(t, time.Now(), gotTime, time.Second) + assert.Equal(t, test.want.Samplerate, got.Samplerate) + assert.NotNil(t, got.MsgPackTimestamp) + }) +} + +func TestLibHoneyEvent_ToPLogRecord(t *testing.T) { + logger := zap.NewNop() + now := time.Now() + tests := []struct { + name string + event LibhoneyEvent + alreadyUsedFields []string + want func(plog.LogRecord) + wantErr bool + }{ + { + name: "basic conversion", + event: LibhoneyEvent{ + Samplerate: 1, + MsgPackTimestamp: &now, + Data: map[string]any{ + "severity_text": "ERROR", + "severity_code": int64(2), + "body": "test message", + "string_attr": "value", + "int_attr": 42, + "float_attr": 3.14, + "bool_attr": true, + }, + }, + want: func(lr plog.LogRecord) { + lr.SetSeverityText("ERROR") + lr.SetSeverityNumber(plog.SeverityNumber(2)) + lr.Body().SetStr("test message") + lr.Attributes().PutStr("string_attr", "value") + lr.Attributes().PutInt("int_attr", 42) + lr.Attributes().PutDouble("float_attr", 3.14) + lr.Attributes().PutBool("bool_attr", true) + lr.Attributes().PutInt("SampleRate", 1) + }, + }, + { + name: "skip already used fields", + event: LibhoneyEvent{ + MsgPackTimestamp: &now, + Data: map[string]any{ + "skip_me": "value", + "keep_me": "value", + }, + }, + alreadyUsedFields: []string{"skip_me"}, + want: func(lr plog.LogRecord) { + lr.Attributes().PutStr("keep_me", "value") + lr.Attributes().PutInt("SampleRate", 0) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + newLog := plog.NewLogRecord() + err := tt.event.ToPLogRecord(&newLog, &tt.alreadyUsedFields, *logger) + + if tt.wantErr { + assert.Error(t, err) + return + } + require.NoError(t, err) + if tt.want != nil { + want := plog.NewLogRecord() + tt.want(want) + + // Check severity + assert.Equal(t, want.SeverityText(), newLog.SeverityText()) + assert.Equal(t, want.SeverityNumber(), newLog.SeverityNumber()) + + // Check body + assert.Equal(t, want.Body().AsString(), newLog.Body().AsString()) + + // Check each attribute has correct type and value + want.Attributes().Range(func(k string, v pcommon.Value) bool { + got, ok := newLog.Attributes().Get(k) + assert.True(t, ok, "missing attribute %s", k) + assert.Equal(t, v.Type(), got.Type(), "wrong type for attribute %s", k) + assert.Equal(t, v, got, "wrong value for attribute %s", k) + + return true + }) + + // Verify no extra attributes + assert.Equal(t, want.Attributes().Len(), newLog.Attributes().Len()) + } + }) + } +} + +func TestLibHoneyEvent_GetService(t *testing.T) { + tests := []struct { + name string + event LibhoneyEvent + fields FieldMapConfig + dataset string + want string + wantErr bool + }{ + { + name: "service name found", + event: LibhoneyEvent{ + Data: map[string]any{ + "service.name": "test-service", + }, + }, + fields: FieldMapConfig{ + Resources: ResourcesConfig{ + ServiceName: "service.name", + }, + }, + want: "test-service", + }, + { + name: "service name not found", + event: LibhoneyEvent{ + Data: map[string]any{}, + }, + fields: FieldMapConfig{ + Resources: ResourcesConfig{ + ServiceName: "service.name", + }, + }, + dataset: "default-dataset", + want: "default-dataset", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + seen := &ServiceHistory{NameCount: make(map[string]int)} + got, err := tt.event.GetService(tt.fields, seen, tt.dataset) + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.want, got) + }) + } +} + +func TestLibhoneyEvent_GetScope(t *testing.T) { + tests := []struct { + name string + event LibhoneyEvent + fields FieldMapConfig + serviceName string + want string + wantErr bool + }{ + { + name: "scope found", + event: LibhoneyEvent{ + Data: map[string]any{ + "library.name": "test-lib", + "library.version": "1.0.0", + }, + }, + fields: FieldMapConfig{ + Scopes: ScopesConfig{ + LibraryName: "library.name", + LibraryVersion: "library.version", + }, + }, + serviceName: "test-service", + want: "test-servicetest-lib", + }, + { + name: "scope not found", + event: LibhoneyEvent{ + Data: map[string]any{}, + }, + fields: FieldMapConfig{ + Scopes: ScopesConfig{ + LibraryName: "library.name", + }, + }, + serviceName: "test-service", + want: "libhoney.receiver", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + seen := &ScopeHistory{Scope: make(map[string]SimpleScope)} + got, err := tt.event.GetScope(tt.fields, seen, tt.serviceName) + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/receiver/libhoneyreceiver/internal/parser/parser.go b/receiver/libhoneyreceiver/internal/parser/parser.go new file mode 100644 index 000000000000..d2818dadd80a --- /dev/null +++ b/receiver/libhoneyreceiver/internal/parser/parser.go @@ -0,0 +1,88 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package parser // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/parser" + +import ( + "fmt" + "net/url" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + semconv "go.opentelemetry.io/collector/semconv/v1.16.0" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" +) + +// GetDatasetFromRequest extracts the dataset name from the request path +func GetDatasetFromRequest(path string) (string, error) { + if path == "" { + return "", fmt.Errorf("missing dataset name") + } + dataset, err := url.PathUnescape(path) + if err != nil { + return "", err + } + return dataset, nil +} + +// ToPdata converts a list of LibhoneyEvents to a Pdata Logs object +func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyevent.FieldMapConfig, logger zap.Logger) plog.Logs { + foundServices := libhoneyevent.ServiceHistory{} + foundServices.NameCount = make(map[string]int) + foundScopes := libhoneyevent.ScopeHistory{} + foundScopes.Scope = make(map[string]libhoneyevent.SimpleScope) + + foundScopes.Scope = make(map[string]libhoneyevent.SimpleScope) // a list of already seen scopes + foundScopes.Scope["libhoney.receiver"] = libhoneyevent.SimpleScope{ + ServiceName: dataset, + LibraryName: "libhoney.receiver", + LibraryVersion: "1.0.0", + ScopeSpans: ptrace.NewSpanSlice(), + ScopeLogs: plog.NewLogRecordSlice(), + } // seed a default + + alreadyUsedFields := []string{cfg.Resources.ServiceName, cfg.Scopes.LibraryName, cfg.Scopes.LibraryVersion} + alreadyUsedFields = append(alreadyUsedFields, cfg.Attributes.Name, + cfg.Attributes.TraceID, cfg.Attributes.ParentID, cfg.Attributes.SpanID, + cfg.Attributes.Error, cfg.Attributes.SpanKind, + ) + alreadyUsedFields = append(alreadyUsedFields, cfg.Attributes.DurationFields...) + + for _, lhe := range lhes { + action, err := lhe.SignalType() + if err != nil { + logger.Warn("signal type unclear") + } + switch action { + case "span": + // not implemented + case "log": + logService, _ := lhe.GetService(cfg, &foundServices, dataset) + logScopeKey, _ := lhe.GetScope(cfg, &foundScopes, logService) // adds a new found scope if needed + newLog := foundScopes.Scope[logScopeKey].ScopeLogs.AppendEmpty() + err := lhe.ToPLogRecord(&newLog, &alreadyUsedFields, logger) + if err != nil { + logger.Warn("log could not be converted from libhoney to plog", zap.String("span.object", lhe.DebugString())) + } + } + } + + resultLogs := plog.NewLogs() + + for scopeName, ss := range foundScopes.Scope { + if ss.ScopeLogs.Len() > 0 { + lr := resultLogs.ResourceLogs().AppendEmpty() + lr.SetSchemaUrl(semconv.SchemaURL) + lr.Resource().Attributes().PutStr(semconv.AttributeServiceName, ss.ServiceName) + + ls := lr.ScopeLogs().AppendEmpty() + ls.Scope().SetName(ss.LibraryName) + ls.Scope().SetVersion(ss.LibraryVersion) + foundScopes.Scope[scopeName].ScopeLogs.MoveAndAppendTo(ls.LogRecords()) + } + } + + return resultLogs +} diff --git a/receiver/libhoneyreceiver/libhoney.go b/receiver/libhoneyreceiver/libhoney.go deleted file mode 100644 index 4ad1faab8fbb..000000000000 --- a/receiver/libhoneyreceiver/libhoney.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" - -import ( - "context" - "errors" - "net" - "net/http" - "sync" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componentstatus" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.uber.org/zap" -) - -type libhoneyReceiver struct { - cfg *Config - serverHTTP *http.Server - - nextTraces consumer.Traces - nextLogs consumer.Logs - shutdownWG sync.WaitGroup - - obsrepHTTP *receiverhelper.ObsReport - - settings *receiver.Settings -} - -type TeamInfo struct { - Slug string `json:"slug"` -} - -type EnvironmentInfo struct { - Slug string `json:"slug"` - Name string `json:"name"` -} - -type AuthInfo struct { - APIKeyAccess map[string]bool `json:"api_key_access"` - Team TeamInfo `json:"team"` - Environment EnvironmentInfo `json:"environment"` -} - -func newLibhoneyReceiver(cfg *Config, set *receiver.Settings) (*libhoneyReceiver, error) { - r := &libhoneyReceiver{ - cfg: cfg, - nextTraces: nil, - settings: set, - } - - var err error - r.obsrepHTTP, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: set.ID, - Transport: "http", - ReceiverCreateSettings: *set, - }) - if err != nil { - return nil, err - } - - return r, nil -} - -func (r *libhoneyReceiver) startHTTPServer(ctx context.Context, host component.Host) error { - // If HTTP is not enabled, nothing to start. - if r.cfg.HTTP == nil { - return nil - } - - if r.nextTraces != nil { - // initialize routes - r.settings.Logger.Debug("r.nextTraces found and ready to go") - } else { - r.settings.Logger.Debug("r.nextTraces is nil for some reason") - } - - // start server - var err error - r.settings.Logger.Info("Starting HTTP server", zap.String("endpoint", r.cfg.HTTP.ServerConfig.Endpoint)) - var hln net.Listener - if hln, err = r.cfg.HTTP.ServerConfig.ToListener(ctx); err != nil { - return err - } - - r.shutdownWG.Add(1) - go func() { - defer r.shutdownWG.Done() - - if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil && !errors.Is(errHTTP, http.ErrServerClosed) { - componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(errHTTP)) - } - }() - return nil -} - -func (r *libhoneyReceiver) Start(ctx context.Context, host component.Host) error { - if err := r.startHTTPServer(ctx, host); err != nil { - return errors.Join(err, r.Shutdown(ctx)) - } - - return nil -} - -// Shutdown is a method to turn off receiving. -func (r *libhoneyReceiver) Shutdown(ctx context.Context) error { - var err error - - if r.serverHTTP != nil { - err = r.serverHTTP.Shutdown(ctx) - } - - r.shutdownWG.Wait() - return err -} - -func (r *libhoneyReceiver) registerTraceConsumer(tc consumer.Traces) { - r.nextTraces = tc -} - -func (r *libhoneyReceiver) registerLogConsumer(tc consumer.Logs) { - r.nextLogs = tc -} diff --git a/receiver/libhoneyreceiver/receiver.go b/receiver/libhoneyreceiver/receiver.go new file mode 100644 index 000000000000..84ad68e4638a --- /dev/null +++ b/receiver/libhoneyreceiver/receiver.go @@ -0,0 +1,291 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "mime" + "net" + "net/http" + "strings" + "sync" + + "github.com/vmihailenco/msgpack/v5" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/encoder" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/parser" +) + +type libhoneyReceiver struct { + cfg *Config + server *http.Server + nextTraces consumer.Traces + nextLogs consumer.Logs + shutdownWG sync.WaitGroup + obsreport *receiverhelper.ObsReport + settings *receiver.Settings +} + +// TeamInfo is part of the AuthInfo struct that stores the team slug +type TeamInfo struct { + Slug string `json:"slug"` +} + +// EnvironmentInfo is part of the AuthInfo struct that stores the environment slug and name +type EnvironmentInfo struct { + Slug string `json:"slug"` + Name string `json:"name"` +} + +// AuthInfo is used by Libhoney to validate team and environment information against Honeycomb's Auth API +type AuthInfo struct { + APIKeyAccess map[string]bool `json:"api_key_access"` + Team TeamInfo `json:"team"` + Environment EnvironmentInfo `json:"environment"` +} + +func newLibhoneyReceiver(cfg *Config, set *receiver.Settings) (*libhoneyReceiver, error) { + r := &libhoneyReceiver{ + cfg: cfg, + nextTraces: nil, + settings: set, + } + + var err error + r.obsreport, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: set.ID, + Transport: "http", + ReceiverCreateSettings: *set, + }) + if err != nil { + return nil, err + } + + return r, nil +} + +func (r *libhoneyReceiver) startHTTPServer(ctx context.Context, host component.Host) error { + // If HTTP is not enabled, nothing to start. + if r.cfg.HTTP == nil { + return nil + } + + httpMux := http.NewServeMux() + + r.settings.Logger.Info("r.nextTraces is not null so httpTracesReciever was added", zap.Int("paths", len(r.cfg.HTTP.TracesURLPaths))) + for _, path := range r.cfg.HTTP.TracesURLPaths { + httpMux.HandleFunc(path, func(resp http.ResponseWriter, req *http.Request) { + r.handleEvent(resp, req) + }) + r.settings.Logger.Debug("Added path to HTTP server", zap.String("path", path)) + } + + if r.cfg.AuthAPI != "" { + httpMux.HandleFunc("/1/auth", func(resp http.ResponseWriter, req *http.Request) { + r.handleAuth(resp, req) + }) + } + + var err error + if r.server, err = r.cfg.HTTP.ToServer(ctx, host, r.settings.TelemetrySettings, httpMux); err != nil { + return err + } + + r.settings.Logger.Info("Starting HTTP server", zap.String("endpoint", r.cfg.HTTP.ServerConfig.Endpoint)) + var hln net.Listener + if hln, err = r.cfg.HTTP.ServerConfig.ToListener(ctx); err != nil { + return err + } + + r.shutdownWG.Add(1) + go func() { + defer r.shutdownWG.Done() + + if err := r.server.Serve(hln); err != nil && !errors.Is(err, http.ErrServerClosed) { + componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) + } + }() + return nil +} + +func (r *libhoneyReceiver) Start(ctx context.Context, host component.Host) error { + if err := r.startHTTPServer(ctx, host); err != nil { + return errors.Join(err, r.Shutdown(ctx)) + } + + return nil +} + +// Shutdown is a method to turn off receiving. +func (r *libhoneyReceiver) Shutdown(ctx context.Context) error { + var err error + + if r.server != nil { + err = r.server.Shutdown(ctx) + } + + r.shutdownWG.Wait() + return err +} + +func (r *libhoneyReceiver) registerTraceConsumer(tc consumer.Traces) { + r.nextTraces = tc +} + +func (r *libhoneyReceiver) registerLogConsumer(tc consumer.Logs) { + r.nextLogs = tc +} + +func (r *libhoneyReceiver) handleAuth(resp http.ResponseWriter, req *http.Request) { + authURL := fmt.Sprintf("%s/1/auth", r.cfg.AuthAPI) + authReq, err := http.NewRequest(http.MethodGet, authURL, nil) + if err != nil { + errJSON, _ := json.Marshal(`{"error": "failed to create AuthInfo request"}`) + writeResponse(resp, "json", http.StatusBadRequest, errJSON) + return + } + authReq.Header.Set("x-honeycomb-team", req.Header.Get("x-honeycomb-team")) + var authClient http.Client + authResp, err := authClient.Do(authReq) + if err != nil { + errJSON, _ := json.Marshal(fmt.Sprintf(`"error": "failed to send request to auth api endpoint", "message", "%s"}`, err.Error())) + writeResponse(resp, "json", http.StatusBadRequest, errJSON) + return + } + defer authResp.Body.Close() + + switch { + case authResp.StatusCode == http.StatusUnauthorized: + errJSON, _ := json.Marshal(`"error": "received 401 response for AuthInfo request from Honeycomb API - check your API key"}`) + writeResponse(resp, "json", http.StatusBadRequest, errJSON) + return + case authResp.StatusCode > 299: + errJSON, _ := json.Marshal(fmt.Sprintf(`"error": "bad response code from API", "status_code", %d}`, authResp.StatusCode)) + writeResponse(resp, "json", http.StatusBadRequest, errJSON) + return + } + authRawBody, _ := io.ReadAll(authResp.Body) + _, err = resp.Write(authRawBody) + if err != nil { + r.settings.Logger.Info("couldn't write http response") + } +} + +func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Request) { + enc, ok := readContentType(resp, req) + if !ok { + return + } + + dataset, err := parser.GetDatasetFromRequest(req.RequestURI) + if err != nil { + r.settings.Logger.Info("No dataset found in URL", zap.String("req.RequstURI", req.RequestURI)) + } + + for _, p := range r.cfg.HTTP.TracesURLPaths { + dataset = strings.Replace(dataset, p, "", 1) + r.settings.Logger.Debug("dataset parsed", zap.String("dataset.parsed", dataset)) + } + + body, err := io.ReadAll(req.Body) + if err != nil { + errorutil.HTTPError(resp, err) + } + if err = req.Body.Close(); err != nil { + errorutil.HTTPError(resp, err) + } + libhoneyevents := make([]libhoneyevent.LibhoneyEvent, 0) + switch req.Header.Get("Content-Type") { + case "application/x-msgpack", "application/msgpack": + decoder := msgpack.NewDecoder(bytes.NewReader(body)) + decoder.UseLooseInterfaceDecoding(true) + err = decoder.Decode(&libhoneyevents) + if err != nil { + r.settings.Logger.Info("messagepack decoding failed") + } + if len(libhoneyevents) > 0 { + r.settings.Logger.Debug("Decoding with msgpack worked", zap.Time("timestamp.first.msgpacktimestamp", *libhoneyevents[0].MsgPackTimestamp), zap.String("timestamp.first.time", libhoneyevents[0].Time)) + r.settings.Logger.Debug("event zero", zap.String("event.data", libhoneyevents[0].DebugString())) + } + case encoder.JsonContentType: + err = json.Unmarshal(body, &libhoneyevents) + if err != nil { + errorutil.HTTPError(resp, err) + } + if len(libhoneyevents) > 0 { + r.settings.Logger.Debug("Decoding with json worked", zap.Time("timestamp.first.msgpacktimestamp", *libhoneyevents[0].MsgPackTimestamp), zap.String("timestamp.first.time", libhoneyevents[0].Time)) + } + } + + otlpLogs := parser.ToPdata(dataset, libhoneyevents, r.cfg.FieldMapConfig, *r.settings.Logger) + + numLogs := otlpLogs.LogRecordCount() + if numLogs > 0 { + ctx := r.obsreport.StartLogsOp(context.Background()) + err = r.nextLogs.ConsumeLogs(ctx, otlpLogs) + r.obsreport.EndLogsOp(ctx, "protobuf", numLogs, err) + } + + if err != nil { + errorutil.HTTPError(resp, err) + return + } + + noErrors := []byte(`{"errors":[]}`) + writeResponse(resp, enc.ContentType(), http.StatusAccepted, noErrors) +} + +func readContentType(resp http.ResponseWriter, req *http.Request) (encoder.Encoder, bool) { + if req.Method != http.MethodPost { + handleUnmatchedMethod(resp) + return nil, false + } + + switch getMimeTypeFromContentType(req.Header.Get("Content-Type")) { + case encoder.JsonContentType: + return encoder.JsEncoder, true + case "application/x-msgpack", "application/msgpack": + return encoder.MpEncoder, true + default: + handleUnmatchedContentType(resp) + return nil, false + } +} + +func writeResponse(w http.ResponseWriter, contentType string, statusCode int, msg []byte) { + w.Header().Set("Content-Type", contentType) + w.WriteHeader(statusCode) + _, _ = w.Write(msg) +} + +func getMimeTypeFromContentType(contentType string) string { + mediatype, _, err := mime.ParseMediaType(contentType) + if err != nil { + return "" + } + return mediatype +} + +func handleUnmatchedMethod(resp http.ResponseWriter) { + status := http.StatusMethodNotAllowed + writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v method not allowed, supported: [POST]", status))) +} + +func handleUnmatchedContentType(resp http.ResponseWriter) { + status := http.StatusUnsupportedMediaType + writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v unsupported media type, supported: [%s, %s]", status, encoder.JsonContentType, encoder.PbContentType))) +} diff --git a/receiver/libhoneyreceiver/receiver_test.go b/receiver/libhoneyreceiver/receiver_test.go new file mode 100644 index 000000000000..31674c515b11 --- /dev/null +++ b/receiver/libhoneyreceiver/receiver_test.go @@ -0,0 +1,276 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyreceiver + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/vmihailenco/msgpack/v5" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" +) + +func TestNewLibhoneyReceiver(t *testing.T) { + defaultCfg := createDefaultConfig() + httpCfg := defaultCfg.(*Config).HTTP + tests := []struct { + name string + config *Config + wantError bool + }{ + { + name: "valid_config", + config: &Config{ + HTTP: httpCfg, + }, + }, + { + name: "nil_config", + config: nil, + wantError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + set := receivertest.NewNopSettings() + r, err := newLibhoneyReceiver(tt.config, &set) + if tt.wantError { + assert.Error(t, err) + assert.Nil(t, r) + return + } + assert.NoError(t, err) + assert.NotNil(t, r) + }) + } +} + +func TestLibhoneyReceiver_Start(t *testing.T) { + cfg := createDefaultConfig() + + set := receivertest.NewNopSettings() + r, err := newLibhoneyReceiver(cfg.(*Config), &set) + require.NoError(t, err) + + r.registerTraceConsumer(consumertest.NewNop()) + r.registerLogConsumer(consumertest.NewNop()) + + err = r.Start(context.Background(), componenttest.NewNopHost()) + assert.NoError(t, err) + + err = r.Shutdown(context.Background()) + assert.NoError(t, err) +} + +func TestLibhoneyReceiver_HandleEvent(t *testing.T) { + now := time.Now() + tests := []struct { + name string + events []libhoneyevent.LibhoneyEvent + contentType string + expectedStatus int + wantError bool + }{ + { + name: "valid_json_event", + events: []libhoneyevent.LibhoneyEvent{ + { + Time: now.Format(time.RFC3339), + MsgPackTimestamp: &now, + Data: map[string]any{ + "message": "test event", + }, + Samplerate: 1, + }, + }, + contentType: "application/json", + expectedStatus: http.StatusAccepted, + }, + { + name: "valid_msgpack_event", + events: []libhoneyevent.LibhoneyEvent{ + { + Time: now.Format(time.RFC3339), + MsgPackTimestamp: &now, + Data: map[string]any{ + "message": "test event", + }, + Samplerate: 1, + }, + }, + contentType: "application/msgpack", + expectedStatus: http.StatusAccepted, + }, + { + name: "invalid_content_type", + events: []libhoneyevent.LibhoneyEvent{}, + contentType: "text/plain", + expectedStatus: http.StatusUnsupportedMediaType, + wantError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := createDefaultConfig() + set := receivertest.NewNopSettings() + r, err := newLibhoneyReceiver(cfg.(*Config), &set) + require.NoError(t, err) + + sink := &consumertest.LogsSink{} + r.registerLogConsumer(sink) + + var body []byte + switch tt.contentType { + case "application/json": + body, err = json.Marshal(tt.events) + case "application/msgpack": + body, err = msgpack.Marshal(tt.events) + default: + body = []byte("invalid content") + } + require.NoError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/1/events/test_dataset", bytes.NewReader(body)) + req.Header.Set("Content-Type", tt.contentType) + w := httptest.NewRecorder() + + r.handleEvent(w, req) + + resp := w.Result() + assert.Equal(t, tt.expectedStatus, resp.StatusCode) + + if !tt.wantError { + assert.Eventually(t, func() bool { + return sink.LogRecordCount() > 0 + }, time.Second, 10*time.Millisecond) + } + }) + } +} + +func TestLibhoneyReceiver_AuthEndpoint(t *testing.T) { + tests := []struct { + name string + authAPI string + apiKey string + mockResponse *http.Response + expectedStatus int + }{ + { + name: "valid_auth", + authAPI: "http://mock-auth-api", + apiKey: "test-key", + mockResponse: &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBufferString(`{ + "team": {"slug": "test-team"}, + "environment": {"slug": "test-env", "name": "Test Env"} + }`)), + }, + expectedStatus: http.StatusOK, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.AuthAPI = tt.authAPI + set := receivertest.NewNopSettings() + r, err := newLibhoneyReceiver(cfg, &set) + require.NoError(t, err) + + // Create test server to mock auth API + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, tt.apiKey, r.Header.Get("x-honeycomb-team")) + w.WriteHeader(tt.mockResponse.StatusCode) + _, err := io.Copy(w, tt.mockResponse.Body) + assert.NoError(t, err, "failed to copy response body") + })) + defer ts.Close() + + req := httptest.NewRequest(http.MethodGet, "/1/auth", nil) + req.Header.Set("x-honeycomb-team", tt.apiKey) + w := httptest.NewRecorder() + + r.server = &http.Server{ + Handler: http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { + r.handleAuth(resp, req) + }), + ReadHeaderTimeout: 3 * time.Second, + } + + resp := w.Result() + assert.Equal(t, tt.expectedStatus, resp.StatusCode) + }) + } +} + +func TestReadContentType(t *testing.T) { + tests := []struct { + name string + method string + contentType string + expectedStatus int + wantEncoder bool + }{ + { + name: "valid_json", + method: http.MethodPost, + contentType: "application/json", + expectedStatus: http.StatusOK, + wantEncoder: true, + }, + { + name: "valid_msgpack", + method: http.MethodPost, + contentType: "application/msgpack", + expectedStatus: http.StatusOK, + wantEncoder: true, + }, + { + name: "invalid_method", + method: http.MethodGet, + contentType: "application/json", + expectedStatus: http.StatusMethodNotAllowed, + wantEncoder: false, + }, + { + name: "invalid_content_type", + method: http.MethodPost, + contentType: "text/plain", + expectedStatus: http.StatusUnsupportedMediaType, + wantEncoder: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest(tt.method, "/test", nil) + req.Header.Set("Content-Type", tt.contentType) + w := httptest.NewRecorder() + + enc, ok := readContentType(w, req) + assert.Equal(t, tt.wantEncoder, ok) + if tt.wantEncoder { + assert.NotNil(t, enc) + } else { + assert.Equal(t, tt.expectedStatus, w.Code) + } + }) + } +}