Skip to content

Commit 19b6807

Browse files
guanwyurishkuro
authored andcommitted
Add a Downsampling writer that drop a percentage of spans (#1353)
* adding a wrapping writer that automatic drop spans before writing with predefined config Signed-off-by: Jude Wang <[email protected]> * updating comment Signed-off-by: Jude Wang <[email protected]> * hash traceID for downsampling and code refactoring Signed-off-by: Jude Wang <[email protected]> * adding DownSamplingOptions Signed-off-by: Jude Wang <[email protected]> * address yuri's comments Signed-off-by: Jude Wang <[email protected]> * committing benchmark file & adding metrics Signed-off-by: Jude Wang <[email protected]> * address comment from Won Signed-off-by: Jude Wang <[email protected]> * get rid of defer; benchmark sync.Pool for byte array; make sure 100% code coverage Signed-off-by: Jude Wang <[email protected]> * making sure sync.Pool benchmark with 0 allocations Signed-off-by: Jude Wang <[email protected]> * refactor to read ratio and hashsalt from CLI Signed-off-by: Jude Wang <[email protected]> * address yuri comments Signed-off-by: Jude Wang <[email protected]> * refinement Signed-off-by: Jude Wang <[email protected]> * fix merge conflict Signed-off-by: Jude Wang <[email protected]> * copy hashSalt slice inside bytePool initialization Signed-off-by: Jude Wang <[email protected]> * further comments from yuri Signed-off-by: Jude Wang <[email protected]> * omit error check for MarshalTo Signed-off-by: Jude Wang <[email protected]> * update naming and some logic Signed-off-by: Jude Wang <[email protected]> * remove pointer Signed-off-by: Jude Wang <[email protected]> * updating tests Signed-off-by: Jude Wang <[email protected]> * adding default salt Signed-off-by: Jude Wang <[email protected]>
1 parent 91d9f95 commit 19b6807

7 files changed

+423
-9
lines changed

Diff for: plugin/storage/factory.go

+51-7
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,22 @@ const (
3737
elasticsearchStorageType = "elasticsearch"
3838
memoryStorageType = "memory"
3939
kafkaStorageType = "kafka"
40+
downsamplingRatio = "downsampling.ratio"
41+
downsamplingHashSalt = "downsampling.hashsalt"
42+
43+
// defaultDownsamplingRatio is the default downsampling ratio.
44+
defaultDownsamplingRatio = 1.0
45+
// defaultDownsamplingHashSalt is the default downsampling hashsalt.
46+
defaultDownsamplingHashSalt = ""
4047
)
4148

4249
var allStorageTypes = []string{cassandraStorageType, elasticsearchStorageType, memoryStorageType, kafkaStorageType}
4350

4451
// Factory implements storage.Factory interface as a meta-factory for storage components.
4552
type Factory struct {
4653
FactoryConfig
47-
48-
factories map[string]storage.Factory
54+
metricsFactory metrics.Factory
55+
factories map[string]storage.Factory
4956
}
5057

5158
// NewFactory creates the meta-factory.
@@ -84,8 +91,9 @@ func (f *Factory) getFactoryOfType(factoryType string) (storage.Factory, error)
8491
}
8592
}
8693

87-
// Initialize implements storage.Factory
94+
// Initialize implements storage.Factory.
8895
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
96+
f.metricsFactory = metricsFactory
8997
for _, factory := range f.factories {
9098
if err := factory.Initialize(metricsFactory, logger); err != nil {
9199
return err
@@ -94,7 +102,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
94102
return nil
95103
}
96104

97-
// CreateSpanReader implements storage.Factory
105+
// CreateSpanReader implements storage.Factory.
98106
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
99107
factory, ok := f.factories[f.SpanReaderType]
100108
if !ok {
@@ -103,7 +111,7 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
103111
return factory.CreateSpanReader()
104112
}
105113

106-
// CreateSpanWriter implements storage.Factory
114+
// CreateSpanWriter implements storage.Factory.
107115
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
108116
var writers []spanstore.Writer
109117
for _, storageType := range f.SpanWriterTypes {
@@ -117,10 +125,21 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
117125
}
118126
writers = append(writers, writer)
119127
}
128+
var spanWriter spanstore.Writer
120129
if len(f.SpanWriterTypes) == 1 {
121-
return writers[0], nil
130+
spanWriter = writers[0]
131+
} else {
132+
spanWriter = spanstore.NewCompositeWriter(writers...)
122133
}
123-
return spanstore.NewCompositeWriter(writers...), nil
134+
// Turn off DownsamplingWriter entirely if ratio == defaultDownsamplingRatio.
135+
if f.DownsamplingRatio == defaultDownsamplingRatio {
136+
return spanWriter, nil
137+
}
138+
return spanstore.NewDownsamplingWriter(spanWriter, spanstore.DownsamplingOptions{
139+
Ratio: f.DownsamplingRatio,
140+
HashSalt: f.DownsamplingHashSalt,
141+
MetricsFactory: f.metricsFactory.Namespace(metrics.NSOptions{Name: "downsampling_writer"}),
142+
}), nil
124143
}
125144

126145
// CreateDependencyReader implements storage.Factory
@@ -139,6 +158,21 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
139158
conf.AddFlags(flagSet)
140159
}
141160
}
161+
addDownsamplingFlags(flagSet)
162+
}
163+
164+
// addDownsamplingFlags add flags for Downsampling params
165+
func addDownsamplingFlags(flagSet *flag.FlagSet) {
166+
flagSet.Float64(
167+
downsamplingRatio,
168+
defaultDownsamplingRatio,
169+
"Ratio of spans passed to storage after downsampling (between 0 and 1), e.g ratio = 0.3 means we are keeping 30% of spans and dropping 70% of spans; ratio = 1.0 disables downsampling.",
170+
)
171+
flagSet.String(
172+
downsamplingHashSalt,
173+
defaultDownsamplingHashSalt,
174+
"Salt used when hashing trace id for downsampling.",
175+
)
142176
}
143177

144178
// InitFromViper implements plugin.Configurable
@@ -148,6 +182,16 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
148182
conf.InitFromViper(v)
149183
}
150184
}
185+
f.initDownsamplingFromViper(v)
186+
}
187+
188+
func (f *Factory) initDownsamplingFromViper(v *viper.Viper) {
189+
f.FactoryConfig.DownsamplingRatio = v.GetFloat64(downsamplingRatio)
190+
if f.FactoryConfig.DownsamplingRatio < 0 || f.FactoryConfig.DownsamplingRatio > 1 {
191+
// Values not in the range of 0 ~ 1.0 will be set to default.
192+
f.FactoryConfig.DownsamplingRatio = 1.0
193+
}
194+
f.FactoryConfig.DownsamplingHashSalt = v.GetString(downsamplingHashSalt)
151195
}
152196

153197
// CreateArchiveSpanReader implements storage.ArchiveFactory

Diff for: plugin/storage/factory_config.go

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type FactoryConfig struct {
3636
SpanWriterTypes []string
3737
SpanReaderType string
3838
DependenciesStorageType string
39+
DownsamplingRatio float64
40+
DownsamplingHashSalt string
3941
}
4042

4143
// FactoryConfigFromEnvAndCLI reads the desired types of storage backends from SPAN_STORAGE_TYPE and

Diff for: plugin/storage/factory_config_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestFactoryConfigFromEnv(t *testing.T) {
3131
clearEnv()
3232
defer clearEnv()
3333

34-
f := FactoryConfigFromEnvAndCLI(nil, nil)
34+
f := FactoryConfigFromEnvAndCLI(nil, &bytes.Buffer{})
3535
assert.Equal(t, 1, len(f.SpanWriterTypes))
3636
assert.Equal(t, cassandraStorageType, f.SpanWriterTypes[0])
3737
assert.Equal(t, cassandraStorageType, f.SpanReaderType)
@@ -40,7 +40,7 @@ func TestFactoryConfigFromEnv(t *testing.T) {
4040
os.Setenv(SpanStorageTypeEnvVar, elasticsearchStorageType)
4141
os.Setenv(DependencyStorageTypeEnvVar, memoryStorageType)
4242

43-
f = FactoryConfigFromEnvAndCLI(nil, nil)
43+
f = FactoryConfigFromEnvAndCLI(nil, &bytes.Buffer{})
4444
assert.Equal(t, 1, len(f.SpanWriterTypes))
4545
assert.Equal(t, elasticsearchStorageType, f.SpanWriterTypes[0])
4646
assert.Equal(t, elasticsearchStorageType, f.SpanReaderType)

Diff for: plugin/storage/factory_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ package storage
1717
import (
1818
"errors"
1919
"flag"
20+
"reflect"
21+
"strings"
2022
"testing"
2123

2224
"github.com/spf13/viper"
@@ -25,6 +27,7 @@ import (
2527
"github.com/uber/jaeger-lib/metrics"
2628
"go.uber.org/zap"
2729

30+
"github.com/jaegertracing/jaeger/pkg/config"
2831
"github.com/jaegertracing/jaeger/storage"
2932
depStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
3033
"github.com/jaegertracing/jaeger/storage/mocks"
@@ -40,6 +43,8 @@ func defaultCfg() FactoryConfig {
4043
SpanWriterTypes: []string{cassandraStorageType},
4144
SpanReaderType: cassandraStorageType,
4245
DependenciesStorageType: cassandraStorageType,
46+
DownsamplingRatio: 1.0,
47+
DownsamplingHashSalt: "",
4348
}
4449
}
4550

@@ -129,11 +134,49 @@ func TestCreate(t *testing.T) {
129134
assert.EqualError(t, err, "Archive storage not supported")
130135

131136
mock.On("CreateSpanWriter").Return(spanWriter, nil)
137+
m := metrics.NullFactory
138+
l := zap.NewNop()
139+
mock.On("Initialize", m, l).Return(nil)
140+
f.Initialize(m, l)
132141
w, err = f.CreateSpanWriter()
133142
assert.NoError(t, err)
134143
assert.Equal(t, spanWriter, w)
135144
}
136145

146+
func TestCreateDownsamplingWriter(t *testing.T) {
147+
f, err := NewFactory(defaultCfg())
148+
assert.NoError(t, err)
149+
assert.NotEmpty(t, f.factories[cassandraStorageType])
150+
mock := new(mocks.Factory)
151+
f.factories[cassandraStorageType] = mock
152+
spanWriter := new(spanStoreMocks.Writer)
153+
mock.On("CreateSpanWriter").Return(spanWriter, nil)
154+
155+
m := metrics.NullFactory
156+
l := zap.NewNop()
157+
mock.On("Initialize", m, l).Return(nil)
158+
159+
var testParams = []struct {
160+
ratio float64
161+
writerType string
162+
}{
163+
{0.5, "*spanstore.DownsamplingWriter"},
164+
{1.0, "*mocks.Writer"},
165+
}
166+
167+
for _, param := range testParams {
168+
t.Run(param.writerType, func(t *testing.T) {
169+
f.DownsamplingRatio = param.ratio
170+
f.Initialize(m, l)
171+
newWriter, err := f.CreateSpanWriter()
172+
assert.NoError(t, err)
173+
// Currently directly assertEqual doesn't work since DownsamplingWriter initializes with different
174+
// address for hashPool. The following workaround checks writer type instead
175+
assert.True(t, strings.HasPrefix(reflect.TypeOf(newWriter).String(), param.writerType))
176+
})
177+
}
178+
}
179+
137180
func TestCreateMulti(t *testing.T) {
138181
cfg := defaultCfg()
139182
cfg.SpanWriterTypes = append(cfg.SpanWriterTypes, elasticsearchStorageType)
@@ -156,6 +199,11 @@ func TestCreateMulti(t *testing.T) {
156199

157200
mock.On("CreateSpanWriter").Return(spanWriter, nil)
158201
mock2.On("CreateSpanWriter").Return(spanWriter2, nil)
202+
m := metrics.NullFactory
203+
l := zap.NewNop()
204+
mock.On("Initialize", m, l).Return(nil)
205+
mock2.On("Initialize", m, l).Return(nil)
206+
f.Initialize(m, l)
159207
w, err = f.CreateSpanWriter()
160208
assert.NoError(t, err)
161209
assert.Equal(t, spanstore.NewCompositeWriter(spanWriter, spanWriter2), w)
@@ -264,3 +312,22 @@ func TestConfigurable(t *testing.T) {
264312
assert.Equal(t, fs, mock.flagSet)
265313
assert.Equal(t, v, mock.viper)
266314
}
315+
316+
func TestParsingDownsamplingRatio(t *testing.T) {
317+
f := Factory{}
318+
v, command := config.Viperize(addDownsamplingFlags)
319+
err := command.ParseFlags([]string{
320+
"--downsampling.ratio=1.5",
321+
"--downsampling.hashsalt=jaeger"})
322+
assert.NoError(t, err)
323+
f.InitFromViper(v)
324+
325+
assert.Equal(t, f.FactoryConfig.DownsamplingRatio, 1.0)
326+
assert.Equal(t, f.FactoryConfig.DownsamplingHashSalt, "jaeger")
327+
328+
err = command.ParseFlags([]string{
329+
"--downsampling.ratio=0.5"})
330+
assert.NoError(t, err)
331+
f.InitFromViper(v)
332+
assert.Equal(t, f.FactoryConfig.DownsamplingRatio, 0.5)
333+
}

Diff for: storage/spanstore/downsampling_writer.go

+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright (c) 2019 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package spanstore
16+
17+
import (
18+
"hash"
19+
"hash/fnv"
20+
"math"
21+
"sync"
22+
23+
"github.com/uber/jaeger-lib/metrics"
24+
25+
"github.com/jaegertracing/jaeger/model"
26+
)
27+
28+
const defaultHashSalt = "downsampling-default-salt"
29+
30+
var (
31+
traceIDByteSize = (&model.TraceID{}).Size()
32+
)
33+
34+
// hasher includes data we want to put in sync.Pool.
35+
type hasher struct {
36+
hash hash.Hash64
37+
buffer []byte
38+
}
39+
40+
// downsamplingWriterMetrics keeping track of total number of dropped spans and accepted spans.
41+
type downsamplingWriterMetrics struct {
42+
SpansDropped metrics.Counter `metric:"spans_dropped"`
43+
SpansAccepted metrics.Counter `metric:"spans_accepted"`
44+
}
45+
46+
// DownsamplingWriter is a span Writer that drops spans with a predefined downsamplingRatio.
47+
type DownsamplingWriter struct {
48+
spanWriter Writer
49+
threshold uint64
50+
lengthOfSalt int
51+
hasherPool *sync.Pool
52+
metrics downsamplingWriterMetrics
53+
}
54+
55+
// DownsamplingOptions contains the options for constructing a DownsamplingWriter.
56+
type DownsamplingOptions struct {
57+
Ratio float64
58+
HashSalt string
59+
MetricsFactory metrics.Factory
60+
}
61+
62+
// NewDownsamplingWriter creates a DownsamplingWriter.
63+
func NewDownsamplingWriter(spanWriter Writer, downsamplingOptions DownsamplingOptions) *DownsamplingWriter {
64+
threshold := uint64(downsamplingOptions.Ratio * float64(math.MaxUint64))
65+
writeMetrics := &downsamplingWriterMetrics{}
66+
metrics.Init(writeMetrics, downsamplingOptions.MetricsFactory, nil)
67+
salt := downsamplingOptions.HashSalt
68+
if salt == "" {
69+
salt = defaultHashSalt
70+
}
71+
hashSaltBytes := []byte(salt)
72+
pool := &sync.Pool{
73+
New: func() interface{} {
74+
buffer := make([]byte, len(hashSaltBytes)+traceIDByteSize)
75+
copy(buffer, hashSaltBytes)
76+
return &hasher{
77+
hash: fnv.New64a(),
78+
buffer: buffer,
79+
}
80+
},
81+
}
82+
83+
return &DownsamplingWriter{
84+
spanWriter: spanWriter,
85+
threshold: threshold,
86+
hasherPool: pool,
87+
metrics: *writeMetrics,
88+
lengthOfSalt: len(hashSaltBytes),
89+
}
90+
}
91+
92+
// WriteSpan calls WriteSpan on wrapped span writer.
93+
func (ds *DownsamplingWriter) WriteSpan(span *model.Span) error {
94+
if !ds.shouldDownsample(span) {
95+
// Drops spans when hashVal falls beyond computed threshold.
96+
ds.metrics.SpansDropped.Inc(1)
97+
return nil
98+
}
99+
ds.metrics.SpansAccepted.Inc(1)
100+
return ds.spanWriter.WriteSpan(span)
101+
}
102+
103+
func (ds *DownsamplingWriter) shouldDownsample(span *model.Span) bool {
104+
hasherInstance := ds.hasherPool.Get().(*hasher)
105+
// Currently MarshalTo will only return err if size of traceIDBytes is smaller than 16
106+
// Since we force traceIDBytes to be size of 16 metrics is not necessary here.
107+
_, _ = span.TraceID.MarshalTo(hasherInstance.buffer[ds.lengthOfSalt:])
108+
hashVal := hasherInstance.hashBytes()
109+
ds.hasherPool.Put(hasherInstance)
110+
return hashVal <= ds.threshold
111+
}
112+
113+
// hashBytes returns the uint64 hash value of byte slice.
114+
func (h *hasher) hashBytes() uint64 {
115+
h.hash.Reset()
116+
// Currently fnv.Write() implementation doesn't throw any error so metric is not necessary here.
117+
_, _ = h.hash.Write(h.buffer)
118+
return h.hash.Sum64()
119+
}

0 commit comments

Comments
 (0)