diff --git a/exporter/otelarrowexporter/metadata_test.go b/exporter/otelarrowexporter/metadata_test.go index 95a80af0c0847..92ccea71e0558 100644 --- a/exporter/otelarrowexporter/metadata_test.go +++ b/exporter/otelarrowexporter/metadata_test.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "net" + "sync" "testing" "time" @@ -86,6 +87,8 @@ func TestSendTracesWithMetadata(t *testing.T) { requestCount := 3 spansPerRequest := 33 + var wg sync.WaitGroup + wg.Add(requestCount) for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() @@ -96,21 +99,18 @@ func TestSendTracesWithMetadata(t *testing.T) { num := requestNum % len(callCtxs) expectByContext[num] += spansPerRequest go func(n int) { + defer wg.Done() assert.NoError(t, exp.ConsumeTraces(callCtxs[n], td)) }(num) } + wg.Wait() - assert.Eventually(t, func() bool { - return rcv.requestCount.Load() == int32(requestCount) - }, 1*time.Second, 5*time.Millisecond) - assert.Eventually(t, func() bool { - return rcv.totalItems.Load() == int32(requestCount*spansPerRequest) - }, 1*time.Second, 5*time.Millisecond) - assert.Eventually(t, func() bool { - rcv.mux.Lock() - defer rcv.mux.Unlock() - return len(callCtxs) == len(rcv.spanCountByMetadata) - }, 1*time.Second, 5*time.Millisecond) + rcv.mux.Lock() + defer rcv.mux.Unlock() + + assert.Equal(t, rcv.requestCount.Load(), int32(requestCount)) + assert.Equal(t, rcv.totalItems.Load(), int32(requestCount*spansPerRequest)) + assert.Len(t, rcv.spanCountByMetadata, len(callCtxs)) for idx, ctx := range callCtxs { md := client.FromContext(ctx).Metadata diff --git a/internal/otelarrow/go.mod b/internal/otelarrow/go.mod index 8ffdd81908d23..362728684d55d 100644 --- a/internal/otelarrow/go.mod +++ b/internal/otelarrow/go.mod @@ -10,8 +10,6 @@ require ( github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.41.1-0.20250911155607-37a3ace6274c go.opentelemetry.io/collector/component/componenttest v0.135.1-0.20250911155607-37a3ace6274c - go.opentelemetry.io/collector/config/configgrpc v0.135.1-0.20250911155607-37a3ace6274c - go.opentelemetry.io/collector/config/configoptional v0.135.1-0.20250911155607-37a3ace6274c go.opentelemetry.io/collector/config/configtelemetry v0.135.1-0.20250911155607-37a3ace6274c go.opentelemetry.io/collector/consumer v1.41.1-0.20250911155607-37a3ace6274c go.opentelemetry.io/collector/consumer/consumererror v0.135.1-0.20250911155607-37a3ace6274c @@ -74,9 +72,11 @@ require ( go.opentelemetry.io/collector/component/componentstatus v0.135.1-0.20250911155607-37a3ace6274c // indirect go.opentelemetry.io/collector/config/configauth v0.135.1-0.20250911155607-37a3ace6274c // indirect go.opentelemetry.io/collector/config/configcompression v1.41.1-0.20250911155607-37a3ace6274c // indirect + go.opentelemetry.io/collector/config/configgrpc v0.135.1-0.20250911155607-37a3ace6274c // indirect go.opentelemetry.io/collector/config/configmiddleware v1.41.1-0.20250911155607-37a3ace6274c // indirect go.opentelemetry.io/collector/config/confignet v1.41.1-0.20250911155607-37a3ace6274c // indirect go.opentelemetry.io/collector/config/configopaque v1.41.1-0.20250911155607-37a3ace6274c // indirect + go.opentelemetry.io/collector/config/configoptional v0.135.1-0.20250911155607-37a3ace6274c // indirect go.opentelemetry.io/collector/config/configretry v1.41.1-0.20250911155607-37a3ace6274c // indirect go.opentelemetry.io/collector/config/configtls v1.41.1-0.20250911155607-37a3ace6274c // indirect go.opentelemetry.io/collector/confmap v1.41.1-0.20250911155607-37a3ace6274c // indirect diff --git a/internal/otelarrow/test/e2e_test.go b/internal/otelarrow/test/e2e_test.go index e5cc9267730ed..1bf9e0df10574 100644 --- a/internal/otelarrow/test/e2e_test.go +++ b/internal/otelarrow/test/e2e_test.go @@ -21,8 +21,6 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" @@ -545,87 +543,6 @@ func TestIntegrationMemoryLimited(t *testing.T) { }, bulkyGenFunc(), consumerFailure, failureMemoryLimitEnding) } -func multiStreamEnding(t *testing.T, p testParams, testCon *testConsumer, td [][]ptrace.Traces) (_, _ map[string]int) { - recvOps, expOps := standardEnding(t, p, testCon, td) - - const streamName = "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" - - total := int(testCon.sentSpans.Load()) - - // Exporter spans: - // - // This span is the Arrow gRPC client stream. Should have no - // stream errors, > 1 streams. - expStreamsUnset := expOps[streamName+"/Unset"] - expStreamsError := expOps[streamName+"/Error"] - require.Less(t, 1, expStreamsUnset+expStreamsError) - require.Equal(t, 1, expStreamsError) - - // Number of export requests: exact match. This is the - // exporterhelper's base span. - require.Equal(t, total, expOps["exporter/otelarrow/traces/Unset"]) - - // Number of export requests: exact match. This span covers - // handling one request in the Arrow exporter. - require.Equal(t, total, expOps["otel_arrow_stream_send/Unset"]) - - // Receiver spans - // - // This span is the Arrow gRPC server stream, instrumented by - // otelgrpc. Because of - // https://github.com/open-telemetry/opentelemetry-go-contrib/issues/2644 - // we expect either an error or unset. There should be > 1 - // streams. - recvStreamsUnset := recvOps[streamName+"/Unset"] - recvStreamsError := recvOps[streamName+"/Error"] - require.Equal(t, 0, recvStreamsError) - require.Less(t, 1, recvStreamsUnset+recvStreamsError) - - // For each stream, there is one Recv() span at the end that ends - // in cancelation (or EOF). So we expect total to be less than - // this span count. - require.Equal(t, total+recvStreamsUnset+recvStreamsError, recvOps["otel_arrow_stream_inflight/Unset"]) - - // This is in request context, the Arrow stream handling one request. - require.Equal(t, total, recvOps["otel_arrow_stream_recv/Unset"]) - - // This is in request context, the receiverhelper's per-request span. - require.Equal(t, total, recvOps["receiver/otelarrow/TraceDataReceived/Unset"]) - - // Exporter and Receiver stream span counts match: - require.Equal(t, expStreamsUnset+expStreamsError, recvStreamsUnset+recvStreamsError) - - return recvOps, expOps -} - -func TestIntegrationSelfTracing(t *testing.T) { - ctx, cancel := context.WithCancel(t.Context()) - defer cancel() - - // until 2 Arrow stream spans are received from self instrumentation - params := testParams{ - threadCount: 10, - requestWhileTrue: func(test *testConsumer) bool { - cnt := 0 - for _, span := range test.expSpans.GetSpans() { - if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" { - cnt++ - } - } - return cnt < 2 - }, - } - - testIntegrationTraces(ctx, t, params, func(_ *ExpConfig, rcfg *RecvConfig) { - rcfg.GRPC.Keepalive = configoptional.Some(configgrpc.KeepaliveServerConfig{ - ServerParameters: configoptional.Some(configgrpc.KeepaliveServerParameters{ - MaxConnectionAge: time.Second, - MaxConnectionAgeGrace: 5 * time.Second, - }), - }) - }, func() GenFunc { return makeTestTraces }, consumerSuccess, multiStreamEnding) -} - func nearLimitGenFunc() MkGen { var sizer ptrace.ProtoMarshaler const nearLimit = 900 << 10 // close to 1 MiB