Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"sync"
"testing"

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
Expand Down Expand Up @@ -37,6 +38,7 @@ type testChannel interface {
onRecv(context.Context) func() (*arrowpb.BatchStatus, error)
onSend(context.Context) func(*arrowpb.BatchArrowRecords) error
onConnect(context.Context) error
onCloseSend() func() error
}

type commonTestCase struct {
Expand Down Expand Up @@ -110,7 +112,7 @@ func (ctc *commonTestCase) newMockStream(ctx context.Context) *commonTestStream
gomock.Any(), // *arrowpb.BatchArrowRecords
).Times(0),
recvCall: client.EXPECT().Recv().Times(0),
closeSendCall: client.EXPECT().CloseSend().AnyTimes().Return(nil),
closeSendCall: client.EXPECT().CloseSend().Times(0),
}
return testStream
}
Expand All @@ -137,6 +139,7 @@ func (ctc *commonTestCase) returnNewStream(hs ...testChannel) func(context.Conte
str := ctc.newMockStream(ctx)
str.sendCall.AnyTimes().DoAndReturn(h.onSend(ctx))
str.recvCall.AnyTimes().DoAndReturn(h.onRecv(ctx))
str.closeSendCall.AnyTimes().DoAndReturn(h.onCloseSend())
return str.anyStreamClient, nil
}
}
Expand All @@ -158,12 +161,14 @@ func (ctc *commonTestCase) repeatedNewStream(nc func() testChannel) func(context
str := ctc.newMockStream(ctx)
str.sendCall.AnyTimes().DoAndReturn(h.onSend(ctx))
str.recvCall.AnyTimes().DoAndReturn(h.onRecv(ctx))
str.closeSendCall.AnyTimes().DoAndReturn(h.onCloseSend())
return str.anyStreamClient, nil
}
}

// healthyTestChannel accepts the connection and returns an OK status immediately.
type healthyTestChannel struct {
lock sync.Mutex

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock is needed to avoid a race because CloseSend() now closes the sent channel, which some former tests were doing manually. Now CloseSend() is always called and the closed channel serves to assist with some tests.

Comment thread
jmacd marked this conversation as resolved.
sent chan *arrowpb.BatchArrowRecords
recv chan *arrowpb.BatchStatus
}
Expand All @@ -175,14 +180,33 @@ func newHealthyTestChannel() *healthyTestChannel {
}
}

func (tc *healthyTestChannel) doClose() {
tc.lock.Lock()
defer tc.lock.Unlock()
if tc.sent != nil {
close(tc.sent)
tc.sent = nil
}
}

func (tc *healthyTestChannel) onConnect(_ context.Context) error {
return nil
}

func (tc *healthyTestChannel) onCloseSend() func() error {
return func() error {
tc.doClose()
return nil
}
}

func (tc *healthyTestChannel) onSend(ctx context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(req *arrowpb.BatchArrowRecords) error {
tc.lock.Lock()
sent := tc.sent
tc.lock.Unlock()
select {
case tc.sent <- req:
case sent <- req:
return nil
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -221,6 +245,12 @@ func (tc *unresponsiveTestChannel) onConnect(_ context.Context) error {
return nil
}

func (tc *unresponsiveTestChannel) onCloseSend() func() error {
return func() error {
return nil
}
}

func (tc *unresponsiveTestChannel) onSend(ctx context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(req *arrowpb.BatchArrowRecords) error {
select {
Expand Down Expand Up @@ -263,6 +293,12 @@ func (tc *arrowUnsupportedTestChannel) onConnect(_ context.Context) error {
return nil
}

func (tc *arrowUnsupportedTestChannel) onCloseSend() func() error {
return func() error {
return nil
}
}

func (tc *arrowUnsupportedTestChannel) onSend(ctx context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(req *arrowpb.BatchArrowRecords) error {
<-ctx.Done()
Expand Down Expand Up @@ -290,6 +326,12 @@ func (tc *disconnectedTestChannel) onConnect(ctx context.Context) error {
return ctx.Err()
}

func (tc *disconnectedTestChannel) onCloseSend() func() error {
return func() error {
panic("unreachable")
}
}

func (tc *disconnectedTestChannel) onSend(_ context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(req *arrowpb.BatchArrowRecords) error {
panic("unreachable")
Expand Down Expand Up @@ -317,6 +359,12 @@ func (tc *sendErrorTestChannel) onConnect(_ context.Context) error {
return nil
}

func (tc *sendErrorTestChannel) onCloseSend() func() error {
return func() error {
return nil
}
}

func (tc *sendErrorTestChannel) onSend(_ context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(*arrowpb.BatchArrowRecords) error {
return io.EOF
Expand Down Expand Up @@ -346,6 +394,12 @@ func (tc *connectErrorTestChannel) onConnect(_ context.Context) error {
return fmt.Errorf("test connect error")
}

func (tc *connectErrorTestChannel) onCloseSend() func() error {
return func() error {
panic("unreachable")
}
}

func (tc *connectErrorTestChannel) onSend(_ context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(*arrowpb.BatchArrowRecords) error {
panic("not reached")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ type Exporter struct {
// Exporter, used for shutdown.
cancel context.CancelFunc

// wg counts one per active goroutine belonging to all strings
// shutdown is closed when Shutdown is called, which allows clients
// not to block indefinitely, if for some reason there are clients
// shutting down out-of-order.
shutdown chan struct{}

// wg counts one per active goroutine belonging to all streams
// of this exporter. The wait group has Add(1) called before
// starting goroutines so that they can be properly waited for
// in shutdown(), so the pattern is:
Expand Down Expand Up @@ -120,6 +125,7 @@ func NewExporter(
streamClient: streamClient,
perRPCCredentials: perRPCCredentials,
returning: make(chan *Stream, numStreams),
shutdown: make(chan struct{}),
netReporter: netReporter,
}
}
Expand Down Expand Up @@ -228,6 +234,8 @@ func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) {
select {
case <-ctx.Done():
err = ctx.Err()
case <-e.shutdown:
err = context.Canceled

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test flake was related to this code here. Shutdown does not cancel the request context, so we needed another signal to cleanly exit the exporter.

case stream = <-e.ready.readyChannel():
}

Expand All @@ -251,6 +259,7 @@ func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) {

// Shutdown returns when all Arrow-associated goroutines have returned.
func (e *Exporter) Shutdown(_ context.Context) error {
close(e.shutdown)
Comment thread
jmacd marked this conversation as resolved.
Outdated
e.cancel()
e.wg.Wait()
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,20 @@ type exporterTestCase struct {
}

func newSingleStreamTestCase(t *testing.T) *exporterTestCase {
return newExporterTestCaseCommon(t, NotNoisy, 1, false, nil)
return newExporterTestCaseCommon(t, NotNoisy, defaultMaxStreamLifetime, 1, false, nil)
}

func newShortLifetimeStreamTestCase(t *testing.T) *exporterTestCase {
return newExporterTestCaseCommon(t, NotNoisy, time.Second/2, 1, false, nil)
}

func newSingleStreamDowngradeDisabledTestCase(t *testing.T) *exporterTestCase {
return newExporterTestCaseCommon(t, NotNoisy, 1, true, nil)
return newExporterTestCaseCommon(t, NotNoisy, defaultMaxStreamLifetime, 1, true, nil)
}

func newSingleStreamMetadataTestCase(t *testing.T) *exporterTestCase {
var count int
return newExporterTestCaseCommon(t, NotNoisy, 1, false, func(ctx context.Context) (map[string]string, error) {
return newExporterTestCaseCommon(t, NotNoisy, defaultMaxStreamLifetime, 1, false, func(ctx context.Context) (map[string]string, error) {
defer func() { count++ }()
if count%2 == 0 {
return nil, nil
Expand All @@ -82,7 +86,7 @@ func newSingleStreamMetadataTestCase(t *testing.T) *exporterTestCase {
}

func newExporterNoisyTestCase(t *testing.T, numStreams int) *exporterTestCase {
return newExporterTestCaseCommon(t, Noisy, numStreams, false, nil)
return newExporterTestCaseCommon(t, Noisy, defaultMaxStreamLifetime, numStreams, false, nil)
}

func copyBatch[T any](recordFunc func(T) (*arrowpb.BatchArrowRecords, error)) func(T) (*arrowpb.BatchArrowRecords, error) {
Expand Down Expand Up @@ -117,7 +121,7 @@ func copyBatch[T any](recordFunc func(T) (*arrowpb.BatchArrowRecords, error)) fu
}
}

func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, numStreams int, disableDowngrade bool, metadataFunc func(ctx context.Context) (map[string]string, error)) *exporterTestCase {
func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, maxLifetime time.Duration, numStreams int, disableDowngrade bool, metadataFunc func(ctx context.Context) (map[string]string, error)) *exporterTestCase {
ctc := newCommonTestCase(t, noisy)

if metadataFunc == nil {
Expand All @@ -128,7 +132,7 @@ func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, numStreams int, di
})
}

exp := NewExporter(defaultMaxStreamLifetime, numStreams, disableDowngrade, ctc.telset, nil, func() arrowRecord.ProducerAPI {
exp := NewExporter(maxLifetime, numStreams, disableDowngrade, ctc.telset, nil, func() arrowRecord.ProducerAPI {
// Mock the close function, use a real producer for testing dataflow.
mock := arrowRecordMock.NewMockProducerAPI(ctc.ctrl)
prod := arrowRecord.NewProducer()
Expand Down Expand Up @@ -506,8 +510,8 @@ func TestArrowExporterStreaming(t *testing.T) {
expectOutput = append(expectOutput, input)
}
// Stop the test conduit started above. If the sender were
// still sending, it would panic on a closed channel.
close(channel.sent)
// still sending, the test would panic on a closed channel.
channel.doClose()
wg.Wait()

// As this equality check doesn't support out of order slices,
Expand Down Expand Up @@ -569,8 +573,8 @@ func TestArrowExporterHeaders(t *testing.T) {
require.True(t, sent)
}
// Stop the test conduit started above. If the sender were
// still sending, it would panic on a closed channel.
close(channel.sent)
// still sending, the test would panic on a closed channel.
channel.doClose()
wg.Wait()

require.Equal(t, expectOutput, actualOutput)
Expand Down Expand Up @@ -640,8 +644,8 @@ func TestArrowExporterIsTraced(t *testing.T) {
require.True(t, sent)
}
// Stop the test conduit started above. If the sender were
// still sending, it would panic on a closed channel.
close(channel.sent)
// still sending, the test would panic on a closed channel.
channel.doClose()
wg.Wait()

require.Equal(t, expectOutput, actualOutput)
Expand All @@ -658,3 +662,65 @@ func TestAddJitter(t *testing.T) {
require.Less(t, x, 20*time.Minute)
}
}

// TestArrowExporterStreamLifetimeAndShutdown exercises multiple
// stream lifetimes and then shuts down, inspects the logs for
// legibility.
func TestArrowExporterStreamLifetimeAndShutdown(t *testing.T) {
tc := newShortLifetimeStreamTestCase(t)

var wg sync.WaitGroup

var expectCount uint64
var actualCount uint64

tc.traceCall.AnyTimes().DoAndReturn(func(ctx context.Context, opts ...grpc.CallOption) (
arrowpb.ArrowTracesService_ArrowTracesClient,
error,
) {
wg.Add(1)
channel := newHealthyTestChannel()

go func() {
defer wg.Done()
testCon := arrowRecord.NewConsumer()

for data := range channel.sent {
traces, err := testCon.TracesFrom(data)
require.NoError(t, err)
require.Equal(t, 1, len(traces))
atomic.AddUint64(&actualCount, 1)
channel.recv <- statusOKFor(data.BatchId)
}

// @@@ is this???
close(channel.recv)
}()

return tc.returnNewStream(channel)(ctx, opts...)
})

bg := context.Background()
require.NoError(t, tc.exporter.Start(bg))

start := time.Now()
// This is 10 stream lifetimes using the "ShortLifetime" test.
for time.Since(start) < 5*time.Second {
input := testdata.GenerateTraces(2)
ctx := context.Background()

sent, err := tc.exporter.SendAndWait(ctx, input)
require.NoError(t, err)
require.True(t, sent)

expectCount++
}

require.NoError(t, tc.exporter.Shutdown(bg))

require.Equal(t, expectCount, actualCount)

wg.Wait()

require.Empty(t, tc.observedLogs.All())
}
Loading