Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
610 changes: 360 additions & 250 deletions collector/receiver/otelarrowreceiver/internal/arrow/arrow.go

Large diffs are not rendered by default.

115 changes: 78 additions & 37 deletions collector/receiver/otelarrowreceiver/internal/arrow/arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"sync"
"testing"
"time"

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
arrowCollectorMock "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1/mock"
Expand Down Expand Up @@ -48,7 +49,9 @@ import (
"github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver/internal/arrow/mock"
)

var defaultBQ = admission.NewBoundedQueue(int64(100000), int64(10))
func defaultBQ() *admission.BoundedQueue {
return admission.NewBoundedQueue(int64(100000), int64(10))
}

type compareJSONTraces struct{ ptrace.Traces }
type compareJSONMetrics struct{ pmetric.Metrics }
Expand Down Expand Up @@ -106,7 +109,7 @@ func (healthyTestChannel) onConsume() error {
type unhealthyTestChannel struct{}

func (unhealthyTestChannel) onConsume() error {
return fmt.Errorf("consumer unhealthy")
return status.Errorf(codes.Unavailable, "consumer unhealthy")
}

type recvResult struct {
Expand Down Expand Up @@ -282,6 +285,14 @@ func statusInvalidFor(batchID int64, msg string) *arrowpb.BatchStatus {
}
}

func statusInternalFor(batchID int64, msg string) *arrowpb.BatchStatus {
return &arrowpb.BatchStatus{
BatchId: batchID,
StatusCode: arrowpb.StatusCode_INTERNAL,
StatusMessage: msg,
}
}

func statusExhaustedFor(batchID int64, msg string) *arrowpb.BatchStatus {
return &arrowpb.BatchStatus{
BatchId: batchID,
Expand All @@ -290,6 +301,14 @@ func statusExhaustedFor(batchID int64, msg string) *arrowpb.BatchStatus {
}
}

func statusUnauthenticatedFor(batchID int64, msg string) *arrowpb.BatchStatus {
return &arrowpb.BatchStatus{
BatchId: batchID,
StatusCode: arrowpb.StatusCode_INVALID_ARGUMENT,
StatusMessage: msg,
}
}

func (ctc *commonTestCase) newRealConsumer() arrowRecord.ConsumerAPI {
mock := arrowRecordMock.NewMockConsumerAPI(ctc.ctrl)
cons := arrowRecord.NewConsumer()
Expand Down Expand Up @@ -358,10 +377,26 @@ func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, bq
}

func requireCanceledStatus(t *testing.T, err error) {
requireStatus(t, codes.Canceled, err)
}

func requireUnavailableStatus(t *testing.T, err error) {
requireStatus(t, codes.Unavailable, err)
}

func requireInternalStatus(t *testing.T, err error) {
requireStatus(t, codes.Internal, err)
}

func requireExhaustedStatus(t *testing.T, err error) {
requireStatus(t, codes.ResourceExhausted, err)
}

func requireStatus(t *testing.T, code codes.Code, err error) {
require.Error(t, err)
status, ok := status.FromError(err)
require.True(t, ok, "is status-wrapped %v", err)
require.Equal(t, codes.Canceled, status.Code())
require.Equal(t, code, status.Code())
}

func TestBoundedQueueWithPdataHeaders(t *testing.T) {
Expand Down Expand Up @@ -437,23 +472,21 @@ func TestBoundedQueueWithPdataHeaders(t *testing.T) {

var bq *admission.BoundedQueue
if tt.rejected {
ctc.stream.EXPECT().Send(statusUnavailableFor(batch.BatchId, "rejecting request, request size larger than configured limit")).Times(1).Return(fmt.Errorf("rejecting request, request size larger than configured limit"))
// make the boundedqueue limit be slightly less than the uncompressed size
bq = admission.NewBoundedQueue(int64(sizer.TracesSize(td)-100), int64(10))
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0)
bq = admission.NewBoundedQueue(int64(sizer.TracesSize(td)-100), 10)
} else {
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil)
bq = admission.NewBoundedQueue(defaultBoundedQueueLimit, int64(10))
bq = admission.NewBoundedQueue(defaultBoundedQueueLimit, 10)
}

ctc.start(ctc.newRealConsumer, bq)
ctc.putBatch(batch, nil)

if tt.rejected {
ctc.cancel()
}

select {
case data := <-ctc.consume:
err := ctc.wait()
requireExhaustedStatus(t, err)
} else {
data := <-ctc.consume
actualTD := data.Data.(ptrace.Traces)
otelAssert.Equiv(stdTesting, []json.Marshaler{
compareJSONTraces{td},
Expand All @@ -462,8 +495,6 @@ func TestBoundedQueueWithPdataHeaders(t *testing.T) {
})
err = ctc.cancelAndWait()
requireCanceledStatus(t, err)
case err = <-ctc.streamErr:
requireCanceledStatus(t, err)
}
})
}
Expand All @@ -480,7 +511,7 @@ func TestReceiverTraces(t *testing.T) {

ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil)

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())
ctc.putBatch(batch, nil)

otelAssert.Equiv(stdTesting, []json.Marshaler{
Expand All @@ -503,7 +534,7 @@ func TestReceiverLogs(t *testing.T) {

ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil)

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())
ctc.putBatch(batch, nil)

assert.EqualValues(t, []json.Marshaler{compareJSONLogs{ld}}, []json.Marshaler{compareJSONLogs{(<-ctc.consume).Data.(plog.Logs)}})
Expand All @@ -523,7 +554,7 @@ func TestReceiverMetrics(t *testing.T) {

ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil)

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())
ctc.putBatch(batch, nil)

otelAssert.Equiv(stdTesting, []json.Marshaler{
Expand All @@ -540,7 +571,7 @@ func TestReceiverRecvError(t *testing.T) {
tc := healthyTestChannel{}
ctc := newCommonTestCase(t, tc)

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())

ctc.putBatch(nil, fmt.Errorf("test recv error"))

Expand All @@ -557,16 +588,27 @@ func TestReceiverSendError(t *testing.T) {
batch, err := ctc.testProducer.BatchArrowRecordsFromLogs(ld)
require.NoError(t, err)

ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(fmt.Errorf("test send error"))
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(status.Errorf(codes.Unavailable, "test send error"))

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())
ctc.putBatch(batch, nil)

assert.EqualValues(t, ld, (<-ctc.consume).Data)

start := time.Now()
for time.Since(start) < 10*time.Second {
if ctc.ctrl.Satisfied() {
break
}
time.Sleep(time.Second)
}

// Release the receiver -- the sender has seen an error by
// now and should return the stream. (Oddly, gRPC has no way
// to signal the receive call to fail using context.)
close(ctc.receive)
err = ctc.wait()
require.Error(t, err)
require.Contains(t, err.Error(), "test send error")
requireUnavailableStatus(t, err)
}

func TestReceiverConsumeError(t *testing.T) {
Expand Down Expand Up @@ -600,7 +642,7 @@ func TestReceiverConsumeError(t *testing.T) {

ctc.stream.EXPECT().Send(statusUnavailableFor(batch.BatchId, "consumer unhealthy")).Times(1).Return(nil)

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())

ctc.putBatch(batch, nil)

Expand Down Expand Up @@ -638,7 +680,7 @@ func TestReceiverInvalidData(t *testing.T) {
}

for _, item := range data {
tc := unhealthyTestChannel{}
tc := healthyTestChannel{}
ctc := newCommonTestCase(t, tc)

var batch *arrowpb.BatchArrowRecords
Expand All @@ -657,13 +699,12 @@ func TestReceiverInvalidData(t *testing.T) {

batch = copyBatch(batch)

ctc.stream.EXPECT().Send(statusInvalidFor(batch.BatchId, "Permanent error: test invalid error")).Times(1).Return(nil)

ctc.start(ctc.newErrorConsumer, defaultBQ)
// newErrorConsumer determines the internal error in decoding above
ctc.start(ctc.newErrorConsumer, defaultBQ())
ctc.putBatch(batch, nil)

err = ctc.cancelAndWait()
requireCanceledStatus(t, err)
err = ctc.wait()
requireInternalStatus(t, err)
}
}

Expand Down Expand Up @@ -694,13 +735,13 @@ func TestReceiverMemoryLimit(t *testing.T) {

batch = copyBatch(batch)

ctc.stream.EXPECT().Send(statusExhaustedFor(batch.BatchId, "Permanent error: test oom error "+arrowRecord.ErrConsumerMemoryLimit.Error())).Times(1).Return(nil)
// The Recv() returns an error, there are no Send() calls.

ctc.start(ctc.newOOMConsumer, defaultBQ)
ctc.start(ctc.newOOMConsumer, defaultBQ())
ctc.putBatch(batch, nil)

err = ctc.cancelAndWait()
requireCanceledStatus(t, err)
err = ctc.wait()
requireExhaustedStatus(t, err)
}
}

Expand Down Expand Up @@ -743,7 +784,7 @@ func TestReceiverEOF(t *testing.T) {

ctc.stream.EXPECT().Send(gomock.Any()).Times(times).Return(nil)

ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())

go func() {
for i := 0; i < times; i++ {
Expand Down Expand Up @@ -808,7 +849,7 @@ func testReceiverHeaders(t *testing.T, includeMeta bool) {

ctc.stream.EXPECT().Send(gomock.Any()).Times(len(expectData)).Return(nil)

ctc.start(ctc.newRealConsumer, defaultBQ, func(gsettings *configgrpc.ServerConfig, _ *auth.Server) {
ctc.start(ctc.newRealConsumer, defaultBQ(), func(gsettings *configgrpc.ServerConfig, _ *auth.Server) {
gsettings.IncludeMetadata = includeMeta
})

Expand Down Expand Up @@ -880,7 +921,7 @@ func TestReceiverCancel(t *testing.T) {
ctc := newCommonTestCase(t, tc)

ctc.cancel()
ctc.start(ctc.newRealConsumer, defaultBQ)
ctc.start(ctc.newRealConsumer, defaultBQ())

err := ctc.wait()
requireCanceledStatus(t, err)
Expand Down Expand Up @@ -1170,7 +1211,7 @@ func testReceiverAuthHeaders(t *testing.T, includeMeta bool, dataAuth bool) {
})

var authCall *gomock.Call
ctc.start(ctc.newRealConsumer, defaultBQ, func(gsettings *configgrpc.ServerConfig, authPtr *auth.Server) {
ctc.start(ctc.newRealConsumer, defaultBQ(), func(gsettings *configgrpc.ServerConfig, authPtr *auth.Server) {
gsettings.IncludeMetadata = includeMeta

as := mock.NewMockServer(ctc.ctrl)
Expand Down
Loading