Skip to content

Commit c52e885

Browse files
committed
Implemented disabled queue
1 parent a3c7d95 commit c52e885

File tree

7 files changed

+188
-6
lines changed

7 files changed

+188
-6
lines changed

.chloggen/disabled_queue.yaml

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
5+
component: exporterhelper
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Adds "disabled" queue which is used when the user sets up batching but not queuing.
9+
10+
# One or more tracking issues or pull requests related to the change
11+
issues: [8122, 10368]
12+
13+
# Optional: The change log or logs in which this entry should be included.
14+
# e.g. '[user]' or '[user, api]'
15+
# Include 'user' if the change is relevant to end users.
16+
# Include 'api' if there is a change to a library API.
17+
# Default: '[user]'
18+
change_logs: [api]

exporter/exporterhelper/internal/base_exporter.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,24 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
101101
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg)
102102
}
103103

104-
if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
105-
usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
104+
if usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
105+
be.queueFactory = exporterqueue.NewDisabledQueueFactory[internal.Request]()
106+
be.queueCfg.QueueSize = be.BatcherCfg.MinSizeItems
107+
q := be.queueFactory(
108+
context.Background(),
109+
exporterqueue.Settings{
110+
Signal: signal,
111+
ExporterSettings: be.Set,
112+
},
113+
be.queueCfg)
114+
be.QueueSender = NewQueueSender(
115+
q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg).MarkAsDisabled()
116+
for _, op := range options {
117+
err = multierr.Append(err, op(be))
118+
}
119+
}
120+
121+
if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled {
106122
bs := NewBatchSender(be.BatcherCfg, be.Set)
107123
be.BatchSender = bs
108124
}

exporter/exporterhelper/internal/batch_sender_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func TestBatchSender_PostShutdown(t *testing.T) {
326326
assert.Equal(t, int64(8), sink.itemsCount.Load())
327327
})
328328
}
329-
runTest("enable_queue_batcher", true)
329+
// We don't expect the same behavior when disable_queue_batcher is true
330330
runTest("disable_queue_batcher", false)
331331
}
332332

exporter/exporterhelper/internal/queue_sender.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ type QueueSender struct {
7676
batcher queue.Batcher
7777
consumers *queue.Consumers[internal.Request]
7878

79+
enabled bool
80+
7981
obsrep *ObsReport
8082
exporterID component.ID
8183
logger *zap.Logger
@@ -97,6 +99,7 @@ func NewQueueSender(
9799
obsrep: obsrep,
98100
exporterID: set.ID,
99101
logger: set.Logger,
102+
enabled: true,
100103
}
101104

102105
exportFunc := func(ctx context.Context, req internal.Request) error {
@@ -115,6 +118,13 @@ func NewQueueSender(
115118
return qs
116119
}
117120

121+
// MarkAsDisabled marks queue sender as disabled and returns the queue sender.
122+
// If queue sender is disabled, then requests are blocked until they are successfully sent out.
123+
func (qs *QueueSender) MarkAsDisabled() *QueueSender {
124+
qs.enabled = false
125+
return qs
126+
}
127+
118128
// Start is invoked during service startup.
119129
func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
120130
if err := qs.queue.Start(ctx, host); err != nil {
@@ -180,10 +190,12 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error {
180190
func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error {
181191
// Prevent cancellation and deadline to propagate to the context stored in the queue.
182192
// The grpc/http based receivers will cancel the request context after this function returns.
183-
c := context.WithoutCancel(ctx)
193+
if qs.enabled {
194+
ctx = context.WithoutCancel(ctx)
195+
}
184196

185-
span := trace.SpanFromContext(c)
186-
if err := qs.queue.Offer(c, req); err != nil {
197+
span := trace.SpanFromContext(ctx)
198+
if err := qs.queue.Offer(ctx, req); err != nil {
187199
span.AddEvent("Failed to enqueue item.", trace.WithAttributes(qs.traceAttribute))
188200
return err
189201
}
+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
5+
6+
import (
7+
"context"
8+
"sync"
9+
10+
"go.opentelemetry.io/collector/component"
11+
)
12+
13+
// boundedQueue blocks insert until the batch containing the request is sent out.
14+
type disabledQueue[T any] struct {
15+
component.StartFunc
16+
*sizedQueue[disabledMemQueueEl[T]]
17+
18+
mu sync.Mutex
19+
nextIndex uint64
20+
doneCh map[uint64](chan error)
21+
}
22+
23+
type disabledMemQueueEl[T any] struct {
24+
index uint64
25+
req T
26+
}
27+
28+
// QueueSettings defines internal parameters for boundedQueue creation.
29+
type disabledQueueSettings[T any] struct {
30+
sizer sizer[T]
31+
capacity int64
32+
}
33+
34+
type disabledQueueSizer[T any] struct {
35+
sizer sizer[T]
36+
}
37+
38+
func (s disabledQueueSizer[T]) Sizeof(item disabledMemQueueEl[T]) int64 {
39+
return s.sizer.Sizeof(item.req)
40+
}
41+
42+
// NewBoundedQueue constructs the new queue of specified capacity, and with an optional
43+
// callback for dropped items (e.g. useful to emit metrics).
44+
func NewDisabledQueue[T any](set disabledQueueSettings[T]) Queue[T] {
45+
return &disabledQueue[T]{
46+
sizedQueue: newSizedQueue[disabledMemQueueEl[T]](
47+
set.capacity,
48+
disabledQueueSizer[T]{sizer: set.sizer},
49+
true /*blocking*/),
50+
doneCh: make(map[uint64](chan error)),
51+
}
52+
}
53+
54+
// Offer is used by the producer to submit new item to the queue. It will block until OnProcessingFinished is called
55+
// on the request.
56+
func (q *disabledQueue[T]) Offer(ctx context.Context, req T) error {
57+
q.mu.Lock()
58+
index := q.nextIndex
59+
q.nextIndex++
60+
done := make(chan error)
61+
q.doneCh[index] = done
62+
63+
if err := q.sizedQueue.Offer(
64+
ctx,
65+
disabledMemQueueEl[T]{req: req, index: index}); err != nil {
66+
delete(q.doneCh, index)
67+
q.mu.Unlock()
68+
return err
69+
}
70+
q.mu.Unlock()
71+
err := <-done
72+
return err
73+
}
74+
75+
func (q *disabledQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
76+
ctx, item, ok := q.sizedQueue.pop()
77+
return item.index, ctx, item.req, ok
78+
}
79+
80+
// OnProcessingFinished unblocks unblocks offer.
81+
func (q *disabledQueue[T]) OnProcessingFinished(index uint64, err error) {
82+
q.mu.Lock()
83+
defer q.mu.Unlock()
84+
85+
q.doneCh[index] <- err
86+
delete(q.doneCh, index)
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue
5+
6+
import (
7+
"context"
8+
"errors"
9+
"sync"
10+
"testing"
11+
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func TestBlockingMemoryQueue(t *testing.T) {
17+
var wg sync.WaitGroup
18+
q := NewDisabledQueue[string](disabledQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1})
19+
20+
err := errors.New("This is an error")
21+
wg.Add(1)
22+
go func() {
23+
assert.EqualError(t, q.Offer(context.Background(), "a"), err.Error()) // Blocks until OnProcessingFinished is called
24+
wg.Done()
25+
}()
26+
27+
index, ctx, req, ok := q.Read(context.Background())
28+
for !ok {
29+
index, ctx, req, ok = q.Read(context.Background())
30+
}
31+
32+
require.Equal(t, uint64(0), index)
33+
require.Equal(t, context.Background(), ctx)
34+
require.Equal(t, "a", req)
35+
q.OnProcessingFinished(index, err)
36+
wg.Wait()
37+
}

exporter/exporterqueue/queue.go

+12
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,15 @@ func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings P
105105
})
106106
}
107107
}
108+
109+
// NewDisabledQueueFactory returns a factory to create a new disabled queue.
110+
// Experimental: This API is at the early stage of development and may change without backward compatibility
111+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
112+
func NewDisabledQueueFactory[T any]() Factory[T] {
113+
return func(_ context.Context, _ Settings, cfg Config) Queue[T] {
114+
return NewDisabledQueue[T](disabledQueueSettings[T]{
115+
sizer: &requestSizer[T]{},
116+
capacity: int64(cfg.QueueSize),
117+
})
118+
}
119+
}

0 commit comments

Comments
 (0)