Skip to content

Commit

Permalink
OTel-Arrow receiver timeout propagation (#34742)
Browse files Browse the repository at this point in the history
**Description:** Receiver side of
open-telemetry/otel-arrow#227. The exporter
side is
#34733.
 
**Link to tracking Issue:**
open-telemetry/otel-arrow#227

**Testing:** A new end-to-end integration test. ✅ 

**Documentation:** Since this is expected of gRPC receivers, no docs are
changed.

---------

Signed-off-by: Alex Boten <[email protected]>
Co-authored-by: Bogdan Drutu <[email protected]>
Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
3 people authored Sep 23, 2024
1 parent 41e26ab commit 23083a5
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 34 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-receiver-timeout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add gRPC timeout propagation.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34742]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
58 changes: 56 additions & 2 deletions internal/otelarrow/test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,15 @@ import (
type testParams struct {
threadCount int
requestUntil func(*testConsumer) bool

// missingDeadline is configured so the zero value implies a deadline,
// which is the default.
missingDeadline bool
}

type testConsumer struct {
t *testing.T

sink consumertest.TracesSink
sentSpans atomic.Int64

Expand All @@ -62,6 +68,8 @@ type testConsumer struct {

recvSpans *tracetest.InMemoryExporter
expSpans *tracetest.InMemoryExporter

expectDeadline bool
}

var _ consumer.Traces = &testConsumer{}
Expand All @@ -80,6 +88,19 @@ func (*testConsumer) Capabilities() consumer.Capabilities {

func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
time.Sleep(time.Duration(float64(time.Millisecond) * (1 + rand.Float64())))

dead, hasDeadline := ctx.Deadline()
timeout := time.Until(dead)

require.Equal(tc.t, tc.expectDeadline, hasDeadline, "deadline set or not set: %v", timeout)
if tc.expectDeadline {
// expect allows 1/6 of the deadline to elapse in transit,
// so 1m becomes 50s.
expect := tc.expCfg.TimeoutSettings.Timeout * 5 / 6
require.Less(tc.t, expect, timeout)
require.Greater(tc.t, tc.expCfg.TimeoutSettings.Timeout, timeout)
}

return tc.sink.ConsumeTraces(ctx, td)
}

Expand All @@ -100,7 +121,7 @@ func testLoggerSettings(_ *testing.T) (component.TelemetrySettings, *observer.Ob
return tset, obslogs, exp
}

func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces, receiver.Traces) {
func basicTestConfig(t *testing.T, tp testParams, cfgF CfgFunc) (*testConsumer, exporter.Traces, receiver.Traces) {
ctx := context.Background()

efact := otelarrowexporter.NewFactory()
Expand All @@ -115,6 +136,7 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces
addr := testutil.GetAvailableLocalAddress(t)

receiverCfg.Protocols.GRPC.NetAddr.Endpoint = addr

exporterCfg.ClientConfig.Endpoint = addr
exporterCfg.ClientConfig.WaitForReady = true
exporterCfg.ClientConfig.TLSSetting.Insecure = true
Expand All @@ -123,6 +145,7 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces
exporterCfg.RetryConfig.Enabled = true
exporterCfg.Arrow.NumStreams = 1
exporterCfg.Arrow.MaxStreamLifetime = 5 * time.Second
exporterCfg.Arrow.DisableDowngrade = true

if cfgF != nil {
cfgF(exporterCfg, receiverCfg)
Expand All @@ -132,6 +155,8 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces
recvTset, recvLogs, recvSpans := testLoggerSettings(t)

testCon := &testConsumer{
t: t,

recvCfg: receiverCfg,
expCfg: exporterCfg,

Expand All @@ -140,6 +165,8 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces

recvSpans: recvSpans,
expSpans: expSpans,

expectDeadline: !tp.missingDeadline,
}

receiver, err := rfact.CreateTracesReceiver(ctx, receiver.Settings{
Expand All @@ -161,7 +188,7 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces
func testIntegrationTraces(ctx context.Context, t *testing.T, tp testParams, cfgf CfgFunc, mkgen MkGen, errf ConsumerErrFunc, endf EndFunc) {
host := componenttest.NewNopHost()

testCon, exporter, receiver := basicTestConfig(t, cfgf)
testCon, exporter, receiver := basicTestConfig(t, tp, cfgf)

var startWG sync.WaitGroup
var exporterShutdownWG sync.WaitGroup
Expand Down Expand Up @@ -426,6 +453,33 @@ func TestIntegrationTracesSimple(t *testing.T) {
}
}

func TestIntegrationDeadlinePropagation(t *testing.T) {
for _, hasDeadline := range []bool{false, true} {
t.Run(fmt.Sprint("deadline=", hasDeadline), func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Until at least one span is written.
var params = testParams{
threadCount: 1,
requestUntil: func(test *testConsumer) bool {
return test.sink.SpanCount() < 1
},
missingDeadline: !hasDeadline,
}

testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, _ *RecvConfig) {
if !hasDeadline {
// 0 disables the exporthelper-set timeout.
ecfg.TimeoutSettings.Timeout = 0
} else {
ecfg.TimeoutSettings.Timeout = 37 * time.Minute
}
}, func() GenFunc { return makeTestTraces }, consumerSuccess, standardEnding)
})
}
}

func TestIntegrationMemoryLimited(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions receiver/otelarrowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelar
go 1.22.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.109.0
github.com/open-telemetry/otel-arrow v0.26.0
Expand Down
103 changes: 71 additions & 32 deletions receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
internalmetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata"
Expand Down Expand Up @@ -173,7 +174,9 @@ func newHeaderReceiver(streamCtx context.Context, as auth.Server, includeMetadat
// client.Info with additional key:values associated with the arrow batch.
func (h *headerReceiver) combineHeaders(ctx context.Context, hdrsBytes []byte) (context.Context, map[string][]string, error) {
if len(hdrsBytes) == 0 && len(h.streamHdrs) == 0 {
return ctx, nil, nil
// Note: call newContext in this case to ensure that
// connInfo is added to the context, for Auth.
return h.newContext(ctx, nil), nil, nil
}

if len(hdrsBytes) == 0 {
Expand Down Expand Up @@ -420,8 +423,8 @@ func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retEr
}
}

func (r *receiverStream) newInFlightData(ctx context.Context, method string, batchID int64, pendingCh chan<- batchResp) (context.Context, *inFlightData) {
ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight")
func (r *receiverStream) newInFlightData(ctx context.Context, method string, batchID int64, pendingCh chan<- batchResp) *inFlightData {
_, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight")

r.inFlightWG.Add(1)
r.telemetryBuilder.OtelArrowReceiverInFlightRequests.Add(ctx, 1)
Expand All @@ -433,7 +436,7 @@ func (r *receiverStream) newInFlightData(ctx context.Context, method string, bat
span: span,
}
id.refs.Add(1)
return ctx, id
return id
}

// inFlightData is responsible for storing the resources held by one request.
Expand Down Expand Up @@ -549,35 +552,43 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre

// Receive a batch corresponding with one ptrace.Traces, pmetric.Metrics,
// or plog.Logs item.
req, err := serverStream.Recv()
req, recvErr := serverStream.Recv()

// the incoming stream context is the parent of the in-flight context, which
// carries a span covering sequential stream-processing work. the context
// is severed at this point, with flight.span a contextless child that will be
// finished in recvDone().
flight := r.newInFlightData(streamCtx, method, req.GetBatchId(), pendingCh)

// inflightCtx is carried through into consumeAndProcess on the success path.
inflightCtx, flight := r.newInFlightData(streamCtx, method, req.GetBatchId(), pendingCh)
// this inherits the stream context so that its auth headers are present
// when the per-data Auth call is made.
inflightCtx := streamCtx
defer flight.recvDone(inflightCtx, &retErr)

if err != nil {
if errors.Is(err, io.EOF) {
return err
if recvErr != nil {
if errors.Is(recvErr, io.EOF) {
return recvErr

} else if errors.Is(err, context.Canceled) {
} else if errors.Is(recvErr, context.Canceled) {
// This is a special case to avoid introducing a span error
// for a canceled operation.
return io.EOF

} else if status, ok := status.FromError(err); ok && status.Code() == codes.Canceled {
} else if status, ok := status.FromError(recvErr); ok && status.Code() == codes.Canceled {
// This is a special case to avoid introducing a span error
// for a canceled operation.
return io.EOF
}
// Note: err is directly from gRPC, should already have status.
return err
return recvErr
}

// Check for optional headers and set the incoming context.
inflightCtx, authHdrs, err := hrcv.combineHeaders(inflightCtx, req.GetHeaders())
if err != nil {
inflightCtx, authHdrs, hdrErr := hrcv.combineHeaders(inflightCtx, req.GetHeaders())
if hdrErr != nil {
// Failing to parse the incoming headers breaks the stream.
return status.Errorf(codes.Internal, "arrow metadata error: %v", err)
return status.Errorf(codes.Internal, "arrow metadata error: %v", hdrErr)
}

// start this span after hrcv.combineHeaders returns extracted context. This will allow this span
Expand All @@ -601,9 +612,29 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
// This is a compressed size so make sure to acquire the difference when request is decompressed.
prevAcquiredBytes = int64(proto.Size(req))
} else {
prevAcquiredBytes, err = strconv.ParseInt(uncompSizeHeaderStr[0], 10, 64)
if err != nil {
return status.Errorf(codes.Internal, "failed to convert string to request size: %v", err)
var parseErr error
prevAcquiredBytes, parseErr = strconv.ParseInt(uncompSizeHeaderStr[0], 10, 64)
if parseErr != nil {
return status.Errorf(codes.Internal, "failed to convert string to request size: %v", parseErr)
}
}

var callerCancel context.CancelFunc
if encodedTimeout, has := authHdrs["grpc-timeout"]; has && len(encodedTimeout) == 1 {
if timeout, decodeErr := grpcutil.DecodeTimeout(encodedTimeout[0]); decodeErr != nil {
r.telemetry.Logger.Debug("grpc-timeout parse error", zap.Error(decodeErr))
} else {
// timeout parsed successfully
inflightCtx, callerCancel = context.WithTimeout(inflightCtx, timeout)

// if we return before the new goroutine is started below
// cancel the context. callerCancel will be non-nil until
// the new goroutine is created at the end of this function.
defer func() {
if callerCancel != nil {
callerCancel()
}
}()
}
}

Expand All @@ -612,19 +643,19 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
// immediately if there are too many waiters, or will
// otherwise block until timeout or enough memory becomes
// available.
err = r.boundedQueue.Acquire(inflightCtx, prevAcquiredBytes)
if err != nil {
return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue: %v", err)
acquireErr := r.boundedQueue.Acquire(inflightCtx, prevAcquiredBytes)
if acquireErr != nil {
return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue: %v", acquireErr)
}
flight.numAcquired = prevAcquiredBytes

data, numItems, uncompSize, err := r.consumeBatch(ac, req)
data, numItems, uncompSize, consumeErr := r.consumeBatch(ac, req)

if err != nil {
if errors.Is(err, arrowRecord.ErrConsumerMemoryLimit) {
return status.Errorf(codes.ResourceExhausted, "otel-arrow decode: %v", err)
if consumeErr != nil {
if errors.Is(consumeErr, arrowRecord.ErrConsumerMemoryLimit) {
return status.Errorf(codes.ResourceExhausted, "otel-arrow decode: %v", consumeErr)
}
return status.Errorf(codes.Internal, "otel-arrow decode: %v", err)
return status.Errorf(codes.Internal, "otel-arrow decode: %v", consumeErr)
}

flight.uncompSize = uncompSize
Expand All @@ -633,27 +664,35 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, uncompSize)
r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(inflightCtx, int64(numItems))

numAcquired, err := r.acquireAdditionalBytes(inflightCtx, prevAcquiredBytes, uncompSize, hrcv.connInfo.Addr, uncompSizeHeaderFound)
numAcquired, secondAcquireErr := r.acquireAdditionalBytes(inflightCtx, prevAcquiredBytes, uncompSize, hrcv.connInfo.Addr, uncompSizeHeaderFound)

flight.numAcquired = numAcquired
if err != nil {
return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue re-acquire: %v", err)
if secondAcquireErr != nil {
return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue re-acquire: %v", secondAcquireErr)
}

// Recognize that the request is still in-flight via consumeAndRespond()
flight.refs.Add(1)

// consumeAndRespond consumes the data and returns control to the sender loop.
go r.consumeAndRespond(inflightCtx, data, flight)
go func(callerCancel context.CancelFunc) {
if callerCancel != nil {
defer callerCancel()
}
r.consumeAndRespond(inflightCtx, streamCtx, data, flight)
}(callerCancel)

// Reset callerCancel so the deferred function above does not call it here.
callerCancel = nil

return nil
}

// consumeAndRespond finishes the span started in recvOne and logs the
// result after invoking the pipeline to consume the data.
func (r *Receiver) consumeAndRespond(ctx context.Context, data any, flight *inFlightData) {
func (r *Receiver) consumeAndRespond(ctx, streamCtx context.Context, data any, flight *inFlightData) {
var err error
defer flight.consumeDone(ctx, &err)
defer flight.consumeDone(streamCtx, &err)

// recoverErr is a special function because it recovers panics, so we
// keep it in a separate defer than the processing above, which will
Expand Down
Loading

0 comments on commit 23083a5

Please sign in to comment.