Skip to content

Commit 20a4b36

Browse files
committed
Implemented disabled queue
1 parent a3c7d95 commit 20a4b36

File tree

8 files changed

+173
-9
lines changed

8 files changed

+173
-9
lines changed

.chloggen/disabled_queue.yaml

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
5+
component: exporterhelper
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Adds "disabled" queue which is used when the user sets up batching but not queuing.
9+
10+
# One or more tracking issues or pull requests related to the change
11+
issues: [8122, 10368]
12+
13+
# Optional: The change log or logs in which this entry should be included.
14+
# e.g. '[user]' or '[user, api]'
15+
# Include 'user' if the change is relevant to end users.
16+
# Include 'api' if there is a change to a library API.
17+
# Default: '[user]'
18+
change_logs: [api]

exporter/exporterhelper/internal/base_exporter.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -90,19 +90,23 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
9090
return nil, err
9191
}
9292

93-
if be.queueCfg.Enabled {
93+
if usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
94+
be.queueFactory = exporterqueue.NewDisabledQueueFactory[internal.Request]()
95+
be.queueCfg.QueueSize = be.BatcherCfg.MinSizeItems
96+
}
97+
if be.queueFactory != nil {
9498
q := be.queueFactory(
9599
context.Background(),
96100
exporterqueue.Settings{
97101
Signal: signal,
98102
ExporterSettings: be.Set,
99103
},
100104
be.queueCfg)
101-
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg)
105+
be.QueueSender = NewQueueSender(
106+
q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg, be.queueCfg.Enabled)
102107
}
103108

104-
if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
105-
usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
109+
if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled {
106110
bs := NewBatchSender(be.BatcherCfg, be.Set)
107111
be.BatchSender = bs
108112
}

exporter/exporterhelper/internal/batch_sender_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func TestBatchSender_PostShutdown(t *testing.T) {
326326
assert.Equal(t, int64(8), sink.itemsCount.Load())
327327
})
328328
}
329-
runTest("enable_queue_batcher", true)
329+
// We don't expect the same behavior when disable_queue_batcher is true
330330
runTest("disable_queue_batcher", false)
331331
}
332332

exporter/exporterhelper/internal/queue_sender.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ type QueueSender struct {
7676
batcher queue.Batcher
7777
consumers *queue.Consumers[internal.Request]
7878

79+
enabled bool
80+
7981
obsrep *ObsReport
8082
exporterID component.ID
8183
logger *zap.Logger
@@ -89,6 +91,7 @@ func NewQueueSender(
8991
exportFailureMessage string,
9092
obsrep *ObsReport,
9193
batcherCfg exporterbatcher.Config,
94+
enabled bool,
9295
) *QueueSender {
9396
qs := &QueueSender{
9497
queue: q,
@@ -97,6 +100,7 @@ func NewQueueSender(
97100
obsrep: obsrep,
98101
exporterID: set.ID,
99102
logger: set.Logger,
103+
enabled: enabled,
100104
}
101105

102106
exportFunc := func(ctx context.Context, req internal.Request) error {
@@ -180,10 +184,12 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error {
180184
func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error {
181185
// Prevent cancellation and deadline to propagate to the context stored in the queue.
182186
// The grpc/http based receivers will cancel the request context after this function returns.
183-
c := context.WithoutCancel(ctx)
187+
if qs.enabled {
188+
ctx = context.WithoutCancel(ctx)
189+
}
184190

185-
span := trace.SpanFromContext(c)
186-
if err := qs.queue.Offer(c, req); err != nil {
191+
span := trace.SpanFromContext(ctx)
192+
if err := qs.queue.Offer(ctx, req); err != nil {
187193
span.AddEvent("Failed to enqueue item.", trace.WithAttributes(qs.traceAttribute))
188194
return err
189195
}

exporter/exporterhelper/internal/queue_sender_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ func TestQueueSenderNoStartShutdown(t *testing.T) {
553553
ExporterCreateSettings: set,
554554
})
555555
require.NoError(t, err)
556-
qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig())
556+
qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig(), true)
557557
assert.NoError(t, qs.Shutdown(context.Background()))
558558
})
559559
}
+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
5+
6+
import (
7+
"context"
8+
"sync"
9+
10+
"go.opentelemetry.io/collector/component"
11+
)
12+
13+
// boundedQueue blocks insert until the batch containing the request is sent out.
14+
type disabledQueue[T any] struct {
15+
component.StartFunc
16+
*sizedQueue[disabledMemQueueEl[T]]
17+
18+
mu sync.Mutex
19+
nextIndex uint64
20+
doneCh map[uint64](chan error)
21+
}
22+
23+
type disabledMemQueueEl[T any] struct {
24+
index uint64
25+
req T
26+
}
27+
28+
// QueueSettings defines internal parameters for boundedQueue creation.
29+
type disabledQueueSettings[T any] struct {
30+
sizer sizer[T]
31+
capacity int64
32+
}
33+
34+
type disabledQueueSizer[T any] struct {
35+
sizer sizer[T]
36+
}
37+
38+
func (s disabledQueueSizer[T]) Sizeof(item disabledMemQueueEl[T]) int64 {
39+
return s.sizer.Sizeof(item.req)
40+
}
41+
42+
// NewBoundedQueue constructs the new queue of specified capacity, and with an optional
43+
// callback for dropped items (e.g. useful to emit metrics).
44+
func NewDisabledQueue[T any](set disabledQueueSettings[T]) Queue[T] {
45+
return &disabledQueue[T]{
46+
sizedQueue: newSizedQueue[disabledMemQueueEl[T]](
47+
set.capacity,
48+
disabledQueueSizer[T]{sizer: set.sizer},
49+
true /*blocking*/),
50+
doneCh: make(map[uint64](chan error)),
51+
}
52+
}
53+
54+
// Offer is used by the producer to submit new item to the queue. It will block until OnProcessingFinished is called
55+
// on the request.
56+
func (q *disabledQueue[T]) Offer(ctx context.Context, req T) error {
57+
q.mu.Lock()
58+
index := q.nextIndex
59+
q.nextIndex++
60+
done := make(chan error)
61+
q.doneCh[index] = done
62+
63+
if err := q.sizedQueue.Offer(
64+
ctx,
65+
disabledMemQueueEl[T]{req: req, index: index}); err != nil {
66+
delete(q.doneCh, index)
67+
q.mu.Unlock()
68+
return err
69+
}
70+
q.mu.Unlock()
71+
err := <-done
72+
return err
73+
}
74+
75+
func (q *disabledQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
76+
ctx, item, ok := q.sizedQueue.pop()
77+
return item.index, ctx, item.req, ok
78+
}
79+
80+
// OnProcessingFinished unblocks unblocks offer.
81+
func (q *disabledQueue[T]) OnProcessingFinished(index uint64, err error) {
82+
q.mu.Lock()
83+
defer q.mu.Unlock()
84+
85+
q.doneCh[index] <- err
86+
delete(q.doneCh, index)
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue
5+
6+
import (
7+
"context"
8+
"errors"
9+
"sync"
10+
"testing"
11+
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func TestBlockingMemoryQueue(t *testing.T) {
17+
var wg sync.WaitGroup
18+
q := NewDisabledQueue[string](disabledQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1})
19+
20+
err := errors.New("This is an error")
21+
wg.Add(1)
22+
go func() {
23+
assert.EqualError(t, q.Offer(context.Background(), "a"), err.Error()) // Blocks until OnProcessingFinished is called
24+
wg.Done()
25+
}()
26+
27+
index, ctx, req, ok := q.Read(context.Background())
28+
for !ok {
29+
index, ctx, req, ok = q.Read(context.Background())
30+
}
31+
32+
require.Equal(t, uint64(0), index)
33+
require.Equal(t, context.Background(), ctx)
34+
require.Equal(t, "a", req)
35+
q.OnProcessingFinished(index, err)
36+
wg.Wait()
37+
}

exporter/exporterqueue/queue.go

+12
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,15 @@ func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings P
105105
})
106106
}
107107
}
108+
109+
// NewDisabledQueueFactory returns a factory to create a new disabled queue.
110+
// Experimental: This API is at the early stage of development and may change without backward compatibility
111+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
112+
func NewDisabledQueueFactory[T any]() Factory[T] {
113+
return func(_ context.Context, _ Settings, cfg Config) Queue[T] {
114+
return NewDisabledQueue[T](disabledQueueSettings[T]{
115+
sizer: &requestSizer[T]{},
116+
capacity: int64(cfg.QueueSize),
117+
})
118+
}
119+
}

0 commit comments

Comments
 (0)