Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"runtime"
"sort"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// bestOfNPrioritizer is a prioritizer that selects a less-loaded stream to write.
Expand Down Expand Up @@ -114,7 +117,7 @@ func (lp *bestOfNPrioritizer) sendAndWait(ctx context.Context, errCh <-chan erro
case <-lp.done:
return ErrStreamRestarting
case <-ctx.Done():
return context.Canceled
return status.Error(codes.Canceled, ctx.Err().Error())
case lp.input <- wri:
return waitForWrite(ctx, errCh, lp.done)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
)

// Exporter is 1:1 with exporter, isolates arrow-specific
Expand Down Expand Up @@ -255,6 +257,12 @@ func (e *Exporter) runArrowStream(ctx context.Context, dc doneCancel, state *str
//
// consumer should fall back to standard OTLP, (true, nil)
func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) {
select {
case <-ctx.Done():
return false, status.Error(codes.Canceled, "incoming request already canceled")
default:
}

errCh := make(chan error, 1)

// Note that if the OTLP exporter's gRPC Headers field was
Expand Down Expand Up @@ -340,7 +348,7 @@ func waitForWrite(ctx context.Context, errCh <-chan error, down <-chan struct{})
select {
case <-ctx.Done():
// This caller's context timed out.
return ctx.Err()
return status.Error(codes.Canceled, ctx.Err().Error())
case <-down:
return ErrStreamRestarting
case err := <-errCh:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package arrow
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -31,7 +30,9 @@ import (
"go.uber.org/zap/zaptest"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

var AllPrioritizers = []PrioritizerName{LeastLoadedPrioritizer, LeastLoadedTwoPrioritizer}
Expand Down Expand Up @@ -278,7 +279,10 @@ func TestArrowExporterTimeout(t *testing.T) {
sent, err := tc.exporter.SendAndWait(ctx, twoTraces)
require.True(t, sent)
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status")
require.Equal(t, codes.Canceled, stat.Code())

require.NoError(t, tc.exporter.Shutdown(ctx))
})
Expand Down Expand Up @@ -406,7 +410,10 @@ func TestArrowExporterConnectTimeout(t *testing.T) {
}()
_, err := tc.exporter.SendAndWait(ctx, twoTraces)
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status error: %v", err)
require.Equal(t, codes.Canceled, stat.Code())

require.NoError(t, tc.exporter.Shutdown(bg))
})
Expand Down Expand Up @@ -489,7 +496,10 @@ func TestArrowExporterStreamRace(t *testing.T) {
// This blocks until the cancelation.
_, err := tc.exporter.SendAndWait(callctx, twoTraces)
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status error: %v", err)
require.Equal(t, codes.Canceled, stat.Code())
}()
}

Expand Down
36 changes: 17 additions & 19 deletions collector/exporter/otelarrowexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
"time"
Expand All @@ -16,7 +15,6 @@ import (
"github.com/open-telemetry/otel-arrow/collector/netstats"
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -135,9 +133,9 @@ func (s *Stream) setBatchChannel(batchID int64, errCh chan<- error) {
s.workState.waiters[batchID] = errCh
}

// logStreamError decides how to log an error. `which` indicates the
// stream direction, will be "reader" or "writer".
func (s *Stream) logStreamError(which string, err error) {
// logStreamError decides how to log an error. `where` indicates the
// error location, will be "reader" or "writer".
func (s *Stream) logStreamError(where string, err error) {
var code codes.Code
var msg string
// gRPC tends to supply status-wrapped errors, so we always
Expand All @@ -156,9 +154,9 @@ func (s *Stream) logStreamError(which string, err error) {
msg = err.Error()
}
if code == codes.Canceled {
s.telemetry.Logger.Debug("arrow stream shutdown", zap.String("which", which), zap.String("message", msg))
s.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg), zap.String("where", where))
} else {
s.telemetry.Logger.Error("arrow stream error", zap.String("which", which), zap.String("message", msg), zap.Int("code", int(code)))
s.telemetry.Logger.Error("arrow stream error", zap.Int("code", int(code)), zap.String("message", msg), zap.String("where", where))
}
}

Expand Down Expand Up @@ -274,7 +272,7 @@ func (s *Stream) write(ctx context.Context) (retErr error) {
return nil
case wri = <-s.workState.toWrite:
case <-ctx.Done():
return ctx.Err()
return status.Error(codes.Canceled, ctx.Err().Error())
}

err := s.encodeAndSend(wri, &hdrsBuf, hdrsEnc)
Expand Down Expand Up @@ -319,8 +317,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp
if err != nil {
// This is some kind of internal error. We will restart the
// stream and mark this record as a permanent one.
err = fmt.Errorf("encode: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
err = status.Errorf(codes.Internal, "encode: %v", err)
wri.errCh <- err
return err
}

Expand All @@ -336,8 +334,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp
// This case is like the encode-failure case
// above, we will restart the stream but consider
// this a permenent error.
err = fmt.Errorf("hpack: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
err = status.Errorf(codes.Internal, "hpack: %v", err)
wri.errCh <- err
return err
}
}
Expand Down Expand Up @@ -382,24 +380,24 @@ func (s *Stream) read(_ context.Context) error {
}

if err = s.processBatchStatus(resp); err != nil {
return fmt.Errorf("process: %w", err)
return err
}
}
}

// getSenderChannel takes the stream lock and removes the corresonding
// sender channel.
func (sws *streamWorkState) getSenderChannel(status *arrowpb.BatchStatus) (chan<- error, error) {
func (sws *streamWorkState) getSenderChannel(bstat *arrowpb.BatchStatus) (chan<- error, error) {
sws.lock.Lock()
defer sws.lock.Unlock()

ch, ok := sws.waiters[status.BatchId]
ch, ok := sws.waiters[bstat.BatchId]
if !ok {
// Will break the stream.
return nil, fmt.Errorf("unrecognized batch ID: %d", status.BatchId)
return nil, status.Errorf(codes.Internal, "unrecognized batch ID: %d", bstat.BatchId)
}

delete(sws.waiters, status.BatchId)
delete(sws.waiters, bstat.BatchId)
return ch, nil
}

Expand Down Expand Up @@ -460,7 +458,7 @@ func (s *Stream) encode(records any) (_ *arrowpb.BatchArrowRecords, retErr error
zap.Reflect("recovered", err),
zap.Stack("stacktrace"),
)
retErr = fmt.Errorf("panic in otel-arrow-adapter: %v", err)
retErr = status.Errorf(codes.Internal, "panic in otel-arrow-adapter: %v", err)
}
}()
var batch *arrowpb.BatchArrowRecords
Expand All @@ -473,7 +471,7 @@ func (s *Stream) encode(records any) (_ *arrowpb.BatchArrowRecords, retErr error
case pmetric.Metrics:
batch, err = s.producer.BatchArrowRecordsFromMetrics(data)
default:
return nil, fmt.Errorf("unsupported OTLP type: %T", records)
return nil, status.Errorf(codes.Unimplemented, "unsupported OTel-Arrow signal type: %T", records)
}
return batch, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
"github.com/open-telemetry/otel-arrow/collector/netstats"
arrowRecordMock "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.uber.org/mock/gomock"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var oneBatch = &arrowpb.BatchArrowRecords{
Expand Down Expand Up @@ -182,8 +183,12 @@ func TestStreamEncodeError(t *testing.T) {
// sender should get a permanent testErr
err := tc.mustSendAndWait()
require.Error(t, err)
require.True(t, errors.Is(err, testErr))
require.True(t, consumererror.IsPermanent(err))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status error: %v", err)
require.Equal(t, codes.Internal, stat.Code())

require.Contains(t, stat.Message(), testErr.Error())
})
}
}
Expand Down
Loading