Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change queue to embed the consumers #12242

Merged
merged 1 commit into from
Feb 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/consumer-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change queue to embed the async consumers.

# One or more tracking issues or pull requests related to the change
issues: [12242]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
7 changes: 1 addition & 6 deletions exporter/exporterhelper/internal/obs_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
)

type fakeQueue[T any] struct {
component.StartFunc
component.ShutdownFunc
exporterqueue.Queue[T]
offerErr error
size int64
capacity int64
Expand All @@ -39,10 +38,6 @@ func (fq *fakeQueue[T]) Capacity() int64 {
return fq.capacity
}

func (fq *fakeQueue[T]) Read(context.Context) (context.Context, T, exporterqueue.DoneCallback, bool) {
panic("implement me")
}

func (fq *fakeQueue[T]) Offer(context.Context, T) error {
return fq.offerErr
}
Expand Down
57 changes: 31 additions & 26 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@
}

type QueueSender struct {
queue exporterqueue.Queue[internal.Request]
batcher queue.Batcher
consumers *queue.Consumers[internal.Request]
queue exporterqueue.Queue[internal.Request]
batcher component.Component
}

func NewQueueSender(
Expand All @@ -79,15 +78,6 @@
exportFailureMessage string,
next Sender[internal.Request],
) (*QueueSender, error) {
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg))
if err != nil {
return nil, err
}

qs := &QueueSender{
queue: q,
}

exportFunc := func(ctx context.Context, req internal.Request) error {
err := next.Send(ctx, req)
if err != nil {
Expand All @@ -96,12 +86,30 @@
}
return err
}
if usePullingBasedExporterQueueBatcher.IsEnabled() {
qs.batcher, _ = queue.NewBatcher(bCfg, q, exportFunc, qCfg.NumConsumers)
} else {
qs.consumers = queue.NewQueueConsumers[internal.Request](q, qCfg.NumConsumers, exportFunc)
if !usePullingBasedExporterQueueBatcher.IsEnabled() {
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, func(ctx context.Context, req internal.Request, done exporterqueue.DoneCallback) {
done(exportFunc(ctx, req))
}))
if err != nil {
return nil, err
}

Check warning on line 95 in exporter/exporterhelper/internal/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue_sender.go#L94-L95

Added lines #L94 - L95 were not covered by tests
return &QueueSender{queue: q}, nil
}
return qs, nil

b, err := queue.NewBatcher(bCfg, exportFunc, qCfg.NumConsumers)
if err != nil {
return nil, err
}

Check warning on line 102 in exporter/exporterhelper/internal/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue_sender.go#L101-L102

Added lines #L101 - L102 were not covered by tests
// TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244
if bCfg.Enabled {
qCfg.NumConsumers = 1
}
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, b.Consume))
if err != nil {
return nil, err
}

Check warning on line 110 in exporter/exporterhelper/internal/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue_sender.go#L109-L110

Added lines #L109 - L110 were not covered by tests

return &QueueSender{queue: q, batcher: b}, nil
}

// Start is invoked during service startup.
Expand All @@ -113,25 +121,22 @@
if usePullingBasedExporterQueueBatcher.IsEnabled() {
return qs.batcher.Start(ctx, host)
}
return qs.consumers.Start(ctx, host)

return nil
}

// Shutdown is invoked during service shutdown.
func (qs *QueueSender) Shutdown(ctx context.Context) error {
// Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only
// Stop the queue and batcher, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.

if err := qs.queue.Shutdown(ctx); err != nil {
return err
}
err := qs.queue.Shutdown(ctx)
if usePullingBasedExporterQueueBatcher.IsEnabled() {
return qs.batcher.Shutdown(ctx)
return errors.Join(err, qs.batcher.Shutdown(ctx))
}
err := qs.consumers.Shutdown(ctx)
return err
}

// send implements the requestSender interface. It puts the request in the queue.
// Send implements the requestSender interface. It puts the request in the queue.
func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error {
// Prevent cancellation and deadline to propagate to the context stored in the queue.
// The grpc/http based receivers will cancel the request context after this function returns.
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterqueue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type memoryQueueSettings[T any] struct {

// newBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) readableQueue[T] {
return &boundedMemoryQueue[T]{
sizedQueue: newSizedQueue[T](set.capacity, set.sizer, set.blocking),
}
Expand Down
66 changes: 16 additions & 50 deletions exporter/exporterqueue/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ func TestBoundedQueue(t *testing.T) {
func TestShutdownWhileNotEmpty(t *testing.T) {
q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1000})

assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
for i := 0; i < 10; i++ {
require.NoError(t, q.Offer(context.Background(), strconv.FormatInt(int64(i), 10)))
}
assert.NoError(t, q.Shutdown(context.Background()))
require.NoError(t, q.Shutdown(context.Background()))

assert.Equal(t, int64(10), q.Size())
numConsumed := 0
Expand Down Expand Up @@ -115,16 +115,15 @@ func TestQueueUsage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100)})
consumed := &atomic.Int64{}
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error {
ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done DoneCallback) {
consumed.Add(1)
return nil
done(nil)
})
require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < 10; j++ {
require.NoError(t, q.Offer(context.Background(), uint64(10)))
}
assert.NoError(t, q.Shutdown(context.Background()))
assert.NoError(t, ac.Shutdown(context.Background()))
require.NoError(t, ac.Shutdown(context.Background()))
assert.Equal(t, int64(10), consumed.Load())
})
}
Expand All @@ -148,11 +147,11 @@ func TestBlockingQueueUsage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100), blocking: true})
consumed := &atomic.Int64{}
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
ac := newAsyncConsumer(q, 10, func(context.Context, uint64) error {
ac := newConsumerQueue(q, 10, func(_ context.Context, _ uint64, done DoneCallback) {
consumed.Add(1)
return nil
done(nil)
})
require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost()))
wg := &sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
Expand All @@ -164,25 +163,21 @@ func TestBlockingQueueUsage(t *testing.T) {
}()
}
wg.Wait()
assert.NoError(t, q.Shutdown(context.Background()))
assert.NoError(t, ac.Shutdown(context.Background()))
require.NoError(t, ac.Shutdown(context.Background()))
assert.Equal(t, int64(1_000_000), consumed.Load())
})
}
}

func TestZeroSizeNoConsumers(t *testing.T) {
q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 0})

err := q.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

require.ErrorIs(t, q.Offer(context.Background(), "a"), ErrQueueIsFull) // in process

assert.NoError(t, q.Shutdown(context.Background()))
}

func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool {
func consume[T any](q readableQueue[T], consumeFunc func(context.Context, T) error) bool {
ctx, req, done, ok := q.Read(context.Background())
if !ok {
return false
Expand All @@ -191,35 +186,6 @@ func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool
return true
}

type asyncConsumer struct {
stopWG sync.WaitGroup
}

func newAsyncConsumer[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *asyncConsumer {
ac := &asyncConsumer{}

ac.stopWG.Add(numConsumers)
for i := 0; i < numConsumers; i++ {
go func() {
defer ac.stopWG.Done()
for {
ctx, req, done, ok := q.Read(context.Background())
if !ok {
return
}
done(consumeFunc(ctx, req))
}
}()
}
return ac
}

// Shutdown ensures that queue and all consumers are stopped.
func (qc *asyncConsumer) Shutdown(_ context.Context) error {
qc.stopWG.Wait()
return nil
}

func BenchmarkOffer(b *testing.B) {
tests := []struct {
name string
Expand All @@ -236,20 +202,20 @@ func BenchmarkOffer(b *testing.B) {
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: &requestSizer[uint64]{}, capacity: int64(10 * b.N)})
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(10 * b.N)})
consumed := &atomic.Int64{}
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error {
ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done DoneCallback) {
consumed.Add(1)
return nil
done(nil)
})
require.NoError(b, ac.Start(context.Background(), componenttest.NewNopHost()))
b.ResetTimer()
b.ReportAllocs()
for j := 0; j < b.N; j++ {
require.NoError(b, q.Offer(context.Background(), uint64(10)))
}
assert.NoError(b, q.Shutdown(context.Background()))
assert.NoError(b, ac.Shutdown(context.Background()))
require.NoError(b, ac.Shutdown(context.Background()))
assert.Equal(b, int64(b.N), consumed.Load())
})
}
Expand Down
59 changes: 59 additions & 0 deletions exporter/exporterqueue/consumers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
)

type consumerQueue[T any] struct {
readableQueue[T]
numConsumers int
consumeFunc ConsumeFunc[T]
stopWG sync.WaitGroup
}

func newConsumerQueue[T any](q readableQueue[T], numConsumers int, consumeFunc ConsumeFunc[T]) *consumerQueue[T] {
return &consumerQueue[T]{
readableQueue: q,
numConsumers: numConsumers,
consumeFunc: consumeFunc,
}
}

// Start ensures that queue and all consumers are started.
func (qc *consumerQueue[T]) Start(ctx context.Context, host component.Host) error {
if err := qc.readableQueue.Start(ctx, host); err != nil {
return err
}
var startWG sync.WaitGroup
for i := 0; i < qc.numConsumers; i++ {
qc.stopWG.Add(1)
startWG.Add(1)
go func() {
startWG.Done()
defer qc.stopWG.Done()
for {
ctx, req, done, ok := qc.readableQueue.Read(context.Background())
if !ok {
return
}
qc.consumeFunc(ctx, req, done)
}
}()
}
startWG.Wait()

return nil
}

// Shutdown ensures that queue and all consumers are stopped.
func (qc *consumerQueue[T]) Shutdown(ctx context.Context) error {
err := qc.readableQueue.Shutdown(ctx)
qc.stopWG.Wait()
return err
}
2 changes: 1 addition & 1 deletion exporter/exporterqueue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type persistentQueue[T any] struct {
}

// newPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func newPersistentQueue[T any](set persistentQueueSettings[T]) Queue[T] {
func newPersistentQueue[T any](set persistentQueueSettings[T]) readableQueue[T] {
_, isRequestSized := set.sizer.(*requestSizer[T])
pq := &persistentQueue[T]{
set: set,
Expand Down
Loading
Loading