Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions collector/receiver/otelarrowreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ When the limit is reached, the receiver will return RESOURCE_EXHAUSTED
error codes to the receiver, which are [conditionally retryable, see
exporter retry configuration](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md).

- `admission_limit_mib` (default: 64): limits the number of requests that are received by the stream based on request size information available. This should not be confused with `memory_limit_mib` which limits allocations made by the consumer when translating arrow records into pdata objects. i.e. request size is used to control how much traffic we admit, but does not control how much memory is used during request processing.

- `waiter_limit` (default: 1000): limits the number of requests waiting on admission once `admission_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed.

`admission_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/otel-arrow/tree/main/collector/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline.

### Compression Configuration

In the `arrow` configuration block, `zstd` sub-section applies to all
Expand Down
3 changes: 3 additions & 0 deletions collector/receiver/otelarrowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type ArrowConfig struct {
// passing through, they will see ResourceExhausted errors.
MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"`

// AdmissionLimitMiB limits the number of requests that are received by the stream based on
// request size information available. Request size is used to control how much traffic we admit
// for processing, but does not control how much memory is used during request processing.
AdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"`

// WaiterLimit is the limit on the number of waiters waiting to be processed and consumed.
Expand Down
2 changes: 2 additions & 0 deletions collector/receiver/otelarrowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestUnmarshalConfig(t *testing.T) {
Arrow: ArrowConfig{
MemoryLimitMiB: 123,
AdmissionLimitMiB: 80,
WaiterLimit: 100,
},
},
}, cfg)
Expand All @@ -104,6 +105,7 @@ func TestUnmarshalConfigUnix(t *testing.T) {
Arrow: ArrowConfig{
MemoryLimitMiB: defaultMemoryLimitMiB,
AdmissionLimitMiB: defaultAdmissionLimitMiB,
WaiterLimit: defaultWaiterLimit,
},
},
}, cfg)
Expand Down
2 changes: 2 additions & 0 deletions collector/receiver/otelarrowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (

defaultMemoryLimitMiB = 128
defaultAdmissionLimitMiB = defaultMemoryLimitMiB / 2
defaultWaiterLimit = 1000
)

// NewFactory creates a new OTLP receiver factory.
Expand Down Expand Up @@ -48,6 +49,7 @@ func createDefaultConfig() component.Config {
Arrow: ArrowConfig{
MemoryLimitMiB: defaultMemoryLimitMiB,
AdmissionLimitMiB: defaultAdmissionLimitMiB,
WaiterLimit: defaultWaiterLimit,
},
},
}
Expand Down
25 changes: 17 additions & 8 deletions collector/receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ type Receiver struct {
recvInFlightRequests metric.Int64UpDownCounter
boundedQueue *admission.BoundedQueue
inFlightWG sync.WaitGroup
pendingCh chan batchResp
}

// New creates a new Receiver reference.
Expand Down Expand Up @@ -369,6 +368,7 @@ func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retEr
}()

doneCtx, doneCancel := context.WithCancel(streamCtx)
defer doneCancel()
streamErrCh := make(chan error, 2)
pendingCh := make(chan batchResp, runtime.NumCPU())

Expand Down Expand Up @@ -423,7 +423,7 @@ func (r *Receiver) recvOne(ctx context.Context, serverStream anyStreamServer, hr
thisCtx, authHdrs, err := hrcv.combineHeaders(ctx, req.GetHeaders())
if err != nil {
// Failing to parse the incoming headers breaks the stream.
return fmt.Errorf("arrow metadata error: %v", err)
return fmt.Errorf("arrow metadata error: %w", err)
}
var prevAcquiredBytes int
uncompSizeStr, sizeHeaderFound := authHdrs["otlp-pdata-size"]
Expand All @@ -433,15 +433,15 @@ func (r *Receiver) recvOne(ctx context.Context, serverStream anyStreamServer, hr
} else {
prevAcquiredBytes, err = strconv.Atoi(uncompSizeStr[0])
if err != nil {
return fmt.Errorf("failed to convert string to request size: %v", err)
return fmt.Errorf("failed to convert string to request size: %w", err)
}
}
// bounded queue to memory limit based on incoming uncompressed request size and waiters.
// Acquire will fail immediately if there are too many waiters,
// or will otherwise block until timeout or enough memory becomes available.
err = r.boundedQueue.Acquire(ctx, int64(prevAcquiredBytes))
if err != nil {
return fmt.Errorf("breaking stream: %v", err)
return fmt.Errorf("breaking stream: %w", err)
}

resp := batchResp{
Expand All @@ -463,7 +463,7 @@ func (r *Receiver) recvOne(ctx context.Context, serverStream anyStreamServer, hr
// processAndConsume will process and send an error to the sender loop
go func() {
defer r.inFlightWG.Done() // done processing
err = r.processAndConsume(thisCtx, method, ac, req, serverStream, authErr, resp, sizeHeaderFound)
err = r.processAndConsume(thisCtx, method, ac, req, authErr, resp, sizeHeaderFound)
resp.err = err
pendingCh <- resp
}()
Expand Down Expand Up @@ -514,7 +514,12 @@ func (r *Receiver) sendOne(serverStream anyStreamServer, resp batchResp) error {
r.logStreamError(err)
return err
}
r.boundedQueue.Release(resp.bytesToRelease)

err = r.boundedQueue.Release(resp.bytesToRelease)
if err != nil {
r.logStreamError(err)
return err
}

return nil
}
Expand Down Expand Up @@ -554,7 +559,7 @@ func (r *Receiver) srvSendLoop(ctx context.Context, serverStream anyStreamServer
}
}

func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, serverStream anyStreamServer, authErr error, response batchResp, sizeHeaderFound bool) error {
func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, authErr error, response batchResp, sizeHeaderFound bool) error {
var err error

ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_recv")
Expand Down Expand Up @@ -758,7 +763,11 @@ func (r *Receiver) acquireAdditionalBytes(ctx context.Context, uncompSize int64,
// acquired bytes to prevent deadlock and reacquire the uncompressed size we just calculated.
// Note: No need to release and reacquire bytes if diff < 0 because this has less impact and no reason to potentially block
// a request that is in flight by reacquiring the correct size.
r.boundedQueue.Release(response.bytesToRelease)
err = r.boundedQueue.Release(response.bytesToRelease)
if err != nil {
return err
}

err = r.boundedQueue.Acquire(ctx, uncompSize)
if err != nil {
response.bytesToRelease = int64(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func TestBoundedQueueWithPdataHeaders(t *testing.T) {
if tt.includePdataHeader {
var hpb bytes.Buffer
hpe := hpack.NewEncoder(&hpb)
err := hpe.WriteField(hpack.HeaderField{
err = hpe.WriteField(hpack.HeaderField{
Name: "otlp-pdata-size",
Value: tt.pdataSize,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ protocols:
arrow:
memory_limit_mib: 123
admission_limit_mib: 80
waiter_limit: 100