Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,51 +60,37 @@ func newFilterLogsProcessor(logger *zap.Logger, cfg *Config) (*filterLogProcesso
}, nil
}

func (flp *filterLogProcessor) ProcessLogs(ctx context.Context, logs plog.Logs) (plog.Logs, error) {
func (flp *filterLogProcessor) ProcessLogs(_ context.Context, logs plog.Logs) (plog.Logs, error) {
rLogs := logs.ResourceLogs()

// Filter out logs
flp.filterLogRecords(rLogs)

if rLogs.Len() == 0 {
return logs, processorhelper.ErrSkipProcessingData
}

return logs, nil
}

func (flp *filterLogProcessor) filterLogRecords(rLogs plog.ResourceLogsSlice) {
for i := 0; i < rLogs.Len(); i++ {
rLog := rLogs.At(i)
resource := rLog.Resource()
scopes := rLog.ScopeLogs()

for j := 0; j < scopes.Len(); j++ {
scope := scopes.At(j)
instrumentationScope := scope.Scope()
lrs := scope.LogRecords()
rLogs.RemoveIf(func(rl plog.ResourceLogs) bool {
resource := rl.Resource()
rl.ScopeLogs().RemoveIf(func(sl plog.ScopeLogs) bool {
scope := sl.Scope()
lrs := sl.LogRecords()

if flp.includeMatcher != nil {
// If includeMatcher exists, remove all records that do not match the filter.
lrs.RemoveIf(func(lr plog.LogRecord) bool {
return !flp.includeMatcher.MatchLogRecord(lr, resource, instrumentationScope)
return !flp.includeMatcher.MatchLogRecord(lr, resource, scope)
})
}

if flp.excludeMatcher != nil {
// If excludeMatcher exists, remove all records that match the filter.
lrs.RemoveIf(func(lr plog.LogRecord) bool {
return flp.excludeMatcher.MatchLogRecord(lr, resource, instrumentationScope)
return flp.excludeMatcher.MatchLogRecord(lr, resource, scope)
})
}
}

scopes.RemoveIf(func(sl plog.ScopeLogs) bool {
return sl.LogRecords().Len() == 0
})
}

rLogs.RemoveIf(func(rl plog.ResourceLogs) bool {
return rl.ScopeLogs().Len() == 0
})

if rLogs.Len() == 0 {
return logs, processorhelper.ErrSkipProcessingData
}

return logs, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package filterprocessor // import "github.com/open-telemetry/opentelemetry-colle
import (
"context"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/zap"
Expand Down Expand Up @@ -87,41 +86,31 @@ func createSpanMatcher(cfg *Config) (filterspan.Matcher, filterspan.Matcher, err

// processTraces filters the given spans of a traces based off the filterSpanProcessor's filters.
func (fsp *filterSpanProcessor) processTraces(_ context.Context, pdt ptrace.Traces) (ptrace.Traces, error) {
for i := 0; i < pdt.ResourceSpans().Len(); i++ {
resSpan := pdt.ResourceSpans().At(i)
for x := 0; x < resSpan.ScopeSpans().Len(); x++ {
ils := resSpan.ScopeSpans().At(x)
ils.Spans().RemoveIf(func(span ptrace.Span) bool {
return fsp.shouldRemoveSpan(span, resSpan.Resource(), ils.Scope())
pdt.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool {
resource := rs.Resource()
rs.ScopeSpans().RemoveIf(func(ss ptrace.ScopeSpans) bool {
scope := ss.Scope()
ss.Spans().RemoveIf(func(span ptrace.Span) bool {
if fsp.include != nil {
if !fsp.include.MatchSpan(span, resource, scope) {
return true
}
}

if fsp.exclude != nil {
if fsp.exclude.MatchSpan(span, resource, scope) {
return true
}
}

return false
})
}
// Remove empty elements, that way if we delete everything we can tell
// the pipeline to stop processing completely (ErrSkipProcessingData)
resSpan.ScopeSpans().RemoveIf(func(ilsSpans ptrace.ScopeSpans) bool {
return ilsSpans.Spans().Len() == 0
return ss.Spans().Len() == 0
})
}
pdt.ResourceSpans().RemoveIf(func(res ptrace.ResourceSpans) bool {
return res.ScopeSpans().Len() == 0
return rs.ScopeSpans().Len() == 0
})
if pdt.ResourceSpans().Len() == 0 {
return pdt, processorhelper.ErrSkipProcessingData
}
return pdt, nil
}

func (fsp *filterSpanProcessor) shouldRemoveSpan(span ptrace.Span, resource pcommon.Resource, library pcommon.InstrumentationScope) bool {
if fsp.include != nil {
if !fsp.include.MatchSpan(span, resource, library) {
return true
}
}

if fsp.exclude != nil {
if fsp.exclude.MatchSpan(span, resource, library) {
return true
}
}

return false
}