Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions exporter/otelarrowexporter/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"net"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -86,6 +87,8 @@ func TestSendTracesWithMetadata(t *testing.T) {

requestCount := 3
spansPerRequest := 33
var wg sync.WaitGroup
wg.Add(requestCount)
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
Expand All @@ -96,21 +99,18 @@ func TestSendTracesWithMetadata(t *testing.T) {
num := requestNum % len(callCtxs)
expectByContext[num] += spansPerRequest
go func(n int) {
defer wg.Done()
assert.NoError(t, exp.ConsumeTraces(callCtxs[n], td))
}(num)
}
wg.Wait()

assert.Eventually(t, func() bool {
return rcv.requestCount.Load() == int32(requestCount)
}, 1*time.Second, 5*time.Millisecond)
assert.Eventually(t, func() bool {
return rcv.totalItems.Load() == int32(requestCount*spansPerRequest)
}, 1*time.Second, 5*time.Millisecond)
assert.Eventually(t, func() bool {
rcv.mux.Lock()
defer rcv.mux.Unlock()
return len(callCtxs) == len(rcv.spanCountByMetadata)
}, 1*time.Second, 5*time.Millisecond)
rcv.mux.Lock()
defer rcv.mux.Unlock()

assert.Equal(t, rcv.requestCount.Load(), int32(requestCount))
assert.Equal(t, rcv.totalItems.Load(), int32(requestCount*spansPerRequest))
assert.Len(t, rcv.spanCountByMetadata, len(callCtxs))

for idx, ctx := range callCtxs {
md := client.FromContext(ctx).Metadata
Expand Down
4 changes: 2 additions & 2 deletions internal/otelarrow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ require (
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/collector/component v1.41.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/component/componenttest v0.135.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/config/configgrpc v0.135.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/config/configoptional v0.135.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/config/configtelemetry v0.135.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/consumer v1.41.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/consumer/consumererror v0.135.1-0.20250911155607-37a3ace6274c
Expand Down Expand Up @@ -74,9 +72,11 @@ require (
go.opentelemetry.io/collector/component/componentstatus v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/config/configauth v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/config/configcompression v1.41.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/config/configgrpc v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/config/configmiddleware v1.41.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/config/confignet v1.41.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/config/configopaque v1.41.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/config/configoptional v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/config/configretry v1.41.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/config/configtls v1.41.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/confmap v1.41.1-0.20250911155607-37a3ace6274c // indirect
Expand Down
83 changes: 0 additions & 83 deletions internal/otelarrow/test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand Down Expand Up @@ -545,87 +543,6 @@ func TestIntegrationMemoryLimited(t *testing.T) {
}, bulkyGenFunc(), consumerFailure, failureMemoryLimitEnding)
}

func multiStreamEnding(t *testing.T, p testParams, testCon *testConsumer, td [][]ptrace.Traces) (_, _ map[string]int) {
recvOps, expOps := standardEnding(t, p, testCon, td)

const streamName = "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces"

total := int(testCon.sentSpans.Load())

// Exporter spans:
//
// This span is the Arrow gRPC client stream. Should have no
// stream errors, > 1 streams.
expStreamsUnset := expOps[streamName+"/Unset"]
expStreamsError := expOps[streamName+"/Error"]
require.Less(t, 1, expStreamsUnset+expStreamsError)
require.Equal(t, 1, expStreamsError)

// Number of export requests: exact match. This is the
// exporterhelper's base span.
require.Equal(t, total, expOps["exporter/otelarrow/traces/Unset"])

// Number of export requests: exact match. This span covers
// handling one request in the Arrow exporter.
require.Equal(t, total, expOps["otel_arrow_stream_send/Unset"])

// Receiver spans
//
// This span is the Arrow gRPC server stream, instrumented by
// otelgrpc. Because of
// https://github.com/open-telemetry/opentelemetry-go-contrib/issues/2644
// we expect either an error or unset. There should be > 1
// streams.
recvStreamsUnset := recvOps[streamName+"/Unset"]
recvStreamsError := recvOps[streamName+"/Error"]
require.Equal(t, 0, recvStreamsError)
require.Less(t, 1, recvStreamsUnset+recvStreamsError)

// For each stream, there is one Recv() span at the end that ends
// in cancelation (or EOF). So we expect total to be less than
// this span count.
require.Equal(t, total+recvStreamsUnset+recvStreamsError, recvOps["otel_arrow_stream_inflight/Unset"])

// This is in request context, the Arrow stream handling one request.
require.Equal(t, total, recvOps["otel_arrow_stream_recv/Unset"])

// This is in request context, the receiverhelper's per-request span.
require.Equal(t, total, recvOps["receiver/otelarrow/TraceDataReceived/Unset"])

// Exporter and Receiver stream span counts match:
require.Equal(t, expStreamsUnset+expStreamsError, recvStreamsUnset+recvStreamsError)

return recvOps, expOps
}

func TestIntegrationSelfTracing(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

// until 2 Arrow stream spans are received from self instrumentation
params := testParams{
threadCount: 10,
requestWhileTrue: func(test *testConsumer) bool {
cnt := 0
for _, span := range test.expSpans.GetSpans() {
if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" {
cnt++
}
}
return cnt < 2
},
}

testIntegrationTraces(ctx, t, params, func(_ *ExpConfig, rcfg *RecvConfig) {
rcfg.GRPC.Keepalive = configoptional.Some(configgrpc.KeepaliveServerConfig{
ServerParameters: configoptional.Some(configgrpc.KeepaliveServerParameters{
MaxConnectionAge: time.Second,
MaxConnectionAgeGrace: 5 * time.Second,
}),
})
}, func() GenFunc { return makeTestTraces }, consumerSuccess, multiStreamEnding)
}

func nearLimitGenFunc() MkGen {
var sizer ptrace.ProtoMarshaler
const nearLimit = 900 << 10 // close to 1 MiB
Expand Down
Loading