Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move log.Processor.Enabled to independent FilterProcessor interfaced type #5692

Merged
merged 25 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3521db9
Add FilterProcessor
MrAlias Aug 7, 2024
8727522
Update the LoggerProvider and Logger
MrAlias Aug 7, 2024
38abe56
Update the ContextFilterProcessor example
MrAlias Aug 7, 2024
f0ba53f
Add changelog entry
MrAlias Aug 7, 2024
4031e03
Add BenchmarkLoggerEnabled
MrAlias Aug 7, 2024
53876b0
Update provider and logger tests
MrAlias Aug 7, 2024
2a3fa43
Mention FilterProcessor from the Processor docs
MrAlias Aug 8, 2024
b3f073f
Remove Enabled method from BatchProcessor
MrAlias Aug 8, 2024
6c9ba9b
Remove Enabled method from SimpleProcessor
MrAlias Aug 8, 2024
6fc46a0
Add sdk/log/internal/x pacakge
MrAlias Aug 8, 2024
9555d9f
Doc experimental features in sdk/log pkg
MrAlias Aug 8, 2024
2271040
Replace FilterProcessor with x.FilterProcessor
MrAlias Aug 8, 2024
d4e9e0f
Fix import comment for sdk/log/internal/x
MrAlias Aug 8, 2024
a8ee3b4
Update changelog
MrAlias Aug 8, 2024
51ad24f
Update changelog with Enabled method removals
MrAlias Aug 8, 2024
0535da8
Add minsev example to x README
MrAlias Aug 8, 2024
ba537c4
Apply suggestions from code review
MrAlias Aug 9, 2024
ecd21f8
Clarify the SDK Logger.Enabled behavior
MrAlias Aug 9, 2024
7079a4d
Merge branch 'main' into FilterProcessor
MrAlias Aug 9, 2024
98978ad
Merge branch 'main' into FilterProcessor
XSAM Aug 15, 2024
67deb25
Merge branch 'main' into FilterProcessor
XSAM Aug 15, 2024
3faedf8
Merge branch 'main' into FilterProcessor
MrAlias Aug 19, 2024
e308557
Merge branch 'main' into FilterProcessor
MrAlias Aug 19, 2024
844fe0c
Merge branch 'main' into FilterProcessor
MrAlias Aug 20, 2024
ec06ff8
Merge branch 'main' into FilterProcessor
MrAlias Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ The next release will require at least [Go 1.22].
- Zero value of `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` no longer panics. (#5665)
- Add `Walk` function to `TraceState` in `go.opentelemetry.io/otel/trace` to iterate all the key-value pairs. (#5651)
- Bridge the trace state in `go.opentelemetry.io/otel/bridge/opencensus`. (#5651)
- The `FilterProcessor` interface type is added in `go.opentelemetry.io/otel/sdk/log/internal/x`.
This is an optional and experimental interface that log `Processor`s can implement to instruct the `Logger` if a `Record` will be processed or not.
It replaces the existing `Enabled` method that is removed from the `Processor` interface itself.
It does not fall within the scope of the OpenTelemetry Go versioning and stability [policy](./VERSIONING.md) and it may be changed in backwards incompatible ways or removed in feature releases. (#5692)
- Support [Go 1.23]. (#5720)

### Changed
Expand All @@ -30,6 +34,8 @@ The next release will require at least [Go 1.22].
- `SimpleProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log` now returns `false` if the exporter is `nil`. (#5665)
- Update the concurrency requirements of `Exporter` in `go.opentelemetry.io/otel/sdk/log`. (#5666)
- `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` synchronizes `OnEmit` calls. (#5666)
- The `Processor` interface in `go.opentelemetry.io/otel/sdk/log` no longer includes the `Enabled` method.
See the `FilterProcessor` interface type added in `go.opentelemetry.io/otel/sdk/log/internal/x` to continue providing this functionality. (#5692)
- The `SimpleProcessor` type in `go.opentelemetry.io/otel/sdk/log` is no longer comparable. (#5693)
- The `BatchProcessor` type in `go.opentelemetry.io/otel/sdk/log` is no longer comparable. (#5693)
- `NewMemberRaw`, `NewKeyProperty` and `NewKeyValuePropertyRaw` in `go.opentelemetry.io/otel/baggage` allow UTF-8 string in key. (#5132)
Expand All @@ -50,6 +56,11 @@ The next release will require at least [Go 1.22].
- Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` and `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#5641)
- Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#5650)

### Removed

- The `Enabled` method of the `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` is removed. (#5692)
- The `Enabled` method of the `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` is removed. (#5692)

<!-- Released section -->
<!-- Don't change this section unless doing release -->

Expand Down
5 changes: 0 additions & 5 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,6 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error {
return nil
}

// Enabled returns if b is enabled.
func (b *BatchProcessor) Enabled(context.Context, Record) bool {
return !b.stopped.Load() && b.q != nil
}

// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchProcessor) Shutdown(ctx context.Context) error {
if b.stopped.Swap(true) || b.q == nil {
Expand Down
9 changes: 0 additions & 9 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func TestEmptyBatchConfig(t *testing.T) {
ctx := context.Background()
record := new(Record)
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, *record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
})
Expand Down Expand Up @@ -270,14 +269,6 @@ func TestBatchProcessor(t *testing.T) {
assert.Equal(t, 3, e.ExportN())
})

t.Run("Enabled", func(t *testing.T) {
b := NewBatchProcessor(defaultNoopExporter)
assert.True(t, b.Enabled(ctx, Record{}))

_ = b.Shutdown(ctx)
assert.False(t, b.Enabled(ctx, Record{}))
})

t.Run("Shutdown", func(t *testing.T) {
t.Run("Error", func(t *testing.T) {
e := newTestExporter(assert.AnError)
Expand Down
3 changes: 3 additions & 0 deletions sdk/log/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@ at a single endpoint their origin is decipherable.

See [go.opentelemetry.io/otel/log] for more information about
the OpenTelemetry Logs Bridge API.

See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about the
experimental features.
*/
package log // import "go.opentelemetry.io/otel/sdk/log"
19 changes: 17 additions & 2 deletions sdk/log/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"strings"
"sync"

logapi "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/global"
Expand Down Expand Up @@ -58,7 +59,7 @@ func ExampleProcessor_filtering() {
// Wrap the processor so that it ignores processing log records
// when a context deriving from WithIgnoreLogs is passed
// to the logging methods.
processor = &ContextFilterProcessor{processor}
processor = &ContextFilterProcessor{Processor: processor}

// The created processor can then be registered with
// the OpenTelemetry Logs SDK using the WithProcessor option.
Expand All @@ -81,6 +82,15 @@ func WithIgnoreLogs(ctx context.Context) context.Context {
// [WithIgnoreLogs] is passed to its methods.
type ContextFilterProcessor struct {
log.Processor

lazyFilter sync.Once
// Use the experimental FilterProcessor interface
// (go.opentelemetry.io/otel/sdk/log/internal/x).
filter filter
}

type filter interface {
Enabled(ctx context.Context, record log.Record) bool
}

func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error {
Expand All @@ -91,7 +101,12 @@ func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record)
}

func (p *ContextFilterProcessor) Enabled(ctx context.Context, record log.Record) bool {
return !ignoreLogs(ctx) && p.Processor.Enabled(ctx, record)
p.lazyFilter.Do(func() {
if f, ok := p.Processor.(filter); ok {
p.filter = f
}
})
return !ignoreLogs(ctx) && (p.filter == nil || p.filter.Enabled(ctx, record))
}

func ignoreLogs(ctx context.Context) bool {
Expand Down
35 changes: 35 additions & 0 deletions sdk/log/internal/x/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Experimental Features

The Logs SDK contains features that have not yet stabilized.
These features are added to the OpenTelemetry Go Logs SDK prior to stabilization so that users can start experimenting with them and provide feedback.

These feature may change in backwards incompatible ways as feedback is applied.
See the [Compatibility and Stability](#compatibility-and-stability) section for more information.

## Features

- [Filter Processors](#filter-processor)

### Filter Processor

Users of logging libraries often want to know if a log `Record` will be processed or dropped before they perform complex operations to construct the `Record`.
The [`Logger`] in the Logs Bridge API provides the `Enabled` method for just this use-case.
In order for the Logs Bridge SDK to effectively implement this API, it needs to be known if the registered [`Processor`]s are enabled for the `Record` within a context.
A [`Processor`] that knows, and can identify, what `Record` it will process or drop when it is passed to `OnEmit` can communicate this to the SDK `Logger` by implementing the `FilterProcessor`.
pellared marked this conversation as resolved.
Show resolved Hide resolved

By default, the SDK `Logger.Enabled` will return true when called.
Only if all the registered [`Processor`]s implement `FilterProcessor` and they all return `false` will `Logger.Enabled` return `false`.

See the [`minsev`] [`Processor`] for an example use-case.
It is used to filter `Record`s out that a have a `Severity` below a threshold.

[`Logger`]: https://pkg.go.dev/go.opentelemetry.io/otel/log#Logger
[`Processor`]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#Processor
[`minsev`]: https://pkg.go.dev/go.opentelemetry.io/contrib/processors/minsev

## Compatibility and Stability

Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../VERSIONING.md).
These features may be removed or modified in successive version releases, including patch versions.

When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release.
46 changes: 46 additions & 0 deletions sdk/log/internal/x/x.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package x contains support for Logs SDK experimental features.
package x // import "go.opentelemetry.io/otel/sdk/log/internal/x"
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"

"go.opentelemetry.io/otel/log"
)

// FilterProcessor is a [Processor] that knows, and can identify, what
// [log.Record] it will process or drop when it is passed to OnEmit.
//
// This is useful for users of logging libraries that want to know if a [log.Record]
// will be processed or dropped before they perform complex operations to
// construct the [log.Record].
//
// [Processor] implementations that choose to support this by satisfying this
// interface are expected to re-evaluate the [log.Record]s passed to OnEmit, it is
// not expected that the caller to OnEmit will use the functionality from this
// interface prior to calling OnEmit.
//
// This should only be implemented for [Processor]s that can make reliable
// enough determination of this prior to processing a [log.Record] and where
// the result is dynamic.
//
// [Processor]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#Processor
type FilterProcessor interface {
// Enabled returns whether the Processor will process for the given context
// and record.
//
// The passed record is likely to be a partial record with only the
// bridge-relevant information being provided (e.g a record with only the
// Severity set). If a Logger needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Processor will process for the
// provided context and record, and will be false if the Processor will not
// process. An implementation should default to returning true for an
// indeterminate state.
//
// Implementations should not modify the record.
Enabled(ctx context.Context, record log.Record) bool
}
26 changes: 22 additions & 4 deletions sdk/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -42,13 +43,30 @@ func (l *logger) Emit(ctx context.Context, r log.Record) {
}
}

func (l *logger) Enabled(ctx context.Context, r log.Record) bool {
newRecord := l.newRecord(ctx, r)
for _, p := range l.provider.processors {
if enabled := p.Enabled(ctx, newRecord); enabled {
// Enabled returns true if at least one Processor held by the LoggerProvider
dashpole marked this conversation as resolved.
Show resolved Hide resolved
// that created the logger will process the record for the provided context.
//
// If it is not possible to definitively determine the record will be
// processed, true will be returned by default. A value of false will only be
// returned if it can be positively verified that no Processor will process the
// record.
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
func (l *logger) Enabled(ctx context.Context, record log.Record) bool {
fltrs := l.provider.filterProcessors()
// If there are more Processors than FilterProcessors we cannot be sure
// that all Processors will drop the record. Therefore, return true.
//
// If all Processors are FilterProcessors, check if any is enabled.
return len(l.provider.processors) > len(fltrs) || anyEnabled(ctx, record, fltrs)
}

func anyEnabled(ctx context.Context, r log.Record, fltrs []x.FilterProcessor) bool {
for _, f := range fltrs {
if f.Enabled(ctx, r) {
// At least one Processor will process the Record.
return true
}
}
// No Processor will process the record
return false
}

Expand Down
26 changes: 24 additions & 2 deletions sdk/log/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ func TestLoggerEmit(t *testing.T) {
}

func TestLoggerEnabled(t *testing.T) {
p0, p1, p2WithDisabled := newProcessor("0"), newProcessor("1"), newProcessor("2")
p2WithDisabled.enabled = false
p0 := newFltrProcessor("0", true)
p1 := newFltrProcessor("1", true)
p2WithDisabled := newFltrProcessor("2", false)

testCases := []struct {
name string
Expand Down Expand Up @@ -273,3 +274,24 @@ func TestLoggerEnabled(t *testing.T) {
})
}
}

func BenchmarkLoggerEnabled(b *testing.B) {
provider := NewLoggerProvider(
WithProcessor(newFltrProcessor("0", false)),
WithProcessor(newFltrProcessor("1", true)),
)
logger := provider.Logger("BenchmarkLoggerEnabled")
ctx, r := context.Background(), log.Record{}
r.SetSeverityText("test")

var enabled bool

b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
enabled = logger.Enabled(ctx, r)
}

_ = enabled
}
24 changes: 3 additions & 21 deletions sdk/log/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
// Any of the Processor's methods may be called concurrently with itself
// or with other methods. It is the responsibility of the Processor to manage
// this concurrency.
//
// See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about how
// a Processor can be extended to support experimental features.
type Processor interface {
// OnEmit is called when a Record is emitted.
//
Expand All @@ -35,27 +38,6 @@ type Processor interface {
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record *Record) error

// Enabled returns whether the Processor will process for the given context
// and record.
//
// The passed record is likely to be a partial record with only the
// bridge-relevant information being provided (e.g a record with only the
// Severity set). If a Logger needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Processor will process for the
// provided context and record, and will be false if the Processor will not
// process. The returned value may be true or false in an indeterminate
// state. An implementation should default to returning true for an
// indeterminate state, but may return false if valid reasons in particular
// circumstances exist (e.g. performance, correctness).
//
// The SDK invokes the processors sequentially in the same order as
// they were registered using [WithProcessor] until any processor returns true.
//
// Implementations should not modify the record.
Enabled(ctx context.Context, record Record) bool

// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
//
Expand Down
15 changes: 15 additions & 0 deletions sdk/log/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/log/noop"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand Down Expand Up @@ -65,6 +66,9 @@ type LoggerProvider struct {
attributeCountLimit int
attributeValueLengthLimit int

fltrProcessorsOnce sync.Once
fltrProcessors []x.FilterProcessor

loggersMu sync.Mutex
loggers map[instrumentation.Scope]*logger

Expand Down Expand Up @@ -92,6 +96,17 @@ func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider {
}
}

func (p *LoggerProvider) filterProcessors() []x.FilterProcessor {
p.fltrProcessorsOnce.Do(func() {
for _, proc := range p.processors {
if f, ok := proc.(x.FilterProcessor); ok {
p.fltrProcessors = append(p.fltrProcessors, f)
}
}
})
return p.fltrProcessors
}

// Logger returns a new [log.Logger] with the provided name and configuration.
//
// If p is shut down, a [noop.Logger] instance is returned.
Expand Down
Loading
Loading