diff --git a/sdk/log/batch.go b/sdk/log/batch.go index b91741d5882..92161b841eb 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -6,11 +6,11 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" "errors" - "slices" "sync" "sync/atomic" "time" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/internal/global" ) @@ -30,6 +30,40 @@ const ( // Compile-time check BatchProcessor implements Processor. var _ Processor = (*BatchProcessor)(nil) +type recordsBufferPool struct { + pool sync.Pool +} + +func newRecordsBufferPool(bufferSize int) *recordsBufferPool { + return &recordsBufferPool{ + pool: sync.Pool{ + New: func() any { + slice := make([]Record, bufferSize) + return &slice + }, + }, + } +} + +func (p *recordsBufferPool) Get() *[]Record { + return p.pool.Get().(*[]Record) +} + +func (p *recordsBufferPool) Put(recordsBuffer *[]Record) { + p.pool.Put(recordsBuffer) +} + +type recordsBatch struct { + ctx context.Context + buf *[]Record + records int + + respCh chan<- error +} + +// Compile-time check BatchProcessor implements Processor. +var _ Processor = (*BatchProcessor)(nil) + // BatchProcessor is a processor that exports batches of log records. // // Use [NewBatchProcessor] to create a BatchProcessor. An empty BatchProcessor @@ -71,11 +105,10 @@ type BatchProcessor struct { // cannot be flushed to the export buffer, the records will remain in the // queue. - // exporter is the bufferedExporter all batches are exported with. - exporter *bufferExporter + exporter Exporter - // q is the active queue of records that have not yet been exported. - q *queue + // queue is the active queue of records that have not yet been exported. + queue *queue // batchSize is the minimum number of records needed before an export is // triggered (unless the interval expires). batchSize int @@ -98,6 +131,13 @@ type BatchProcessor struct { // stopped holds the stopped state of the BatchProcessor. stopped atomic.Bool + recordsBatches chan recordsBatch + recordsBatchesClosed bool + recordsBatchesMu sync.Mutex + recordsBatchesDone chan struct{} + + recordsBufPool *recordsBufferPool + noCmp [0]func() //nolint: unused // This is indeed used. } @@ -117,17 +157,21 @@ func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchPr exporter = newTimeoutExporter(exporter, cfg.expTimeout.Value) // Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched // appropriately on export. - exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value) + // exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value) b := &BatchProcessor{ - exporter: newBufferExporter(exporter, cfg.expBufferSize.Value), + exporter: exporter, - q: newQueue(cfg.maxQSize.Value), + queue: newQueue(cfg.maxQSize.Value), batchSize: cfg.expMaxBatchSize.Value, pollTrigger: make(chan struct{}, 1), pollKill: make(chan struct{}), + + recordsBatches: make(chan recordsBatch, cfg.expBufferSize.Value), + recordsBufPool: newRecordsBufferPool(cfg.expMaxBatchSize.Value), } b.pollDone = b.poll(cfg.expInterval.Value) + b.recordsBatchesDone = b.processRecordsBatches() return b } @@ -137,8 +181,6 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { done = make(chan struct{}) ticker := time.NewTicker(interval) - // TODO: investigate using a sync.Pool instead of cloning. - buf := make([]Record, b.batchSize) go func() { defer close(done) defer ticker.Stop() @@ -152,25 +194,12 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { return } - if d := b.q.Dropped(); d > 0 { + if d := b.queue.Dropped(); d > 0 { global.Warn("dropped log records", "dropped", d) } - var qLen int - // Don't copy data from queue unless exporter can accept more, it is very expensive. - if b.exporter.Ready() { - qLen = b.q.TryDequeue(buf, func(r []Record) bool { - ok := b.exporter.EnqueueExport(r) - if ok { - buf = slices.Clone(buf) - } - return ok - }) - } else { - qLen = b.q.Len() - } - - if qLen >= b.batchSize { + ok, recordsInQueue := b.tryDequeue(nil) + if !ok || recordsInQueue >= b.batchSize { // There is another full batch ready. Immediately trigger // another export attempt. select { @@ -181,17 +210,82 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { } } }() + + return done +} + +func (b *BatchProcessor) processRecordsBatches() (done chan struct{}) { + done = make(chan struct{}) + + go func() { + defer close(done) + for chunk := range b.recordsBatches { + err := b.exporter.Export(chunk.ctx, (*chunk.buf)[:chunk.records]) + + b.recordsBufPool.Put(chunk.buf) + + select { + case chunk.respCh <- err: + default: + // e.respCh is nil or busy, default to otel.Handler. + if err != nil { + otel.Handle(err) + } + } + } + }() + return done } +// Tries to write records batch from the queue to records batches channel. +// If success, ok is true and queueLen is number of records remaining in the records queue. +// If failure, ok is false and queueLen value does not have any meaning. +func (b *BatchProcessor) tryDequeue(respCh chan<- error) (ok bool, queueLen int) { + b.recordsBatchesMu.Lock() + defer b.recordsBatchesMu.Unlock() + + if b.recordsBatchesClosed { + if respCh != nil { + respCh <- nil + } + return true, 0 + } + + if len(b.recordsBatches) == cap(b.recordsBatches) { + if respCh != nil { + respCh <- nil + } + return false, 0 + } + + buf := b.recordsBufPool.Get() + + queueLen, n := b.queue.Dequeue(*buf) + if n == 0 { + b.recordsBufPool.Put(buf) + if respCh != nil { + respCh <- nil + } + return true, 0 + } + + data := recordsBatch{ctx: context.Background(), respCh: respCh, buf: buf, records: n} + + // push in sync as available space is guaranteed by len check and mutex + b.recordsBatches <- data + + return true, queueLen +} + // OnEmit batches provided log record. func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { - if b.stopped.Load() || b.q == nil { + if b.stopped.Load() || b.queue == nil { return nil } // The record is cloned so that changes done by subsequent processors // are not going to lead to a data race. - if n := b.q.Enqueue(r.Clone()); n >= b.batchSize { + if n := b.queue.Enqueue(r.Clone()); n >= b.batchSize { select { case b.pollTrigger <- struct{}{}: default: @@ -205,7 +299,7 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { // Shutdown flushes queued log records and shuts down the decorated exporter. func (b *BatchProcessor) Shutdown(ctx context.Context) error { - if b.stopped.Swap(true) || b.q == nil { + if b.stopped.Swap(true) || b.queue == nil { return nil } @@ -213,14 +307,29 @@ func (b *BatchProcessor) Shutdown(ctx context.Context) error { close(b.pollKill) select { case <-b.pollDone: + case <-ctx.Done(): + // Out of time. + return errors.Join(ctx.Err(), b.shutdownExporter(ctx)) + } + + err := b.flush(ctx) + + return errors.Join(err, b.shutdownExporter(ctx)) +} + +func (b *BatchProcessor) shutdownExporter(ctx context.Context) error { + b.recordsBatchesMu.Lock() + defer b.recordsBatchesMu.Unlock() + b.recordsBatchesClosed = true + close(b.recordsBatches) + select { + case <-b.recordsBatchesDone: case <-ctx.Done(): // Out of time. return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx)) } - // Flush remaining queued before exporter shutdown. - err := b.exporter.Export(ctx, b.q.Flush()) - return errors.Join(err, b.exporter.Shutdown(ctx)) + return b.exporter.Shutdown(ctx) } var errPartialFlush = errors.New("partial flush: export buffer full") @@ -232,130 +341,40 @@ var ctxErr = func(ctx context.Context) error { // ForceFlush flushes queued log records and flushes the decorated exporter. func (b *BatchProcessor) ForceFlush(ctx context.Context) error { - if b.stopped.Load() || b.q == nil { + if b.stopped.Load() || b.queue == nil { return nil } - buf := make([]Record, b.q.cap) - notFlushed := func() bool { - var flushed bool - _ = b.q.TryDequeue(buf, func(r []Record) bool { - flushed = b.exporter.EnqueueExport(r) - return flushed - }) - return !flushed - } - var err error - // For as long as ctx allows, try to make a single flush of the queue. - for notFlushed() { - // Use ctxErr instead of calling ctx.Err directly so we can test - // the partial error return. - if e := ctxErr(ctx); e != nil { - err = errors.Join(e, errPartialFlush) - break - } - } - return errors.Join(err, b.exporter.ForceFlush(ctx)) -} - -// queue holds a queue of logging records. -// -// When the queue becomes full, the oldest records in the queue are -// overwritten. -type queue struct { - sync.Mutex - - dropped atomic.Uint64 - cap, len int - read, write *ring -} - -func newQueue(size int) *queue { - r := newRing(size) - return &queue{ - cap: size, - read: r, - write: r, - } -} - -func (q *queue) Len() int { - q.Lock() - defer q.Unlock() - - return q.len -} - -// Dropped returns the number of Records dropped during enqueueing since the -// last time Dropped was called. -func (q *queue) Dropped() uint64 { - return q.dropped.Swap(0) -} + err := b.flush(ctx) -// Enqueue adds r to the queue. The queue size, including the addition of r, is -// returned. -// -// If enqueueing r will exceed the capacity of q, the oldest Record held in q -// will be dropped and r retained. -func (q *queue) Enqueue(r Record) int { - q.Lock() - defer q.Unlock() - - q.write.Value = r - q.write = q.write.Next() - - q.len++ - if q.len > q.cap { - // Overflow. Advance read to be the new "oldest". - q.len = q.cap - q.read = q.read.Next() - q.dropped.Add(1) - } - return q.len + return errors.Join(err, b.exporter.ForceFlush(ctx)) } -// TryDequeue attempts to dequeue up to len(buf) Records. The available Records -// will be assigned into buf and passed to write. If write fails, returning -// false, the Records will not be removed from the queue. If write succeeds, -// returning true, the dequeued Records are removed from the queue. The number -// of Records remaining in the queue are returned. -// -// When write is called the lock of q is held. The write function must not call -// other methods of this q that acquire the lock. -func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int { - q.Lock() - defer q.Unlock() - - origRead := q.read - - n := min(len(buf), q.len) - for i := 0; i < n; i++ { - buf[i] = q.read.Value - q.read = q.read.Next() - } +func (b *BatchProcessor) flush(ctx context.Context) error { + var err error + for { + respCh := make(chan error, 1) + ok, queueLen := b.tryDequeue(respCh) - if write(buf[:n]) { - q.len -= n - } else { - q.read = origRead - } - return q.len -} + select { + case respErr := <-respCh: + if respErr != nil { + err = errors.Join(respErr, errPartialFlush) + } + case <-ctx.Done(): + err = errors.Join(ctxErr(ctx), errPartialFlush) + } -// Flush returns all the Records held in the queue and resets it to be -// empty. -func (q *queue) Flush() []Record { - q.Lock() - defer q.Unlock() + if err != nil { + break + } - out := make([]Record, q.len) - for i := range out { - out[i] = q.read.Value - q.read = q.read.Next() + if ok && queueLen == 0 { + break + } } - q.len = 0 - return out + return err } type batchConfig struct { diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index b2e993a5bfa..3c700160e06 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -7,7 +7,6 @@ import ( "bytes" "context" stdlog "log" - "slices" "strconv" "strings" "sync" @@ -386,7 +385,7 @@ func TestBatchProcessor(t *testing.T) { require.NoError(t, b.OnEmit(ctx, new(Record))) } assert.Eventually(t, func() bool { - return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input) + return e.ExportN() > 0 && len(b.recordsBatches) == cap(b.recordsBatches) }, 2*time.Second, time.Microsecond) // 1 export being performed, 1 export in buffer chan, >1 batch // still in queue that an attempt to flush will be made on. @@ -465,7 +464,7 @@ func TestBatchProcessor(t *testing.T) { // Second record will be written to export queue assert.NoError(t, b.OnEmit(ctx, r), "export queue record") require.Eventually(t, func() bool { - return len(b.exporter.input) == cap(b.exporter.input) + return len(b.recordsBatches) == cap(b.recordsBatches) }, 2*time.Second, time.Microsecond, "blocked queue read not attempted") // Third record will be written to BatchProcessor.q @@ -522,127 +521,6 @@ func TestBatchProcessor(t *testing.T) { }) } -func TestQueue(t *testing.T) { - var r Record - r.SetBody(log.BoolValue(true)) - - t.Run("newQueue", func(t *testing.T) { - const size = 1 - q := newQueue(size) - assert.Equal(t, 0, q.len) - assert.Equal(t, size, q.cap, "capacity") - assert.Equal(t, size, q.read.Len(), "read ring") - assert.Same(t, q.read, q.write, "different rings") - }) - - t.Run("Enqueue", func(t *testing.T) { - const size = 2 - q := newQueue(size) - - var notR Record - notR.SetBody(log.IntValue(10)) - - assert.Equal(t, 1, q.Enqueue(notR), "incomplete batch") - assert.Equal(t, 1, q.len, "length") - assert.Equal(t, size, q.cap, "capacity") - - assert.Equal(t, 2, q.Enqueue(r), "complete batch") - assert.Equal(t, 2, q.len, "length") - assert.Equal(t, size, q.cap, "capacity") - - assert.Equal(t, 2, q.Enqueue(r), "overflow batch") - assert.Equal(t, 2, q.len, "length") - assert.Equal(t, size, q.cap, "capacity") - - assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records") - }) - - t.Run("Dropped", func(t *testing.T) { - q := newQueue(1) - - _ = q.Enqueue(r) - _ = q.Enqueue(r) - assert.Equal(t, uint64(1), q.Dropped(), "fist") - - _ = q.Enqueue(r) - _ = q.Enqueue(r) - assert.Equal(t, uint64(2), q.Dropped(), "second") - }) - - t.Run("Flush", func(t *testing.T) { - const size = 2 - q := newQueue(size) - q.write.Value = r - q.write = q.write.Next() - q.len = 1 - - assert.Equal(t, []Record{r}, q.Flush(), "flushed") - }) - - t.Run("TryFlush", func(t *testing.T) { - const size = 3 - q := newQueue(size) - for i := 0; i < size-1; i++ { - q.write.Value = r - q.write = q.write.Next() - q.len++ - } - - buf := make([]Record, 1) - f := func([]Record) bool { return false } - assert.Equal(t, size-1, q.TryDequeue(buf, f), "not flushed") - require.Equal(t, size-1, q.len, "length") - require.NotSame(t, q.read, q.write, "read ring advanced") - - var flushed []Record - f = func(r []Record) bool { - flushed = append(flushed, r...) - return true - } - if assert.Equal(t, size-2, q.TryDequeue(buf, f), "did not flush len(buf)") { - assert.Equal(t, []Record{r}, flushed, "Records") - } - - buf = slices.Grow(buf, size) - flushed = flushed[:0] - if assert.Equal(t, 0, q.TryDequeue(buf, f), "did not flush len(queue)") { - assert.Equal(t, []Record{r}, flushed, "Records") - } - }) - - t.Run("ConcurrentSafe", func(t *testing.T) { - const goRoutines = 10 - - flushed := make(chan []Record, goRoutines) - out := make([]Record, 0, goRoutines) - done := make(chan struct{}) - go func() { - defer close(done) - for recs := range flushed { - out = append(out, recs...) - } - }() - - var wg sync.WaitGroup - wg.Add(goRoutines) - - b := newQueue(goRoutines) - for i := 0; i < goRoutines; i++ { - go func() { - defer wg.Done() - b.Enqueue(Record{}) - flushed <- b.Flush() - }() - } - - wg.Wait() - close(flushed) - <-done - - assert.Len(t, out, goRoutines, "flushed Records") - }) -} - func BenchmarkBatchProcessorOnEmit(b *testing.B) { r := new(Record) body := log.BoolValue(true) diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index 74689d7f387..2a11f899e33 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -35,6 +35,12 @@ func BenchmarkProcessor(b *testing.B) { return []LoggerProviderOption{WithProcessor(NewSimpleProcessor(noopExporter{}))} }, }, + { + name: "SimpleDelayExporter", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{WithProcessor(NewSimpleProcessor(mockDelayExporter{}))} + }, + }, { name: "Batch", f: func() []LoggerProviderOption { @@ -42,7 +48,7 @@ func BenchmarkProcessor(b *testing.B) { }, }, { - name: "BatchSimulateExport", + name: "BatchDelayExporter", f: func() []LoggerProviderOption { return []LoggerProviderOption{WithProcessor(NewBatchProcessor(mockDelayExporter{}))} }, diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index 8cef5dde6b5..fbffac639c8 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -5,13 +5,7 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" - "errors" - "fmt" - "sync" - "sync/atomic" "time" - - "go.opentelemetry.io/otel" ) // Exporter handles the delivery of log records to external receivers. @@ -67,37 +61,6 @@ func (noopExporter) Shutdown(context.Context) error { return nil } func (noopExporter) ForceFlush(context.Context) error { return nil } -// chunkExporter wraps an Exporter's Export method so it is called with -// appropriately sized export payloads. Any payload larger than a defined size -// is chunked into smaller payloads and exported sequentially. -type chunkExporter struct { - Exporter - - // size is the maximum batch size exported. - size int -} - -// newChunkExporter wraps exporter. Calls to the Export will have their records -// payload chunked so they do not exceed size. If size is less than or equal -// to 0, exporter is returned directly. -func newChunkExporter(exporter Exporter, size int) Exporter { - if size <= 0 { - return exporter - } - return &chunkExporter{Exporter: exporter, size: size} -} - -// Export exports records in chunks no larger than c.size. -func (c chunkExporter) Export(ctx context.Context, records []Record) error { - n := len(records) - for i, j := 0, min(c.size, n); i < n; i, j = i+c.size, min(j+c.size, n) { - if err := c.Exporter.Export(ctx, records[i:j]); err != nil { - return err - } - } - return nil -} - // timeoutExporter wraps an Exporter and ensures any call to Export will have a // timeout for the context. type timeoutExporter struct { @@ -123,202 +86,3 @@ func (e *timeoutExporter) Export(ctx context.Context, records []Record) error { defer cancel() return e.Exporter.Export(ctx, records) } - -// exportSync exports all data from input using exporter in a spawned -// goroutine. The returned chan will be closed when the spawned goroutine -// completes. -func exportSync(input <-chan exportData, exporter Exporter) (done chan struct{}) { - done = make(chan struct{}) - go func() { - defer close(done) - for data := range input { - data.DoExport(exporter.Export) - } - }() - return done -} - -// exportData is data related to an export. -type exportData struct { - ctx context.Context - records []Record - - // respCh is the channel any error returned from the export will be sent - // on. If this is nil, and the export error is non-nil, the error will - // passed to the OTel error handler. - respCh chan<- error -} - -// DoExport calls exportFn with the data contained in e. The error response -// will be returned on e's respCh if not nil. The error will be handled by the -// default OTel error handle if it is not nil and respCh is nil or full. -func (e exportData) DoExport(exportFn func(context.Context, []Record) error) { - if len(e.records) == 0 { - e.respond(nil) - return - } - - e.respond(exportFn(e.ctx, e.records)) -} - -func (e exportData) respond(err error) { - select { - case e.respCh <- err: - default: - // e.respCh is nil or busy, default to otel.Handler. - if err != nil { - otel.Handle(err) - } - } -} - -// bufferExporter provides asynchronous and synchronous export functionality by -// buffering export requests. -type bufferExporter struct { - Exporter - - input chan exportData - inputMu sync.Mutex - - done chan struct{} - stopped atomic.Bool -} - -// newBufferExporter returns a new bufferExporter that wraps exporter. The -// returned bufferExporter will buffer at most size number of export requests. -// If size is less than 1, 1 will be used. -func newBufferExporter(exporter Exporter, size int) *bufferExporter { - if size < 1 { - size = 1 - } - input := make(chan exportData, size) - return &bufferExporter{ - Exporter: exporter, - - input: input, - done: exportSync(input, exporter), - } -} - -func (e *bufferExporter) Ready() bool { - return len(e.input) != cap(e.input) -} - -var errStopped = errors.New("exporter stopped") - -func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error { - data := exportData{ctx, records, rCh} - - e.inputMu.Lock() - defer e.inputMu.Unlock() - - // Check stopped before enqueueing now that e.inputMu is held. This - // prevents sends on a closed chan when Shutdown is called concurrently. - if e.stopped.Load() { - return errStopped - } - - select { - case e.input <- data: - case <-ctx.Done(): - return ctx.Err() - } - return nil -} - -// EnqueueExport enqueues an export of records in the context of ctx to be -// performed asynchronously. This will return true if the records are -// successfully enqueued (or the bufferExporter is shut down), false otherwise. -// -// The passed records are held after this call returns. -func (e *bufferExporter) EnqueueExport(records []Record) bool { - if len(records) == 0 { - // Nothing to enqueue, do not waste input space. - return true - } - - data := exportData{ctx: context.Background(), records: records} - - e.inputMu.Lock() - defer e.inputMu.Unlock() - - // Check stopped before enqueueing now that e.inputMu is held. This - // prevents sends on a closed chan when Shutdown is called concurrently. - if e.stopped.Load() { - return true - } - - select { - case e.input <- data: - return true - default: - return false - } -} - -// Export synchronously exports records in the context of ctx. This will not -// return until the export has been completed. -func (e *bufferExporter) Export(ctx context.Context, records []Record) error { - if len(records) == 0 { - return nil - } - - resp := make(chan error, 1) - err := e.enqueue(ctx, records, resp) - if err != nil { - if errors.Is(err, errStopped) { - return nil - } - return fmt.Errorf("%w: dropping %d records", err, len(records)) - } - - select { - case err := <-resp: - return err - case <-ctx.Done(): - return ctx.Err() - } -} - -// ForceFlush flushes buffered exports. Any existing exports that is buffered -// is flushed before this returns. -func (e *bufferExporter) ForceFlush(ctx context.Context) error { - resp := make(chan error, 1) - err := e.enqueue(ctx, nil, resp) - if err != nil { - if errors.Is(err, errStopped) { - return nil - } - return err - } - - select { - case <-resp: - case <-ctx.Done(): - return ctx.Err() - } - return e.Exporter.ForceFlush(ctx) -} - -// Shutdown shuts down e. -// -// Any buffered exports are flushed before this returns. -// -// All calls to EnqueueExport or Exporter will return nil without any export -// after this is called. -func (e *bufferExporter) Shutdown(ctx context.Context) error { - if e.stopped.Swap(true) { - return nil - } - e.inputMu.Lock() - defer e.inputMu.Unlock() - - // No more sends will be made. - close(e.input) - select { - case <-e.done: - case <-ctx.Done(): - return errors.Join(ctx.Err(), e.Exporter.Shutdown(ctx)) - } - return e.Exporter.Shutdown(ctx) -} diff --git a/sdk/log/exporter_test.go b/sdk/log/exporter_test.go index 25f05832087..987ee43235b 100644 --- a/sdk/log/exporter_test.go +++ b/sdk/log/exporter_test.go @@ -5,8 +5,6 @@ package log import ( "context" - "io" - stdlog "log" "slices" "sync" "sync/atomic" @@ -14,10 +12,6 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/log" ) type instruction struct { @@ -130,164 +124,6 @@ func (e *testExporter) ForceFlushN() int { return int(atomic.LoadInt32(e.forceFlushN)) } -func TestChunker(t *testing.T) { - t.Run("ZeroSize", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - c := newChunkExporter(exp, 0) - const size = 100 - _ = c.Export(context.Background(), make([]Record, size)) - - assert.Equal(t, 1, exp.ExportN()) - records := exp.Records() - assert.Len(t, records, 1) - assert.Len(t, records[0], size) - }) - - t.Run("ForceFlush", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - c := newChunkExporter(exp, 0) - _ = c.ForceFlush(context.Background()) - assert.Equal(t, 1, exp.ForceFlushN(), "ForceFlush not passed through") - }) - - t.Run("Shutdown", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - c := newChunkExporter(exp, 0) - _ = c.Shutdown(context.Background()) - assert.Equal(t, 1, exp.ShutdownN(), "Shutdown not passed through") - }) - - t.Run("Chunk", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - c := newChunkExporter(exp, 10) - assert.NoError(t, c.Export(context.Background(), make([]Record, 5))) - assert.NoError(t, c.Export(context.Background(), make([]Record, 25))) - - wantLens := []int{5, 10, 10, 5} - records := exp.Records() - require.Len(t, records, len(wantLens), "chunks") - for i, n := range wantLens { - assert.Lenf(t, records[i], n, "chunk %d", i) - } - }) - - t.Run("ExportError", func(t *testing.T) { - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - c := newChunkExporter(exp, 0) - ctx := context.Background() - records := make([]Record, 25) - err := c.Export(ctx, records) - assert.ErrorIs(t, err, assert.AnError, "no chunking") - - c = newChunkExporter(exp, 10) - err = c.Export(ctx, records) - assert.ErrorIs(t, err, assert.AnError, "with chunking") - }) -} - -func TestExportSync(t *testing.T) { - eventuallyDone := func(t *testing.T, done chan struct{}) { - assert.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, 2*time.Second, time.Microsecond) - } - - t.Run("ErrorHandler", func(t *testing.T) { - var got error - handler := otel.ErrorHandlerFunc(func(err error) { got = err }) - otel.SetErrorHandler(handler) - t.Cleanup(func() { - l := stdlog.New(io.Discard, "", stdlog.LstdFlags) - otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { - l.Print(err) - })) - }) - - in := make(chan exportData, 1) - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - done := exportSync(in, exp) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - - in <- exportData{ - ctx: context.Background(), - records: make([]Record, 1), - } - }() - - wg.Wait() - close(in) - eventuallyDone(t, done) - - assert.ErrorIs(t, got, assert.AnError, "error not passed to ErrorHandler") - }) - - t.Run("ConcurrentSafe", func(t *testing.T) { - in := make(chan exportData, 1) - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - done := exportSync(in, exp) - - const goRoutines = 10 - var wg sync.WaitGroup - wg.Add(goRoutines) - for i := 0; i < goRoutines; i++ { - go func(n int) { - defer wg.Done() - - var r Record - r.SetBody(log.IntValue(n)) - - resp := make(chan error, 1) - in <- exportData{ - ctx: context.Background(), - records: []Record{r}, - respCh: resp, - } - - assert.ErrorIs(t, <-resp, assert.AnError) - }(i) - } - - // Empty records should be ignored. - in <- exportData{ctx: context.Background()} - - wg.Wait() - - close(in) - eventuallyDone(t, done) - - assert.Equal(t, goRoutines, exp.ExportN(), "Export calls") - - want := make([]log.Value, goRoutines) - for i := range want { - want[i] = log.IntValue(i) - } - records := exp.Records() - got := make([]log.Value, len(records)) - for i := range got { - if assert.Len(t, records[i], 1, "number of records exported") { - got[i] = records[i][0].Body() - } - } - assert.ElementsMatch(t, want, got, "record bodies") - }) -} - func TestTimeoutExporter(t *testing.T) { t.Run("ZeroTimeout", func(t *testing.T) { exp := newTestExporter(nil) @@ -324,276 +160,3 @@ func TestTimeoutExporter(t *testing.T) { close(out) }) } - -func TestBufferExporter(t *testing.T) { - t.Run("ConcurrentSafe", func(t *testing.T) { - const goRoutines = 10 - - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, goRoutines) - - ctx := context.Background() - records := make([]Record, 10) - - stop := make(chan struct{}) - var wg sync.WaitGroup - for i := 0; i < goRoutines; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-stop: - return - default: - _ = e.EnqueueExport(records) - _ = e.Export(ctx, records) - _ = e.ForceFlush(ctx) - } - } - }() - } - - assert.Eventually(t, func() bool { - return exp.ExportN() > 0 - }, 2*time.Second, time.Microsecond) - - assert.NoError(t, e.Shutdown(ctx)) - close(stop) - wg.Wait() - }) - - t.Run("Shutdown", func(t *testing.T) { - t.Run("Multiple", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 1) - - assert.NoError(t, e.Shutdown(context.Background())) - assert.Equal(t, 1, exp.ShutdownN(), "first Shutdown") - - assert.NoError(t, e.Shutdown(context.Background())) - assert.Equal(t, 1, exp.ShutdownN(), "second Shutdown") - }) - - t.Run("ContextCancelled", func(t *testing.T) { - // Discard error logs. - defer func(orig otel.ErrorHandler) { - otel.SetErrorHandler(orig) - }(otel.GetErrorHandler()) - handler := otel.ErrorHandlerFunc(func(err error) {}) - otel.SetErrorHandler(handler) - - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - - trigger := make(chan struct{}) - exp.ExportTrigger = trigger - t.Cleanup(func() { close(trigger) }) - e := newBufferExporter(exp, 1) - - // Make sure there is something to flush. - require.True(t, e.EnqueueExport(make([]Record, 1))) - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - err := e.Shutdown(ctx) - assert.ErrorIs(t, err, context.Canceled) - assert.ErrorIs(t, err, assert.AnError) - }) - - t.Run("Error", func(t *testing.T) { - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - - e := newBufferExporter(exp, 1) - assert.ErrorIs(t, e.Shutdown(context.Background()), assert.AnError) - }) - }) - - t.Run("ForceFlush", func(t *testing.T) { - t.Run("Multiple", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 2) - - ctx := context.Background() - records := make([]Record, 1) - require.NoError(t, e.enqueue(ctx, records, nil), "enqueue") - - assert.NoError(t, e.ForceFlush(ctx), "ForceFlush records") - assert.Equal(t, 1, exp.ExportN(), "Export number incremented") - assert.Len(t, exp.Records(), 1, "exported Record batches") - - // Nothing to flush. - assert.NoError(t, e.ForceFlush(ctx), "ForceFlush empty") - assert.Equal(t, 1, exp.ExportN(), "Export number changed") - assert.Empty(t, exp.Records(), "exported non-zero Records") - }) - - t.Run("ContextCancelled", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - - trigger := make(chan struct{}) - exp.ExportTrigger = trigger - t.Cleanup(func() { close(trigger) }) - e := newBufferExporter(exp, 1) - - ctx, cancel := context.WithCancel(context.Background()) - require.True(t, e.EnqueueExport(make([]Record, 1))) - - got := make(chan error, 1) - go func() { got <- e.ForceFlush(ctx) }() - require.Eventually(t, func() bool { - return exp.ExportN() > 0 - }, 2*time.Second, time.Microsecond) - cancel() // Canceled before export response. - err := <-got - assert.ErrorIs(t, err, context.Canceled, "enqueued") - _ = e.Shutdown(ctx) - - // Zero length buffer - e = newBufferExporter(exp, 0) - assert.ErrorIs(t, e.ForceFlush(ctx), context.Canceled, "not enqueued") - }) - - t.Run("Error", func(t *testing.T) { - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - - e := newBufferExporter(exp, 1) - assert.ErrorIs(t, e.ForceFlush(context.Background()), assert.AnError) - }) - - t.Run("Stopped", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - - e := newBufferExporter(exp, 1) - - ctx := context.Background() - _ = e.Shutdown(ctx) - assert.NoError(t, e.ForceFlush(ctx)) - }) - }) - - t.Run("Export", func(t *testing.T) { - t.Run("ZeroRecords", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 1) - - assert.NoError(t, e.Export(context.Background(), nil)) - assert.Equal(t, 0, exp.ExportN()) - }) - - t.Run("Multiple", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 1) - - ctx := context.Background() - records := make([]Record, 1) - records[0].SetBody(log.BoolValue(true)) - - assert.NoError(t, e.Export(ctx, records)) - - n := exp.ExportN() - assert.Equal(t, 1, n, "first Export number") - assert.Equal(t, [][]Record{records}, exp.Records()) - - assert.NoError(t, e.Export(ctx, records)) - assert.Equal(t, n+1, exp.ExportN(), "second Export number") - assert.Equal(t, [][]Record{records}, exp.Records()) - }) - - t.Run("ContextCancelled", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - - trigger := make(chan struct{}) - exp.ExportTrigger = trigger - t.Cleanup(func() { close(trigger) }) - e := newBufferExporter(exp, 1) - - records := make([]Record, 1) - ctx, cancel := context.WithCancel(context.Background()) - - got := make(chan error, 1) - go func() { got <- e.Export(ctx, records) }() - require.Eventually(t, func() bool { - return exp.ExportN() > 0 - }, 2*time.Second, time.Microsecond) - cancel() // Canceled before export response. - err := <-got - assert.ErrorIs(t, err, context.Canceled, "enqueued") - _ = e.Shutdown(ctx) - - // Zero length buffer - e = newBufferExporter(exp, 0) - assert.ErrorIs(t, e.Export(ctx, records), context.Canceled, "not enqueued") - }) - - t.Run("Error", func(t *testing.T) { - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - - e := newBufferExporter(exp, 1) - ctx, records := context.Background(), make([]Record, 1) - assert.ErrorIs(t, e.Export(ctx, records), assert.AnError) - }) - - t.Run("Stopped", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - - e := newBufferExporter(exp, 1) - - ctx := context.Background() - _ = e.Shutdown(ctx) - assert.NoError(t, e.Export(ctx, make([]Record, 1))) - assert.Equal(t, 0, exp.ExportN(), "Export called") - }) - }) - - t.Run("EnqueueExport", func(t *testing.T) { - t.Run("ZeroRecords", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 1) - - assert.True(t, e.EnqueueExport(nil)) - e.ForceFlush(context.Background()) - assert.Equal(t, 0, exp.ExportN(), "empty batch enqueued") - }) - - t.Run("Multiple", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 2) - - records := make([]Record, 1) - records[0].SetBody(log.BoolValue(true)) - - assert.True(t, e.EnqueueExport(records)) - assert.True(t, e.EnqueueExport(records)) - e.ForceFlush(context.Background()) - - n := exp.ExportN() - assert.Equal(t, 2, n, "Export number") - assert.Equal(t, [][]Record{records, records}, exp.Records()) - }) - - t.Run("Stopped", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 1) - - _ = e.Shutdown(context.Background()) - assert.True(t, e.EnqueueExport(make([]Record, 1))) - }) - }) -} diff --git a/sdk/log/queue.go b/sdk/log/queue.go new file mode 100644 index 00000000000..16ce9d836c6 --- /dev/null +++ b/sdk/log/queue.go @@ -0,0 +1,115 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log // import "go.opentelemetry.io/otel/sdk/log" + +import ( + "sync" + "sync/atomic" +) + +// queue holds a queue of logging records. +// When the queue becomes full, the oldest records in the queue are +// overwritten. +type queue struct { + sync.Mutex + + dropped atomic.Uint64 + cap, len int + read, write *ring +} + +func newQueue(size int) *queue { + r := newRing(size) + return &queue{ + cap: size, + read: r, + write: r, + } +} + +// Dropped returns the number of Records dropped during enqueueing since the +// last time Dropped was called. +func (q *queue) Dropped() uint64 { + return q.dropped.Swap(0) +} + +// Enqueue adds r to the queue. The queue size, including the addition of r, is +// returned. +// +// If enqueueing r will exceed the capacity of q, the oldest Record held in q +// will be dropped and r retained. +func (q *queue) Enqueue(r Record) int { + q.Lock() + defer q.Unlock() + + q.write.Value = r + q.write = q.write.Next() + + q.len++ + if q.len > q.cap { + // Overflow. Advance read to be the new "oldest". + q.len = q.cap + q.read = q.read.Next() + q.dropped.Add(1) + } + return q.len +} + +// TryDequeue attempts to dequeue up to len(buf) Records. The available Records +// will be assigned into buf and passed to write. If write fails, returning +// false, the Records will not be removed from the queue. If write succeeds, +// returning true, the dequeued Records are removed from the queue. The number +// of Records remaining in the queue are returned. +// +// When write is called the lock of q is held. The write function must not call +// other methods of this q that acquire the lock. +func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int { + q.Lock() + defer q.Unlock() + + origRead := q.read + + n := min(len(buf), q.len) + for i := 0; i < n; i++ { + buf[i] = q.read.Value + q.read = q.read.Next() + } + + if write(buf[:n]) { + q.len -= n + } else { + q.read = origRead + } + return q.len +} + +func (q *queue) Dequeue(buf []Record) (queueLen, written int) { + q.Lock() + defer q.Unlock() + + n := min(len(buf), q.len) + for i := 0; i < n; i++ { + buf[i] = q.read.Value + q.read = q.read.Next() + } + + q.len -= n + return q.len, n +} + +// Flush returns all the Records held in the queue and resets it to be +// empty. +func (q *queue) Flush() []Record { + q.Lock() + defer q.Unlock() + + out := make([]Record, q.len) + for i := range out { + out[i] = q.read.Value + q.read = q.read.Next() + } + q.len = 0 + + return out +} diff --git a/sdk/log/queue_test.go b/sdk/log/queue_test.go new file mode 100644 index 00000000000..7552353eb42 --- /dev/null +++ b/sdk/log/queue_test.go @@ -0,0 +1,136 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log // import "go.opentelemetry.io/otel/sdk/log" + +import ( + "slices" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/log" +) + +func TestQueue(t *testing.T) { + var r Record + r.SetBody(log.BoolValue(true)) + + t.Run("newQueue", func(t *testing.T) { + const size = 1 + q := newQueue(size) + assert.Equal(t, 0, q.len) + assert.Equal(t, size, q.cap, "capacity") + assert.Equal(t, size, q.read.Len(), "read ring") + assert.Same(t, q.read, q.write, "different rings") + }) + + t.Run("Enqueue", func(t *testing.T) { + const size = 2 + q := newQueue(size) + + var notR Record + notR.SetBody(log.IntValue(10)) + + assert.Equal(t, 1, q.Enqueue(notR), "incomplete batch") + assert.Equal(t, 1, q.len, "length") + assert.Equal(t, size, q.cap, "capacity") + + assert.Equal(t, 2, q.Enqueue(r), "complete batch") + assert.Equal(t, 2, q.len, "length") + assert.Equal(t, size, q.cap, "capacity") + + assert.Equal(t, 2, q.Enqueue(r), "overflow batch") + assert.Equal(t, 2, q.len, "length") + assert.Equal(t, size, q.cap, "capacity") + + assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records") + }) + + t.Run("Dropped", func(t *testing.T) { + q := newQueue(1) + + _ = q.Enqueue(r) + _ = q.Enqueue(r) + assert.Equal(t, uint64(1), q.Dropped(), "fist") + + _ = q.Enqueue(r) + _ = q.Enqueue(r) + assert.Equal(t, uint64(2), q.Dropped(), "second") + }) + + t.Run("Flush", func(t *testing.T) { + const size = 2 + q := newQueue(size) + q.write.Value = r + q.write = q.write.Next() + q.len = 1 + + assert.Equal(t, []Record{r}, q.Flush(), "flushed") + }) + + t.Run("TryFlush", func(t *testing.T) { + const size = 3 + q := newQueue(size) + for i := 0; i < size-1; i++ { + q.write.Value = r + q.write = q.write.Next() + q.len++ + } + + buf := make([]Record, 1) + f := func([]Record) bool { return false } + assert.Equal(t, size-1, q.TryDequeue(buf, f), "not flushed") + require.Equal(t, size-1, q.len, "length") + require.NotSame(t, q.read, q.write, "read ring advanced") + + var flushed []Record + f = func(r []Record) bool { + flushed = append(flushed, r...) + return true + } + if assert.Equal(t, size-2, q.TryDequeue(buf, f), "did not flush len(buf)") { + assert.Equal(t, []Record{r}, flushed, "Records") + } + + buf = slices.Grow(buf, size) + flushed = flushed[:0] + if assert.Equal(t, 0, q.TryDequeue(buf, f), "did not flush len(queue)") { + assert.Equal(t, []Record{r}, flushed, "Records") + } + }) + + t.Run("ConcurrentSafe", func(t *testing.T) { + const goRoutines = 10 + + flushed := make(chan []Record, goRoutines) + out := make([]Record, 0, goRoutines) + done := make(chan struct{}) + go func() { + defer close(done) + for recs := range flushed { + out = append(out, recs...) + } + }() + + var wg sync.WaitGroup + wg.Add(goRoutines) + + b := newQueue(goRoutines) + for i := 0; i < goRoutines; i++ { + go func() { + defer wg.Done() + b.Enqueue(Record{}) + flushed <- b.Flush() + }() + } + + wg.Wait() + close(flushed) + <-done + + assert.Len(t, out, goRoutines, "flushed Records") + }) +}