diff --git a/collector/exporter/otelarrowexporter/internal/arrow/bestofn.go b/collector/exporter/otelarrowexporter/internal/arrow/bestofn.go index f5ac6034d9..0c3d938d14 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/bestofn.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/bestofn.go @@ -9,6 +9,9 @@ import ( "runtime" "sort" "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // bestOfNPrioritizer is a prioritizer that selects a less-loaded stream to write. @@ -114,7 +117,7 @@ func (lp *bestOfNPrioritizer) sendAndWait(ctx context.Context, errCh <-chan erro case <-lp.done: return ErrStreamRestarting case <-ctx.Done(): - return context.Canceled + return status.Error(codes.Canceled, ctx.Err().Error()) case lp.input <- wri: return waitForWrite(ctx, errCh, lp.done) } diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter.go index 3976f9982c..544a6ee20a 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter.go @@ -20,7 +20,9 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" ) // Exporter is 1:1 with exporter, isolates arrow-specific @@ -255,6 +257,12 @@ func (e *Exporter) runArrowStream(ctx context.Context, dc doneCancel, state *str // // consumer should fall back to standard OTLP, (true, nil) func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) { + select { + case <-ctx.Done(): + return false, status.Error(codes.Canceled, "incoming request already canceled") + default: + } + errCh := make(chan error, 1) // Note that if the OTLP exporter's gRPC Headers field was @@ -340,7 +348,7 @@ func waitForWrite(ctx context.Context, errCh <-chan error, down <-chan struct{}) select { case <-ctx.Done(): // This caller's context timed out. - return ctx.Err() + return status.Error(codes.Canceled, ctx.Err().Error()) case <-down: return ErrStreamRestarting case err := <-errCh: diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go index 276e5f3fa4..7047d74074 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go @@ -6,7 +6,6 @@ package arrow import ( "context" "encoding/json" - "errors" "fmt" "sync" "sync/atomic" @@ -31,7 +30,9 @@ import ( "go.uber.org/zap/zaptest" "golang.org/x/net/http2/hpack" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) var AllPrioritizers = []PrioritizerName{LeastLoadedPrioritizer, LeastLoadedTwoPrioritizer} @@ -278,7 +279,10 @@ func TestArrowExporterTimeout(t *testing.T) { sent, err := tc.exporter.SendAndWait(ctx, twoTraces) require.True(t, sent) require.Error(t, err) - require.True(t, errors.Is(err, context.Canceled)) + + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status") + require.Equal(t, codes.Canceled, stat.Code()) require.NoError(t, tc.exporter.Shutdown(ctx)) }) @@ -406,7 +410,10 @@ func TestArrowExporterConnectTimeout(t *testing.T) { }() _, err := tc.exporter.SendAndWait(ctx, twoTraces) require.Error(t, err) - require.True(t, errors.Is(err, context.Canceled)) + + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status error: %v", err) + require.Equal(t, codes.Canceled, stat.Code()) require.NoError(t, tc.exporter.Shutdown(bg)) }) @@ -489,7 +496,10 @@ func TestArrowExporterStreamRace(t *testing.T) { // This blocks until the cancelation. _, err := tc.exporter.SendAndWait(callctx, twoTraces) require.Error(t, err) - require.True(t, errors.Is(err, context.Canceled)) + + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status error: %v", err) + require.Equal(t, codes.Canceled, stat.Code()) }() } diff --git a/collector/exporter/otelarrowexporter/internal/arrow/stream.go b/collector/exporter/otelarrowexporter/internal/arrow/stream.go index a5d97e53ee..78d7345c43 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/stream.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/stream.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "errors" - "fmt" "io" "sync" "time" @@ -16,7 +15,6 @@ import ( "github.com/open-telemetry/otel-arrow/collector/netstats" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -135,9 +133,9 @@ func (s *Stream) setBatchChannel(batchID int64, errCh chan<- error) { s.workState.waiters[batchID] = errCh } -// logStreamError decides how to log an error. `which` indicates the -// stream direction, will be "reader" or "writer". -func (s *Stream) logStreamError(which string, err error) { +// logStreamError decides how to log an error. `where` indicates the +// error location, will be "reader" or "writer". +func (s *Stream) logStreamError(where string, err error) { var code codes.Code var msg string // gRPC tends to supply status-wrapped errors, so we always @@ -156,9 +154,9 @@ func (s *Stream) logStreamError(which string, err error) { msg = err.Error() } if code == codes.Canceled { - s.telemetry.Logger.Debug("arrow stream shutdown", zap.String("which", which), zap.String("message", msg)) + s.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg), zap.String("where", where)) } else { - s.telemetry.Logger.Error("arrow stream error", zap.String("which", which), zap.String("message", msg), zap.Int("code", int(code))) + s.telemetry.Logger.Error("arrow stream error", zap.Int("code", int(code)), zap.String("message", msg), zap.String("where", where)) } } @@ -274,7 +272,7 @@ func (s *Stream) write(ctx context.Context) (retErr error) { return nil case wri = <-s.workState.toWrite: case <-ctx.Done(): - return ctx.Err() + return status.Error(codes.Canceled, ctx.Err().Error()) } err := s.encodeAndSend(wri, &hdrsBuf, hdrsEnc) @@ -319,8 +317,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp if err != nil { // This is some kind of internal error. We will restart the // stream and mark this record as a permanent one. - err = fmt.Errorf("encode: %w", err) - wri.errCh <- consumererror.NewPermanent(err) + err = status.Errorf(codes.Internal, "encode: %v", err) + wri.errCh <- err return err } @@ -336,8 +334,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp // This case is like the encode-failure case // above, we will restart the stream but consider // this a permenent error. - err = fmt.Errorf("hpack: %w", err) - wri.errCh <- consumererror.NewPermanent(err) + err = status.Errorf(codes.Internal, "hpack: %v", err) + wri.errCh <- err return err } } @@ -382,24 +380,24 @@ func (s *Stream) read(_ context.Context) error { } if err = s.processBatchStatus(resp); err != nil { - return fmt.Errorf("process: %w", err) + return err } } } // getSenderChannel takes the stream lock and removes the corresonding // sender channel. -func (sws *streamWorkState) getSenderChannel(status *arrowpb.BatchStatus) (chan<- error, error) { +func (sws *streamWorkState) getSenderChannel(bstat *arrowpb.BatchStatus) (chan<- error, error) { sws.lock.Lock() defer sws.lock.Unlock() - ch, ok := sws.waiters[status.BatchId] + ch, ok := sws.waiters[bstat.BatchId] if !ok { // Will break the stream. - return nil, fmt.Errorf("unrecognized batch ID: %d", status.BatchId) + return nil, status.Errorf(codes.Internal, "unrecognized batch ID: %d", bstat.BatchId) } - delete(sws.waiters, status.BatchId) + delete(sws.waiters, bstat.BatchId) return ch, nil } @@ -460,7 +458,7 @@ func (s *Stream) encode(records any) (_ *arrowpb.BatchArrowRecords, retErr error zap.Reflect("recovered", err), zap.Stack("stacktrace"), ) - retErr = fmt.Errorf("panic in otel-arrow-adapter: %v", err) + retErr = status.Errorf(codes.Internal, "panic in otel-arrow-adapter: %v", err) } }() var batch *arrowpb.BatchArrowRecords @@ -473,7 +471,7 @@ func (s *Stream) encode(records any) (_ *arrowpb.BatchArrowRecords, retErr error case pmetric.Metrics: batch, err = s.producer.BatchArrowRecordsFromMetrics(data) default: - return nil, fmt.Errorf("unsupported OTLP type: %T", records) + return nil, status.Errorf(codes.Unimplemented, "unsupported OTel-Arrow signal type: %T", records) } return batch, err } diff --git a/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go b/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go index 91a33f30d9..f653cc887e 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go @@ -15,9 +15,10 @@ import ( "github.com/open-telemetry/otel-arrow/collector/netstats" arrowRecordMock "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record/mock" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/consumer/consumererror" "go.uber.org/mock/gomock" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var oneBatch = &arrowpb.BatchArrowRecords{ @@ -182,8 +183,12 @@ func TestStreamEncodeError(t *testing.T) { // sender should get a permanent testErr err := tc.mustSendAndWait() require.Error(t, err) - require.True(t, errors.Is(err, testErr)) - require.True(t, consumererror.IsPermanent(err)) + + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status error: %v", err) + require.Equal(t, codes.Internal, stat.Code()) + + require.Contains(t, stat.Message(), testErr.Error()) }) } } diff --git a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go index 4159822ee5..5515b7fe06 100644 --- a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -83,7 +83,12 @@ type Receiver struct { recvInFlightItems metric.Int64UpDownCounter recvInFlightRequests metric.Int64UpDownCounter boundedQueue *admission.BoundedQueue - inFlightWG sync.WaitGroup +} + +// receiverStream holds the inFlightWG for a single stream. +type receiverStream struct { + *Receiver + inFlightWG sync.WaitGroup } // New creates a new Receiver reference. @@ -306,9 +311,9 @@ func (r *Receiver) logStreamError(err error, where string) { } if code == codes.Canceled { - r.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg)) + r.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg), zap.String("where", where)) } else { - r.telemetry.Logger.Error("arrow stream error", zap.String("message", msg), zap.Int("code", int(code)), zap.String("where", where)) + r.telemetry.Logger.Error("arrow stream error", zap.Int("code", int(code)), zap.String("message", msg), zap.String("where", where)) } } @@ -381,34 +386,34 @@ func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retEr // wg is used to ensure this thread returns after both // sender and recevier threads return. - var wg sync.WaitGroup - wg.Add(2) + var sendWG sync.WaitGroup + var recvWG sync.WaitGroup + sendWG.Add(1) + recvWG.Add(1) - // The inflightWG is used to wait for all data to send. The - // 1-count here is removed after srvReceiveLoop() returns, - // having this ensures that concurrent calls to Add() in the - // receiver do not race with Wait() in the sender. - r.inFlightWG.Add(1) + rstream := &receiverStream{ + Receiver: r, + } go func() { var err error - defer wg.Done() + defer recvWG.Done() defer r.recoverErr(&err) - defer r.inFlightWG.Done() - err = r.srvReceiveLoop(doneCtx, serverStream, pendingCh, method, ac) + err = rstream.srvReceiveLoop(doneCtx, serverStream, pendingCh, method, ac) streamErrCh <- err }() go func() { var err error - defer wg.Done() + defer sendWG.Done() defer r.recoverErr(&err) - err = r.srvSendLoop(doneCtx, serverStream, pendingCh) + err = rstream.srvSendLoop(doneCtx, serverStream, &recvWG, pendingCh) streamErrCh <- err }() // Wait for sender/receiver threads to return before returning. - defer wg.Wait() + defer recvWG.Wait() + defer sendWG.Wait() select { case <-doneCtx.Done(): @@ -419,17 +424,17 @@ func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retEr } } -func (r *Receiver) newInFlightData(ctx context.Context, method string, batchID int64, pendingCh chan<- batchResp) (context.Context, *inFlightData) { +func (r *receiverStream) newInFlightData(ctx context.Context, method string, batchID int64, pendingCh chan<- batchResp) (context.Context, *inFlightData) { ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight") r.inFlightWG.Add(1) r.recvInFlightRequests.Add(ctx, 1) id := &inFlightData{ - Receiver: r, - method: method, - batchID: batchID, - pendingCh: pendingCh, - span: span, + receiverStream: r, + method: method, + batchID: batchID, + pendingCh: pendingCh, + span: span, } id.refs.Add(1) return ctx, id @@ -438,7 +443,7 @@ func (r *Receiver) newInFlightData(ctx context.Context, method string, batchID i // inFlightData is responsible for storing the resources held by one request. type inFlightData struct { // Receiver is the owner of the resources held by this object. - *Receiver + *receiverStream method string batchID int64 @@ -539,7 +544,7 @@ func (id *inFlightData) anyDone(ctx context.Context) { // This handles constructing an inFlightData object, which itself // tracks everything that needs to be used by instrumention when the // batch finishes. -func (r *Receiver) recvOne(streamCtx context.Context, serverStream anyStreamServer, hrcv *headerReceiver, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { +func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStreamServer, hrcv *headerReceiver, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { // Receive a batch corresponding with one ptrace.Traces, pmetric.Metrics, // or plog.Logs item. @@ -650,7 +655,7 @@ func (r *Receiver) consumeAndRespond(ctx context.Context, data any, flight *inFl } // srvReceiveLoop repeatedly receives one batch of data. -func (r *Receiver) srvReceiveLoop(ctx context.Context, serverStream anyStreamServer, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { +func (r *receiverStream) srvReceiveLoop(ctx context.Context, serverStream anyStreamServer, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { hrcv := newHeaderReceiver(ctx, r.authServer, r.gsettings.IncludeMetadata) for { select { @@ -665,7 +670,7 @@ func (r *Receiver) srvReceiveLoop(ctx context.Context, serverStream anyStreamSer } // srvReceiveLoop repeatedly sends one batch data response. -func (r *Receiver) sendOne(serverStream anyStreamServer, resp batchResp) error { +func (r *receiverStream) sendOne(serverStream anyStreamServer, resp batchResp) error { // Note: Statuses can be batched, but we do not take // advantage of this feature. bs := &arrowpb.BatchStatus{ @@ -709,19 +714,17 @@ func (r *Receiver) sendOne(serverStream anyStreamServer, resp batchResp) error { return nil } -func (r *Receiver) flushSender(serverStream anyStreamServer, pendingCh <-chan batchResp) error { - var err error - // wait for all in flight requests to be successfully - // processed or fail. this implies waiting for the receiver - // loop to exit, as it holds one additional wait count to - // avoid a race with Add() here. +func (r *receiverStream) flushSender(serverStream anyStreamServer, recvWG *sync.WaitGroup, pendingCh <-chan batchResp) error { + // wait to ensure no more items are accepted + recvWG.Wait() + + // wait for all responses to be sent r.inFlightWG.Wait() for { select { case resp := <-pendingCh: - err = r.sendOne(serverStream, resp) - if err != nil { + if err := r.sendOne(serverStream, resp); err != nil { return err } default: @@ -731,11 +734,11 @@ func (r *Receiver) flushSender(serverStream anyStreamServer, pendingCh <-chan ba } } -func (r *Receiver) srvSendLoop(ctx context.Context, serverStream anyStreamServer, pendingCh <-chan batchResp) error { +func (r *receiverStream) srvSendLoop(ctx context.Context, serverStream anyStreamServer, recvWG *sync.WaitGroup, pendingCh <-chan batchResp) error { for { select { case <-ctx.Done(): - return r.flushSender(serverStream, pendingCh) + return r.flushSender(serverStream, recvWG, pendingCh) case resp := <-pendingCh: if err := r.sendOne(serverStream, resp); err != nil { return err diff --git a/collector/test/e2e_test.go b/collector/test/e2e_test.go index 7492b00b47..d4ca5f8f56 100644 --- a/collector/test/e2e_test.go +++ b/collector/test/e2e_test.go @@ -6,8 +6,10 @@ package test import ( "context" "encoding/json" + "errors" "fmt" "math/rand" + "strings" "sync" "testing" "time" @@ -15,11 +17,14 @@ import ( "github.com/open-telemetry/otel-arrow/collector/exporter/otelarrowexporter" "github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver" "github.com/open-telemetry/otel-arrow/collector/testutil" + "github.com/open-telemetry/otel-arrow/pkg/datagen" "github.com/open-telemetry/otel-arrow/pkg/otel/assert" + "github.com/open-telemetry/otel-arrow/pkg/otel/common/arrow" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" @@ -27,14 +32,43 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +type testParams struct { + threadCount int + requestCount int +} + +var normalParams = testParams{ + threadCount: 10, + requestCount: 100, +} + +var memoryLimitParams = testParams{ + threadCount: 10, + requestCount: 10, +} + type testConsumer struct { - sink consumertest.TracesSink + sink consumertest.TracesSink + recvLogs *observer.ObservedLogs + expLogs *observer.ObservedLogs } var _ consumer.Traces = &testConsumer{} +type ExpConfig = otelarrowexporter.Config +type RecvConfig = otelarrowreceiver.Config +type CfgFunc func(*ExpConfig, *RecvConfig) +type GenFunc func(int) ptrace.Traces +type MkGen func() GenFunc +type EndFunc func(t *testing.T, tp testParams, testCon *testConsumer, expect [][]ptrace.Traces) +type ConsumerErrFunc func(t *testing.T, err error) + func (*testConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{} } @@ -44,11 +78,20 @@ func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) err return tc.sink.ConsumeTraces(ctx, td) } -func TestIntegrationSimpleTraces(t *testing.T) { - const ( - threadCount = 10 - requestCount = 100 - ) +func testLoggerSettings(t *testing.T) (component.TelemetrySettings, *observer.ObservedLogs) { + tset := componenttest.NewNopTelemetrySettings() + + core, obslogs := observer.New(zapcore.InfoLevel) + + // Note: if you want to see these logs, use: + //tset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core())) + tset.Logger = zap.New(core) + + return tset, obslogs +} + +func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces, receiver.Traces) { + ctx := context.Background() efact := otelarrowexporter.NewFactory() rfact := otelarrowreceiver.NewFactory() @@ -56,8 +99,8 @@ func TestIntegrationSimpleTraces(t *testing.T) { ecfg := efact.CreateDefaultConfig() rcfg := rfact.CreateDefaultConfig() - receiverCfg := rcfg.(*otelarrowreceiver.Config) - exporterCfg := ecfg.(*otelarrowexporter.Config) + receiverCfg := rcfg.(*RecvConfig) + exporterCfg := ecfg.(*ExpConfig) addr := testutil.GetAvailableLocalAddress(t) @@ -70,28 +113,39 @@ func TestIntegrationSimpleTraces(t *testing.T) { exporterCfg.RetryConfig.Enabled = false exporterCfg.Arrow.NumStreams = 1 - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - tset := componenttest.NewNopTelemetrySettings() - tset.Logger, _ = zap.NewDevelopment() + if cfgF != nil { + cfgF(exporterCfg, receiverCfg) + } - host := componenttest.NewNopHost() + expTset, expLogs := testLoggerSettings(t) + recvTset, recvLogs := testLoggerSettings(t) - testCon := &testConsumer{} + testCon := &testConsumer{ + recvLogs: recvLogs, + expLogs: expLogs, + } receiver, err := rfact.CreateTracesReceiver(ctx, receiver.CreateSettings{ ID: component.MustNewID("otelarrowreceiver"), - TelemetrySettings: tset, + TelemetrySettings: recvTset, }, receiverCfg, testCon) require.NoError(t, err) exporter, err := efact.CreateTracesExporter(ctx, exporter.CreateSettings{ ID: component.MustNewID("otelarrowexporter"), - TelemetrySettings: tset, + TelemetrySettings: expTset, }, exporterCfg) require.NoError(t, err) + return testCon, exporter, receiver + +} + +func testIntegrationTraces(ctx context.Context, t *testing.T, tp testParams, cfgf CfgFunc, mkgen MkGen, errf ConsumerErrFunc, endf EndFunc) { + host := componenttest.NewNopHost() + + testCon, exporter, receiver := basicTestConfig(t, cfgf) + var startWG sync.WaitGroup var exporterShutdownWG sync.WaitGroup var startExporterShutdownWG sync.WaitGroup @@ -123,41 +177,17 @@ func TestIntegrationSimpleTraces(t *testing.T) { startWG.Wait() var clientDoneWG sync.WaitGroup // wait for client to finish - var expect [threadCount][]ptrace.Traces + expect := make([][]ptrace.Traces, tp.threadCount) - for num := 0; num < threadCount; num++ { + for num := 0; num < tp.threadCount; num++ { clientDoneWG.Add(1) go func() { defer clientDoneWG.Done() - for i := 0; i < requestCount; i++ { - td := ptrace.NewTraces() - td.ResourceSpans().AppendEmpty().Resource().Attributes().PutStr("resource-attr", fmt.Sprint("resource-attr-val-", i)) - - ss := td.ResourceSpans().At(0).ScopeSpans().AppendEmpty().Spans() - span := ss.AppendEmpty() - - span.SetName("operationA") - span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) - span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) - - span.SetTraceID(testutil.UInt64ToTraceID(rand.Uint64(), rand.Uint64())) - span.SetSpanID(testutil.UInt64ToSpanID(rand.Uint64())) - evs := span.Events() - ev0 := evs.AppendEmpty() - ev0.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - ev0.SetName("event-with-attr") - ev0.Attributes().PutStr("span-event-attr", "span-event-attr-val") - ev0.SetDroppedAttributesCount(2) - ev1 := evs.AppendEmpty() - ev1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - ev1.SetName("event") - ev1.SetDroppedAttributesCount(2) - span.SetDroppedEventsCount(1) - status := span.Status() - status.SetCode(ptrace.StatusCodeError) - status.SetMessage("status-cancelled") - - require.NoError(t, exporter.ConsumeTraces(ctx, td)) + generator := mkgen() + for i := 0; i < tp.requestCount; i++ { + td := generator(i) + + errf(t, exporter.ConsumeTraces(ctx, td)) expect[num] = append(expect[num], td) } }() @@ -172,8 +202,59 @@ func TestIntegrationSimpleTraces(t *testing.T) { // wait for receiver to shut down receiverShutdownWG.Wait() + endf(t, tp, testCon, expect[:]) +} + +func makeTestTraces(i int) ptrace.Traces { + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty().Resource().Attributes().PutStr("resource-attr", fmt.Sprint("resource-attr-val-", i)) + + ss := td.ResourceSpans().At(0).ScopeSpans().AppendEmpty().Spans() + span := ss.AppendEmpty() + + span.SetName("operationA") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) + + span.SetTraceID(testutil.UInt64ToTraceID(rand.Uint64(), rand.Uint64())) + span.SetSpanID(testutil.UInt64ToSpanID(rand.Uint64())) + evs := span.Events() + ev0 := evs.AppendEmpty() + ev0.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + ev0.SetName("event-with-attr") + ev0.Attributes().PutStr("span-event-attr", "span-event-attr-val") + ev0.SetDroppedAttributesCount(2) + ev1 := evs.AppendEmpty() + ev1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + ev1.SetName("event") + ev1.SetDroppedAttributesCount(2) + span.SetDroppedEventsCount(1) + status := span.Status() + status.SetCode(ptrace.StatusCodeError) + status.SetMessage("status-cancelled") + + return td +} + +func bulkyGenFunc() MkGen { + return func() GenFunc { + entropy := datagen.NewTestEntropy(int64(rand.Uint64())) //nolint:gosec // only used for testing + + tracesGen := datagen.NewTracesGenerator( + entropy, + entropy.NewStandardResourceAttributes(), + entropy.NewStandardInstrumentationScopes(), + ) + return func(_ int) ptrace.Traces { + return tracesGen.Generate(1000, time.Minute) + } + } + +} + +func standardEnding(t *testing.T, tp testParams, testCon *testConsumer, expect [][]ptrace.Traces) { // Check for matching request count and data - require.Equal(t, requestCount*threadCount, testCon.sink.SpanCount()) + require.Equal(t, tp.requestCount*tp.threadCount, testCon.sink.SpanCount()) var expectJSON []json.Marshaler for _, tdn := range expect { @@ -189,3 +270,95 @@ func TestIntegrationSimpleTraces(t *testing.T) { asserter := assert.NewStdUnitTest(t) assert.Equiv(asserter, expectJSON, receivedJSON) } + +func logSigs(obs *observer.ObservedLogs) (map[string]int, []string) { + counts := map[string]int{} + var msgs []string + for _, rl := range obs.All() { + var attrs []string + for _, f := range rl.Context { + attrs = append(attrs, f.Key) + + if rl.Message == "arrow stream error" && f.Key == "message" { + msgs = append(msgs, f.String) + } + } + var sig strings.Builder + sig.WriteString(rl.Message) + sig.WriteString("|||") + sig.WriteString(strings.Join(attrs, "///")) + counts[sig.String()]++ + } + return counts, msgs +} + +func countMemoryLimitErrors(msgs []string) (cnt int) { + for _, msg := range msgs { + if _, ok := arrow.NewLimitErrorFromError(errors.New(msg)); ok { + cnt++ + } + } + return +} + +func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer, _ [][]ptrace.Traces) { + require.Equal(t, 0, testCon.sink.SpanCount()) + + eSigs, eMsgs := logSigs(testCon.expLogs) + rSigs, rMsgs := logSigs(testCon.recvLogs) + + require.Less(t, 0, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eSigs) + require.Less(t, 0, rSigs["arrow stream error|||code///message///where"], "should have receiver arrow stream errors: %v", rSigs) + + require.Less(t, 0, countMemoryLimitErrors(rMsgs), "should have memory limit errors: %v", rMsgs) + require.Less(t, 0, countMemoryLimitErrors(eMsgs), "should have memory limit errors: %v", eMsgs) +} + +func consumerSuccess(t *testing.T, err error) { + require.NoError(t, err) +} + +func consumerFailure(t *testing.T, err error) { + require.Error(t, err) + + // there should be no permanent errors anywhere in this test. + require.True(t, !consumererror.IsPermanent(err), + "should not be permanent: %v", err) + + stat, ok := status.FromError(err) + require.True(t, ok, "should be a status error: %v", err) + + switch stat.Code() { + case codes.ResourceExhausted, codes.Canceled: + // Cool + default: + // Not cool + t.Fatalf("unexpected status code %v", stat) + } +} + +func TestIntegrationTracesSimple(t *testing.T) { + for _, n := range []int{1, 2, 4, 8} { + t.Run(fmt.Sprint(n), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testIntegrationTraces(ctx, t, normalParams, func(ecfg *ExpConfig, rcfg *RecvConfig) { + ecfg.Arrow.NumStreams = n + }, func() GenFunc { return makeTestTraces }, consumerSuccess, standardEnding) + }) + } +} + +func TestIntegrationMemoryLimited(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(5 * time.Second) + cancel() + }() + testIntegrationTraces(ctx, t, memoryLimitParams, func(ecfg *ExpConfig, rcfg *RecvConfig) { + rcfg.Arrow.MemoryLimitMiB = 1 + ecfg.Arrow.NumStreams = 10 + ecfg.TimeoutSettings.Timeout = 5 * time.Second + }, bulkyGenFunc(), consumerFailure, failureMemoryLimitEnding) +} diff --git a/collector/test/go.mod b/collector/test/go.mod index 2e0ce44cc9..a453883c61 100644 --- a/collector/test/go.mod +++ b/collector/test/go.mod @@ -14,6 +14,7 @@ require ( go.opentelemetry.io/collector/pdata v1.5.0 go.opentelemetry.io/collector/receiver v0.98.0 go.uber.org/zap v1.27.0 + google.golang.org/grpc v1.63.2 ) require ( @@ -22,6 +23,7 @@ require ( github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/brianvoe/gofakeit/v6 v6.17.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -88,7 +90,6 @@ require ( golang.org/x/tools v0.15.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect - google.golang.org/grpc v1.63.2 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/pkg/otel/arrow_record/consumer.go b/pkg/otel/arrow_record/consumer.go index 8bb67172f0..62eff883fa 100644 --- a/pkg/otel/arrow_record/consumer.go +++ b/pkg/otel/arrow_record/consumer.go @@ -17,6 +17,7 @@ package arrow_record import ( "bytes" "context" + "errors" "fmt" "log" "math/rand" @@ -60,9 +61,12 @@ type ConsumerAPI interface { var _ ConsumerAPI = &Consumer{} -var ErrConsumerMemoryLimit = fmt.Errorf( - "The number of decoded records is smaller than the number of received payloads. " + - "Please increase the memory limit of the consumer.") +// ErrConsumerMemoryLimit is used by calling code to check +// errors.Is(err, ErrConsumerMemoryLimit). It is never returned. +var ErrConsumerMemoryLimit error = common.LimitError{} + +var errConsumerInternalError = errors.New( + "internal error: number of decoded records is smaller than the number of received payloads") // Consumer is a BatchArrowRecords consumer. type Consumer struct { @@ -311,12 +315,15 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces, // Consume takes a BatchArrowRecords protobuf message and returns an array of RecordMessage. // Note: the records wrapped in the RecordMessage must be released after use by the caller. -func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.RecordMessage, error) { +func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) (ibes []*record_message.RecordMessage, retErr error) { ctx := context.Background() - var ibes []*record_message.RecordMessage defer func() { c.recordsCounter.Add(ctx, int64(len(ibes)), c.metricOpts()...) + if retErr != nil { + releaseRecords(ibes) + ibes = nil + } }() // Transform each individual OtlpArrowPayload into RecordMessage @@ -356,8 +363,7 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R ipc.WithZstd(), ) if err != nil { - releaseRecords(ibes) - return nil, werror.Wrap(err) + return ibes, werror.Wrap(distinguishMemoryError(err)) } sc.ipcReader = ipcReader } @@ -370,16 +376,27 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R rec.Retain() ibes = append(ibes, record_message.NewRecordMessage(bar.BatchId, payload.GetType(), rec)) } + + if err := sc.ipcReader.Err(); err != nil { + return ibes, werror.Wrap(distinguishMemoryError(err)) + } } if len(ibes) < len(bar.ArrowPayloads) { - releaseRecords(ibes) - return nil, ErrConsumerMemoryLimit + return ibes, werror.Wrap(errConsumerInternalError) } return ibes, nil } +func distinguishMemoryError(err error) error { + limErr, ok := common.NewLimitErrorFromError(err) + if ok { + return limErr + } + return err +} + type runtimeChecker struct{} var _ memory.TestingT = &runtimeChecker{} diff --git a/pkg/otel/common/arrow/allocator.go b/pkg/otel/common/arrow/allocator.go index dac4e71b36..5974624857 100644 --- a/pkg/otel/common/arrow/allocator.go +++ b/pkg/otel/common/arrow/allocator.go @@ -16,11 +16,22 @@ package arrow import ( "fmt" - "os" + "regexp" + "strconv" + "strings" "github.com/apache/arrow/go/v14/arrow/memory" ) +// MemoryErrorStringPrefix is a prefix used to recognize memory limit errors. +// +// Note: the arrow/go package (as of v16) has a panic recovery +// mechanism which formats the error object raised through panic in +// the code below. The formatting uses a "%v" which means we lose the +// error wrapping facility that would let us easily extract the +// object. Therefore, we use a regexp to unpack memory limit errors. +const MemoryErrorStringPrefix = "allocation size exceeds limit" + type LimitedAllocator struct { Allocator memory.Allocator inuse uint64 @@ -44,8 +55,33 @@ type LimitError struct { var _ error = LimitError{} +var limitRegexp = regexp.MustCompile(`requested (\d+) out of (\d+) \(in-use=(\d+)\)`) + +// NewLimitErrorFromError extracts a formatted limit error. See +// MemoryErrorStringPrefix for an explanation. +func NewLimitErrorFromError(err error) (error, bool) { + msg := err.Error() + if !strings.Contains(msg, MemoryErrorStringPrefix) { + return err, false + } + matches := limitRegexp.FindStringSubmatch(msg) + if len(matches) != 4 { + return err, false + } + + req, _ := strconv.ParseUint(matches[1], 10, 64) + lim, _ := strconv.ParseUint(matches[2], 10, 64) + inuse, _ := strconv.ParseUint(matches[3], 10, 64) + + return LimitError{ + Request: req, + Inuse: inuse, + Limit: lim, + }, true +} + func (le LimitError) Error() string { - return fmt.Sprintf("allocation size %d exceeds limit %d (in-use=%d)", le.Request, le.Limit, le.Inuse) + return fmt.Sprintf("%s: requested %d out of %d (in-use=%d)", MemoryErrorStringPrefix, le.Request, le.Limit, le.Inuse) } func (_ LimitError) Is(tgt error) bool { @@ -65,9 +101,6 @@ func (l *LimitedAllocator) Allocate(size int) []byte { Inuse: l.inuse, Limit: l.limit, } - // Write the error to stderr so that it is visible even if the - // panic is caught. - os.Stderr.WriteString(err.Error() + "\n") panic(err) } @@ -86,9 +119,6 @@ func (l *LimitedAllocator) Reallocate(size int, b []byte) []byte { Inuse: l.inuse, Limit: l.limit, } - // Write the error to stderr so that it is visible even if the - // panic is caught. - os.Stderr.WriteString(err.Error() + "\n") panic(err) } diff --git a/pkg/otel/common/arrow/allocator_test.go b/pkg/otel/common/arrow/allocator_test.go index cc28564f4c..7925b9da67 100644 --- a/pkg/otel/common/arrow/allocator_test.go +++ b/pkg/otel/common/arrow/allocator_test.go @@ -16,13 +16,14 @@ package arrow import ( "errors" + "fmt" "testing" "github.com/apache/arrow/go/v14/arrow/memory" "github.com/stretchr/testify/require" ) -func TestLimitedAllocator(t *testing.T) { +func TestLimitedAllocatorUnformatted(t *testing.T) { const boundary = 1000000 check := memory.NewCheckedAllocator(memory.NewGoAllocator()) limit := NewLimitedAllocator(check, boundary) @@ -47,9 +48,26 @@ func TestLimitedAllocator(t *testing.T) { }() require.NotNil(t, capture) require.True(t, errors.Is(capture.(error), LimitError{})) - require.Equal(t, "allocation size 1 exceeds limit 1000000 (in-use=1000000)", capture.(error).Error()) + require.Equal(t, "allocation size exceeds limit: requested 1 out of 1000000 (in-use=1000000)", capture.(error).Error()) limit.Free(b) check.AssertSize(t, 0) } + +func TestLimitedAllocatorFormatted(t *testing.T) { + // Arrow does not wrap the error, so the consumer sees its + // formatted version. + expect := LimitError{ + Request: 1000, + Inuse: 9900, + Limit: 10000, + } + + unwrap, ok := NewLimitErrorFromError(fmt.Errorf("some sort of prefix %v some sort of suffix", expect)) + require.Error(t, unwrap) + require.True(t, ok) + require.Equal(t, expect, unwrap) + + require.True(t, errors.Is(unwrap, LimitError{})) +}