Skip to content

Commit

Permalink
Implemented disabled queue
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Jan 18, 2025
1 parent a3c7d95 commit 1b1ad11
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 9 deletions.
18 changes: 18 additions & 0 deletions .chloggen/disabled_queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds "disabled" queue which is used when the user sets up batching but not queuing.

# One or more tracking issues or pull requests related to the change
issues: [8122, 10368]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
12 changes: 8 additions & 4 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,23 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
return nil, err
}

if be.queueCfg.Enabled {
if usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
be.queueFactory = exporterqueue.NewDisabledQueueFactory[internal.Request]()
be.queueCfg.QueueSize = be.BatcherCfg.MinSizeItems
}
if be.queueFactory != nil {
q := be.queueFactory(
context.Background(),
exporterqueue.Settings{
Signal: signal,
ExporterSettings: be.Set,
},
be.queueCfg)
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg)
be.QueueSender = NewQueueSender(
q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg, be.queueCfg.Enabled)
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func TestBatchSender_PostShutdown(t *testing.T) {
assert.Equal(t, int64(8), sink.itemsCount.Load())
})
}
runTest("enable_queue_batcher", true)
// We don't expect the same behavior when enable the new queue_batcher
runTest("disable_queue_batcher", false)
}

Expand Down
12 changes: 9 additions & 3 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type QueueSender struct {
batcher queue.Batcher
consumers *queue.Consumers[internal.Request]

enabled bool

obsrep *ObsReport
exporterID component.ID
logger *zap.Logger
Expand All @@ -89,6 +91,7 @@ func NewQueueSender(
exportFailureMessage string,
obsrep *ObsReport,
batcherCfg exporterbatcher.Config,
enabled bool,
) *QueueSender {
qs := &QueueSender{
queue: q,
Expand All @@ -97,6 +100,7 @@ func NewQueueSender(
obsrep: obsrep,
exporterID: set.ID,
logger: set.Logger,
enabled: enabled,
}

exportFunc := func(ctx context.Context, req internal.Request) error {
Expand Down Expand Up @@ -180,10 +184,12 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error {
func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error {
// Prevent cancellation and deadline to propagate to the context stored in the queue.
// The grpc/http based receivers will cancel the request context after this function returns.
c := context.WithoutCancel(ctx)
if qs.enabled {
ctx = context.WithoutCancel(ctx)
}

span := trace.SpanFromContext(c)
if err := qs.queue.Offer(c, req); err != nil {
span := trace.SpanFromContext(ctx)
if err := qs.queue.Offer(ctx, req); err != nil {
span.AddEvent("Failed to enqueue item.", trace.WithAttributes(qs.traceAttribute))
return err
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func TestQueueSenderNoStartShutdown(t *testing.T) {
ExporterCreateSettings: set,
})
require.NoError(t, err)
qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig())
qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig(), true)
assert.NoError(t, qs.Shutdown(context.Background()))
})
}
Expand Down
87 changes: 87 additions & 0 deletions exporter/exporterqueue/disabled_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
)

// boundedQueue blocks insert until the batch containing the request is sent out.
type disabledQueue[T any] struct {
component.StartFunc
*sizedQueue[disabledMemQueueEl[T]]

mu sync.Mutex
nextIndex uint64
doneCh map[uint64](chan error)
}

type disabledMemQueueEl[T any] struct {
index uint64
req T
}

// QueueSettings defines internal parameters for boundedQueue creation.
type disabledQueueSettings[T any] struct {
sizer sizer[T]
capacity int64
}

type disabledQueueSizer[T any] struct {
sizer sizer[T]
}

func (s disabledQueueSizer[T]) Sizeof(item disabledMemQueueEl[T]) int64 {
return s.sizer.Sizeof(item.req)
}

// NewBoundedQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewDisabledQueue[T any](set disabledQueueSettings[T]) Queue[T] {
return &disabledQueue[T]{
sizedQueue: newSizedQueue[disabledMemQueueEl[T]](
set.capacity,
disabledQueueSizer[T]{sizer: set.sizer},
true /*blocking*/),
doneCh: make(map[uint64](chan error)),
}
}

// Offer is used by the producer to submit new item to the queue. It will block until OnProcessingFinished is called
// on the request.
func (q *disabledQueue[T]) Offer(ctx context.Context, req T) error {
q.mu.Lock()
index := q.nextIndex
q.nextIndex++
done := make(chan error)
q.doneCh[index] = done

if err := q.sizedQueue.Offer(
ctx,
disabledMemQueueEl[T]{req: req, index: index}); err != nil {
delete(q.doneCh, index)
q.mu.Unlock()
return err
}

Check warning on line 69 in exporter/exporterqueue/disabled_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterqueue/disabled_queue.go#L66-L69

Added lines #L66 - L69 were not covered by tests
q.mu.Unlock()
err := <-done
return err
}

func (q *disabledQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
ctx, item, ok := q.sizedQueue.pop()
return item.index, ctx, item.req, ok
}

// OnProcessingFinished unblocks unblocks offer.
func (q *disabledQueue[T]) OnProcessingFinished(index uint64, err error) {
q.mu.Lock()
defer q.mu.Unlock()

q.doneCh[index] <- err
delete(q.doneCh, index)
}
37 changes: 37 additions & 0 deletions exporter/exporterqueue/disabled_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterqueue

import (
"context"
"errors"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBlockingMemoryQueue(t *testing.T) {
var wg sync.WaitGroup
q := NewDisabledQueue[string](disabledQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1})

err := errors.New("This is an error")
wg.Add(1)
go func() {
assert.EqualError(t, q.Offer(context.Background(), "a"), err.Error()) // Blocks until OnProcessingFinished is called
wg.Done()
}()

index, ctx, req, ok := q.Read(context.Background())
for !ok {
index, ctx, req, ok = q.Read(context.Background())
}

require.Equal(t, uint64(0), index)
require.Equal(t, context.Background(), ctx)
require.Equal(t, "a", req)
q.OnProcessingFinished(index, err)
wg.Wait()
}
12 changes: 12 additions & 0 deletions exporter/exporterqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,15 @@ func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings P
})
}
}

// NewDisabledQueueFactory returns a factory to create a new disabled queue.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewDisabledQueueFactory[T any]() Factory[T] {
return func(_ context.Context, _ Settings, cfg Config) Queue[T] {
return NewDisabledQueue[T](disabledQueueSettings[T]{
sizer: &requestSizer[T]{},
capacity: int64(cfg.QueueSize),
})
}
}

0 comments on commit 1b1ad11

Please sign in to comment.