diff --git a/CHANGELOG.md b/CHANGELOG.md index f088301a11..c81500710e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## Unreleased - Jitter is applied to once per process, not once per stream. [#199](https://github.com/open-telemetry/otel-arrow/pull/199) +- Network statistics tracing instrumentation simplified. [#201](https://github.com/open-telemetry/otel-arrow/pull/201) ## [0.23.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.23.0) - 2024-05-09 diff --git a/collector/exporter/otelarrowexporter/internal/arrow/stream.go b/collector/exporter/otelarrowexporter/internal/arrow/stream.go index b6717641d9..a5d97e53ee 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/stream.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/stream.go @@ -351,13 +351,10 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp // unreliable for arrow transport, so we instrument it // directly here. Only the primary direction of transport // is instrumented this way. - if wri.uncompSize != 0 { - var sized netstats.SizesStruct - sized.Method = s.method - sized.Length = int64(wri.uncompSize) - s.netReporter.CountSend(ctx, sized) - s.netReporter.SetSpanSizeAttributes(ctx, sized) - } + var sized netstats.SizesStruct + sized.Method = s.method + sized.Length = int64(wri.uncompSize) + s.netReporter.CountSend(ctx, sized) if err := s.client.Send(batch); err != nil { // The error will be sent to errCh during cleanup for this stream. diff --git a/collector/netstats/handler.go b/collector/netstats/handler.go index 5716d2e4c9..7c8224acd7 100644 --- a/collector/netstats/handler.go +++ b/collector/netstats/handler.go @@ -60,7 +60,6 @@ func (h statsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { } ss.WireLength = int64(s.WireLength) h.rep.CountReceive(ctx, ss) - h.rep.SetSpanSizeAttributes(ctx, ss) case *stats.OutPayload: var ss SizesStruct @@ -70,7 +69,6 @@ func (h statsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { } ss.WireLength = int64(s.WireLength) h.rep.CountSend(ctx, ss) - h.rep.SetSpanSizeAttributes(ctx, ss) } } diff --git a/collector/netstats/netstats.go b/collector/netstats/netstats.go index a57a6d4e40..cb14aef24a 100644 --- a/collector/netstats/netstats.go +++ b/collector/netstats/netstats.go @@ -60,6 +60,8 @@ type NetworkReporter struct { compSizeHisto metric.Int64Histogram } +var _ Interface = &NetworkReporter{} + // SizesStruct is used to pass uncompressed on-wire message lengths to // the CountSend() and CountReceive() methods. type SizesStruct struct { @@ -79,7 +81,10 @@ type Interface interface { // CountSend reports inbound bytes. CountReceive(ctx context.Context, ss SizesStruct) - // SetSpanAttributes takes a context and adds attributes to the associated span. + // SetSpanAttributes is a No-Op. Deprecated. Remove uses of + // this function as the same functionality is included in + // CountSend/CountReceive. Remove this after the collector-contrib + // components have been updated to not reference this method. SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) } @@ -208,16 +213,27 @@ func (rep *NetworkReporter) CountSend(ctx context.Context, ss SizesStruct) { return } + span := trace.SpanFromContext(ctx) attrs := metric.WithAttributes(rep.staticAttr, attribute.String("method", ss.Method)) - if rep.isExporter && ss.WireLength > 0 { - rep.compSizeHisto.Record(ctx, ss.WireLength, attrs) - } - if rep.sentBytes != nil && ss.Length > 0 { - rep.sentBytes.Add(ctx, ss.Length, attrs) + if ss.Length > 0 { + if rep.sentBytes != nil { + rep.sentBytes.Add(ctx, ss.Length, attrs) + } + if span.IsRecording() { + span.SetAttributes(attribute.Int64("sent_uncompressed", ss.Length)) + } } - if rep.sentWireBytes != nil && ss.WireLength > 0 { - rep.sentWireBytes.Add(ctx, ss.WireLength, attrs) + if ss.WireLength > 0 { + if rep.isExporter && rep.compSizeHisto != nil { + rep.compSizeHisto.Record(ctx, ss.WireLength, attrs) + } + if rep.sentWireBytes != nil { + rep.sentWireBytes.Add(ctx, ss.WireLength, attrs) + } + if span.IsRecording() { + span.SetAttributes(attribute.Int64("sent_compressed", ss.WireLength)) + } } } @@ -230,41 +246,29 @@ func (rep *NetworkReporter) CountReceive(ctx context.Context, ss SizesStruct) { return } - attrs := metric.WithAttributes(rep.staticAttr, attribute.String("method", ss.Method)) - if !rep.isExporter && ss.WireLength > 0 { - rep.compSizeHisto.Record(ctx, ss.WireLength, attrs) - } - if rep.recvBytes != nil && ss.Length > 0 { - rep.recvBytes.Add(ctx, ss.Length, attrs) - } - if rep.recvWireBytes != nil && ss.WireLength > 0 { - rep.recvWireBytes.Add(ctx, ss.WireLength, attrs) - } -} - -func (rep *NetworkReporter) SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) { - if rep == nil { - return - } - span := trace.SpanFromContext(ctx) - - var compressedName string - var uncompressedName string - // set attribute name based on exporter vs receiver - if rep.isExporter { - compressedName = "stream_client_compressed_bytes_sent" - uncompressedName = "stream_client_uncompressed_bytes_sent" - } else { // receiver attributes - compressedName = "stream_server_compressed_bytes_recv" - uncompressedName = "stream_server_uncompressed_bytes_recv" - } + attrs := metric.WithAttributes(rep.staticAttr, attribute.String("method", ss.Method)) if ss.Length > 0 { - span.SetAttributes(attribute.Int(uncompressedName, int(ss.Length))) + if rep.recvBytes != nil { + rep.recvBytes.Add(ctx, ss.Length, attrs) + } + if span.IsRecording() { + span.SetAttributes(attribute.Int64("received_uncompressed", ss.Length)) + } } - if ss.WireLength > 0 { - span.SetAttributes(attribute.Int(compressedName, int(ss.WireLength))) + if !rep.isExporter && rep.compSizeHisto != nil { + rep.compSizeHisto.Record(ctx, ss.WireLength, attrs) + } + if rep.recvWireBytes != nil { + rep.recvWireBytes.Add(ctx, ss.WireLength, attrs) + } + if span.IsRecording() { + span.SetAttributes(attribute.Int64("received_compressed", ss.WireLength)) + } } } + +// SetSpanSizeAttributes is a no-op. +func (*NetworkReporter) SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) {} diff --git a/collector/netstats/netstats_test.go b/collector/netstats/netstats_test.go index 8d434ee503..8592b7d6c8 100644 --- a/collector/netstats/netstats_test.go +++ b/collector/netstats/netstats_test.go @@ -149,8 +149,10 @@ func TestNetStatsSetSpanAttrs(t *testing.T) { length: 1234567, wireLength: 123, attrs: []attribute.KeyValue{ - attribute.Int("stream_client_uncompressed_bytes_sent", 1234567), - attribute.Int("stream_client_compressed_bytes_sent", 123), + attribute.Int("sent_uncompressed", 1234567), + attribute.Int("sent_compressed", 123), + attribute.Int("received_uncompressed", 1234567*2), + attribute.Int("received_compressed", 123*2), }, }, { @@ -159,8 +161,10 @@ func TestNetStatsSetSpanAttrs(t *testing.T) { length: 8901234, wireLength: 890, attrs: []attribute.KeyValue{ - attribute.Int("stream_server_uncompressed_bytes_recv", 8901234), - attribute.Int("stream_server_compressed_bytes_recv", 890), + attribute.Int("sent_uncompressed", 8901234), + attribute.Int("sent_compressed", 890), + attribute.Int("received_uncompressed", 8901234*2), + attribute.Int("received_compressed", 890*2), }, }, } @@ -177,11 +181,14 @@ func TestNetStatsSetSpanAttrs(t *testing.T) { sized.Method = "test" sized.Length = int64(tc.length) sized.WireLength = int64(tc.wireLength) - enr.SetSpanSizeAttributes(ctx, sized) + enr.CountSend(ctx, sized) + sized.Length *= 2 + sized.WireLength *= 2 + enr.CountReceive(ctx, sized) actualAttrs := sp.(sdktrace.ReadOnlySpan).Attributes() - require.Equal(t, tc.attrs, actualAttrs) + require.Equal(t, attribute.NewSet(tc.attrs...), attribute.NewSet(actualAttrs...)) }) } } diff --git a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go index 700d4d83a4..a1d51a7bd6 100644 --- a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -20,7 +20,6 @@ import ( "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/extension/auth" @@ -606,21 +605,16 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu return nil } var uncompSize int64 - if r.telemetry.MetricsLevel > configtelemetry.LevelNormal { - defer func() { - // The netstats code knows that uncompressed size is - // unreliable for arrow transport, so we instrument it - // directly here. Only the primary direction of transport - // is instrumented this way. - var sized netstats.SizesStruct - sized.Method = method - if r.telemetry.MetricsLevel > configtelemetry.LevelNormal { - sized.Length = uncompSize - } - r.netReporter.CountReceive(ctx, sized) - r.netReporter.SetSpanSizeAttributes(ctx, sized) - }() - } + defer func() { + // The netstats code knows that uncompressed size is + // unreliable for arrow transport, so we instrument it + // directly here. Only the primary direction of transport + // is instrumented this way. + var sized netstats.SizesStruct + sized.Method = method + sized.Length = uncompSize + r.netReporter.CountReceive(ctx, sized) + }() switch payloads[0].Type { case arrowpb.ArrowPayloadType_UNIVARIATE_METRICS: