Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk/log: Add FilterProcessor and EnabledParameters #6317

Merged
merged 34 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
96a79ee
mv sdk/log/internal/x sdk/xlog
pellared Feb 5, 2025
6282e71
Introduce xlog module with FilterProcessor and EnabledParameters
pellared Feb 5, 2025
69ff66d
Update changelog
pellared Feb 5, 2025
bf61d44
Add trace context fields
pellared Feb 5, 2025
5b3c8ca
Test FilterProcessor functionality
pellared Feb 5, 2025
f24fc64
Update changelog
pellared Feb 5, 2025
fffc077
go mod tidy
pellared Feb 5, 2025
55adcc4
Merge branch 'main' into sdk/xlogs
pellared Feb 5, 2025
008cf4d
Merge branch 'main' into sdk/xlogs
pellared Feb 6, 2025
4df6669
Remove span context fields from EnabledParameters
pellared Feb 6, 2025
d31f329
Simplify doc comments
pellared Feb 6, 2025
fe5ae63
go mod tidy
pellared Feb 6, 2025
4e54827
Move sdk/xlog to sdk/log/xlog
pellared Feb 6, 2025
abe7833
Fix hyperlink
pellared Feb 6, 2025
d5817c6
Update README.md
pellared Feb 6, 2025
c23714b
Merge branch 'main' into sdk/xlogs
pellared Feb 6, 2025
cb91d27
Merge branch 'main' into sdk/xlogs
pellared Feb 7, 2025
71376d1
Update sdk/log/example_test.go
pellared Feb 7, 2025
e6b34ae
Update sdk/log/logger.go
pellared Feb 7, 2025
4ff41e1
Format comment
pellared Feb 7, 2025
628f916
Add missing period
pellared Feb 7, 2025
98d6811
Do not release xlog
pellared Feb 7, 2025
5cf1eaf
Improve Go doc comments
pellared Feb 7, 2025
6378ea7
Merge branch 'main' into sdk/xlogs
pellared Feb 13, 2025
46ba89f
Update CHANGELOG.md
pellared Feb 13, 2025
c8deb2f
Promote FilterProcessor and EnabledParameters to sdk/log
pellared Feb 13, 2025
c1a5ad7
Update PR number
pellared Feb 13, 2025
a7604e1
Merge branch 'main' into filterproc
pellared Feb 13, 2025
daf7441
Update example_test.go
pellared Feb 13, 2025
2ec602c
Update example_test.go
pellared Feb 13, 2025
1cdb279
Merge branch 'main' into filterproc
pellared Feb 14, 2025
af3bc56
Merge branch 'main' into filterproc
pellared Feb 18, 2025
40478df
Merge branch 'main' into filterproc
pellared Feb 18, 2025
34bae5a
Merge branch 'main' into filterproc
pellared Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ The next release will require at least [Go 1.23].
- Document the pitfalls of using `Resource` as a comparable type.
`Resource.Equal` and `Resource.Equivalent` should be used instead. (#6272)
- Support [Go 1.24]. (#6304)
- Add `FilterProcessor` and `EnabledParameters` in `go.opentelemetry.io/otel/sdk/log`.
It replaces `go.opentelemetry.io/otel/sdk/log/internal/x.FilterProcessor`.
Compared to previous version it additionally gives the possibility to filter by resource and instrumentation scope. (#6317)

### Changed

Expand Down
12 changes: 8 additions & 4 deletions log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ type Logger interface {
// Enabled returns whether the Logger emits for the given context and
// param.
//
// The passed param is likely to be a partial record with only the
// bridge-relevant information being provided (e.g a param with only the
// Severity set). If a Logger needs more information than is provided, it
// This is useful for users that want to know if a [Record]
// will be processed or dropped before they perform complex operations to
// construct the [Record].
//
// The passed param is likely to be a partial record information being
// provided (e.g a param with only the Severity set).
// If a Logger needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Logger will emit for the
Expand All @@ -46,7 +50,7 @@ type Logger interface {
// exist (e.g. performance, correctness).
//
// The param should not be held by the implementation. A copy should be
// made if the record needs to be held after the call returns.
// made if the param needs to be held after the call returns.
//
// Implementations of this method need to be safe for a user to call
// concurrently.
Expand Down
3 changes: 0 additions & 3 deletions sdk/log/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,5 @@ at a single endpoint their origin is decipherable.

See [go.opentelemetry.io/otel/log] for more information about
the OpenTelemetry Logs Bridge API.

See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about the
experimental features.
*/
package log // import "go.opentelemetry.io/otel/sdk/log"
18 changes: 8 additions & 10 deletions sdk/log/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Example() {
}

// Use a processor that filters out records based on the provided context.
func ExampleProcessor_filtering() {
func ExampleFilterProcessor() {
// Existing processor that emits telemetry.
var processor log.Processor = log.NewBatchProcessor(nil)

Expand Down Expand Up @@ -84,14 +84,12 @@ type ContextFilterProcessor struct {
log.Processor

lazyFilter sync.Once
// Use the experimental FilterProcessor interface
// (go.opentelemetry.io/otel/sdk/log/internal/x).
filter filter
// Support the FilterProcessor interface for the embedded processor.
filter log.FilterProcessor
}

type filter interface {
Enabled(ctx context.Context, param logapi.EnabledParameters) bool
}
// Compile time check.
var _ log.FilterProcessor = (*ContextFilterProcessor)(nil)

func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error {
if ignoreLogs(ctx) {
Expand All @@ -100,9 +98,9 @@ func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record)
return p.Processor.OnEmit(ctx, record)
}

func (p *ContextFilterProcessor) Enabled(ctx context.Context, param logapi.EnabledParameters) bool {
func (p *ContextFilterProcessor) Enabled(ctx context.Context, param log.EnabledParameters) bool {
p.lazyFilter.Do(func() {
if f, ok := p.Processor.(filter); ok {
if f, ok := p.Processor.(log.FilterProcessor); ok {
p.filter = f
}
})
Expand All @@ -115,7 +113,7 @@ func ignoreLogs(ctx context.Context) bool {
}

// Use a processor which redacts sensitive data from some attributes.
func ExampleProcessor_redact() {
func ExampleProcessor() {
// Existing processor that emits telemetry.
var processor log.Processor = log.NewBatchProcessor(nil)

Expand Down
62 changes: 62 additions & 0 deletions sdk/log/filter_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"context"

"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
)

// FilterProcessor is a [Processor] that knows, and can identify, what [Record]
// it will process or drop when it is passed to [Processor.OnEmit].
//
// This is useful for users that want to know if a [log.Record]
// will be processed or dropped before they perform complex operations to
// construct the [log.Record].
//
// The SDK's Logger.Enabled returns false
// if all the registered Processors implement FilterProcessor
// and they all return false.
//
// Processor implementations that choose to support this by satisfying this
// interface are expected to re-evaluate the [Record] passed to [Processor.OnEmit],
// it is not expected that the caller to OnEmit will use the functionality
// from this interface prior to calling OnEmit.
//
// See the [go.opentelemetry.io/contrib/processors/minsev] for an example use-case.
// It provides a Processor used to filter out [Record]
// that has a [log.Severity] below a threshold.
type FilterProcessor interface {
// Enabled returns whether the Processor will process for the given context
// and param.
//
// The passed param is likely to be a partial record information being
// provided (e.g a param with only the Severity set).
// If a Processor needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Processor will process for the
// provided context and param, and will be false if the Logger will not
// emit. The returned value may be true or false in an indeterminate state.
// An implementation should default to returning true for an indeterminate
// state, but may return false if valid reasons in particular circumstances
// exist (e.g. performance, correctness).
//
// The param should not be held by the implementation. A copy should be
// made if the param needs to be held after the call returns.
//
// Implementations of this method need to be safe for a user to call
// concurrently.
Enabled(ctx context.Context, param EnabledParameters) bool
}

// EnabledParameters represents payload for [FilterProcessor]'s Enabled method.
type EnabledParameters struct {
Resource resource.Resource
InstrumentationScope instrumentation.Scope
Severity log.Severity
}
35 changes: 0 additions & 35 deletions sdk/log/internal/x/README.md

This file was deleted.

47 changes: 0 additions & 47 deletions sdk/log/internal/x/x.go

This file was deleted.

17 changes: 12 additions & 5 deletions sdk/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -50,14 +49,22 @@ func (l *logger) Emit(ctx context.Context, r log.Record) {
// processed, true will be returned by default. A value of false will only be
// returned if it can be positively verified that no Processor will process.
func (l *logger) Enabled(ctx context.Context, param log.EnabledParameters) bool {
// If there are more Processors than FilterProcessors we cannot be sure
// that all Processors will drop the record. Therefore, return true.
p := EnabledParameters{
Resource: *l.provider.resource,
InstrumentationScope: l.instrumentationScope,
Severity: param.Severity,
}

// If there are more Processors than FilterProcessors,
// which means not all Processors are FilterProcessors,
// we cannot be sure that all Processors will drop the record.
// Therefore, return true.
//
// If all Processors are FilterProcessors, check if any is enabled.
return len(l.provider.processors) > len(l.provider.fltrProcessors) || anyEnabled(ctx, param, l.provider.fltrProcessors)
return len(l.provider.processors) > len(l.provider.fltrProcessors) || anyEnabled(ctx, p, l.provider.fltrProcessors)
}

func anyEnabled(ctx context.Context, param log.EnabledParameters, fltrs []x.FilterProcessor) bool {
func anyEnabled(ctx context.Context, param EnabledParameters, fltrs []FilterProcessor) bool {
for _, f := range fltrs {
if f.Enabled(ctx, param) {
// At least one Processor will process the Record.
Expand Down
50 changes: 39 additions & 11 deletions sdk/log/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,17 @@ func TestLoggerEnabled(t *testing.T) {
p1 := newFltrProcessor("1", true)
p2WithDisabled := newFltrProcessor("2", false)

emptyResource := resource.Empty()
res := resource.NewSchemaless(attribute.String("key", "value"))

testCases := []struct {
name string
logger *logger
ctx context.Context
expected bool
name string
logger *logger
ctx context.Context
expected bool
expectedP0Params []EnabledParameters
expectedP1Params []EnabledParameters
expectedP2Params []EnabledParameters
}{
{
name: "NoProcessors",
Expand All @@ -241,41 +247,63 @@ func TestLoggerEnabled(t *testing.T) {
logger: newLogger(NewLoggerProvider(
WithProcessor(p0),
WithProcessor(p1),
), instrumentation.Scope{}),
WithResource(res),
), instrumentation.Scope{Name: "scope"}),
ctx: context.Background(),
expected: true,
expectedP0Params: []EnabledParameters{{
Resource: *res,
InstrumentationScope: instrumentation.Scope{Name: "scope"},
}},
expectedP1Params: nil,
},
{
name: "WithDisabledProcessors",
logger: newLogger(NewLoggerProvider(
WithProcessor(p2WithDisabled),
WithResource(emptyResource),
), instrumentation.Scope{}),
ctx: context.Background(),
expected: false,
ctx: context.Background(),
expected: false,
expectedP2Params: []EnabledParameters{{}},
},
{
name: "ContainsDisabledProcessor",
logger: newLogger(NewLoggerProvider(
WithProcessor(p2WithDisabled),
WithProcessor(p0),
WithResource(emptyResource),
), instrumentation.Scope{}),
ctx: context.Background(),
expected: true,
ctx: context.Background(),
expected: true,
expectedP2Params: []EnabledParameters{{}},
expectedP0Params: []EnabledParameters{{}},
},
{
name: "WithNilContext",
logger: newLogger(NewLoggerProvider(
WithProcessor(p0),
WithProcessor(p1),
WithResource(emptyResource),
), instrumentation.Scope{}),
ctx: nil,
expected: true,
ctx: nil,
expected: true,
expectedP0Params: []EnabledParameters{{}},
expectedP1Params: nil,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Clean up the records before the test.
p0.params = nil
p1.params = nil
p2WithDisabled.params = nil

assert.Equal(t, tc.expected, tc.logger.Enabled(tc.ctx, log.EnabledParameters{}))
assert.Equal(t, tc.expectedP0Params, p0.params)
assert.Equal(t, tc.expectedP1Params, p1.params)
assert.Equal(t, tc.expectedP2Params, p2WithDisabled.params)
})
}
}
9 changes: 4 additions & 5 deletions sdk/log/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ import (
// or with other methods. It is the responsibility of the Processor to manage
// this concurrency.
//
// See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about how
// a Processor can be extended to support experimental features.
// See [FilterProcessor] for information about how a Processor can support filtering.
type Processor interface {
// OnEmit is called when a Record is emitted.
//
Expand All @@ -30,11 +29,11 @@ type Processor interface {
// Handler.
//
// The SDK invokes the processors sequentially in the same order as
// they were registered using [WithProcessor].
// they were registered using WithProcessor.
// Implementations may synchronously modify the record so that the changes
// are visible in the next registered processor.
// Notice that [Record] is not concurrent safe. Therefore, asynchronous
// processing may cause race conditions. Use [Record.Clone]
// Notice that Record is not concurrent safe. Therefore, asynchronous
// processing may cause race conditions. Use Record.Clone
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record *Record) error

Expand Down
Loading
Loading