Skip to content

Commit

Permalink
Sanitize spans with null process or empty service name (#3631)
Browse files Browse the repository at this point in the history
* Sanitize spans with null process or empty service name

Signed-off-by: Yuri Shkuro <[email protected]>

* cleanup

Signed-off-by: Yuri Shkuro <[email protected]>

* review comments

Signed-off-by: Yuri Shkuro <[email protected]>

* wait longer

Signed-off-by: Yuri Shkuro <[email protected]>

* add tests

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored May 10, 2022
1 parent 998c9d4 commit 7ad154e
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/grpc/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewReporter(conn *grpc.ClientConn, agentTags map[string]string, logger *zap
collector: api_v2.NewCollectorServiceClient(conn),
agentTags: makeModelKeyValue(agentTags),
logger: logger,
sanitizer: zipkin2.NewChainedSanitizer(zipkin2.StandardSanitizers...),
sanitizer: zipkin2.NewChainedSanitizer(zipkin2.NewStandardSanitizers()...),
}
}

Expand Down
6 changes: 4 additions & 2 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type options struct {
logger *zap.Logger
serviceMetrics metrics.Factory
hostMetrics metrics.Factory
preProcessSpans ProcessSpans
preProcessSpans ProcessSpans // see docs in PreProcessSpans option.
sanitizer sanitizer.SanitizeSpan
preSave ProcessSpan
spanFilter FilterSpan
Expand Down Expand Up @@ -78,7 +78,9 @@ func (options) HostMetrics(hostMetrics metrics.Factory) Option {
}
}

// PreProcessSpans creates an Option that initializes the preProcessSpans function
// PreProcessSpans creates an Option that initializes the preProcessSpans function.
// This function can implement non-standard pre-processing of the spans when extending
// the collector from source. Jaeger itself does not define any pre-processing.
func (options) PreProcessSpans(preProcessSpans ProcessSpans) Option {
return func(b *options) {
b.preProcessSpans = preProcessSpans
Expand Down
41 changes: 41 additions & 0 deletions cmd/collector/app/sanitizer/empty_service_name_sanitizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sanitizer

import (
"github.com/jaegertracing/jaeger/model"
)

const (
serviceNameReplacement = "empty-service-name"
nullProcessServiceName = "null-process-and-service-name"
)

// NewEmptyServiceNameSanitizer returns a function that replaces empty service name
// with a string "empty-service-name".
// If the whole span.Process is null, it creates one with "null-process-and-service-name".
func NewEmptyServiceNameSanitizer() SanitizeSpan {
return sanitizeEmptyServiceName
}

// Sanitize sanitizes the service names in the span annotations.
func sanitizeEmptyServiceName(span *model.Span) *model.Span {
if span.Process == nil {
span.Process = &model.Process{ServiceName: nullProcessServiceName}
} else if span.Process.ServiceName == "" {
span.Process.ServiceName = serviceNameReplacement
}
return span
}
32 changes: 32 additions & 0 deletions cmd/collector/app/sanitizer/empty_service_name_sanitizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sanitizer

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/model"
)

func TestEmptyServiceNameSanitizer(t *testing.T) {
s := NewEmptyServiceNameSanitizer()
s1 := s(&model.Span{})
assert.NotNil(t, s1.Process)
assert.Equal(t, nullProcessServiceName, s1.Process.ServiceName)
s2 := s(&model.Span{Process: &model.Process{}})
assert.Equal(t, serviceNameReplacement, s2.Process.ServiceName)
}
13 changes: 12 additions & 1 deletion cmd/collector/app/sanitizer/sanitizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,19 @@ import (
// span should implement this interface.
type SanitizeSpan func(span *model.Span) *model.Span

// NewChainedSanitizer creates a Sanitizer from the variadic list of passed Sanitizers
// NewStandardSanitizers are automatically applied by SpanProcessor.
func NewStandardSanitizers() []SanitizeSpan {
return []SanitizeSpan{
NewEmptyServiceNameSanitizer(),
}
}

// NewChainedSanitizer creates a Sanitizer from the variadic list of passed Sanitizers.
// If the list only has one element, it is returned directly to minimize indirection.
func NewChainedSanitizer(sanitizers ...SanitizeSpan) SanitizeSpan {
if len(sanitizers) == 1 {
return sanitizers[0]
}
return func(span *model.Span) *model.Span {
for _, s := range sanitizers {
span = s(span)
Expand Down
44 changes: 44 additions & 0 deletions cmd/collector/app/sanitizer/sanitizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sanitizer

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/model"
)

func TestNewStandardSanitizers(t *testing.T) {
NewStandardSanitizers()
}

func TestChainedSanitizer(t *testing.T) {
var s1 SanitizeSpan = func(span *model.Span) *model.Span {
span.Process = &model.Process{ServiceName: "s1"}
return span
}
var s2 SanitizeSpan = func(span *model.Span) *model.Span {
span.Process = &model.Process{ServiceName: "s2"}
return span
}
c1 := NewChainedSanitizer(s1)
sp1 := c1(&model.Span{})
assert.Equal(t, "s1", sp1.Process.ServiceName)
c2 := NewChainedSanitizer(s1, s2)
sp2 := c2(&model.Span{})
assert.Equal(t, "s2", sp2.Process.ServiceName)
}
16 changes: 11 additions & 5 deletions cmd/collector/app/sanitizer/zipkin/span_sanitizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,17 @@ const (
zeroParentIDTag = "errZeroParentID"
)

var (
defaultDuration = int64(1)
// StandardSanitizers is a list of standard zipkin sanitizers.
StandardSanitizers = []Sanitizer{NewSpanStartTimeSanitizer(), NewSpanDurationSanitizer(), NewParentIDSanitizer(), NewErrorTagSanitizer()}
)
var defaultDuration = int64(1) // not a const because we take its address

// NewStandardSanitizers is a list of standard zipkin sanitizers.
func NewStandardSanitizers() []Sanitizer {
return []Sanitizer{
NewSpanStartTimeSanitizer(),
NewSpanDurationSanitizer(),
NewParentIDSanitizer(),
NewErrorTagSanitizer(),
}
}

// Sanitizer interface for sanitizing spans. Any business logic that needs to be applied to normalize the contents of a
// span should implement this interface.
Expand Down
4 changes: 4 additions & 0 deletions cmd/collector/app/sanitizer/zipkin/span_sanitizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ var (
positiveDuration = int64(1)
)

func TestNewStandardSanitizers(t *testing.T) {
NewStandardSanitizers()
}

func TestChainedSanitizer(t *testing.T) {
sanitizer := NewChainedSanitizer(NewSpanDurationSanitizer())

Expand Down
6 changes: 5 additions & 1 deletion cmd/collector/app/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) proce
// BuildHandlers builds span handlers (Zipkin, Jaeger)
func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor) *SpanHandlers {
return &SpanHandlers{
handler.NewZipkinSpanHandler(b.Logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)),
handler.NewZipkinSpanHandler(
b.Logger,
spanProcessor,
zs.NewChainedSanitizer(zs.NewStandardSanitizers()...),
),
handler.NewJaegerSpanHandler(b.Logger, spanProcessor),
handler.NewGRPCHandler(b.Logger, spanProcessor),
}
Expand Down
9 changes: 7 additions & 2 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type queueItem struct {
span *model.Span
}

// NewSpanProcessor returns a SpanProcessor that preProcesses, filters, queues, sanitizes, and processes spans
// NewSpanProcessor returns a SpanProcessor that preProcesses, filters, queues, sanitizes, and processes spans.
func NewSpanProcessor(
spanWriter spanstore.Writer,
additional []ProcessSpan,
Expand Down Expand Up @@ -96,13 +96,18 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt
}
boundedQueue := queue.NewBoundedQueue(options.queueSize, droppedItemHandler)

sanitizers := sanitizer.NewStandardSanitizers()
if options.sanitizer != nil {
sanitizers = append(sanitizers, options.sanitizer)
}

sp := spanProcessor{
queue: boundedQueue,
metrics: handlerMetrics,
logger: options.logger,
preProcessSpans: options.preProcessSpans,
filterSpan: options.spanFilter,
sanitizer: options.sanitizer,
sanitizer: sanitizer.NewChainedSanitizer(sanitizers...),
reportBusy: options.reportBusy,
numWorkers: options.numWorkers,
spanWriter: spanWriter,
Expand Down
20 changes: 12 additions & 8 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,15 @@ func isSpanAllowed(span *model.Span) bool {
}

type fakeSpanWriter struct {
err error
spansLock sync.Mutex
spans []*model.Span
err error
}

func (n *fakeSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
n.spansLock.Lock()
defer n.spansLock.Unlock()
n.spans = append(n.spans, span)
return n.err
}

Expand Down Expand Up @@ -222,16 +227,15 @@ func TestSpanProcessor(t *testing.T) {
w := &fakeSpanWriter{}
p := NewSpanProcessor(w, nil, Options.QueueSize(1)).(*spanProcessor)

res, err := p.ProcessSpans([]*model.Span{
{
Process: &model.Process{
ServiceName: "x",
},
},
}, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
res, err := p.ProcessSpans(
[]*model.Span{{}}, // empty span should be enriched by sanitizers
processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)
assert.NoError(t, p.Close())
assert.Len(t, w.spans, 1)
assert.NotNil(t, w.spans[0].Process)
assert.NotEmpty(t, w.spans[0].Process.ServiceName)
}

func TestSpanProcessorErrors(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion scripts/es-integration-test.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/bin/bash

PS4='T$(date "+%H:%M:%S") '
set -euxf -o pipefail

usage() {
Expand Down Expand Up @@ -68,7 +69,7 @@ wait_for_it() {
''%{http_code}''
)
local counter=0
while [[ "$(curl ${params[@]} ${url})" != "200" && ${counter} -le 30 ]]; do
while [[ "$(curl ${params[@]} ${url})" != "200" && ${counter} -le 60 ]]; do
sleep 2
counter=$((counter+1))
echo "waiting for ${url} to be up..."
Expand Down

0 comments on commit 7ad154e

Please sign in to comment.