From b4f0b520d1c1fd0dd7c17a3710c83e7838d57d89 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 4 Jun 2024 14:55:25 -0700 Subject: [PATCH 1/2] Use gRPC Status codes in the Arrow exporter --- .../internal/arrow/bestofn.go | 5 ++- .../internal/arrow/exporter.go | 10 +++++- .../internal/arrow/exporter_test.go | 18 +++++++--- .../internal/arrow/stream.go | 36 +++++++++---------- .../internal/arrow/stream_test.go | 11 ++++-- 5 files changed, 52 insertions(+), 28 deletions(-) diff --git a/collector/exporter/otelarrowexporter/internal/arrow/bestofn.go b/collector/exporter/otelarrowexporter/internal/arrow/bestofn.go index f5ac6034d9..1c4f2f34e2 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/bestofn.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/bestofn.go @@ -9,6 +9,9 @@ import ( "runtime" "sort" "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // bestOfNPrioritizer is a prioritizer that selects a less-loaded stream to write. @@ -114,7 +117,7 @@ func (lp *bestOfNPrioritizer) sendAndWait(ctx context.Context, errCh <-chan erro case <-lp.done: return ErrStreamRestarting case <-ctx.Done(): - return context.Canceled + return status.Errorf(codes.Canceled, "stream wait: %v", ctx.Err()) case lp.input <- wri: return waitForWrite(ctx, errCh, lp.done) } diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter.go index 3976f9982c..3eaae9dbd5 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter.go @@ -20,7 +20,9 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" ) // Exporter is 1:1 with exporter, isolates arrow-specific @@ -255,6 +257,12 @@ func (e *Exporter) runArrowStream(ctx context.Context, dc doneCancel, state *str // // consumer should fall back to standard OTLP, (true, nil) func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) { + select { + case <-ctx.Done(): + return false, status.Errorf(codes.Canceled, "incoming context: %v", ctx.Err()) + default: + } + errCh := make(chan error, 1) // Note that if the OTLP exporter's gRPC Headers field was @@ -340,7 +348,7 @@ func waitForWrite(ctx context.Context, errCh <-chan error, down <-chan struct{}) select { case <-ctx.Done(): // This caller's context timed out. - return ctx.Err() + return status.Errorf(codes.Canceled, "send wait: %v", ctx.Err()) case <-down: return ErrStreamRestarting case err := <-errCh: diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go index 276e5f3fa4..7047d74074 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go @@ -6,7 +6,6 @@ package arrow import ( "context" "encoding/json" - "errors" "fmt" "sync" "sync/atomic" @@ -31,7 +30,9 @@ import ( "go.uber.org/zap/zaptest" "golang.org/x/net/http2/hpack" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) var AllPrioritizers = []PrioritizerName{LeastLoadedPrioritizer, LeastLoadedTwoPrioritizer} @@ -278,7 +279,10 @@ func TestArrowExporterTimeout(t *testing.T) { sent, err := tc.exporter.SendAndWait(ctx, twoTraces) require.True(t, sent) require.Error(t, err) - require.True(t, errors.Is(err, context.Canceled)) + + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status") + require.Equal(t, codes.Canceled, stat.Code()) require.NoError(t, tc.exporter.Shutdown(ctx)) }) @@ -406,7 +410,10 @@ func TestArrowExporterConnectTimeout(t *testing.T) { }() _, err := tc.exporter.SendAndWait(ctx, twoTraces) require.Error(t, err) - require.True(t, errors.Is(err, context.Canceled)) + + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status error: %v", err) + require.Equal(t, codes.Canceled, stat.Code()) require.NoError(t, tc.exporter.Shutdown(bg)) }) @@ -489,7 +496,10 @@ func TestArrowExporterStreamRace(t *testing.T) { // This blocks until the cancelation. _, err := tc.exporter.SendAndWait(callctx, twoTraces) require.Error(t, err) - require.True(t, errors.Is(err, context.Canceled)) + + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status error: %v", err) + require.Equal(t, codes.Canceled, stat.Code()) }() } diff --git a/collector/exporter/otelarrowexporter/internal/arrow/stream.go b/collector/exporter/otelarrowexporter/internal/arrow/stream.go index a5d97e53ee..ae991241ca 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/stream.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/stream.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "errors" - "fmt" "io" "sync" "time" @@ -16,7 +15,6 @@ import ( "github.com/open-telemetry/otel-arrow/collector/netstats" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -135,9 +133,9 @@ func (s *Stream) setBatchChannel(batchID int64, errCh chan<- error) { s.workState.waiters[batchID] = errCh } -// logStreamError decides how to log an error. `which` indicates the -// stream direction, will be "reader" or "writer". -func (s *Stream) logStreamError(which string, err error) { +// logStreamError decides how to log an error. `where` indicates the +// error location, will be "reader" or "writer". +func (s *Stream) logStreamError(where string, err error) { var code codes.Code var msg string // gRPC tends to supply status-wrapped errors, so we always @@ -156,9 +154,9 @@ func (s *Stream) logStreamError(which string, err error) { msg = err.Error() } if code == codes.Canceled { - s.telemetry.Logger.Debug("arrow stream shutdown", zap.String("which", which), zap.String("message", msg)) + s.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg), zap.String("where", where)) } else { - s.telemetry.Logger.Error("arrow stream error", zap.String("which", which), zap.String("message", msg), zap.Int("code", int(code))) + s.telemetry.Logger.Error("arrow stream error", zap.Int("code", int(code)), zap.String("message", msg), zap.String("where", where)) } } @@ -274,7 +272,7 @@ func (s *Stream) write(ctx context.Context) (retErr error) { return nil case wri = <-s.workState.toWrite: case <-ctx.Done(): - return ctx.Err() + return status.Errorf(codes.Canceled, "stream input: %v", ctx.Err()) } err := s.encodeAndSend(wri, &hdrsBuf, hdrsEnc) @@ -319,8 +317,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp if err != nil { // This is some kind of internal error. We will restart the // stream and mark this record as a permanent one. - err = fmt.Errorf("encode: %w", err) - wri.errCh <- consumererror.NewPermanent(err) + err = status.Errorf(codes.Internal, "encode: %v", err) + wri.errCh <- err return err } @@ -336,8 +334,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp // This case is like the encode-failure case // above, we will restart the stream but consider // this a permenent error. - err = fmt.Errorf("hpack: %w", err) - wri.errCh <- consumererror.NewPermanent(err) + err = status.Errorf(codes.Internal, "hpack: %v", err) + wri.errCh <- err return err } } @@ -382,24 +380,24 @@ func (s *Stream) read(_ context.Context) error { } if err = s.processBatchStatus(resp); err != nil { - return fmt.Errorf("process: %w", err) + return err } } } // getSenderChannel takes the stream lock and removes the corresonding // sender channel. -func (sws *streamWorkState) getSenderChannel(status *arrowpb.BatchStatus) (chan<- error, error) { +func (sws *streamWorkState) getSenderChannel(bstat *arrowpb.BatchStatus) (chan<- error, error) { sws.lock.Lock() defer sws.lock.Unlock() - ch, ok := sws.waiters[status.BatchId] + ch, ok := sws.waiters[bstat.BatchId] if !ok { // Will break the stream. - return nil, fmt.Errorf("unrecognized batch ID: %d", status.BatchId) + return nil, status.Errorf(codes.Internal, "unrecognized batch ID: %d", bstat.BatchId) } - delete(sws.waiters, status.BatchId) + delete(sws.waiters, bstat.BatchId) return ch, nil } @@ -460,7 +458,7 @@ func (s *Stream) encode(records any) (_ *arrowpb.BatchArrowRecords, retErr error zap.Reflect("recovered", err), zap.Stack("stacktrace"), ) - retErr = fmt.Errorf("panic in otel-arrow-adapter: %v", err) + retErr = status.Errorf(codes.Internal, "panic in otel-arrow-adapter: %v", err) } }() var batch *arrowpb.BatchArrowRecords @@ -473,7 +471,7 @@ func (s *Stream) encode(records any) (_ *arrowpb.BatchArrowRecords, retErr error case pmetric.Metrics: batch, err = s.producer.BatchArrowRecordsFromMetrics(data) default: - return nil, fmt.Errorf("unsupported OTLP type: %T", records) + return nil, status.Errorf(codes.Unimplemented, "unsupported OTel-Arrow signal type: %T", records) } return batch, err } diff --git a/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go b/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go index 91a33f30d9..f653cc887e 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go @@ -15,9 +15,10 @@ import ( "github.com/open-telemetry/otel-arrow/collector/netstats" arrowRecordMock "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record/mock" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/consumer/consumererror" "go.uber.org/mock/gomock" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var oneBatch = &arrowpb.BatchArrowRecords{ @@ -182,8 +183,12 @@ func TestStreamEncodeError(t *testing.T) { // sender should get a permanent testErr err := tc.mustSendAndWait() require.Error(t, err) - require.True(t, errors.Is(err, testErr)) - require.True(t, consumererror.IsPermanent(err)) + + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status error: %v", err) + require.Equal(t, codes.Internal, stat.Code()) + + require.Contains(t, stat.Message(), testErr.Error()) }) } } From 68e8062f86df79f29a2b40bed080211060e0ce76 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 4 Jun 2024 15:25:50 -0700 Subject: [PATCH 2/2] Do not skip canceled contexts here --- .../exporter/otelarrowexporter/internal/arrow/exporter.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter.go index 3eaae9dbd5..4b9af2def8 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter.go @@ -257,12 +257,6 @@ func (e *Exporter) runArrowStream(ctx context.Context, dc doneCancel, state *str // // consumer should fall back to standard OTLP, (true, nil) func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) { - select { - case <-ctx.Done(): - return false, status.Errorf(codes.Canceled, "incoming context: %v", ctx.Err()) - default: - } - errCh := make(chan error, 1) // Note that if the OTLP exporter's gRPC Headers field was