Skip to content

Commit

Permalink
sdk/log: SimpleProcessor synchronizes OnEmit calls (#5666)
Browse files Browse the repository at this point in the history
From
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/sdk.md#export:

> `Export` will never be called concurrently for the same exporter
instance.

From our SDK perspective this change will make
https://pkg.go.dev/go.opentelemetry.io/otel/exporters/stdout/stdoutlog
concurrent-safe when used with the simple processor. Before the change,
there can be multiple goroutines calling `Write` in parallel (via
`json.Encoder.Encode`).

### Remarks

[Maybe we should simply state that "whole" exporter implementation does
not need to be
concurrent-safe?](open-telemetry/opentelemetry-specification#4173 (comment))
However:
1. we would need to make complex changes in `BatchExporter` to
synchronize the `Export`, `Shutdown`, `ForceFlush` calls
2. we would need to update all exporters (remove synchronization) and
simple exporter (add locks to `Shutdown`, `ForceFlush`)
3. I am 100% not sure if this would be compliant with the specification
- I think it would be complaint because we would simply give stronger
safety-measures


We should probably discuss it separately, but I wanted to highlight my
though process.
Even if we decide that simple and batch processors to synchronize all
calls then I would prefer to address it in a separate PR.

Related spec clarification PR:
-
open-telemetry/opentelemetry-specification#4173
  • Loading branch information
pellared authored Aug 9, 2024
1 parent a5d1ec0 commit 69e2358
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- `Processor.OnEmit` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636)
- `SimpleProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log` now returns `false` if the exporter is `nil`. (#5665)
- Update the concurrency requirements of `Exporter` in `go.opentelemetry.io/otel/sdk/log`. (#5666)
- `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` synchronizes `OnEmit` calls. (#5666)

### Fixed

Expand Down
11 changes: 7 additions & 4 deletions sdk/log/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ import (
)

// Exporter handles the delivery of log records to external receivers.
//
// Any of the Exporter's methods may be called concurrently with itself
// or with other methods. It is the responsibility of the Exporter to manage
// this concurrency.
type Exporter interface {
// Export transmits log records to a receiver.
//
Expand All @@ -34,6 +30,9 @@ type Exporter interface {
//
// Before modifying a Record, the implementation must use Record.Clone
// to create a copy that shares no state with the original.
//
// Export should never be called concurrently with other Export calls.
// However, it may be called concurrently with other methods.
Export(ctx context.Context, records []Record) error

// Shutdown is called when the SDK shuts down. Any cleanup or release of
Expand All @@ -44,13 +43,17 @@ type Exporter interface {
//
// After Shutdown is called, calls to Export, Shutdown, or ForceFlush
// should perform no operation and return nil error.
//
// Shutdown may be called concurrently with itself or with other methods.
Shutdown(ctx context.Context) error

// ForceFlush exports log records to the configured Exporter that have not yet
// been exported.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// ForceFlush may be called concurrently with itself or with other methods.
ForceFlush(ctx context.Context) error
}

Expand Down
4 changes: 4 additions & 0 deletions sdk/log/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var _ Processor = (*SimpleProcessor)(nil)
//
// Use [NewSimpleProcessor] to create a SimpleProcessor.
type SimpleProcessor struct {
mu sync.Mutex
exporter Exporter
}

Expand Down Expand Up @@ -43,6 +44,9 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
return nil
}

s.mu.Lock()
defer s.mu.Unlock()

records := simpleProcRecordsPool.Get().(*[]Record)
(*records)[0] = *r
defer func() {
Expand Down
24 changes: 23 additions & 1 deletion sdk/log/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package log_test

import (
"context"
"io"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -70,6 +72,25 @@ func TestSimpleProcessorForceFlush(t *testing.T) {
require.True(t, e.forceFlushCalled, "exporter ForceFlush not called")
}

type writerExporter struct {
io.Writer
}

func (e *writerExporter) Export(_ context.Context, records []log.Record) error {
for _, r := range records {
_, _ = io.WriteString(e.Writer, r.Body().String())
}
return nil
}

func (e *writerExporter) Shutdown(context.Context) error {
return nil
}

func (e *writerExporter) ForceFlush(context.Context) error {
return nil
}

func TestSimpleProcessorEmpty(t *testing.T) {
assert.NotPanics(t, func() {
var s log.SimpleProcessor
Expand All @@ -91,7 +112,8 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) {
r := new(log.Record)
r.SetSeverityText("test")
ctx := context.Background()
s := log.NewSimpleProcessor(nil)
e := &writerExporter{new(strings.Builder)}
s := log.NewSimpleProcessor(e)
for i := 0; i < goRoutineN; i++ {
go func() {
defer wg.Done()
Expand Down

0 comments on commit 69e2358

Please sign in to comment.