Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## Unreleased

- Jitter is applied to once per process, not once per stream. [#199](https://github.com/open-telemetry/otel-arrow/pull/199)
- Network statistics tracing instrumentation simplified. [#201](https://github.com/open-telemetry/otel-arrow/pull/201)

## [0.23.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.23.0) - 2024-05-09

Expand Down
11 changes: 4 additions & 7 deletions collector/exporter/otelarrowexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,10 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp
// unreliable for arrow transport, so we instrument it
// directly here. Only the primary direction of transport
// is instrumented this way.
if wri.uncompSize != 0 {
var sized netstats.SizesStruct
sized.Method = s.method
sized.Length = int64(wri.uncompSize)
s.netReporter.CountSend(ctx, sized)
s.netReporter.SetSpanSizeAttributes(ctx, sized)
}
var sized netstats.SizesStruct
sized.Method = s.method
sized.Length = int64(wri.uncompSize)
s.netReporter.CountSend(ctx, sized)

if err := s.client.Send(batch); err != nil {
// The error will be sent to errCh during cleanup for this stream.
Expand Down
2 changes: 0 additions & 2 deletions collector/netstats/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func (h statsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
}
ss.WireLength = int64(s.WireLength)
h.rep.CountReceive(ctx, ss)
h.rep.SetSpanSizeAttributes(ctx, ss)

case *stats.OutPayload:
var ss SizesStruct
Expand All @@ -70,7 +69,6 @@ func (h statsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
}
ss.WireLength = int64(s.WireLength)
h.rep.CountSend(ctx, ss)
h.rep.SetSpanSizeAttributes(ctx, ss)
}
}

Expand Down
82 changes: 43 additions & 39 deletions collector/netstats/netstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type NetworkReporter struct {
compSizeHisto metric.Int64Histogram
}

var _ Interface = &NetworkReporter{}

// SizesStruct is used to pass uncompressed on-wire message lengths to
// the CountSend() and CountReceive() methods.
type SizesStruct struct {
Expand All @@ -79,7 +81,10 @@ type Interface interface {
// CountSend reports inbound bytes.
CountReceive(ctx context.Context, ss SizesStruct)

// SetSpanAttributes takes a context and adds attributes to the associated span.
// SetSpanAttributes is a No-Op. Deprecated. Remove uses of
// this function as the same functionality is included in
// CountSend/CountReceive. Remove this after the collector-contrib
// components have been updated to not reference this method.
SetSpanSizeAttributes(ctx context.Context, ss SizesStruct)
}

Expand Down Expand Up @@ -208,16 +213,27 @@ func (rep *NetworkReporter) CountSend(ctx context.Context, ss SizesStruct) {
return
}

span := trace.SpanFromContext(ctx)
attrs := metric.WithAttributes(rep.staticAttr, attribute.String("method", ss.Method))

if rep.isExporter && ss.WireLength > 0 {
rep.compSizeHisto.Record(ctx, ss.WireLength, attrs)
}
if rep.sentBytes != nil && ss.Length > 0 {
rep.sentBytes.Add(ctx, ss.Length, attrs)
if ss.Length > 0 {
if rep.sentBytes != nil {
rep.sentBytes.Add(ctx, ss.Length, attrs)
}
if span.IsRecording() {
span.SetAttributes(attribute.Int64("sent_uncompressed", ss.Length))
}
}
if rep.sentWireBytes != nil && ss.WireLength > 0 {
rep.sentWireBytes.Add(ctx, ss.WireLength, attrs)
if ss.WireLength > 0 {
if rep.isExporter && rep.compSizeHisto != nil {
rep.compSizeHisto.Record(ctx, ss.WireLength, attrs)
}
if rep.sentWireBytes != nil {
rep.sentWireBytes.Add(ctx, ss.WireLength, attrs)
}
if span.IsRecording() {
span.SetAttributes(attribute.Int64("sent_compressed", ss.WireLength))
}
}
}

Expand All @@ -230,41 +246,29 @@ func (rep *NetworkReporter) CountReceive(ctx context.Context, ss SizesStruct) {
return
}

attrs := metric.WithAttributes(rep.staticAttr, attribute.String("method", ss.Method))
if !rep.isExporter && ss.WireLength > 0 {
rep.compSizeHisto.Record(ctx, ss.WireLength, attrs)
}
if rep.recvBytes != nil && ss.Length > 0 {
rep.recvBytes.Add(ctx, ss.Length, attrs)
}
if rep.recvWireBytes != nil && ss.WireLength > 0 {
rep.recvWireBytes.Add(ctx, ss.WireLength, attrs)
}
}

func (rep *NetworkReporter) SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) {
if rep == nil {
return
}

span := trace.SpanFromContext(ctx)

var compressedName string
var uncompressedName string
// set attribute name based on exporter vs receiver
if rep.isExporter {
compressedName = "stream_client_compressed_bytes_sent"
uncompressedName = "stream_client_uncompressed_bytes_sent"
} else { // receiver attributes
compressedName = "stream_server_compressed_bytes_recv"
uncompressedName = "stream_server_uncompressed_bytes_recv"
}
attrs := metric.WithAttributes(rep.staticAttr, attribute.String("method", ss.Method))

if ss.Length > 0 {
span.SetAttributes(attribute.Int(uncompressedName, int(ss.Length)))
if rep.recvBytes != nil {
rep.recvBytes.Add(ctx, ss.Length, attrs)
}
if span.IsRecording() {
span.SetAttributes(attribute.Int64("received_uncompressed", ss.Length))
}
}

if ss.WireLength > 0 {
span.SetAttributes(attribute.Int(compressedName, int(ss.WireLength)))
if !rep.isExporter && rep.compSizeHisto != nil {
rep.compSizeHisto.Record(ctx, ss.WireLength, attrs)
}
if rep.recvWireBytes != nil {
rep.recvWireBytes.Add(ctx, ss.WireLength, attrs)
}
if span.IsRecording() {
span.SetAttributes(attribute.Int64("received_compressed", ss.WireLength))
}
}
}

// SetSpanSizeAttributes is a no-op.
func (*NetworkReporter) SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) {}
19 changes: 13 additions & 6 deletions collector/netstats/netstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,10 @@ func TestNetStatsSetSpanAttrs(t *testing.T) {
length: 1234567,
wireLength: 123,
attrs: []attribute.KeyValue{
attribute.Int("stream_client_uncompressed_bytes_sent", 1234567),
attribute.Int("stream_client_compressed_bytes_sent", 123),
attribute.Int("sent_uncompressed", 1234567),
attribute.Int("sent_compressed", 123),
attribute.Int("received_uncompressed", 1234567*2),
attribute.Int("received_compressed", 123*2),
},
},
{
Expand All @@ -159,8 +161,10 @@ func TestNetStatsSetSpanAttrs(t *testing.T) {
length: 8901234,
wireLength: 890,
attrs: []attribute.KeyValue{
attribute.Int("stream_server_uncompressed_bytes_recv", 8901234),
attribute.Int("stream_server_compressed_bytes_recv", 890),
attribute.Int("sent_uncompressed", 8901234),
attribute.Int("sent_compressed", 890),
attribute.Int("received_uncompressed", 8901234*2),
attribute.Int("received_compressed", 890*2),
},
},
}
Expand All @@ -177,11 +181,14 @@ func TestNetStatsSetSpanAttrs(t *testing.T) {
sized.Method = "test"
sized.Length = int64(tc.length)
sized.WireLength = int64(tc.wireLength)
enr.SetSpanSizeAttributes(ctx, sized)
enr.CountSend(ctx, sized)
sized.Length *= 2
sized.WireLength *= 2
enr.CountReceive(ctx, sized)

actualAttrs := sp.(sdktrace.ReadOnlySpan).Attributes()

require.Equal(t, tc.attrs, actualAttrs)
require.Equal(t, attribute.NewSet(tc.attrs...), attribute.NewSet(actualAttrs...))
})
}
}
Expand Down
26 changes: 10 additions & 16 deletions collector/receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/extension/auth"
Expand Down Expand Up @@ -606,21 +605,16 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu
return nil
}
var uncompSize int64
if r.telemetry.MetricsLevel > configtelemetry.LevelNormal {
defer func() {
// The netstats code knows that uncompressed size is
// unreliable for arrow transport, so we instrument it
// directly here. Only the primary direction of transport
// is instrumented this way.
var sized netstats.SizesStruct
sized.Method = method
if r.telemetry.MetricsLevel > configtelemetry.LevelNormal {
sized.Length = uncompSize
}
r.netReporter.CountReceive(ctx, sized)
r.netReporter.SetSpanSizeAttributes(ctx, sized)
}()
}
defer func() {
// The netstats code knows that uncompressed size is
// unreliable for arrow transport, so we instrument it
// directly here. Only the primary direction of transport
// is instrumented this way.
var sized netstats.SizesStruct
sized.Method = method
sized.Length = uncompSize
r.netReporter.CountReceive(ctx, sized)
}()

switch payloads[0].Type {
case arrowpb.ArrowPayloadType_UNIVARIATE_METRICS:
Expand Down