Skip to content

Commit

Permalink
Merge branch 'main' into add-link
Browse files Browse the repository at this point in the history
  • Loading branch information
XSAM authored Mar 28, 2024
2 parents 9f0364c + 321219b commit 8e429fd
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 17 deletions.
8 changes: 0 additions & 8 deletions log/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -605,14 +605,6 @@ However, in this approach we would need have factory functions for both types.
It would make the API surface unnecessarily big,
and we may even have problems naming the functions.
## Open issues
The Logs Bridge API MUST NOT be released as stable
before all issues below are closed:
- [Clarify attributes parameter type of Get a Logger operation](https://github.com/open-telemetry/opentelemetry-specification/issues/3841)
- [Add an Enabled method to Logger](https://github.com/open-telemetry/opentelemetry-specification/issues/3917)
[^1]: [Handle structured body and attributes](https://github.com/pellared/opentelemetry-go/pull/7)
[^2]: Jonathan Amsterdam, [The Go Blog: Structured Logging with slog](https://go.dev/blog/slog)
[^3]: Jonathan Amsterdam, [GopherCon Europe 2023: A Fast Structured Logging Package](https://www.youtube.com/watch?v=tC4Jt3i62ns)
Expand Down
9 changes: 0 additions & 9 deletions sdk/log/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,6 @@ provided via API.
Moreover it is safer to have these abstraction decoupled.
E.g. there can be a need for some fields that can be set via API and cannot be modified by the processors.

## Open issues

The Logs SDK MUST NOT be released as stable before all issues below are closed:

- [Clarify that ReadableLogRecord and ReadWriteLogRecord can be represented using a single type](https://github.com/open-telemetry/opentelemetry-specification/pull/3898)
- [Fix what can be modified via ReadWriteLogRecord](https://github.com/open-telemetry/opentelemetry-specification/pull/3907)
- [logs: Allow duplicate keys](https://github.com/open-telemetry/opentelemetry-specification/issues/3931)
- [Add an Enabled method to Logger](https://github.com/open-telemetry/opentelemetry-specification/issues/3917)

[^1]: [OpenTelemetry Logging](https://opentelemetry.io/docs/specs/otel/logs)
[^2]: [Conversation on representing LogRecordProcessor and LogRecordExporter via a single Expoter interface](https://github.com/open-telemetry/opentelemetry-go/pull/4954#discussion_r1515050480)
[^3]: [Introduce Processor](https://github.com/pellared/opentelemetry-go/pull/9)
50 changes: 50 additions & 0 deletions sdk/log/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"context"

"go.opentelemetry.io/otel"
)

// Exporter handles the delivery of log records to external receivers.
Expand Down Expand Up @@ -50,3 +52,51 @@ func (noopExporter) Export(context.Context, []Record) error { return nil }
func (noopExporter) Shutdown(context.Context) error { return nil }

func (noopExporter) ForceFlush(context.Context) error { return nil }

// exportSync exports all data from input using exporter in a spawned
// goroutine. The returned chan will be closed when the spawned goroutine
// completes.
func exportSync(input <-chan exportData, exporter Exporter) (done chan struct{}) {
done = make(chan struct{})
go func() {
defer close(done)
for data := range input {
data.DoExport(exporter.Export)
}
}()
return done
}

// exportData is data related to an export.
type exportData struct {
ctx context.Context
records []Record

// respCh is the channel any error returned from the export will be sent
// on. If this is nil, and the export error is non-nil, the error will
// passed to the OTel error handler.
respCh chan<- error
}

// DoExport calls exportFn with the data contained in e. The error response
// will be returned on e's respCh if not nil. The error will be handled by the
// default OTel error handle if it is not nil and respCh is nil or full.
func (e exportData) DoExport(exportFn func(context.Context, []Record) error) {
if len(e.records) == 0 {
e.respond(nil)
return
}

e.respond(exportFn(e.ctx, e.records))
}

func (e exportData) respond(err error) {
select {
case e.respCh <- err:
default:
// e.respCh is nil or busy, default to otel.Handler.
if err != nil {
otel.Handle(err)
}
}
}
198 changes: 198 additions & 0 deletions sdk/log/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package log

import (
"context"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/log"
)

type instruction struct {
Record *[]Record
Flush chan [][]Record
}

type testExporter struct {
// Err is the error returned by all methods of the testExporter.
Err error

// Counts of method calls.
exportN, shutdownN, forceFlushN *int32

input chan instruction
done chan struct{}
}

func newTestExporter(err error) *testExporter {
e := &testExporter{
Err: err,
exportN: new(int32),
shutdownN: new(int32),
forceFlushN: new(int32),
input: make(chan instruction),
}
e.done = run(e.input)

return e
}

func run(input chan instruction) chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)

var records [][]Record
for in := range input {
if in.Record != nil {
records = append(records, *in.Record)
}
if in.Flush != nil {
cp := slices.Clone(records)
records = records[:0]
in.Flush <- cp
}
}
}()
return done
}

func (e *testExporter) Records() [][]Record {
out := make(chan [][]Record, 1)
e.input <- instruction{Flush: out}
return <-out
}

func (e *testExporter) Export(ctx context.Context, r []Record) error {
atomic.AddInt32(e.exportN, 1)
e.input <- instruction{Record: &r}
return e.Err
}

func (e *testExporter) ExportN() int {
return int(atomic.LoadInt32(e.exportN))
}

func (e *testExporter) Stop() {
close(e.input)
<-e.done
}

func (e *testExporter) Shutdown(ctx context.Context) error {
atomic.AddInt32(e.shutdownN, 1)
return e.Err
}

func (e *testExporter) ShutdownN() int {
return int(atomic.LoadInt32(e.shutdownN))
}

func (e *testExporter) ForceFlush(ctx context.Context) error {
atomic.AddInt32(e.forceFlushN, 1)
return e.Err
}

func (e *testExporter) ForceFlushN() int {
return int(atomic.LoadInt32(e.forceFlushN))
}

func TestExportSync(t *testing.T) {
eventuallyDone := func(t *testing.T, done chan struct{}) {
assert.Eventually(t, func() bool {
select {
case <-done:
return true
default:
return false
}
}, 2*time.Second, time.Microsecond)
}

t.Run("ErrorHandler", func(t *testing.T) {
var got error
handler := otel.ErrorHandlerFunc(func(err error) { got = err })
otel.SetErrorHandler(handler)

in := make(chan exportData, 1)
exp := newTestExporter(assert.AnError)
t.Cleanup(exp.Stop)
done := exportSync(in, exp)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

in <- exportData{
ctx: context.Background(),
records: make([]Record, 1),
}
}()

wg.Wait()
close(in)
eventuallyDone(t, done)

assert.ErrorIs(t, got, assert.AnError, "error not passed to ErrorHandler")
})

t.Run("ConcurrentSafe", func(t *testing.T) {
in := make(chan exportData, 1)
exp := newTestExporter(assert.AnError)
t.Cleanup(exp.Stop)
done := exportSync(in, exp)

const goRoutines = 10
var wg sync.WaitGroup
wg.Add(goRoutines)
for i := 0; i < goRoutines; i++ {
go func(n int) {
defer wg.Done()

var r Record
r.SetBody(log.IntValue(n))

resp := make(chan error, 1)
in <- exportData{
ctx: context.Background(),
records: []Record{r},
respCh: resp,
}

assert.ErrorIs(t, <-resp, assert.AnError)
}(i)
}

// Empty records should be ignored.
in <- exportData{ctx: context.Background()}

wg.Wait()

close(in)
eventuallyDone(t, done)

assert.Equal(t, goRoutines, exp.ExportN(), "Export calls")

want := make([]log.Value, goRoutines)
for i := range want {
want[i] = log.IntValue(i)
}
records := exp.Records()
got := make([]log.Value, len(records))
for i := range got {
if assert.Len(t, records[i], 1, "number of records exported") {
got[i] = records[i][0].Body()
}
}
assert.ElementsMatch(t, want, got, "record bodies")
})
}

0 comments on commit 8e429fd

Please sign in to comment.