diff --git a/collector/cmd/otelarrowcol/components.go b/collector/cmd/otelarrowcol/components.go index ec25b23125..149ffbb00e 100644 --- a/collector/cmd/otelarrowcol/components.go +++ b/collector/cmd/otelarrowcol/components.go @@ -1,16 +1,18 @@ package main import ( - "github.com/f5/otel-arrow-adapter/collector/gen/exporter/otlpexporter" + "github.com/f5/otel-arrow-adapter/collector/connector/validationconnector" "github.com/f5/otel-arrow-adapter/collector/gen/exporter/fileexporter" + "github.com/f5/otel-arrow-adapter/collector/gen/exporter/otlpexporter" "github.com/f5/otel-arrow-adapter/collector/gen/receiver/otlpreceiver" - "github.com/f5/otel-arrow-adapter/collector/receiver/filereceiver" - "github.com/f5/otel-arrow-adapter/collector/processor/obfuscationprocessor" "github.com/f5/otel-arrow-adapter/collector/processor/experimentprocessor" + "github.com/f5/otel-arrow-adapter/collector/processor/obfuscationprocessor" + "github.com/f5/otel-arrow-adapter/collector/receiver/filereceiver" "github.com/lightstep/telemetry-generator/generatorreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension" + "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/loggingexporter" "go.opentelemetry.io/collector/exporter/otlphttpexporter" @@ -67,5 +69,9 @@ func components() (otelcol.Factories, error) { return otelcol.Factories{}, err } + factories.Connectors, err = connector.MakeFactoryMap( + validationconnector.NewFactory(), + ) + return factories, nil } diff --git a/collector/connector/validationconnector/README.md b/collector/connector/validationconnector/README.md new file mode 100644 index 0000000000..987c3a8dc8 --- /dev/null +++ b/collector/connector/validationconnector/README.md @@ -0,0 +1,137 @@ +# Validation Connector + +This connector allows verifying that an exporter and receiver pair +produce correct data. + +![Diagram](validation.png) + +## Current status: under development + +Unfortuantely, until +https://github.com/open-telemetry/opentelemetry-collector/issues/8104 +is resolved this component cannot be reliably used. + +## Configuration + +This connector can be used to test any exporter and receiver pair that +are able to propagate metadata using the Colector's +`client.Metadata.Info` mechanism. For example, to test an OTel Arrow +collector configuration, configure a loopback receiver: + +``` +receivers: + otlp/loopback: + protocols: + grpc: + endpoint: 127.0.0.1:4000 + include_metadata: true + arrow: + disabled: false +``` + +Note that the receiver needs to set `include_metadata: true`. + +The exporter should be connected to the loopback receiver port with a +`headers_setter` extension to ensure propagation + +``` +exporters: + otlp/arrow: + endpoint: 127.0.0.1:4000 + wait_for_ready: true + tls: + insecure: true + arrow: + disabled: false + disable_downgrade: true + num_streams: 1 + retry_on_failure: + enabled: false + sending_queue: + enabled: false + auth: + authenticator: headers_setter + +extensions: + headers_setter: + headers: + - key: validation-sequence + from_context: X-Validation-Sequence +``` + +Two validation connectors are initialized, one labeled "verify" and +one labeled "expect". The "expect" connector names the pipeline of +the "verify" conector, as follows: + +``` +connectors: + validation/verify/traces: + validation/expect/traces: + follower: traces/validate + +... + +service: + extensions: [headers_setter] + pipelines: + # The input is routed to the first validation connector. + traces/input: + receivers: [INPUT] + exporters: [validation/expect/traces] + + # This pipeline provides the real/expected input from the first + # validation connector to the second connector and the system under + # test. This pipeline is named as the "follower" of the validation + # connector, above. + traces/validate: + receivers: [validation/expect/traces] + exporters: [validation/verify/traces, otlp/arrow] + + # This pipeline provides the actual input from the system under test + # to the second validation connector. + traces/loop: + receivers: [otlp/loopback] + exporters: [validation/verify/traces] + + # This pipeline continues following validation. + traces/output: + receivers: [validation/verify/traces] + exporters: [OUTPUT1, OUTPUT2, ...] +``` + +### Example + +A complete example of how to use this connector is given in +[../../examples/synthesize/](../../examples/synthesize/README.md) +that uses `filereceiver` and `fileexporter` for its inputs and outputs. + +## Detailed design + +There are always two instances of the validation connector in use. + +The first validation connector will output to the second validation +connector and the exporter-under-test. + +The second validation connector will receive from the +receiver-under-test. + +The second validation connector expects to see the same input twice, +once from the other connector to set an expectation, and once from the +receiver with the actual data. The connector distinguishes these +cases using two kinds of context variable. + +1. The first-stage-to-second-stage context contains a Go Context + variable which does not escape the process or propagate via any + RPC. This variable contains the sequence number as observed by the + first connector. It stores the expected data in its map as a + `ptrace.Traces`, `pmetric.Metrics` or `plog.Logs`. +2. The receiver-to-second-stage context MUST contain a metadata + variable indicating the sequence number of the data item. The + connector will look up the sequence number to find expected + matching data. + +For each expected and actual data item, the support method +`AssertEquiv` library in `../../../pkg/otel/assert/` is used to ensure +that the data is equivalent. This package uses a definition of +equivalence that tolerates reordering of unordered fields, which is +necesary to validate the OTel Arrow components in this repository. diff --git a/collector/connector/validationconnector/validation.go b/collector/connector/validationconnector/validation.go new file mode 100644 index 0000000000..cb8166214a --- /dev/null +++ b/collector/connector/validationconnector/validation.go @@ -0,0 +1,360 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package validationconnector + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "sync" + + "github.com/f5/otel-arrow-adapter/pkg/otel/assert" + "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "go.uber.org/zap" +) + +const ( + typeStr = "validation" +) + +type inputToOutputContext struct{} // value is int64 +type inputToValidateContext struct{} // value is int64 + +type Config struct { + // Follower is the name of a pipeline where the second validation + // connector is a receiver. This should be set only for the first + // validation connector, as it is how they distinguish themselves. + Follower component.ID `mapstructure:"follower"` +} + +// validation is used for any signal. +type validation struct { + // lock protects store and sendSequence. + lock sync.Mutex + + cfg *Config + logger *zap.Logger + + // store is holds all the expected data that has yet to arrive. + store map[int64]any + + // sendSequence is used to generate new sequence numbers. + sendSequence int64 + + // only one of the following fields will be set in any given pipeline + + nextTraces consumer.Traces + nextMetrics consumer.Metrics + nextLogs consumer.Logs +} + +var ( + errUnexpectedConsumer = fmt.Errorf("expected a connector router as consumer") + errMissingFollower = fmt.Errorf("validation input should have validation output as follower") + + asserter = assert.NewStandaloneTest() +) + +func NewFactory() connector.Factory { + return connector.NewFactory( + typeStr, + createDefaultConfig, + connector.WithTracesToTraces(createTracesToTraces, component.StabilityLevelBeta), + connector.WithMetricsToMetrics(createMetricsToMetrics, component.StabilityLevelBeta), + connector.WithLogsToLogs(createLogsToLogs, component.StabilityLevelBeta), + ) +} + +func createDefaultConfig() component.Config { + return &Config{} +} + +func (c *Config) hasFollower() bool { + return c.Follower.Type() != "" +} + +// reorder _was an attempt_ to solve the problem described in +// https://github.com/open-telemetry/opentelemetry-collector/issues/8104, +// however it does not work. This places the follower first in the list +// of components because we want it to receive the data first. +func (v *validation) reorder(ids []component.ID) ([]component.ID, error) { + var ordered []component.ID + + found := false + if v.cfg.hasFollower() { + ordered = append(ordered, v.cfg.Follower) + } + for _, pid := range ids { + if v.cfg.hasFollower() && v.cfg.Follower == pid { + found = true + continue + } + ordered = append(ordered, pid) + } + if v.cfg.hasFollower() && !found { + return nil, errMissingFollower + } + return ordered, nil +} + +// expecting places an expectation by sequence number. +func (v *validation) expecting(seq int64, data any) error { + v.lock.Lock() + defer v.lock.Unlock() + + if v.store[seq] != nil { + return fmt.Errorf("duplicate sequence number received: %d", seq) + } + v.store[seq] = data + return nil +} + +// received calls assert.Equiv for the received data item, assuming we +// have the expectation. +func (v *validation) received(seq int64, data any) error { + v.lock.Lock() + defer v.lock.Unlock() + + expect := v.store[seq] + if expect == nil { + return fmt.Errorf("missing expectation for sequence %d", seq) + } + + if td, ok := data.(ptrace.Traces); ok { + assert.Equiv(asserter, []json.Marshaler{ + ptraceotlp.NewExportRequestFromTraces(expect.(ptrace.Traces)), + }, []json.Marshaler{ + ptraceotlp.NewExportRequestFromTraces(td), + }) + } else if md, ok := data.(pmetric.Metrics); ok { + assert.Equiv(asserter, []json.Marshaler{ + pmetricotlp.NewExportRequestFromMetrics(expect.(pmetric.Metrics)), + }, []json.Marshaler{ + pmetricotlp.NewExportRequestFromMetrics(md), + }) + } else if ld, ok := data.(plog.Logs); ok { + assert.Equiv(asserter, []json.Marshaler{ + plogotlp.NewExportRequestFromLogs(expect.(plog.Logs)), + }, []json.Marshaler{ + plogotlp.NewExportRequestFromLogs(ld), + }) + } else { + return fmt.Errorf("unrecognized data type") + } + + return nil +} + +func newValidation(cfg *Config, logger *zap.Logger) *validation { + return &validation{ + cfg: cfg, + logger: logger, + store: map[int64]any{}, + } +} + +func createTracesToTraces( + ctx context.Context, + set connector.CreateSettings, + cfg component.Config, + nextConsumer consumer.Traces, +) (connector.Traces, error) { + v := newValidation(cfg.(*Config), set.Logger) + + tr, ok := nextConsumer.(connector.TracesRouter) + if !ok { + return nil, errUnexpectedConsumer + } + ordered, err := v.reorder(tr.PipelineIDs()) + if err != nil { + return nil, err + } + next, err := tr.Consumer(ordered...) + if err != nil { + return nil, err + } + v.nextTraces = next + return v, nil +} + +func createMetricsToMetrics( + ctx context.Context, + set connector.CreateSettings, + cfg component.Config, + nextConsumer consumer.Metrics, +) (connector.Metrics, error) { + v := newValidation(cfg.(*Config), set.Logger) + + tr, ok := nextConsumer.(connector.MetricsRouter) + if !ok { + return nil, errUnexpectedConsumer + } + ordered, err := v.reorder(tr.PipelineIDs()) + if err != nil { + return nil, err + } + next, err := tr.Consumer(ordered...) + if err != nil { + return nil, err + } + v.nextMetrics = next + return v, nil +} + +func createLogsToLogs( + ctx context.Context, + set connector.CreateSettings, + cfg component.Config, + nextConsumer consumer.Logs, +) (connector.Logs, error) { + v := newValidation(cfg.(*Config), set.Logger) + + tr, ok := nextConsumer.(connector.LogsRouter) + if !ok { + return nil, errUnexpectedConsumer + } + ordered, err := v.reorder(tr.PipelineIDs()) + if err != nil { + return nil, err + } + next, err := tr.Consumer(ordered...) + if err != nil { + return nil, err + } + v.nextLogs = next + + return v, nil +} + +func (v *validation) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (v *validation) Start(ctx context.Context, host component.Host) error { + return nil +} + +func (v *validation) Shutdown(ctx context.Context) error { + return nil +} + +func (v *validation) nextSequence() int64 { + v.lock.Lock() + defer v.lock.Unlock() + v.sendSequence++ + return v.sendSequence +} + +func (v *validation) consumeNext(ctx context.Context, data any) error { + var err error + switch { + case v.nextTraces != nil: + err = v.nextTraces.ConsumeTraces(ctx, data.(ptrace.Traces)) + case v.nextMetrics != nil: + err = v.nextMetrics.ConsumeMetrics(ctx, data.(pmetric.Metrics)) + case v.nextLogs != nil: + err = v.nextLogs.ConsumeLogs(ctx, data.(plog.Logs)) + default: + err = fmt.Errorf("unhandled data type") + } + if err != nil { + return consumererror.NewPermanent(err) + } + return nil +} + +// consume is the central logic of the validation connector, both +// instances. +func (v *validation) consume(ctx context.Context, data any) error { + if v.cfg.hasFollower() { + // Here, the first validation connector. + sequence := v.nextSequence() + + // Here, insert a propagating marker that we expect to + // propagate by context headers. + info := client.FromContext(ctx) + + // TODO: the OTel client Metadata doesn't allow itself + // to be copied or modified without an explicit list + // of keys, which is completely bogus and makes the + // intended correct action here impossible. So, clobber + // the metadata and file an issue about the problem. + info.Metadata = client.NewMetadata(map[string][]string{ + "X-Validation-Sequence": []string{ + fmt.Sprint(sequence), + }, + }) + ctx = client.NewContext(ctx, info) + + // Insert a non-propagating context marker for the + // second validation connector. + ctx = context.WithValue(ctx, inputToOutputContext{}, sequence) + + return v.consumeNext(ctx, data) + } + + // TODO: Because of the problem documented in + // https://github.com/open-telemetry/opentelemetry-collector/issues/8104 + // this code does not reliably receive the expectation input before + // the actual value input. We await a solution. + if directExpect := ctx.Value(inputToOutputContext{}); directExpect != nil { + // In this branch, the second validation connector is + // receiving data directly from the first validation + // connector. + sequence := directExpect.(int64) + + v.logger.Debug("Received expectation", zap.Int64("sequence", sequence)) + + // Expected test input. Do not consume. + return v.expecting(sequence, data) + } + + // In this branch, the second validation connector is + // receiving the data from the pipeline under test. + info := client.FromContext(ctx) + seqHdrs := info.Metadata.Get("validation-sequence") + var sequence int64 + if len(seqHdrs) == 1 { + sequence, _ = strconv.ParseInt(seqHdrs[0], 10, 64) + } + if sequence == 0 { + return consumererror.NewPermanent( + fmt.Errorf("missing first validation connector sequence expectation")) + } + + if err := v.received(sequence, data); err != nil { + v.logger.Info("Validation failure", zap.Error(err)) + + // Output validating actual input failed. + return consumererror.NewPermanent(err) + } + + // Success, pass the data. + v.logger.Info("Validation success", zap.Int64("sequence", sequence)) + return v.consumeNext(ctx, data) +} + +func (v *validation) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + return v.consume(ctx, md) +} + +func (v *validation) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + return v.consume(ctx, ld) +} + +func (v *validation) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + return v.consume(ctx, td) +} diff --git a/collector/connector/validationconnector/validation.png b/collector/connector/validationconnector/validation.png new file mode 100644 index 0000000000..858dbeb8e8 Binary files /dev/null and b/collector/connector/validationconnector/validation.png differ diff --git a/collector/examples/synthesize/record.yaml b/collector/examples/synthesize/record.yaml index df6a79a961..2e2756b675 100644 --- a/collector/examples/synthesize/record.yaml +++ b/collector/examples/synthesize/record.yaml @@ -1,38 +1,38 @@ receivers: - otlp: - protocols: - grpc: generator: path: ./generator/hipster_shop.yaml exporters: file/traces: path: ./recorded_traces.json - compression: zstd + compression: format: json file/metrics: path: ./recorded_metrics.json - compression: zstd + compression: format: json logging: + loglevel: info processors: batch: + send_batch_max_size: 15 + send_batch_size: 10 + timeout: 1s obfuscation: - encrypt_key: "some-32-byte-long-key-to-be-safe" - encrypt_round: 128 - # obfuscate all attributes with string values - encrypt_all: true + rounds: 20 + key_length: 256 -# Note: to disable obfuscation, simply remove "obfuscation" from the -# list of processors below. service: pipelines: - traces: + # This pipeline generates trace data, obfuscates + batches the data, and + # send it to the otlp receiver of this same collector + traces/generate: receivers: [generator] processors: [obfuscation, batch] exporters: [file/traces, logging] + metrics: receivers: [generator] processors: [obfuscation, batch] diff --git a/collector/examples/synthesize/replay.yaml b/collector/examples/synthesize/replay.yaml index 55860a9552..5dee7094bc 100644 --- a/collector/examples/synthesize/replay.yaml +++ b/collector/examples/synthesize/replay.yaml @@ -1,17 +1,37 @@ receivers: - otlp: + otlp/loopback: protocols: grpc: + endpoint: 127.0.0.1:4000 + include_metadata: true + arrow: + disabled: false file/traces: path: ./recorded_traces.json - compression: zstd + compression: format: json file/metrics: path: ./recorded_metrics.json - compression: zstd + compression: format: json exporters: + otlp/arrow: + endpoint: 127.0.0.1:4000 + wait_for_ready: true + tls: + insecure: true + arrow: + disabled: false + disable_downgrade: true + num_streams: 1 + retry_on_failure: + enabled: false + sending_queue: + enabled: false + auth: + authenticator: headers_setter + file/traces: path: ./replayed_traces.json format: json @@ -20,16 +40,58 @@ exporters: format: json logging: -processors: - batch: +connectors: + validation/verify/traces: + validation/expect/traces: + follower: traces/validate + + validation/verify/metrics: + validation/expect/metrics: + follower: metrics/validate + +extensions: + headers_setter: + headers: + - key: validation-sequence + from_context: X-Validation-Sequence service: + extensions: [headers_setter] pipelines: - traces: + traces/input: receivers: [file/traces] - processors: [batch] + exporters: [validation/expect/traces] + metrics/input: + receivers: [file/metrics] + exporters: [validation/expect/metrics] + + traces/validate: + receivers: [validation/expect/traces] + exporters: [validation/verify/traces, otlp/arrow] + metrics/validate: + receivers: [validation/expect/metrics] + exporters: [validation/verify/metrics, otlp/arrow] + + traces/loop: + receivers: [otlp/loopback] + exporters: [validation/verify/traces] + metrics/loop: + receivers: [otlp/loopback] + exporters: [validation/verify/metrics] + + traces/output: + receivers: [validation/verify/traces] exporters: [file/traces, logging] + metrics/output: + receivers: [validation/verify/metrics] + exporters: [file/metrics, logging] + + telemetry: + resource: + "service.name": "replayer" metrics: - receivers: [file/metrics] - processors: [batch] - exporters: [file/metrics, logging] \ No newline at end of file + address: 127.0.0.1:8888 + level: detailed + logs: + level: debug + #level: info diff --git a/collector/gen/exporter/fileexporter/factory.go b/collector/gen/exporter/fileexporter/factory.go index aec2a5b250..8c0a635ba6 100644 --- a/collector/gen/exporter/fileexporter/factory.go +++ b/collector/gen/exporter/fileexporter/factory.go @@ -37,7 +37,8 @@ func NewFactory() exporter.Factory { createDefaultConfig, exporter.WithTraces(createTracesExporter, metadata.TracesStability), exporter.WithMetrics(createMetricsExporter, metadata.MetricsStability), - exporter.WithLogs(createLogsExporter, metadata.LogsStability)) + exporter.WithLogs(createLogsExporter, metadata.LogsStability), + ) } func createDefaultConfig() component.Config { @@ -71,6 +72,12 @@ func createTracesExporter( exporterhelper.WithStart(fe.Start), exporterhelper.WithShutdown(fe.Shutdown), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exporterhelper.WithRetry(exporterhelper.RetrySettings{ + Enabled: false, + }), + exporterhelper.WithQueue(exporterhelper.QueueSettings{ + Enabled: false, + }), ) } @@ -98,6 +105,12 @@ func createMetricsExporter( exporterhelper.WithStart(fe.Start), exporterhelper.WithShutdown(fe.Shutdown), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exporterhelper.WithRetry(exporterhelper.RetrySettings{ + Enabled: false, + }), + exporterhelper.WithQueue(exporterhelper.QueueSettings{ + Enabled: false, + }), ) } @@ -125,6 +138,12 @@ func createLogsExporter( exporterhelper.WithStart(fe.Start), exporterhelper.WithShutdown(fe.Shutdown), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exporterhelper.WithRetry(exporterhelper.RetrySettings{ + Enabled: false, + }), + exporterhelper.WithQueue(exporterhelper.QueueSettings{ + Enabled: false, + }), ) } @@ -172,4 +191,4 @@ func buildFileWriter(cfg *Config, logger *zap.Logger) (WriteCloseFlusher, error) // We maintain this map because the Factory is asked trace and metric receivers separately // when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not // create separate objects, they must use one Receiver object per configuration. -var exporters = sharedcomponent.NewSharedComponents[*Config, component.Component]() \ No newline at end of file +var exporters = sharedcomponent.NewSharedComponents[*Config, component.Component]() diff --git a/collector/gen/exporter/fileexporter/file_exporter.go b/collector/gen/exporter/fileexporter/file_exporter.go index 63105c1e5e..534a6a3d87 100644 --- a/collector/gen/exporter/fileexporter/file_exporter.go +++ b/collector/gen/exporter/fileexporter/file_exporter.go @@ -124,10 +124,14 @@ func (lw *lineWriter) Close() error { } func (lw *lineWriter) Flush() error { + lw.mutex.Lock() + defer lw.mutex.Unlock() return lw.file.Flush() } func (lw *lineWriter) getFile() io.WriteCloser { + lw.mutex.Lock() + defer lw.mutex.Unlock() return lw.file } @@ -223,4 +227,4 @@ func (e *fileExporter) Shutdown(context.Context) error { close(e.stopTicker) } return e.file.Close() -} \ No newline at end of file +} diff --git a/collector/gen/exporter/otlpexporter/config.go b/collector/gen/exporter/otlpexporter/config.go index 0bf4ee20d5..68b4ad8d9c 100644 --- a/collector/gen/exporter/otlpexporter/config.go +++ b/collector/gen/exporter/otlpexporter/config.go @@ -5,6 +5,7 @@ package otlpexporter // import "github.com/f5/otel-arrow-adapter/collector/gen/e import ( "fmt" + "time" "google.golang.org/grpc" @@ -33,10 +34,11 @@ type Config struct { // ArrowSettings includes whether Arrow is enabled and the number of // concurrent Arrow streams. type ArrowSettings struct { - Disabled bool `mapstructure:"disabled"` - NumStreams int `mapstructure:"num_streams"` - DisableDowngrade bool `mapstructure:"disable_downgrade"` - EnableMixedSignals bool `mapstructure:"enable_mixed_signals"` + Disabled bool `mapstructure:"disabled"` + NumStreams int `mapstructure:"num_streams"` + DisableDowngrade bool `mapstructure:"disable_downgrade"` + EnableMixedSignals bool `mapstructure:"enable_mixed_signals"` + MaxStreamLifetime time.Duration `mapstructure:"max_stream_lifetime"` } var _ component.Config = (*Config)(nil) @@ -59,5 +61,9 @@ func (cfg *ArrowSettings) Validate() error { return fmt.Errorf("stream count must be > 0: %d", cfg.NumStreams) } + if cfg.MaxStreamLifetime.Seconds() < float64(1) { + return fmt.Errorf("max stream life must be > 0: %d", cfg.MaxStreamLifetime) + } + return nil } diff --git a/collector/gen/exporter/otlpexporter/config_test.go b/collector/gen/exporter/otlpexporter/config_test.go index d688885a21..809902e8e6 100644 --- a/collector/gen/exporter/otlpexporter/config_test.go +++ b/collector/gen/exporter/otlpexporter/config_test.go @@ -79,27 +79,32 @@ func TestUnmarshalConfig(t *testing.T) { Arrow: ArrowSettings{ NumStreams: 2, EnableMixedSignals: true, + MaxStreamLifetime: 2 * time.Hour, }, }, cfg) } func TestArrowSettingsValidate(t *testing.T) { - settings := func(enabled bool, numStreams int) *ArrowSettings { - return &ArrowSettings{Disabled: !enabled, NumStreams: numStreams} + settings := func(enabled bool, numStreams int, maxStreamLifetime time.Duration) *ArrowSettings { + return &ArrowSettings{Disabled: !enabled, NumStreams: numStreams, MaxStreamLifetime: maxStreamLifetime} } - require.NoError(t, settings(true, 1).Validate()) - require.NoError(t, settings(false, 1).Validate()) - require.NoError(t, settings(true, 2).Validate()) - require.NoError(t, settings(true, math.MaxInt).Validate()) + require.NoError(t, settings(true, 1, 10*time.Second).Validate()) + require.NoError(t, settings(false, 1, 10*time.Second).Validate()) + require.NoError(t, settings(true, 2, 1*time.Second).Validate()) + require.NoError(t, settings(true, math.MaxInt, 10*time.Second).Validate()) - require.Error(t, settings(true, 0).Validate()) - require.Contains(t, settings(true, 0).Validate().Error(), "stream count must be") - require.Error(t, settings(false, -1).Validate()) - require.Error(t, settings(true, math.MinInt).Validate()) + require.Error(t, settings(true, 0, 10*time.Second).Validate()) + require.Contains(t, settings(true, 0, 10*time.Second).Validate().Error(), "stream count must be") + require.Contains(t, settings(true, 1, -1*time.Second).Validate().Error(), "max stream life must be") + require.Error(t, settings(false, -1, 10*time.Second).Validate()) + require.Error(t, settings(false, 1, -1*time.Second).Validate()) + require.Error(t, settings(true, math.MinInt, 10*time.Second).Validate()) } func TestDefaultSettingsValid(t *testing.T) { cfg := createDefaultConfig() + // this must be set by the user and config + // validation always checks that a value is set. + cfg.(*Config).Arrow.MaxStreamLifetime = 2 * time.Second require.NoError(t, cfg.(*Config).Validate()) - } diff --git a/collector/gen/exporter/otlpexporter/factory.go b/collector/gen/exporter/otlpexporter/factory.go index 0d1df0ea0e..9a96533a30 100644 --- a/collector/gen/exporter/otlpexporter/factory.go +++ b/collector/gen/exporter/otlpexporter/factory.go @@ -6,10 +6,12 @@ package otlpexporter // import "github.com/f5/otel-arrow-adapter/collector/gen/e import ( "context" "runtime" + "time" arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" "google.golang.org/grpc" + "github.com/f5/otel-arrow-adapter/collector/gen/exporter/otlpexporter/internal/arrow" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/configgrpc" @@ -17,7 +19,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" - "github.com/f5/otel-arrow-adapter/collector/gen/exporter/otlpexporter/internal/arrow" ) const ( @@ -49,7 +50,8 @@ func createDefaultConfig() component.Config { WriteBufferSize: 512 * 1024, }, Arrow: ArrowSettings{ - NumStreams: runtime.NumCPU(), + NumStreams: runtime.NumCPU(), + MaxStreamLifetime: time.Hour, }, } } diff --git a/collector/gen/exporter/otlpexporter/factory_test.go b/collector/gen/exporter/otlpexporter/factory_test.go index c4d75c005d..5a4c69d570 100644 --- a/collector/gen/exporter/otlpexporter/factory_test.go +++ b/collector/gen/exporter/otlpexporter/factory_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/f5/otel-arrow-adapter/collector/gen/internal/testutil" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/configgrpc" @@ -20,7 +21,6 @@ import ( "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exportertest" - "github.com/f5/otel-arrow-adapter/collector/gen/internal/testutil" ) func TestCreateDefaultConfig(t *testing.T) { @@ -34,7 +34,7 @@ func TestCreateDefaultConfig(t *testing.T) { assert.Equal(t, ocfg.QueueSettings, exporterhelper.NewDefaultQueueSettings()) assert.Equal(t, ocfg.TimeoutSettings, exporterhelper.NewDefaultTimeoutSettings()) assert.Equal(t, ocfg.Compression, configcompression.Gzip) - assert.Equal(t, ocfg.Arrow, ArrowSettings{Disabled: false, NumStreams: runtime.NumCPU()}) + assert.Equal(t, ocfg.Arrow, ArrowSettings{Disabled: false, NumStreams: runtime.NumCPU(), MaxStreamLifetime: time.Hour}) } func TestCreateMetricsExporter(t *testing.T) { diff --git a/collector/gen/exporter/otlpexporter/internal/arrow/common_test.go b/collector/gen/exporter/otlpexporter/internal/arrow/common_test.go index e429d7b1b0..6361d12452 100644 --- a/collector/gen/exporter/otlpexporter/internal/arrow/common_test.go +++ b/collector/gen/exporter/otlpexporter/internal/arrow/common_test.go @@ -97,6 +97,7 @@ type commonTestStream struct { ctxCall *gomock.Call sendCall *gomock.Call recvCall *gomock.Call + closeSendCall *gomock.Call } func (ctc *commonTestCase) newMockStream(ctx context.Context) *commonTestStream { @@ -109,6 +110,7 @@ func (ctc *commonTestCase) newMockStream(ctx context.Context) *commonTestStream gomock.Any(), // *arrowpb.BatchArrowRecords ).Times(0), recvCall: client.EXPECT().Recv().Times(0), + closeSendCall: client.EXPECT().CloseSend().AnyTimes().Return(nil), } return testStream } diff --git a/collector/gen/exporter/otlpexporter/internal/arrow/exporter.go b/collector/gen/exporter/otlpexporter/internal/arrow/exporter.go index fc439fce6e..f355e1a3ae 100644 --- a/collector/gen/exporter/otlpexporter/internal/arrow/exporter.go +++ b/collector/gen/exporter/otlpexporter/internal/arrow/exporter.go @@ -7,6 +7,7 @@ import ( "context" "errors" "sync" + "time" arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" @@ -23,6 +24,8 @@ type Exporter struct { // numStreams is the number of streams that will be used. numStreams int + maxStreamLifetime time.Duration + // disableDowngrade prevents downgrade from occurring, supports // forcing Arrow transport. disableDowngrade bool @@ -90,6 +93,7 @@ func MakeAnyStreamClient[T AnyStreamClient](clientFunc func(ctx context.Context, // NewExporter configures a new Exporter. func NewExporter( + maxStreamLifetime time.Duration, numStreams int, disableDowngrade bool, telemetry component.TelemetrySettings, @@ -99,6 +103,7 @@ func NewExporter( perRPCCredentials credentials.PerRPCCredentials, ) *Exporter { return &Exporter{ + maxStreamLifetime: maxStreamLifetime, numStreams: numStreams, disableDowngrade: disableDowngrade, telemetry: telemetry, @@ -175,6 +180,7 @@ func (e *Exporter) runArrowStream(ctx context.Context) { producer := e.newProducer() stream := newStream(producer, e.ready, e.telemetry, e.perRPCCredentials) + stream.maxStreamLifetime = e.maxStreamLifetime defer func() { if err := producer.Close(); err != nil { diff --git a/collector/gen/exporter/otlpexporter/internal/arrow/exporter_test.go b/collector/gen/exporter/otlpexporter/internal/arrow/exporter_test.go index f0c94eecca..b6bb886b43 100644 --- a/collector/gen/exporter/otlpexporter/internal/arrow/exporter_test.go +++ b/collector/gen/exporter/otlpexporter/internal/arrow/exporter_test.go @@ -13,10 +13,6 @@ import ( "testing" "time" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" - arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" - arrowRecordMock "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record/mock" - otelAssert "github.com/f5/otel-arrow-adapter/pkg/otel/assert" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -24,12 +20,19 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" - "github.com/f5/otel-arrow-adapter/collector/gen/internal/testdata" + arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" + arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" + arrowRecordMock "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record/mock" + otelAssert "github.com/f5/otel-arrow-adapter/pkg/otel/assert" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/f5/otel-arrow-adapter/collector/gen/internal/testdata" ) +const defaultMaxStreamLifetime = 11 * time.Second type compareJSONTraces struct{ ptrace.Traces } type compareJSONMetrics struct{ pmetric.Metrics } type compareJSONLogs struct{ plog.Logs } @@ -123,7 +126,7 @@ func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, numStreams int, di }) } - exp := NewExporter(numStreams, disableDowngrade, ctc.telset, nil, func() arrowRecord.ProducerAPI { + exp := NewExporter(defaultMaxStreamLifetime, numStreams, disableDowngrade, ctc.telset, nil, func() arrowRecord.ProducerAPI { // Mock the close function, use a real producer for testing dataflow. mock := arrowRecordMock.NewMockProducerAPI(ctc.ctrl) prod := arrowRecord.NewProducer() @@ -177,6 +180,7 @@ func statusUnrecognizedFor(id int64) *arrowpb.BatchStatus { // TestArrowExporterSuccess tests a single Send through a healthy channel. func TestArrowExporterSuccess(t *testing.T) { + stdTesting := otelAssert.NewStdUnitTest(t) for _, inputData := range []interface{}{twoTraces, twoMetrics, twoLogs} { tc := newSingleStreamTestCase(t) channel := newHealthyTestChannel() @@ -207,7 +211,7 @@ func TestArrowExporterSuccess(t *testing.T) { traces, err := testCon.TracesFrom(outputData) require.NoError(t, err) require.Equal(t, 1, len(traces)) - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONTraces{testData}, }, []json.Marshaler{ compareJSONTraces{traces[0]}, @@ -216,7 +220,7 @@ func TestArrowExporterSuccess(t *testing.T) { logs, err := testCon.LogsFrom(outputData) require.NoError(t, err) require.Equal(t, 1, len(logs)) - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONLogs{testData}, }, []json.Marshaler{ compareJSONLogs{logs[0]}, @@ -225,7 +229,7 @@ func TestArrowExporterSuccess(t *testing.T) { metrics, err := testCon.MetricsFrom(outputData) require.NoError(t, err) require.Equal(t, 1, len(metrics)) - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONMetrics{testData}, }, []json.Marshaler{ compareJSONMetrics{metrics[0]}, diff --git a/collector/gen/exporter/otlpexporter/internal/arrow/stream.go b/collector/gen/exporter/otlpexporter/internal/arrow/stream.go index d8cc5da0b4..0b8987bc62 100644 --- a/collector/gen/exporter/otlpexporter/internal/arrow/stream.go +++ b/collector/gen/exporter/otlpexporter/internal/arrow/stream.go @@ -11,6 +11,7 @@ import ( "io" "strings" "sync" + "time" arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" @@ -31,6 +32,12 @@ import ( // Stream is 1:1 with gRPC stream. type Stream struct { + // maxStreamLifetime is the max timeout before stream + // should be closed on the client side. This ensures a + // graceful shutdown before max_connection_age is reached + // on the server side. + maxStreamLifetime time.Duration + // producer is exclusive to the holder of the stream. producer arrowRecord.ProducerAPI @@ -149,8 +156,10 @@ func (s *Stream) run(bgctx context.Context, streamClient StreamClientFunc, grpcO ww.Add(1) go func() { defer ww.Done() - defer cancel() writeErr = s.write(ctx) + if writeErr != nil { + cancel() + } }() // the result from read() is processed after cancel and wait, @@ -257,6 +266,8 @@ func (s *Stream) write(ctx context.Context) error { var hdrsBuf bytes.Buffer hdrsEnc := hpack.NewEncoder(&hdrsBuf) + timer := time.NewTimer(s.maxStreamLifetime) + for { // Note: this can't block b/c stream has capacity & // individual streams shut down synchronously. @@ -265,8 +276,17 @@ func (s *Stream) write(ctx context.Context) error { // this can block, and if the context is canceled we // wait for the reader to find this stream. var wri writeItem + var ok bool select { - case wri = <-s.toWrite: + case <-timer.C: + s.prioritizer.removeReady(s) + s.client.CloseSend() + return nil + case wri, ok = <-s.toWrite: + // channel is closed + if !ok { + return nil + } case <-ctx.Done(): // Because we did not <-stream.toWrite, there // is a potential sender race since the stream @@ -325,8 +345,17 @@ func (s *Stream) read(_ context.Context) error { // cancel a call to Recv() but the call to processBatchStatus // is non-blocking. for { + // Note: if the client has called CloseSend() and is waiting for a response from the server. + // And if the server fails for some reason, we will wait until some other condition, such as a context + // timeout. TODO: possibly, improve to wait for no outstanding requests and then stop reading. resp, err := s.client.Recv() if err != nil { + // Once the send direction of stream is closed the server should return + // an error that mentions an EOF. The expected error code is codes.Unknown. + status, ok := status.FromError(err) + if ok && status.Message() == "EOF" && status.Code() == codes.Unknown { + return nil + } // Note: do not wrap, contains a Status. return err } @@ -334,6 +363,7 @@ func (s *Stream) read(_ context.Context) error { if err = s.processBatchStatus(resp); err != nil { return fmt.Errorf("process: %w", err) } + } } diff --git a/collector/gen/exporter/otlpexporter/internal/arrow/stream_test.go b/collector/gen/exporter/otlpexporter/internal/arrow/stream_test.go index e6ee6a7df6..718b89351f 100644 --- a/collector/gen/exporter/otlpexporter/internal/arrow/stream_test.go +++ b/collector/gen/exporter/otlpexporter/internal/arrow/stream_test.go @@ -53,6 +53,7 @@ func newStreamTestCase(t *testing.T) *streamTestCase { ctc.requestMetadataCall.AnyTimes().Return(nil, nil) stream := newStream(producer, prio, ctc.telset, ctc.perRPCCredentials) + stream.maxStreamLifetime = 10 * time.Second fromTracesCall := producer.EXPECT().BatchArrowRecordsFromTraces(gomock.Any()).Times(0) fromMetricsCall := producer.EXPECT().BatchArrowRecordsFromMetrics(gomock.Any()).Times(0) @@ -118,6 +119,38 @@ func (tc *streamTestCase) get() *Stream { return <-tc.prioritizer.readyChannel() } +// TestStreamEncodeError verifies that exceeding the +// max_stream_lifetime results in shutdown that +// simply restarts the stream. +func TestStreamGracefulShutdown(t *testing.T) { + tc := newStreamTestCase(t) + maxStreamLifetime := 1 * time.Second + tc.stream.maxStreamLifetime = maxStreamLifetime + + tc.fromTracesCall.Times(1).Return(oneBatch, nil) + tc.closeSendCall.Times(1).Return(nil) + + channel := newHealthyTestChannel() + tc.start(channel) + defer tc.cancelAndWaitForShutdown() + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + go func() { + defer wg.Done() + batch := <-channel.sent + channel.recv <- statusOKFor(batch.BatchId) + }() + + err := tc.get().SendAndWait(tc.bgctx, twoTraces) + require.NoError(t, err) + // let stream get closed and send again. + time.Sleep(maxStreamLifetime) + err = tc.get().SendAndWait(tc.bgctx, twoTraces) + require.Error(t, err) + require.True(t, errors.Is(err, ErrStreamRestarting)) +} + // TestStreamEncodeError verifies that an encoder error in the sender // yields a permanent error. func TestStreamEncodeError(t *testing.T) { diff --git a/collector/gen/exporter/otlpexporter/otlp.go b/collector/gen/exporter/otlpexporter/otlp.go index cde409f2e0..6722aeab77 100644 --- a/collector/gen/exporter/otlpexporter/otlp.go +++ b/collector/gen/exporter/otlpexporter/otlp.go @@ -20,12 +20,12 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "github.com/f5/otel-arrow-adapter/collector/gen/exporter/otlpexporter/internal/arrow" + "github.com/f5/otel-arrow-adapter/collector/gen/internal/netstats" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" - "github.com/f5/otel-arrow-adapter/collector/gen/exporter/otlpexporter/internal/arrow" - "github.com/f5/otel-arrow-adapter/collector/gen/internal/netstats" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pdata/pmetric" @@ -131,7 +131,7 @@ func (e *baseExporter) start(ctx context.Context, host component.Host) (err erro } } - e.arrow = arrow.NewExporter(e.config.Arrow.NumStreams, e.config.Arrow.DisableDowngrade, e.settings.TelemetrySettings, e.callOptions, func() arrowRecord.ProducerAPI { + e.arrow = arrow.NewExporter(e.config.Arrow.MaxStreamLifetime, e.config.Arrow.NumStreams, e.config.Arrow.DisableDowngrade, e.settings.TelemetrySettings, e.callOptions, func() arrowRecord.ProducerAPI { return arrowRecord.NewProducer() }, e.streamClientFactory(e.config, e.clientConn), perRPCCreds) diff --git a/collector/gen/exporter/otlpexporter/otlp_test.go b/collector/gen/exporter/otlpexporter/otlp_test.go index d12bcc739c..b4666394c2 100644 --- a/collector/gen/exporter/otlpexporter/otlp_test.go +++ b/collector/gen/exporter/otlpexporter/otlp_test.go @@ -31,6 +31,8 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" + "github.com/f5/otel-arrow-adapter/collector/gen/exporter/otlpexporter/internal/arrow/grpcmock" + "github.com/f5/otel-arrow-adapter/collector/gen/internal/testdata" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -40,10 +42,8 @@ import ( "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exportertest" - "github.com/f5/otel-arrow-adapter/collector/gen/exporter/otlpexporter/internal/arrow/grpcmock" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/auth" - "github.com/f5/otel-arrow-adapter/collector/gen/internal/testdata" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pdata/pmetric" @@ -457,6 +457,7 @@ func TestSendTracesWhenEndpointHasHttpScheme(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) cfg.GRPCClientSettings = test.gRPCClientSettings cfg.GRPCClientSettings.Endpoint = test.scheme + ln.Addr().String() + cfg.Arrow.MaxStreamLifetime = 100 * time.Second if test.useTLS { cfg.GRPCClientSettings.TLSSetting.InsecureSkipVerify = true } @@ -513,6 +514,7 @@ func TestSendMetrics(t *testing.T) { "header": "header-value", }, } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second set := exportertest.NewNopCreateSettings() set.BuildInfo.Description = "Collector" set.BuildInfo.Version = "1.2.3test" @@ -611,6 +613,7 @@ func TestSendTraceDataServerDownAndUp(t *testing.T) { // Do not rely on external retry logic here, if that is intended set InitialInterval to 100ms. WaitForReady: true, } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second set := exportertest.NewNopCreateSettings() exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) require.NoError(t, err) @@ -668,6 +671,7 @@ func TestSendTraceDataServerStartWhileRequest(t *testing.T) { Insecure: true, }, } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second set := exportertest.NewNopCreateSettings() exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) require.NoError(t, err) @@ -721,6 +725,7 @@ func TestSendTracesOnResourceExhaustion(t *testing.T) { Insecure: true, }, } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second set := exportertest.NewNopCreateSettings() exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) require.NoError(t, err) @@ -803,6 +808,7 @@ func TestSendLogData(t *testing.T) { Insecure: true, }, } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second set := exportertest.NewNopCreateSettings() set.BuildInfo.Description = "Collector" set.BuildInfo.Version = "1.2.3test" @@ -920,6 +926,7 @@ func testSendArrowTraces(t *testing.T, mixedSignals, clientWaitForReady, streamS cfg.Arrow = ArrowSettings{ NumStreams: 1, EnableMixedSignals: mixedSignals, + MaxStreamLifetime: 100 * time.Second, } set := exportertest.NewNopCreateSettings() @@ -1092,6 +1099,7 @@ func TestSendArrowFailedTraces(t *testing.T) { cfg.Arrow = ArrowSettings{ NumStreams: 1, EnableMixedSignals: true, + MaxStreamLifetime: 100 * time.Second, } cfg.QueueSettings.Enabled = false diff --git a/collector/gen/exporter/otlpexporter/testdata/config.yaml b/collector/gen/exporter/otlpexporter/testdata/config.yaml index 0120d78fca..b755ef5c5d 100644 --- a/collector/gen/exporter/otlpexporter/testdata/config.yaml +++ b/collector/gen/exporter/otlpexporter/testdata/config.yaml @@ -29,3 +29,4 @@ arrow: num_streams: 2 disabled: false enable_mixed_signals: true + max_stream_lifetime: "2h" diff --git a/collector/gen/receiver/otlpreceiver/internal/arrow/arrow_test.go b/collector/gen/receiver/otlpreceiver/internal/arrow/arrow_test.go index f00b510fd4..385f02fa25 100644 --- a/collector/gen/receiver/otlpreceiver/internal/arrow/arrow_test.go +++ b/collector/gen/receiver/otlpreceiver/internal/arrow/arrow_test.go @@ -14,11 +14,6 @@ import ( "sync" "testing" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" - arrowCollectorMock "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1/mock" - arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" - arrowRecordMock "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record/mock" - otelAssert "github.com/f5/otel-arrow-adapter/pkg/otel/assert" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -26,18 +21,25 @@ import ( "golang.org/x/net/http2/hpack" "google.golang.org/grpc/metadata" + arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" + arrowCollectorMock "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1/mock" + arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" + arrowRecordMock "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record/mock" + otelAssert "github.com/f5/otel-arrow-adapter/pkg/otel/assert" + "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/extension/auth" - "github.com/f5/otel-arrow-adapter/collector/gen/internal/testdata" "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" + + "github.com/f5/otel-arrow-adapter/collector/gen/internal/testdata" "github.com/f5/otel-arrow-adapter/collector/gen/receiver/otlpreceiver/internal/arrow/mock" ) @@ -369,6 +371,7 @@ func TestReceiverLogs(t *testing.T) { func TestReceiverMetrics(t *testing.T) { tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) + stdTesting := otelAssert.NewStdUnitTest(t) md := testdata.GenerateMetrics(2) batch, err := ctc.testProducer.BatchArrowRecordsFromMetrics(md) @@ -379,7 +382,7 @@ func TestReceiverMetrics(t *testing.T) { ctc.start(ctc.newRealConsumer) ctc.putBatch(batch, nil) - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONMetrics{md}, }, []json.Marshaler{ compareJSONMetrics{(<-ctc.consume).Data.(pmetric.Metrics)}, @@ -424,6 +427,8 @@ func TestReceiverSendError(t *testing.T) { } func TestReceiverConsumeError(t *testing.T) { + stdTesting := otelAssert.NewStdUnitTest(t) + data := []interface{}{ testdata.GenerateTraces(2), testdata.GenerateMetrics(2), @@ -458,19 +463,19 @@ func TestReceiverConsumeError(t *testing.T) { switch input := item.(type) { case ptrace.Traces: - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONTraces{input}, }, []json.Marshaler{ compareJSONTraces{(<-ctc.consume).Data.(ptrace.Traces)}, }) case plog.Logs: - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONLogs{input}, }, []json.Marshaler{ compareJSONLogs{(<-ctc.consume).Data.(plog.Logs)}, }) case pmetric.Metrics: - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONMetrics{input}, }, []json.Marshaler{ compareJSONMetrics{(<-ctc.consume).Data.(pmetric.Metrics)}, diff --git a/collector/processor/obfuscationprocessor/config.go b/collector/processor/obfuscationprocessor/config.go index 766fc493ac..5d35238012 100644 --- a/collector/processor/obfuscationprocessor/config.go +++ b/collector/processor/obfuscationprocessor/config.go @@ -1,9 +1,19 @@ package obfuscationprocessor type Config struct { - EncryptKey string `mapstructure:"encrypt_key"` - EncryptRound int `mapstructure:"encrypt_round"` - EncryptAll bool `mapstructure:"encrypt_all"` + // Rounds is a Fiestel parameter which determines the + // difficulty of uncovering the original data. Default 10. + Rounds int `mapstructure:"rounds"` + // KeyLength is a Fiestel parameter which determines the + // length of the keyt used to obfuscate. Default 128. + KeyLength int `mapstructure:"key_length"` + + // EncryptAll indicates that all byte-array and string values + // should be obfuscated. + EncryptAll bool `mapstructure:"encrypt_all"` + + // EncryptAttributes indicates a specific list of attributes + // to obfuscate. EncryptAttributes []string `mapstructure:"encrypt_attributes"` -} \ No newline at end of file +} diff --git a/collector/processor/obfuscationprocessor/factory.go b/collector/processor/obfuscationprocessor/factory.go index 0d23c47e9b..653f827c14 100644 --- a/collector/processor/obfuscationprocessor/factory.go +++ b/collector/processor/obfuscationprocessor/factory.go @@ -2,6 +2,9 @@ package obfuscationprocessor import ( "context" + + "crypto/rand" + "github.com/cyrildever/feistel" "github.com/cyrildever/feistel/common/utils/hash" "go.opentelemetry.io/collector/component" @@ -16,7 +19,8 @@ const ( // The stability level of the exporter. stability = component.StabilityLevelAlpha - defaultRound = 10 + defaultRounds = 10 + defaultKeyLength = 128 ) // NewFactory creates a factory for the obfuscation processor. @@ -25,15 +29,23 @@ func NewFactory() processor.Factory { typeStr, createDefaultConfig, processor.WithTraces(createTracesProcessor, stability), + processor.WithLogs(createLogsProcessor, stability), processor.WithMetrics(createMetricsProcessor, stability), ) } -func createDefaultConfig() component.Config { +func newKey(keyLength int) string { + buf := make([]byte, keyLength) + rand.Reader.Read(buf) + return string(buf) +} + +func createDefaultConfig() component.Config { return &Config{ - EncryptRound: defaultRound, + Rounds: defaultRounds, + KeyLength: defaultKeyLength, // encrypt all string attributes by default - EncryptAll: true, + EncryptAll: true, } } @@ -47,7 +59,7 @@ func createMetricsProcessor( processor := &obfuscation{ logger: set.Logger, nextMetrics: next, - encrypt: feistel.NewFPECipher(hash.SHA_256, oCfg.EncryptKey, oCfg.EncryptRound), + encrypt: feistel.NewFPECipher(hash.SHA_256, newKey(oCfg.KeyLength), oCfg.Rounds), encryptAttributes: makeEncryptList(oCfg), encryptAll: oCfg.EncryptAll, } @@ -73,7 +85,7 @@ func createTracesProcessor( processor := &obfuscation{ logger: set.Logger, nextTraces: next, - encrypt: feistel.NewFPECipher(hash.SHA_256, oCfg.EncryptKey, oCfg.EncryptRound), + encrypt: feistel.NewFPECipher(hash.SHA_256, newKey(oCfg.KeyLength), oCfg.Rounds), encryptAttributes: makeEncryptList(oCfg), encryptAll: oCfg.EncryptAll, } @@ -88,6 +100,32 @@ func createTracesProcessor( processorhelper.WithShutdown(processor.Shutdown)) } +// createTracesProcessor creates an instance of obfuscation for processing traces +func createLogsProcessor( + ctx context.Context, + set processor.CreateSettings, + cfg component.Config, + next consumer.Logs, +) (processor.Logs, error) { + oCfg := cfg.(*Config) + processor := &obfuscation{ + logger: set.Logger, + nextLogs: next, + encrypt: feistel.NewFPECipher(hash.SHA_256, newKey(oCfg.KeyLength), oCfg.Rounds), + encryptAttributes: makeEncryptList(oCfg), + encryptAll: oCfg.EncryptAll, + } + return processorhelper.NewLogsProcessor( + ctx, + set, + cfg, + next, + processor.processLogs, + processorhelper.WithCapabilities(processor.Capabilities()), + processorhelper.WithStart(processor.Start), + processorhelper.WithShutdown(processor.Shutdown)) +} + // makeEncryptList sets up a lookup table of span attribute keys which need to be encrypted. func makeEncryptList(c *Config) map[string]struct{} { allowList := make(map[string]struct{}, len(c.EncryptAttributes)) @@ -98,4 +136,4 @@ func makeEncryptList(c *Config) map[string]struct{} { c.EncryptAll = false } return allowList -} \ No newline at end of file +} diff --git a/collector/processor/obfuscationprocessor/processor.go b/collector/processor/obfuscationprocessor/processor.go index cc6ee7586c..83531270a5 100644 --- a/collector/processor/obfuscationprocessor/processor.go +++ b/collector/processor/obfuscationprocessor/processor.go @@ -19,6 +19,7 @@ type obfuscation struct { // Next trace consumer in line nextTraces consumer.Traces nextMetrics consumer.Metrics + nextLogs consumer.Logs encryptAttributes map[string]struct{} encrypt *feistel.FPECipher diff --git a/docs/img/OTEL - Chaos engineering.png b/docs/img/OTEL - Chaos engineering.png new file mode 100644 index 0000000000..af6e89a17a Binary files /dev/null and b/docs/img/OTEL - Chaos engineering.png differ diff --git a/docs/img/OTEL - validation process.png b/docs/img/OTEL - validation process.png new file mode 100644 index 0000000000..ab2571ef2e Binary files /dev/null and b/docs/img/OTEL - validation process.png differ diff --git a/docs/validation_process.md b/docs/validation_process.md index fef1502033..a9ceb6ed04 100644 --- a/docs/validation_process.md +++ b/docs/validation_process.md @@ -1,4 +1,4 @@ -# Encoding/Decoding Validation Process +# Validation Process This document describes the validation process employed to confirm the correctness and resilience of encoding and decoding process of OTLP entities to/from OTel Arrow @@ -20,16 +20,20 @@ Mandatory fields will not be generated systematically in order to test the robustness of the encoding/decoding process (the original Protobuf encoding doesn't enforce mandatory fields). -## Generic comparison of OTLP entities before and after encoding/decoding +## Encoding/Decoding validation -OTLP entities are serialized in their JSON representation before encoding and -after decoding. These two JSON representations must be logically equivalent. -This comparison is implemented by a dedicated function that recursively traverses -the 2 JSON trees and compares the values of the fields taking into account that -the fields are not ordered and that the batching process may have changed the -internal organization of the data. +OTLP entities are encoded to Arrow records and then decoded back to OTLP +entities. The validation process compares the original OTLP entities with the +decoded OTLP entities. The comparison is done at the JSON level. These two JSON +representations must be logically equivalent. This comparison is implemented by +a dedicated function that recursively traverses the 2 JSON trees and compares +the values of the fields taking into account that the fields are not ordered +and that the batching process may have changed the internal organization of the +data. The `assert.Equiv` function implements this comparison. -## Decoding of invalid data +![General validation process](./img/OTEL%20-%20validation%20process.png) + +## Encoding/Decoding of invalid data A generic process has been implemented to inject specific errors and data changes in the encoded data. For example, the existing process keep the Arrow records @@ -37,7 +41,13 @@ but randomly change their payload types. The goal is to test the resilience of the decoding process to invalid data. The decoding layer must be able to handle any invalid data and return appropriate error messages without crashing. -## Capturing and Replaying production data +![Decoding of invalid data](./img/OTEL%20-%20Chaos%20Engineering.png) + +## Collector validation + +ToDo: describe the validation process of the collector (@jmacd). + +## Capturing and Replaying staging or production data A new version of the OTel file exporter has been implemented to capture OTLP traffic in a generic JSON format (with ZSTD compression). A set of tools have diff --git a/go.mod b/go.mod index db63b43cfb..70108a5237 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.80.0 go.opentelemetry.io/collector/config/configtls v0.80.0 go.opentelemetry.io/collector/confmap v0.80.0 + go.opentelemetry.io/collector/connector v0.80.0 go.opentelemetry.io/collector/consumer v0.80.0 go.opentelemetry.io/collector/exporter v0.80.0 go.opentelemetry.io/collector/exporter/loggingexporter v0.80.0 @@ -137,7 +138,6 @@ require ( go.mongodb.org/mongo-driver v1.11.6 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector/config/internal v0.80.0 // indirect - go.opentelemetry.io/collector/connector v0.80.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0-rcv0013 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.1-0.20230612162650-64be7e574a17 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect diff --git a/pkg/arrow/from_record.go b/pkg/arrow/from_record.go index ba694b809c..9cc7c0b767 100644 --- a/pkg/arrow/from_record.go +++ b/pkg/arrow/from_record.go @@ -130,6 +130,35 @@ func U64FromRecord(record arrow.Record, fieldID int, row int) (uint64, error) { } } +// NullableU16FromRecord returns the uint16 value for a specific row and column in an +// Arrow record. If the value is null, it returns nil. +func NullableU16FromRecord(record arrow.Record, fieldID int, row int) (*uint16, error) { + if fieldID == AbsentFieldID { + return nil, nil + } + + arr := record.Column(fieldID) + if arr == nil { + return nil, nil + } + + if arr.IsNull(row) { + return nil, nil + } + + switch arr := arr.(type) { + case *array.Uint16: + if arr.IsNull(row) { + return nil, nil + } else { + val := arr.Value(row) + return &val, nil + } + default: + return nil, werror.WrapWithMsg(ErrInvalidArrayType, "not a uint16 array") + } +} + // NullableU32FromRecord returns the uint32 value for a specific row and column in an // Arrow record. If the value is null, it returns nil. func NullableU32FromRecord(record arrow.Record, fieldID int, row int) (*uint32, error) { diff --git a/pkg/datagen/metrics.go b/pkg/datagen/metrics.go index acd27681ed..c44b025a96 100644 --- a/pkg/datagen/metrics.go +++ b/pkg/datagen/metrics.go @@ -82,6 +82,65 @@ func (mg *MetricsGenerator) GenerateAllKindOfMetrics(batchSize int, collectInter return result } +func (mg *MetricsGenerator) GenerateRandomMetrics(batchSize int, collectInterval time.Duration) pmetric.Metrics { + result := pmetric.NewMetrics() + + // Generate 4 resource spans per batch. + for i := 0; i < 4; i++ { + resourceMetrics := result.ResourceMetrics().AppendEmpty() + // ~50% of the time, generate a random resource + if mg.GenBool() { + pick(mg.TestEntropy, mg.resourceAttributes).CopyTo(resourceMetrics.Resource().Attributes()) + } + + // Generate 4 scope metrics per resource metric. + for j := 0; j < 4; j++ { + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + // ~50% of the time, generate a random scope + if mg.GenBool() { + pick(mg.TestEntropy, mg.instrumentationScopes).CopyTo(scopeMetrics.Scope()) + } + + if mg.GenBool() { + scopeMetrics.SetSchemaUrl(fmt.Sprintf("https://opentelemetry.io/schemas/1.%d.%d", i, j)) + } + + metrics := scopeMetrics.Metrics() + + spanCount := mg.rng.Intn(batchSize) + 1 + for i := 0; i < spanCount; i++ { + mg.AdvanceTime(time.Duration(mg.rng.Intn(int(collectInterval)))) + + if mg.GenBool() { + mg.SystemCpuTime(metrics.AppendEmpty(), 1) + } + if mg.GenBool() { + mg.SystemMemoryUsage(metrics.AppendEmpty()) + } + if mg.GenBool() { + mg.SystemCpuLoadAverage1m(metrics.AppendEmpty()) + } + if mg.GenBool() { + mg.FakeSummary(metrics.AppendEmpty()) + } + if mg.GenBool() { + mg.FakeHistogram(metrics.AppendEmpty()) + } + if mg.GenBool() { + mg.ExpHistogramWithEverything(metrics.AppendEmpty()) + } + } + } + if mg.GenBool() { + resourceMetrics.SetSchemaUrl(fmt.Sprintf("https://opentelemetry.io/schemas/1.0.%d", i)) + } + } + + mg.generation++ + + return result +} + func (mg *MetricsGenerator) GenerateGauges(batchSize int, collectInterval time.Duration) pmetric.Metrics { result, metrics := mg.newResult() diff --git a/pkg/otel/arrow_record/logs_dict_test.go b/pkg/otel/arrow_record/logs_dict_test.go index 661fdfe0ec..0fdd1b44d9 100644 --- a/pkg/otel/arrow_record/logs_dict_test.go +++ b/pkg/otel/arrow_record/logs_dict_test.go @@ -54,6 +54,8 @@ func TestLogsWithNoDictionary(t *testing.T) { } }() + stdTesting := assert.NewStdUnitTest(t) + for i := 0; i < 300; i++ { logs := GenerateLogs(0, math.MaxUint8+1) batch, err := producer.BatchArrowRecordsFromLogs(logs) @@ -65,7 +67,7 @@ func TestLogsWithNoDictionary(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{plogotlp.NewExportRequestFromLogs(logs)}, []json.Marshaler{plogotlp.NewExportRequestFromLogs(received[0])}, ) @@ -102,6 +104,8 @@ func TestLogsMultiBatchWithDictionaryIndexChanges(t *testing.T) { } }() + stdTesting := assert.NewStdUnitTest(t) + for i := 0; i < 10; i++ { logs := GenerateLogs(0, math.MaxUint8+1) batch, err := producer.BatchArrowRecordsFromLogs(logs) @@ -113,7 +117,7 @@ func TestLogsMultiBatchWithDictionaryIndexChanges(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{plogotlp.NewExportRequestFromLogs(logs)}, []json.Marshaler{plogotlp.NewExportRequestFromLogs(received[0])}, ) @@ -153,6 +157,8 @@ func TestLogsMultiBatchWithDictionaryOverflow(t *testing.T) { } }() + stdTesting := assert.NewStdUnitTest(t) + for i := 0; i < 10; i++ { logs := GenerateLogs(i*((math.MaxUint8/2)+1), (math.MaxUint8/2)+1) batch, err := producer.BatchArrowRecordsFromLogs(logs) @@ -164,7 +170,7 @@ func TestLogsMultiBatchWithDictionaryOverflow(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{plogotlp.NewExportRequestFromLogs(logs)}, []json.Marshaler{plogotlp.NewExportRequestFromLogs(received[0])}, ) @@ -203,6 +209,8 @@ func TestLogsMultiBatchWithDictionaryLimit(t *testing.T) { } }() + stdTesting := assert.NewStdUnitTest(t) + for i := 0; i < 10; i++ { logs := GenerateLogs(0, math.MaxUint8+1) batch, err := producer.BatchArrowRecordsFromLogs(logs) @@ -214,7 +222,7 @@ func TestLogsMultiBatchWithDictionaryLimit(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{plogotlp.NewExportRequestFromLogs(logs)}, []json.Marshaler{plogotlp.NewExportRequestFromLogs(received[0])}, ) diff --git a/pkg/otel/arrow_record/metrics_dict_test.go b/pkg/otel/arrow_record/metrics_dict_test.go index 43a6362fa6..a234fbbacf 100644 --- a/pkg/otel/arrow_record/metrics_dict_test.go +++ b/pkg/otel/arrow_record/metrics_dict_test.go @@ -53,6 +53,8 @@ func TestMetricsWithNoDictionary(t *testing.T) { } }() + stdTesting := assert.NewStdUnitTest(t) + for i := 0; i < 300; i++ { metrics := GenerateMetrics(0, math.MaxUint8+1) batch, err := producer.BatchArrowRecordsFromMetrics(metrics) @@ -64,7 +66,7 @@ func TestMetricsWithNoDictionary(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(metrics)}, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(received[0])}, ) @@ -101,6 +103,8 @@ func TestMetricsMultiBatchWithDictionaryIndexChanges(t *testing.T) { } }() + stdTesting := assert.NewStdUnitTest(t) + for i := 0; i < 10; i++ { metrics := GenerateMetrics(0, math.MaxUint8+1) batch, err := producer.BatchArrowRecordsFromMetrics(metrics) @@ -112,7 +116,7 @@ func TestMetricsMultiBatchWithDictionaryIndexChanges(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(metrics)}, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(received[0])}, ) @@ -153,6 +157,8 @@ func TestMetricsMultiBatchWithDictionaryOverflow(t *testing.T) { } }() + stdTesting := assert.NewStdUnitTest(t) + for i := 0; i < 10; i++ { metrics := GenerateMetrics(i*((math.MaxUint8/2)+1), (math.MaxUint8/2)+1) batch, err := producer.BatchArrowRecordsFromMetrics(metrics) @@ -164,7 +170,7 @@ func TestMetricsMultiBatchWithDictionaryOverflow(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(metrics)}, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(received[0])}, ) @@ -204,6 +210,8 @@ func TestMetricsMultiBatchWithDictionaryLimit(t *testing.T) { } }() + stdTesting := assert.NewStdUnitTest(t) + for i := 0; i < 10; i++ { metrics := GenerateMetrics(0, math.MaxUint8+1) batch, err := producer.BatchArrowRecordsFromMetrics(metrics) @@ -215,7 +223,7 @@ func TestMetricsMultiBatchWithDictionaryLimit(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(metrics)}, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(received[0])}, ) diff --git a/pkg/otel/arrow_record/producer_consumer_test.go b/pkg/otel/arrow_record/producer_consumer_test.go index af34cb250f..1fdd68ad1e 100644 --- a/pkg/otel/arrow_record/producer_consumer_test.go +++ b/pkg/otel/arrow_record/producer_consumer_test.go @@ -200,6 +200,8 @@ func FuzzProducerTraces1(f *testing.F) { func TestProducerConsumerTraces(t *testing.T) { ent := datagen.NewTestEntropy(int64(rand.Uint64())) //nolint:gosec // only used for testing + stdTesting := assert.NewStdUnitTest(t) + for idx, dg := range []*datagen.TraceGenerator{ datagen.NewTracesGenerator( ent, @@ -260,7 +262,7 @@ func TestProducerConsumerTraces(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(traces)}, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(received[0])}, ) @@ -271,6 +273,8 @@ func TestProducerConsumerTraces(t *testing.T) { func TestProducerConsumerLogs(t *testing.T) { ent := datagen.NewTestEntropy(int64(rand.Uint64())) //nolint:gosec // only used for testing + stdTesting := assert.NewStdUnitTest(t) + for idx, dg := range []*datagen.LogsGenerator{ datagen.NewLogsGenerator( ent, @@ -312,7 +316,7 @@ func TestProducerConsumerLogs(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{plogotlp.NewExportRequestFromLogs(logs)}, []json.Marshaler{plogotlp.NewExportRequestFromLogs(received[0])}, ) @@ -323,6 +327,8 @@ func TestProducerConsumerLogs(t *testing.T) { func TestProducerConsumerMetrics(t *testing.T) { ent := datagen.NewTestEntropy(int64(rand.Uint64())) //nolint:gosec // only used for testing + stdTesting := assert.NewStdUnitTest(t) + for idx, dg := range []*datagen.MetricsGenerator{ datagen.NewMetricsGenerator( ent, @@ -365,7 +371,7 @@ func TestProducerConsumerMetrics(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(metrics)}, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(received[0])}, ) @@ -380,7 +386,7 @@ func TestProducerConsumerMetrics(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(metrics)}, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(received[0])}, ) diff --git a/pkg/otel/arrow_record/traces_dict_test.go b/pkg/otel/arrow_record/traces_dict_test.go index fe0688c8de..2ae31a4046 100644 --- a/pkg/otel/arrow_record/traces_dict_test.go +++ b/pkg/otel/arrow_record/traces_dict_test.go @@ -54,6 +54,8 @@ func TestTracesWithNoDictionary(t *testing.T) { } }() + stdTesting := assert.NewStdUnitTest(t) + for i := 0; i < 300; i++ { traces := GenerateTraces(0, math.MaxUint8+1) batch, err := producer.BatchArrowRecordsFromTraces(traces) @@ -65,7 +67,7 @@ func TestTracesWithNoDictionary(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(traces)}, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(received[0])}, ) @@ -102,6 +104,8 @@ func TestTracesMultiBatchWithDictionaryIndexChanges(t *testing.T) { } }() + stdTesting := assert.NewStdUnitTest(t) + for i := 0; i < 10; i++ { traces := GenerateTraces(0, math.MaxUint8+1) batch, err := producer.BatchArrowRecordsFromTraces(traces) @@ -113,7 +117,7 @@ func TestTracesMultiBatchWithDictionaryIndexChanges(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(traces)}, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(received[0])}, ) @@ -153,6 +157,8 @@ func TestTracesMultiBatchWithDictionaryOverflow(t *testing.T) { } }() + stdTesting := assert.NewStdUnitTest(t) + for i := 0; i < 10; i++ { traces := GenerateTraces(i*((math.MaxUint8/2)+1), (math.MaxUint8/2)+1) batch, err := producer.BatchArrowRecordsFromTraces(traces) @@ -164,7 +170,7 @@ func TestTracesMultiBatchWithDictionaryOverflow(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(traces)}, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(received[0])}, ) @@ -203,6 +209,8 @@ func TestTracesMultiBatchWithDictionaryLimit(t *testing.T) { } }() + stdTesting := assert.NewStdUnitTest(t) + for i := 0; i < 10; i++ { traces := GenerateTraces(0, math.MaxUint8+1) batch, err := producer.BatchArrowRecordsFromTraces(traces) @@ -214,7 +222,7 @@ func TestTracesMultiBatchWithDictionaryLimit(t *testing.T) { require.Equal(t, 1, len(received)) assert.Equiv( - t, + stdTesting, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(traces)}, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(received[0])}, ) diff --git a/pkg/otel/assert/equiv.go b/pkg/otel/assert/equiv.go index d7da2253aa..0ce6d55e14 100644 --- a/pkg/otel/assert/equiv.go +++ b/pkg/otel/assert/equiv.go @@ -25,9 +25,85 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) +// Testing is an interface that makes `assert.Equiv` independent of the testing +// framework. It is then possible to use `assert.Equiv` in contexts where the +// testing framework is not available. +type Testing interface { + Helper() + FailNow(failureMessage string, msgAndArgs ...any) bool + NoError(err error, msgAndArgs ...any) + Equal(expected, actual interface{}, msgAndArgs ...any) bool +} + +// StdUnitTest adapts the Equiv() method for a standard unit test. +type StdUnitTest struct { + t *testing.T +} + +func NewStdUnitTest(t *testing.T) Testing { + return &StdUnitTest{t: t} +} + +func (a *StdUnitTest) Helper() { + a.t.Helper() +} + +func (a *StdUnitTest) FailNow(failureMessage string, msgAndArgs ...any) bool { + return assert.FailNow(a.t, failureMessage, msgAndArgs...) +} + +func (a *StdUnitTest) NoError(err error, msgAndArgs ...any) { + assert.NoError(a.t, err, msgAndArgs...) +} + +func (a *StdUnitTest) Equal(expected, actual interface{}, msgAndArgs ...any) bool { + return assert.Equal(a.t, expected, actual, msgAndArgs...) +} + +// StandaloneTest adapts the Equiv() method for a standard unit test. +type StandaloneTest struct { +} + +func NewStandaloneTest() Testing { + return &StandaloneTest{} +} + +func (a *StandaloneTest) Helper() { + // n/a +} + +// sprintf is handle's handles `...any` the way the testify library +// does, meaning to expect a printf formating directive in the first +// position and format appropriately. +func sprintf(msgAndArgs ...any) string { + if len(msgAndArgs) == 0 { + return "" // empty + } + if s, ok := msgAndArgs[0].(string); ok { + return fmt.Sprintf(s, msgAndArgs[1:]...) + } + return fmt.Sprint(msgAndArgs...) // fallback to sprint() +} + +func (a *StandaloneTest) FailNow(failureMessage string, msgAndArgs ...any) bool { + panic(fmt.Sprint(failureMessage, ": ", sprintf(msgAndArgs...))) +} + +func (a *StandaloneTest) NoError(err error, msgAndArgs ...any) { + if err != nil { + panic(fmt.Sprint("unexpected error: ", sprintf(msgAndArgs...))) + } +} + +func (a *StandaloneTest) Equal(expected, actual interface{}, msgAndArgs ...any) bool { + if expected != actual { + panic(fmt.Sprint("unexpected mismatch: ", expected, "!=", actual, sprintf(msgAndArgs...))) + } + return true +} + // Equiv asserts that two arrays of json.Marshaler are equivalent. Metrics, logs, and traces requests implement // json.Marshaler and are considered equivalent if they have the same set of vPaths. A vPath is a path to a value // in a json object. For example the vPath "resource.attributes.service.name=myservice" refers to the value "myservice" @@ -41,15 +117,15 @@ import ( // This concept of equivalence is useful for testing the conversion OTLP to/from OTLP Arrow as this conversion doesn't // necessarily preserve the structure of the original OTLP entity. Resource spans or scope spans can be split or merged // during the conversion if the semantic is preserved. -func Equiv(t *testing.T, expected []json.Marshaler, actual []json.Marshaler) { +func Equiv(t Testing, expected []json.Marshaler, actual []json.Marshaler) { t.Helper() expectedVPaths, err := vPaths(expected) if err != nil { - assert.FailNow(t, "Failed to convert expected traces to canonical representation", err) + t.FailNow("Failed to convert expected traces to canonical representation", err) } actualVPaths, err := vPaths(actual) if err != nil { - assert.FailNow(t, "Failed to convert actual traces to canonical representation", err) + t.FailNow("Failed to convert actual traces to canonical representation", err) } missingExpectedVPaths := difference(expectedVPaths, actualVPaths) @@ -76,19 +152,19 @@ func Equiv(t *testing.T, expected []json.Marshaler, actual []json.Marshaler) { actualJSON, _ := json.MarshalIndent(actual, "", " ") println("actual json: " + string(actualJSON)) - assert.FailNow(t, "Traces are not equivalent") + t.FailNow("Traces are not equivalent") } } -func EquivFromBytes(t *testing.T, expected []byte, actual []byte) { +func EquivFromBytes(t Testing, expected []byte, actual []byte) { t.Helper() expectedVPaths, err := vPathsFromBytes(expected) if err != nil { - assert.FailNow(t, "Failed to convert expected traces to canonical representation", err) + t.FailNow("Failed to convert expected traces to canonical representation", err) } actualVPaths, err := vPathsFromBytes(actual) if err != nil { - assert.FailNow(t, "Failed to convert actual traces to canonical representation", err) + t.FailNow("Failed to convert actual traces to canonical representation", err) } missingExpectedVPaths := difference(expectedVPaths, actualVPaths) @@ -107,27 +183,27 @@ func EquivFromBytes(t *testing.T, expected []byte, actual []byte) { } } if len(missingExpectedVPaths) > 0 || len(missingActualVPaths) > 0 { - assert.FailNow(t, "Traces are not equivalent") + t.FailNow("Traces are not equivalent") } } // NotEquiv asserts that two arrays of json.Marshaler are not equivalent. See Equiv for the definition of equivalence. -func NotEquiv(t *testing.T, expected []json.Marshaler, actual []json.Marshaler) { +func NotEquiv(t Testing, expected []json.Marshaler, actual []json.Marshaler) { t.Helper() expectedVPaths, err := vPaths(expected) if err != nil { - assert.FailNow(t, "Failed to convert expected traces to canonical representation", err) + t.FailNow("Failed to convert expected traces to canonical representation", err) } actualVPaths, err := vPaths(actual) if err != nil { - assert.FailNow(t, "Failed to convert actual traces to canonical representation", err) + t.FailNow("Failed to convert actual traces to canonical representation", err) } missingExpectedVPaths := difference(expectedVPaths, actualVPaths) missingActualVPaths := difference(actualVPaths, expectedVPaths) if len(missingExpectedVPaths) == 0 && len(missingActualVPaths) == 0 { - assert.FailNow(t, "Traces should not be equivalent") + t.FailNow("Traces should not be equivalent") } } @@ -224,9 +300,9 @@ func exportAllVPaths(traces map[string]interface{}, currentVPath string, vPaths } } -// nonPositionalIndex returns a string that can be used to identify: -// - a resource, -// - a scope, +// nonPositionalIndex returns a string that can be used to identify: resource, +// scope, event, link, attribute, span, metrics, dataPoints. +// // Note: The string `_` is returned if the key is not supported. func nonPositionalIndex(key string, vMap map[string]interface{}) string { switch key { @@ -240,11 +316,8 @@ func nonPositionalIndex(key string, vMap map[string]interface{}) string { if ok { return sig(scope) } - case "events", "links": - return sig(vMap) - case "attributes": - return sig(vMap) - case "spans": + case "events", "links", "attributes", "spans", "quantileValues", + "filteredAttributes", "exemplars", "dataPoints", "metrics", "logRecords", "values": return sig(vMap) } return "_" @@ -308,8 +381,9 @@ func mapSig(vMap map[string]interface{}) string { sigBuilder.WriteString(",") } - // Special case for attributes, which are sorted by key. - if key == "attributes" { + // Special case for attributes (and filtered attributes), which are + // sorted by key. + if key == "attributes" || key == "filteredAttributes" { attributes, ok := vMap[key].([]interface{}) if ok { attrsSig, done := tryAttributesSig(attributes) @@ -322,9 +396,9 @@ func mapSig(vMap map[string]interface{}) string { } } - // Special case for events and links, which are sorted by non-positional - // index. - if key == "events" || key == "links" { + // Special case for events, links, and exemplars, which are sorted by + // non-positional index. + if key == "events" || key == "links" || key == "exemplars" || key == "values" { items, ok := vMap[key].([]interface{}) if ok { sig, done := itemsSig(key, items) @@ -453,18 +527,18 @@ func jsonifyFromBytes(jsonBytes []byte) (map[string]interface{}, error) { // JSONCanonicalEq compares two JSON objects for equality after converting // them to a canonical form. This is useful for comparing JSON objects that may // have different key orders or array orders. -func JSONCanonicalEq(t *testing.T, expected interface{}, actual interface{}) { +func JSONCanonicalEq(t Testing, expected interface{}, actual interface{}) { t.Helper() expected, err := jsonFrom(expected) - require.NoError(t, err) + t.NoError(err) actual, err = jsonFrom(actual) - require.NoError(t, err) + t.NoError(err) expectedID := CanonicalObjectID(expected) actualID := CanonicalObjectID(actual) - assert.Equal(t, expectedID, actualID) + t.Equal(expectedID, actualID) } // CanonicalObjectID computes a unique ID for an object. diff --git a/pkg/otel/assert/equiv_test.go b/pkg/otel/assert/equiv_test.go index 1924fa79c4..c9c8c52433 100644 --- a/pkg/otel/assert/equiv_test.go +++ b/pkg/otel/assert/equiv_test.go @@ -27,6 +27,7 @@ import ( func TestEquiv(t *testing.T) { t.Parallel() + stdTesting := NewStdUnitTest(t) traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() rs.Resource().Attributes().PutStr("foo1", "bar") @@ -43,7 +44,7 @@ func TestEquiv(t *testing.T) { ptraceotlp.NewExportRequestFromTraces(traces), ptraceotlp.NewExportRequestFromTraces(traces), } - Equiv(t, expectedTraces, actualTraces) + Equiv(stdTesting, expectedTraces, actualTraces) traces = ptrace.NewTraces() rs = traces.ResourceSpans().AppendEmpty() @@ -53,12 +54,13 @@ func TestEquiv(t *testing.T) { actualTraces = []json.Marshaler{ ptraceotlp.NewExportRequestFromTraces(traces), } - NotEquiv(t, expectedTraces, actualTraces) + NotEquiv(stdTesting, expectedTraces, actualTraces) } func TestEquivSortAndMerge(t *testing.T) { t.Parallel() + stdTesting := NewStdUnitTest(t) split_res_and_scope := ptrace.NewTraces() rs := split_res_and_scope.ResourceSpans().AppendEmpty() rs.Resource().Attributes().PutStr("k2", "v2") @@ -187,12 +189,13 @@ func TestEquivSortAndMerge(t *testing.T) { actualTraces := []json.Marshaler{ ptraceotlp.NewExportRequestFromTraces(split_res_and_scope), } - Equiv(t, expectedTraces, actualTraces) + Equiv(stdTesting, expectedTraces, actualTraces) } func TestSingleResScope(t *testing.T) { t.Parallel() + stdTesting := NewStdUnitTest(t) expected := "{\n \"resourceSpans\": [\n {\n \"resource\": {\n \"attributes\": [\n {\n \"key\": \"hostname\",\n \"value\": {\n \"stringValue\": \"host3.mydomain.com\"\n }\n },\n {\n \"key\": \"unique3\",\n \"value\": {\n \"stringValue\": \"uv3\"\n }\n },\n {\n \"key\": \"ip\",\n \"value\": {\n \"stringValue\": \"192.168.0.3\"\n }\n },\n {\n \"key\": \"version\",\n \"value\": {\n \"doubleValue\": 1.5\n }\n },\n {\n \"key\": \"status\",\n \"value\": {\n \"intValue\": \"500\"\n }\n },\n {\n \"key\": \"up\",\n \"value\": {\n \"boolValue\": false\n }\n }\n ]\n },\n \"scopeSpans\": [\n {\n \"scope\": {\n \"name\": \"fake_generator\",\n \"version\": \"1.0.1\"\n },\n \"spans\": [\n {\n \"traceId\": \"6d759c9c5e1a049927ca069a497b0508\",\n \"spanId\": \"90d5ead3745935bd\",\n \"traceState\": \"maiores\",\n \"parentSpanId\": \"\",\n \"kind\": 2,\n \"droppedAttributesCount\": 9,\n \"droppedEventsCount\": 9,\n \"droppedLinksCount\": 6,\n \"status\": {\n \"message\": \"OK\"\n }\n },\n {\n \"traceId\": \"72e8551d2f079f29231aa57088384785\",\n \"spanId\": \"35ce5d0711df60f2\",\n \"parentSpanId\": \"35ce5d0711df60f2\",\n \"name\": \"GET /user-info\",\n \"startTimeUnixNano\": \"1668124800000010667\",\n \"endTimeUnixNano\": \"1668124800000010668\",\n \"droppedAttributesCount\": 8,\n \"events\": [\n {\n \"timeUnixNano\": \"1668124800000010672\"\n },\n {\n \"timeUnixNano\": \"1668124800000010674\",\n \"name\": \"odit\",\n \"droppedAttributesCount\": 2\n },\n {\n \"timeUnixNano\": \"1668124800000010672\",\n \"name\": \"velit\",\n \"attributes\": [\n {\n \"key\": \"attr_0\",\n \"value\": {\n \"stringValue\": \"est\"\n }\n },\n {\n \"key\": \"attr_1\",\n \"value\": {\n \"doubleValue\": 0.017895097521176077\n }\n },\n {\n \"key\": \"attr_2\",\n \"value\": {\n \"stringValue\": \"consectetur\"\n }\n }\n ],\n \"droppedAttributesCount\": 9\n },\n {\n \"name\": \"exercitationem\"\n },\n {\n \"timeUnixNano\": \"1668124800000010672\",\n \"name\": \"soluta\",\n \"droppedAttributesCount\": 9\n },\n {\n \"timeUnixNano\": \"1668124800000010672\",\n \"droppedAttributesCount\": 7\n },\n {}\n ],\n \"links\": [\n {\n \"traceId\": \"72e8551d2f079f29231aa57088384785\",\n \"spanId\": \"\",\n \"traceState\": \"ut\",\n \"attributes\": [\n {\n \"key\": \"attr_0\",\n \"value\": {\n \"intValue\": \"4055508854307121380\"\n }\n },\n {\n \"key\": \"attr_1\",\n \"value\": {\n \"intValue\": \"2603754219448080514\"\n }\n },\n {\n \"key\": \"attr_2\",\n \"value\": {\n \"stringValue\": \"ut\"\n }\n },\n {\n \"key\": \"attr_3\",\n \"value\": {\n \"intValue\": \"542986775976848616\"\n }\n },\n {\n \"key\": \"attr_4\",\n \"value\": {\n \"intValue\": \"5562030613432072994\"\n }\n }\n ],\n \"droppedAttributesCount\": 8\n },\n {\n \"traceId\": \"\",\n \"spanId\": \"\",\n \"traceState\": \"vel\",\n \"droppedAttributesCount\": 6\n }\n ],\n \"status\": {\n \"code\": 1\n }\n }\n ]\n }\n ],\n \"schemaUrl\": \"https://opentelemetry.io/schemas/1.0.0\"\n }\n ]\n }" actual := ptrace.NewTraces() @@ -271,12 +274,13 @@ func TestSingleResScope(t *testing.T) { actualTraces, err := ptraceotlp.NewExportRequestFromTraces(actual).MarshalJSON() assert.NoError(t, err) - EquivFromBytes(t, []byte(expected), actualTraces) + EquivFromBytes(stdTesting, []byte(expected), actualTraces) } func TestNotEquivSortAndMerge(t *testing.T) { t.Parallel() + stdTesting := NewStdUnitTest(t) traces_1 := ptrace.NewTraces() rs := traces_1.ResourceSpans().AppendEmpty() rs.Resource().Attributes().PutStr("k2", "v2") @@ -351,7 +355,7 @@ func TestNotEquivSortAndMerge(t *testing.T) { actualTraces := []json.Marshaler{ ptraceotlp.NewExportRequestFromTraces(traces_1), } - NotEquiv(t, expectedTraces, actualTraces) + NotEquiv(stdTesting, expectedTraces, actualTraces) } func TestNonPositionalIndex(t *testing.T) { diff --git a/pkg/otel/logs/arrow/all_test.go b/pkg/otel/logs/arrow/all_test.go index 82d9bd08e8..7520bb7d36 100644 --- a/pkg/otel/logs/arrow/all_test.go +++ b/pkg/otel/logs/arrow/all_test.go @@ -44,6 +44,7 @@ var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16) func TestLogs(t *testing.T) { t.Parallel() + stdTesting := jsonassert.NewStdUnitTest(t) producerStats := stats.NewProducerStats() pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer pool.AssertSize(t, 0) @@ -88,7 +89,7 @@ func TestLogs(t *testing.T) { ,{"body":{"str":"body2","type":1},"dropped_attributes_count":1,"flags":2,"id":1,"observed_time_unix_nano":"1970-01-01 00:00:00.000000004","resource":{"dropped_attributes_count":1,"id":1,"schema_url":"schema2"},"schema_url":"schema2","scope":{"dropped_attributes_count":1,"id":0,"name":"scope2","version":"1.0.2"},"severity_number":2,"severity_text":"severity2","span_id":"qgAAAAAAAAA=","time_unix_nano":"1970-01-01 00:00:00.000000003","trace_id":"qgAAAAAAAAAAAAAAAAAAAA=="} ]` - jsonassert.JSONCanonicalEq(t, expected, actual) + jsonassert.JSONCanonicalEq(stdTesting, expected, actual) for _, relatedRecord := range relatedRecords { switch relatedRecord.PayloadType() { diff --git a/pkg/otel/logs/otlp/logs.go b/pkg/otel/logs/otlp/logs.go index a7bdeb15aa..e0a8555acf 100644 --- a/pkg/otel/logs/otlp/logs.go +++ b/pkg/otel/logs/otlp/logs.go @@ -124,11 +124,10 @@ func LogsFrom(record arrow.Record, relatedData *RelatedData) (plog.Logs, error) // Process log record fields logRecord := logRecordSlice.AppendEmpty() - deltaID, err := arrowutils.U16FromRecord(record, logRecordIDs.ID, row) + deltaID, err := arrowutils.NullableU16FromRecord(record, logRecordIDs.ID, row) if err != nil { return logs, werror.Wrap(err) } - ID := relatedData.LogRecordIDFromDelta(deltaID) timeUnixNano, err := arrowutils.TimestampFromRecord(record, logRecordIDs.TimeUnixNano, row) if err != nil { @@ -229,10 +228,15 @@ func LogsFrom(record arrow.Record, relatedData *RelatedData) (plog.Logs, error) } logRecordAttrs := logRecord.Attributes() - attrs := relatedData.LogRecordAttrMapStore.AttributesByID(ID) - if attrs != nil { - attrs.CopyTo(logRecordAttrs) + + if deltaID != nil { + ID := relatedData.LogRecordIDFromDelta(*deltaID) + attrs := relatedData.LogRecordAttrMapStore.AttributesByID(ID) + if attrs != nil { + attrs.CopyTo(logRecordAttrs) + } } + droppedAttributesCount, err := arrowutils.U32FromRecord(record, logRecordIDs.DropAttributesCount, row) if err != nil { return logs, werror.WrapWithContext(err, map[string]interface{}{"row": row}) diff --git a/pkg/otel/logs/validation_test.go b/pkg/otel/logs/validation_test.go index dc7ad56e75..f7f9a49cee 100644 --- a/pkg/otel/logs/validation_test.go +++ b/pkg/otel/logs/validation_test.go @@ -80,6 +80,8 @@ func CheckEncodeDecode( t *testing.T, expectedRequest plogotlp.ExportRequest, ) { + stdTesting := assert.NewStdUnitTest(t) + // Convert the OTLP logs request to Arrow. pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer pool.AssertSize(t, 0) @@ -118,7 +120,7 @@ func CheckEncodeDecode( record.Release() - assert.Equiv(t, []json.Marshaler{expectedRequest}, []json.Marshaler{plogotlp.NewExportRequestFromLogs(logs)}) + assert.Equiv(stdTesting, []json.Marshaler{expectedRequest}, []json.Marshaler{plogotlp.NewExportRequestFromLogs(logs)}) } func MultiRoundOfCheckEncodeMessUpDecode( diff --git a/pkg/otel/metrics/arrow/ehistogram_dp.go b/pkg/otel/metrics/arrow/ehistogram_dp.go index 1c873f7553..48e3e67f32 100644 --- a/pkg/otel/metrics/arrow/ehistogram_dp.go +++ b/pkg/otel/metrics/arrow/ehistogram_dp.go @@ -180,6 +180,7 @@ func (b *EHistogramDataPointBuilder) Build() (record arrow.Record, err error) { // Intermediaries steps may be required to update the schema. for { b.attrsAccu.Reset() + b.exemplarAccumulator.Reset() record, err = b.TryBuild(b.attrsAccu) if err != nil { if record != nil { diff --git a/pkg/otel/metrics/arrow/histogram_dp.go b/pkg/otel/metrics/arrow/histogram_dp.go index a920dc4905..980363289a 100644 --- a/pkg/otel/metrics/arrow/histogram_dp.go +++ b/pkg/otel/metrics/arrow/histogram_dp.go @@ -181,6 +181,7 @@ func (b *HistogramDataPointBuilder) Build() (record arrow.Record, err error) { // Intermediaries steps may be required to update the schema. for { b.attrsAccu.Reset() + b.exemplarAccumulator.Reset() record, err = b.TryBuild(b.attrsAccu) if err != nil { if record != nil { diff --git a/pkg/otel/metrics/arrow/number_data_point.go b/pkg/otel/metrics/arrow/number_data_point.go index ca909bf89a..77d0c61a7b 100644 --- a/pkg/otel/metrics/arrow/number_data_point.go +++ b/pkg/otel/metrics/arrow/number_data_point.go @@ -171,6 +171,7 @@ func (b *DataPointBuilder) Build() (record arrow.Record, err error) { // Intermediaries steps may be required to update the schema. for { b.attrsAccu.Reset() + b.exemplarAccumulator.Reset() record, err = b.TryBuild(b.attrsAccu) if err != nil { if record != nil { diff --git a/pkg/otel/metrics/validation_test.go b/pkg/otel/metrics/validation_test.go index 20e6c29bb1..57e74acb45 100644 --- a/pkg/otel/metrics/validation_test.go +++ b/pkg/otel/metrics/validation_test.go @@ -48,7 +48,7 @@ func TestMetricsEncodingDecoding(t *testing.T) { t.Parallel() metricsGen := MetricsGenerator() - expectedRequest := pmetricotlp.NewExportRequestFromMetrics(metricsGen.GenerateAllKindOfMetrics(100, 100)) + expectedRequest := pmetricotlp.NewExportRequestFromMetrics(metricsGen.GenerateRandomMetrics(50, 100)) CheckEncodeDecode(t, expectedRequest) } @@ -127,6 +127,8 @@ func MetricsGenerator() *datagen.MetricsGenerator { } func CheckEncodeDecode(t *testing.T, expectedRequest pmetricotlp.ExportRequest) { + stdTesting := assert.NewStdUnitTest(t) + // Convert the OTLP metrics request to Arrow. pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer pool.AssertSize(t, 0) @@ -165,7 +167,7 @@ func CheckEncodeDecode(t *testing.T, expectedRequest pmetricotlp.ExportRequest) record.Release() - assert.Equiv(t, []json.Marshaler{expectedRequest}, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(metrics)}) + assert.Equiv(stdTesting, []json.Marshaler{expectedRequest}, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(metrics)}) } // MultiRoundOfCheckEncodeMessUpDecode tests the robustness of the conversion of diff --git a/pkg/otel/traces/arrow/all_test.go b/pkg/otel/traces/arrow/all_test.go index 64a1ea356d..0b1418a62e 100644 --- a/pkg/otel/traces/arrow/all_test.go +++ b/pkg/otel/traces/arrow/all_test.go @@ -245,6 +245,8 @@ func TestLink(t *testing.T) { func TestTraces(t *testing.T) { t.Parallel() + stdTesting := jsonassert.NewStdUnitTest(t) + producerStats := stats.NewProducerStats() pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer pool.AssertSize(t, 0) @@ -292,7 +294,7 @@ func TestTraces(t *testing.T) { ,{"dropped_attributes_count":1,"dropped_events_count":1,"dropped_links_count":1,"duration_time_unix_nano":"1ms","id":1,"kind":3,"name":"span2","parent_span_id":"qgAAAAAAAAA=","resource":{"dropped_attributes_count":1,"id":1,"schema_url":"schema2"},"schema_url":"schema2","scope":{"dropped_attributes_count":1,"id":0,"name":"scope2","version":"1.0.2"},"span_id":"qgAAAAAAAAA=","start_time_unix_nano":"1970-01-01 00:00:00.000000003","status":{"code":2,"status_message":"message2"},"trace_id":"qgAAAAAAAAAAAAAAAAAAAA==","trace_state":"key1=value2"} ]` - jsonassert.JSONCanonicalEq(t, expected, actual) + jsonassert.JSONCanonicalEq(stdTesting, expected, actual) for _, relatedRecord := range relatedRecords { switch relatedRecord.PayloadType() { diff --git a/pkg/otel/traces/arrow/traces.go b/pkg/otel/traces/arrow/traces.go index 045b0746be..aa9e1931b2 100644 --- a/pkg/otel/traces/arrow/traces.go +++ b/pkg/otel/traces/arrow/traces.go @@ -204,8 +204,13 @@ func (b *TracesBuilder) Append(traces ptrace.Traces) error { spanLinks := span.Span.Links() ID := spanID - b.ib.Append(ID) - spanID++ + if spanAttrs.Len() == 0 && spanEvents.Len() == 0 && spanLinks.Len() == 0 { + // No related data found + b.ib.AppendNull() + } else { + b.ib.Append(ID) + spanID++ + } // === Process resource and schema URL === resAttrs := span.Resource.Attributes() diff --git a/pkg/otel/traces/otlp/traces.go b/pkg/otel/traces/otlp/traces.go index e80737eecb..7a3eece6f8 100644 --- a/pkg/otel/traces/otlp/traces.go +++ b/pkg/otel/traces/otlp/traces.go @@ -132,11 +132,10 @@ func TracesFrom(record arrow.Record, relatedData *RelatedData) (ptrace.Traces, e // Process span fields span := spanSlice.AppendEmpty() - deltaID, err := arrowutils.U16FromRecord(record, traceIDs.ID, row) + deltaID, err := arrowutils.NullableU16FromRecord(record, traceIDs.ID, row) if err != nil { return traces, werror.Wrap(err) } - ID := relatedData.SpanIDFromDelta(deltaID) traceID, err := arrowutils.FixedSizeBinaryFromRecord(record, traceIDs.TraceID, row) if err != nil { @@ -210,22 +209,27 @@ func TracesFrom(record arrow.Record, relatedData *RelatedData) (ptrace.Traces, e } span.Status().SetCode(ptrace.StatusCode(code)) } - spanAttrs := span.Attributes() - attrs := relatedData.SpanAttrMapStore.AttributesByID(ID) - if attrs != nil { - attrs.CopyTo(spanAttrs) - } - events := relatedData.SpanEventsStore.EventsByID(ID) - eventSlice := span.Events() - for _, event := range events { - event.MoveTo(eventSlice.AppendEmpty()) - } + if deltaID != nil { + ID := relatedData.SpanIDFromDelta(*deltaID) + + spanAttrs := span.Attributes() + attrs := relatedData.SpanAttrMapStore.AttributesByID(ID) + if attrs != nil { + attrs.CopyTo(spanAttrs) + } - links := relatedData.SpanLinksStore.LinksByID(ID) - linkSlice := span.Links() - for _, link := range links { - link.MoveTo(linkSlice.AppendEmpty()) + events := relatedData.SpanEventsStore.EventsByID(ID) + eventSlice := span.Events() + for _, event := range events { + event.MoveTo(eventSlice.AppendEmpty()) + } + + links := relatedData.SpanLinksStore.LinksByID(ID) + linkSlice := span.Links() + for _, link := range links { + link.MoveTo(linkSlice.AppendEmpty()) + } } var tid pcommon.TraceID diff --git a/pkg/otel/traces/validation_test.go b/pkg/otel/traces/validation_test.go index a8ead48059..71fd348cd4 100644 --- a/pkg/otel/traces/validation_test.go +++ b/pkg/otel/traces/validation_test.go @@ -284,6 +284,8 @@ func CheckEncodeDecode( t *testing.T, expectedRequest ptraceotlp.ExportRequest, ) { + stdTesting := assert.NewStdUnitTest(t) + // Convert the OTLP traces request to Arrow. pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer pool.AssertSize(t, 0) @@ -322,7 +324,7 @@ func CheckEncodeDecode( record.Release() - assert.Equiv(t, []json.Marshaler{expectedRequest}, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(traces)}) + assert.Equiv(stdTesting, []json.Marshaler{expectedRequest}, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(traces)}) } func MultiRoundOfCheckEncodeMessUpDecode( @@ -411,6 +413,7 @@ func TestConversionFromRealData(t *testing.T) { func checkTracesConversion(t *testing.T, expectedRequest ptraceotlp.ExportRequest) { //nolint:unused // only used for testing t.Helper() + stdTesting := assert.NewStdUnitTest(t) pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer pool.AssertSize(t, 0) @@ -447,7 +450,7 @@ func checkTracesConversion(t *testing.T, expectedRequest ptraceotlp.ExportReques defer record.Release() - assert.Equiv(t, []json.Marshaler{expectedRequest}, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(traces)}) + assert.Equiv(stdTesting, []json.Marshaler{expectedRequest}, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(traces)}) } func traceID(id string) [16]byte { diff --git a/tools/trace_verify/main.go b/tools/trace_verify/main.go new file mode 100644 index 0000000000..4dfda02e3a --- /dev/null +++ b/tools/trace_verify/main.go @@ -0,0 +1,85 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package main + +import ( + "bufio" + "encoding/json" + "flag" + "log" + "os" + + "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" + "github.com/f5/otel-arrow-adapter/pkg/otel/assert" + + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" +) + +func main() { + flag.Parse() + + producer := arrow_record.NewProducer() + consumer := arrow_record.NewConsumer() + + args := flag.Args() + + args = os.Args[1:] + + asserter := assert.NewStandaloneTest() + + for _, file := range args { + f, err := os.Open(file) + if err != nil { + log.Fatalf("open: %s: %v", file, err) + return + } + scanner := bufio.NewScanner(f) + for scanner.Scan() { + var un ptrace.JSONUnmarshaler + + expected, err := un.UnmarshalTraces([]byte(scanner.Text())) + if err != nil { + log.Fatalf("parse: %v", err) + } + + batch, err := producer.BatchArrowRecordsFromTraces(expected) + if err != nil { + log.Fatalf("produce arrow: %v", err) + } + + received, err := consumer.TracesFrom(batch) + if err != nil { + log.Fatalf("consume arrow: %v", err) + } + if len(received) != 1 { + log.Fatalf("expecting 1 traces: %d", len(received)) + } + + assert.Equiv(asserter, []json.Marshaler{ + ptraceotlp.NewExportRequestFromTraces(expected), + }, []json.Marshaler{ + ptraceotlp.NewExportRequestFromTraces(received[0]), + }) + + log.Printf("Verified %d traces\n", expected.SpanCount()) + } + if err := scanner.Err(); err != nil { + log.Fatalf("read: %v", err) + } + } +}