Skip to content

Commit

Permalink
Add parameter to enable processed span metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Yosuke Matsuda <[email protected]>
  • Loading branch information
ymtdzzz committed Jun 26, 2022
1 parent 9b4bbf4 commit 1b22e9e
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 37 deletions.
13 changes: 9 additions & 4 deletions cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
)

const (
flagDynQueueSizeMemory = "collector.queue-size-memory"
flagNumWorkers = "collector.num-workers"
flagQueueSize = "collector.queue-size"
flagCollectorTags = "collector.tags"
flagDynQueueSizeMemory = "collector.queue-size-memory"
flagNumWorkers = "collector.num-workers"
flagQueueSize = "collector.queue-size"
flagCollectorTags = "collector.tags"
flagProcessedSpanMetricsEnabled = "collector.enable-processed-span-metrics"

flagSuffixHostPort = "host-port"

Expand Down Expand Up @@ -124,6 +125,8 @@ type CollectorOptions struct {
}
// CollectorTags is the string representing collector tags to append to each and every span
CollectorTags map[string]string
// ProcessedSpanMetricsEnabled determines whether to enable processed span metrics
ProcessedSpanMetricsEnabled bool
}

type serverFlagsConfig struct {
Expand Down Expand Up @@ -163,6 +166,7 @@ func AddFlags(flags *flag.FlagSet) {
flags.Int(flagQueueSize, DefaultQueueSize, "The queue size of the collector")
flags.Uint(flagDynQueueSizeMemory, 0, "(experimental) The max memory size in MiB to use for the dynamic queue.")
flags.String(flagCollectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}")
flags.Bool(flagProcessedSpanMetricsEnabled, false, "Enables processed span metrics.")

addHTTPFlags(flags, httpServerFlagsCfg, ports.PortToHostPort(ports.CollectorHTTP))
addGRPCFlags(flags, grpcServerFlagsCfg, ports.PortToHostPort(ports.CollectorGRPC))
Expand Down Expand Up @@ -239,6 +243,7 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper, logger *zap.Logger)
cOpts.NumWorkers = v.GetInt(flagNumWorkers)
cOpts.QueueSize = v.GetInt(flagQueueSize)
cOpts.DynQueueSizeMemory = v.GetUint(flagDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes
cOpts.ProcessedSpanMetricsEnabled = v.GetBool(flagProcessedSpanMetricsEnabled)

if err := cOpts.HTTP.initFromViper(v, logger, httpServerFlagsCfg); err != nil {
return cOpts, fmt.Errorf("failed to parse HTTP server options: %w", err)
Expand Down
38 changes: 23 additions & 15 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ import (
)

type options struct {
logger *zap.Logger
serviceMetrics metrics.Factory
hostMetrics metrics.Factory
preProcessSpans ProcessSpans // see docs in PreProcessSpans option.
sanitizer sanitizer.SanitizeSpan
preSave ProcessSpan
spanFilter FilterSpan
numWorkers int
blockingSubmit bool
queueSize int
dynQueueSizeWarmup uint
dynQueueSizeMemory uint
reportBusy bool
extraFormatTypes []processor.SpanFormat
collectorTags map[string]string
logger *zap.Logger
serviceMetrics metrics.Factory
hostMetrics metrics.Factory
preProcessSpans ProcessSpans // see docs in PreProcessSpans option.
sanitizer sanitizer.SanitizeSpan
preSave ProcessSpan
spanFilter FilterSpan
numWorkers int
blockingSubmit bool
queueSize int
dynQueueSizeWarmup uint
dynQueueSizeMemory uint
reportBusy bool
extraFormatTypes []processor.SpanFormat
collectorTags map[string]string
processedSpanMetricsEnabled bool
}

// Option is a function that sets some option on StorageBuilder.
Expand Down Expand Up @@ -156,6 +157,13 @@ func (options) CollectorTags(extraTags map[string]string) Option {
}
}

// ProcessedSpanMetricsEnabled creates an Option that initializes the processedSpanMetrics boolean
func (options) ProcessedSpanMetricsEnabled(processedSpanMetrics bool) Option {
return func(b *options) {
b.processedSpanMetricsEnabled = processedSpanMetrics
}
}

func (o options) apply(opts ...Option) options {
ret := options{}
for _, opt := range opts {
Expand Down
3 changes: 3 additions & 0 deletions cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ func TestAllOptionSet(t *testing.T) {
Options.DynQueueSizeMemory(1024),
Options.PreSave(func(span *model.Span, tenant string) {}),
Options.CollectorTags(map[string]string{"extra": "tags"}),
Options.ProcessedSpanMetricsEnabled(true),
)
assert.EqualValues(t, 5, opts.numWorkers)
assert.EqualValues(t, 10, opts.queueSize)
assert.EqualValues(t, map[string]string{"extra": "tags"}, opts.collectorTags)
assert.EqualValues(t, 1000, opts.dynQueueSizeWarmup)
assert.EqualValues(t, 1024, opts.dynQueueSizeMemory)
assert.True(t, opts.processedSpanMetricsEnabled)
}

func TestNoOptionsSet(t *testing.T) {
Expand All @@ -66,4 +68,5 @@ func TestNoOptionsSet(t *testing.T) {
span := model.Span{}
assert.EqualValues(t, &span, opts.sanitizer(&span))
assert.EqualValues(t, 0, opts.dynQueueSizeWarmup)
assert.False(t, opts.processedSpanMetricsEnabled)
}
1 change: 1 addition & 0 deletions cmd/collector/app/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) proce
Options.CollectorTags(b.CollectorOpts.CollectorTags),
Options.DynQueueSizeWarmup(uint(b.CollectorOpts.QueueSize)), // same as queue size for now
Options.DynQueueSizeMemory(b.CollectorOpts.DynQueueSizeMemory),
Options.ProcessedSpanMetricsEnabled(b.CollectorOpts.ProcessedSpanMetricsEnabled),
)
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt

processSpanFuncs := []ProcessSpan{options.preSave, sp.saveSpan}
if options.dynQueueSizeMemory > 0 {
// add to processSpanFuncs
options.logger.Info("Dynamically adjusting the queue size at runtime.",
zap.Uint("memory-mib", options.dynQueueSizeMemory/1024/1024),
zap.Uint("queue-size-warmup", options.dynQueueSizeWarmup))
}
if options.dynQueueSizeMemory > 0 || options.processedSpanMetricsEnabled {
// add to processSpanFuncs
processSpanFuncs = append(processSpanFuncs, sp.countSpan)
}

Expand Down
81 changes: 64 additions & 17 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,27 +400,74 @@ func TestSpanProcessorWithCollectorTags(t *testing.T) {
}

func TestSpanProcessorCountSpan(t *testing.T) {
mb := metricstest.NewFactory(time.Hour)
m := mb.Namespace(metrics.NSOptions{})
tests := []struct {
name string
enableDynQueueSizeMem bool
enableSpanMetrics bool
expectedUpdateGauge bool
}{
{
name: "enable-dyn-queue-size-enable-metrics",
enableDynQueueSizeMem: true,
enableSpanMetrics: true,
expectedUpdateGauge: true,
},
{
name: "enable-dyn-queue-size-disable-metrics",
enableDynQueueSizeMem: true,
enableSpanMetrics: false,
expectedUpdateGauge: true,
},
{
name: "disable-dyn-queue-size-enable-metrics",
enableDynQueueSizeMem: false,
enableSpanMetrics: true,
expectedUpdateGauge: true,
},
{
name: "disable-dyn-queue-size-disable-metrics",
enableDynQueueSizeMem: false,
enableSpanMetrics: false,
expectedUpdateGauge: false,
},
}

w := &fakeSpanWriter{}
p := NewSpanProcessor(w, nil, Options.HostMetrics(m), Options.DynQueueSizeMemory(1000)).(*spanProcessor)
p.background(10*time.Millisecond, p.updateGauges)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mb := metricstest.NewFactory(time.Hour)
m := mb.Namespace(metrics.NSOptions{})

p.processSpan(&model.Span{}, "")
assert.NotEqual(t, uint64(0), p.bytesProcessed)
w := &fakeSpanWriter{}
opts := []Option{Options.HostMetrics(m), Options.ProcessedSpanMetricsEnabled(tt.enableSpanMetrics)}
if tt.enableDynQueueSizeMem {
opts = append(opts, Options.DynQueueSizeMemory(1000))
} else {
opts = append(opts, Options.DynQueueSizeMemory(0))
}
p := NewSpanProcessor(w, nil, opts...).(*spanProcessor)
p.background(10*time.Millisecond, p.updateGauges)

p.processSpan(&model.Span{}, "")
assert.NotEqual(t, uint64(0), p.bytesProcessed)

for i := 0; i < 15; i++ {
_, g := mb.Snapshot()
if b := g["spans.bytes"]; b > 0 {
if !tt.expectedUpdateGauge {
assert.Fail(t, "gauge has been updated unexpectedly")
}
assert.Equal(t, p.bytesProcessed.Load(), uint64(g["spans.bytes"]))
return
}
time.Sleep(time.Millisecond)
}

for i := 0; i < 15; i++ {
_, g := mb.Snapshot()
if b := g["spans.bytes"]; b > 0 {
assert.Equal(t, p.bytesProcessed.Load(), uint64(g["spans.bytes"]))
return
}
time.Sleep(time.Millisecond)
if tt.expectedUpdateGauge {
assert.Fail(t, "gauge hasn't been updated within a reasonable amount of time")
}
assert.NoError(t, p.Close())
})
}

assert.Fail(t, "gauge hasn't been updated within a reasonable amount of time")
assert.NoError(t, p.Close())
}

func TestUpdateDynQueueSize(t *testing.T) {
Expand Down

0 comments on commit 1b22e9e

Please sign in to comment.