Skip to content

Commit

Permalink
[chore] Simplify sized channel by not using a channel, remove corner …
Browse files Browse the repository at this point in the history
…case

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jan 10, 2025
1 parent da5b68a commit 02391f7
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 120 deletions.
10 changes: 5 additions & 5 deletions exporter/exporterqueue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
component.StartFunc
*sizedChannel[memQueueEl[T]]
*sizedQueue[memQueueEl[T]]
}

// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
Expand All @@ -29,17 +29,17 @@ type memoryQueueSettings[T any] struct {
// callback for dropped items (e.g. useful to emit metrics).
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
return &boundedMemoryQueue[T]{
sizedChannel: newSizedChannel[memQueueEl[T]](set.capacity, memQueueElSizer[T]{sizer: set.sizer}),
sizedQueue: newSizedQueue[memQueueEl[T]](set.capacity, memQueueElSizer[T]{sizer: set.sizer}),
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req})
return q.sizedQueue.push(memQueueEl[T]{ctx: ctx, req: req})
}

func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
item, ok := q.sizedChannel.pop()
item, ok := q.sizedQueue.pop()
return 0, item.ctx, item.req, ok
}

Expand All @@ -50,7 +50,7 @@ func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
q.sizedChannel.shutdown()
q.sizedQueue.shutdown()
return nil
}

Expand Down
61 changes: 36 additions & 25 deletions exporter/exporterqueue/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,6 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
}))
}

func Benchmark_QueueUsage_1000_requests(b *testing.B) {
benchmarkQueueUsage(b, &requestSizer[ptrace.Traces]{}, 1000)
}

func Benchmark_QueueUsage_100000_requests(b *testing.B) {
benchmarkQueueUsage(b, &requestSizer[ptrace.Traces]{}, 100000)
}

func Benchmark_QueueUsage_10000_items(b *testing.B) {
// each request has 10 items: 1000 requests = 10000 items
benchmarkQueueUsage(b, &itemsSizer{}, 1000)
}

func Benchmark_QueueUsage_1M_items(b *testing.B) {
// each request has 10 items: 100000 requests = 1M items
benchmarkQueueUsage(b, &itemsSizer{}, 100000)
}

func TestQueueUsage(t *testing.T) {
t.Run("requests_based", func(t *testing.T) {
queueUsage(t, &requestSizer[ptrace.Traces]{}, 10)
Expand All @@ -126,13 +108,6 @@ func TestQueueUsage(t *testing.T) {
})
}

func benchmarkQueueUsage(b *testing.B, sizer sizer[ptrace.Traces], requestsCount int) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
queueUsage(b, sizer, requestsCount)
}
}

func queueUsage(tb testing.TB, sizer sizer[ptrace.Traces], requestsCount int) {
q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: sizer, capacity: int64(10 * requestsCount)})
consumed := &atomic.Int64{}
Expand Down Expand Up @@ -200,3 +175,39 @@ func (qc *asyncConsumer) Shutdown(_ context.Context) error {
qc.stopWG.Wait()
return nil
}

func BenchmarkOffer(b *testing.B) {
tests := []struct {
name string
sizer sizer[ptrace.Traces]
}{
{
name: "requests_based",
sizer: &requestSizer[ptrace.Traces]{},
},
{
name: "items_based",
sizer: &itemsSizer{},
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: &requestSizer[ptrace.Traces]{}, capacity: int64(10 * b.N)})
consumed := &atomic.Int64{}
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error {
consumed.Add(1)
return nil
})
td := testdata.GenerateTraces(10)
b.ResetTimer()
b.ReportAllocs()
for j := 0; j < b.N; j++ {
require.NoError(b, q.Offer(context.Background(), td))
}
assert.NoError(b, q.Shutdown(context.Background()))
assert.NoError(b, ac.Shutdown(context.Background()))
assert.Equal(b, int64(b.N), consumed.Load())
})
}
}
82 changes: 0 additions & 82 deletions exporter/exporterqueue/sized_channel.go

This file was deleted.

124 changes: 124 additions & 0 deletions exporter/exporterqueue/sized_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"

import (
"sync"
)

type node[T any] struct {
data T
size int64
next *node[T]
}

type linkedQueue[T any] struct {
head *node[T]
tail *node[T]
}

func (l *linkedQueue[T]) push(data T, size int64) {
n := &node[T]{data: data, size: size}
if l.tail == nil {
l.head = n
l.tail = n
return
}
l.tail.next = n
l.tail = n
}

func (l *linkedQueue[T]) pop() (T, int64) {
n := l.head
l.head = n.next
if l.head == nil {
l.tail = nil
}
return n.data, n.size
}

// sizedQueue is a channel wrapper for sized elements with a capacity set to a total size of all the elements.
// The channel will accept elements until the total size of the elements reaches the capacity.
type sizedQueue[T any] struct {
sizer sizer[T]
cap int64

mu sync.Mutex
hasElements *sync.Cond
items *linkedQueue[T]
size int64
stopped bool
}

// newSizedQueue creates a sized elements channel. Each element is assigned a size by the provided sizer.
// capacity is the capacity of the queue.
func newSizedQueue[T any](capacity int64, sizer sizer[T]) *sizedQueue[T] {
sq := &sizedQueue[T]{
sizer: sizer,
cap: capacity,
items: &linkedQueue[T]{},
}
sq.hasElements = sync.NewCond(&sq.mu)
return sq
}

// push puts the element into the queue with the given sized if there is enough capacity.
// Returns an error if the queue is full.
func (sq *sizedQueue[T]) push(el T) error {
elSize := sq.sizer.Sizeof(el)

sq.mu.Lock()
defer sq.mu.Unlock()

if sq.size+elSize > sq.cap {
return ErrQueueIsFull
}

sq.size += elSize
sq.items.push(el, elSize)
// Signal one consumer if any.
sq.hasElements.Signal()
return nil
}

// pop removes the element from the queue and returns it.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (sq *sizedQueue[T]) pop() (T, bool) {
sq.mu.Lock()
defer sq.mu.Unlock()

for {
if sq.size > 0 {
el, elSize := sq.items.pop()
sq.size -= elSize
return el, true
}

if sq.stopped {
var el T
return el, false
}

sq.hasElements.Wait()
}
}

// shutdown closes the queue channel to initiate draining of the queue.
func (sq *sizedQueue[T]) shutdown() {
sq.mu.Lock()
defer sq.mu.Unlock()
sq.stopped = true
sq.hasElements.Broadcast()
}

func (sq *sizedQueue[T]) Size() int {
sq.mu.Lock()
defer sq.mu.Unlock()
return int(sq.size)
}

func (sq *sizedQueue[T]) Capacity() int {
return int(sq.cap)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func (s sizerInt) Sizeof(el int) int64 {
return int64(el)
}

func TestSizedCapacityChannel(t *testing.T) {
q := newSizedChannel[int](7, sizerInt{})
func TestSizedQueue(t *testing.T) {
q := newSizedQueue[int](7, sizerInt{})
require.NoError(t, q.push(1))
assert.Equal(t, 1, q.Size())
assert.Equal(t, 7, q.Capacity())
Expand Down Expand Up @@ -45,12 +45,23 @@ func TestSizedCapacityChannel(t *testing.T) {
assert.Equal(t, 0, el)
}

func TestSizedCapacityChannel_Offer_sizedNotFullButChannelFull(t *testing.T) {
q := newSizedChannel[int](1, sizerInt{})
func TestSizedQueue_DrainAllElements(t *testing.T) {
q := newSizedQueue[int](7, sizerInt{})
require.NoError(t, q.push(1))
require.NoError(t, q.push(3))

el, ok := q.pop()
assert.Equal(t, 1, el)
assert.True(t, ok)
assert.Equal(t, 3, q.Size())

q.used.Store(0)
err := q.push(1)
require.Error(t, err)
assert.Equal(t, ErrQueueIsFull, err)
q.shutdown()
el, ok = q.pop()
assert.Equal(t, 3, el)
assert.True(t, ok)
assert.Equal(t, 0, q.Size())

el, ok = q.pop()
assert.False(t, ok)
assert.Equal(t, 0, el)
}

0 comments on commit 02391f7

Please sign in to comment.