Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tracegen] Add options to generate more spans and attributes #4535

Merged
merged 2 commits into from
Jun 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 40 additions & 13 deletions cmd/tracegen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"

Expand All @@ -29,6 +30,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/jaegerclientenv2otel"
Expand All @@ -46,22 +48,47 @@ func main() {
otel.SetTextMapPropagator(propagation.TraceContext{})
jaegerclientenv2otel.MapJaegerToOtelEnvVars(logger)

exp, err := createOtelExporter(cfg.TraceExporter)
if err != nil {
logger.Sugar().Fatalf("cannot create trace exporter %s: %w", cfg.TraceExporter, err)
tracers, shutdown := createTracers(cfg)
defer shutdown(context.Background())

tracegen.Run(cfg, tracers, logger)
}

func createTracers(cfg *tracegen.Config) ([]trace.Tracer, func(context.Context) error) {
if cfg.Services < 1 {
cfg.Services = 1
}
logger.Sugar().Infof("using %s trace exporter", cfg.TraceExporter)
var shutdown []func(context.Context) error
var tracers []trace.Tracer
for s := 0; s < cfg.Services; s++ {
svc := cfg.Service
if cfg.Services > 1 {
svc = fmt.Sprintf("%s-%02d", svc, s)
}

tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(cfg.Service),
)),
)
defer tp.Shutdown(context.Background())
exp, err := createOtelExporter(cfg.TraceExporter)
if err != nil {
logger.Sugar().Fatalf("cannot create trace exporter %s: %w", cfg.TraceExporter, err)
}
logger.Sugar().Infof("using %s trace exporter for service %s", cfg.TraceExporter, svc)

tracegen.Run(cfg, tp.Tracer("tracegen"), logger)
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(svc),
)),
)
tracers = append(tracers, tp.Tracer(cfg.Service))
shutdown = append(shutdown, tp.Shutdown)
}
return tracers, func(ctx context.Context) error {
var errs []error
for _, f := range shutdown {
errs = append(errs, f(ctx))
}
return errors.Join(errs...)
}
}

func createOtelExporter(exporterType string) (sdktrace.SpanExporter, error) {
Expand Down
35 changes: 19 additions & 16 deletions internal/tracegen/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ import (
// Config describes the test scenario.
type Config struct {
Workers int
Services int
Traces int
ChildSpans int
Attributes int
AttrValues int
Marshal bool
Debug bool
Firehose bool
Expand All @@ -41,17 +45,21 @@ type Config struct {
// Flags registers config flags.
func (c *Config) Flags(fs *flag.FlagSet) {
fs.IntVar(&c.Workers, "workers", 1, "Number of workers (goroutines) to run")
fs.IntVar(&c.Traces, "traces", 1, "Number of traces to generate in each worker (ignored if duration is provided")
fs.IntVar(&c.Traces, "traces", 1, "Number of traces to generate in each worker (ignored if duration is provided)")
fs.IntVar(&c.ChildSpans, "spans", 1, "Number of child spans to generate for each trace")
fs.IntVar(&c.Attributes, "attrs", 1, "Number of attributes to generate for each child span")
fs.IntVar(&c.AttrValues, "attr-values", 5, "Number of distinct values to allow for each attribute")
fs.BoolVar(&c.Debug, "debug", false, "Whether to set DEBUG flag on the spans to force sampling")
fs.BoolVar(&c.Firehose, "firehose", false, "Whether to set FIREHOSE flag on the spans to skip indexing")
fs.DurationVar(&c.Pause, "pause", time.Microsecond, "How long to pause before finishing trace")
fs.DurationVar(&c.Duration, "duration", 0, "For how long to run the test")
fs.StringVar(&c.Service, "service", "tracegen", "Service name to use")
fs.DurationVar(&c.Pause, "pause", time.Microsecond, "How long to sleep before finishing each span. If set to 0s then a fake 123µs duration is used.")
fs.DurationVar(&c.Duration, "duration", 0, "For how long to run the test if greater than 0s (overrides -traces).")
fs.StringVar(&c.Service, "service", "tracegen", "Service name prefix to use")
fs.IntVar(&c.Services, "services", 1, "Number of unique suffixes to add to service name when generating traces, e.g. tracegen-01 (but only one service per trace)")
fs.StringVar(&c.TraceExporter, "trace-exporter", "jaeger", "Trace exporter (jaeger|otlp/otlp-http|otlp-grpc|stdout). Exporters can be additionally configured via environment variables, see https://github.com/jaegertracing/jaeger/blob/main/cmd/tracegen/README.md")
}

// Run executes the test scenario.
func Run(c *Config, tracer trace.Tracer, logger *zap.Logger) error {
func Run(c *Config, tracers []trace.Tracer, logger *zap.Logger) error {
if c.Duration > 0 {
c.Traces = 0
} else if c.Traces <= 0 {
Expand All @@ -63,17 +71,12 @@ func Run(c *Config, tracer trace.Tracer, logger *zap.Logger) error {
for i := 0; i < c.Workers; i++ {
wg.Add(1)
w := worker{
id: i,
tracer: tracer,
traces: c.Traces,
marshal: c.Marshal,
debug: c.Debug,
firehose: c.Firehose,
pause: c.Pause,
duration: c.Duration,
running: &running,
wg: &wg,
logger: logger.With(zap.Int("worker", i)),
id: i,
tracers: tracers,
Config: *c,
running: &running,
wg: &wg,
logger: logger.With(zap.Int("worker", i)),
}

go w.simulateTraces()
Expand Down
98 changes: 60 additions & 38 deletions internal/tracegen/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,76 +27,98 @@ import (
)

type worker struct {
tracer trace.Tracer
running *uint32 // pointer to shared flag that indicates it's time to stop the test
id int // worker id
traces int // how many traces the worker has to generate (only when duration==0)
marshal bool // whether the worker needs to marshal trace context via HTTP headers
debug bool // whether to set DEBUG flag on the spans
firehose bool // whether to set FIREHOSE flag on the spans
duration time.Duration // how long to run the test for (overrides `traces`)
pause time.Duration // how long to pause before finishing the trace
wg *sync.WaitGroup // notify when done
logger *zap.Logger
tracers []trace.Tracer
running *uint32 // pointer to shared flag that indicates it's time to stop the test
id int // worker id
Config
wg *sync.WaitGroup // notify when done
logger *zap.Logger

// internal counters
traceNo int
attrValNo int
}

const (
fakeSpanDuration = 123 * time.Microsecond
)

func (w worker) simulateTraces() {
var i int
func (w *worker) simulateTraces() {
for atomic.LoadUint32(w.running) == 1 {
w.simulateOneTrace()
i++
if w.traces != 0 {
if i >= w.traces {
svcNo := w.traceNo % len(w.tracers)
w.simulateOneTrace(w.tracers[svcNo])
w.traceNo++
if w.Traces != 0 {
if w.traceNo >= w.Traces {
break
}
}
}
w.logger.Info(fmt.Sprintf("Worker %d generated %d traces", w.id, i))
w.logger.Info(fmt.Sprintf("Worker %d generated %d traces", w.id, w.traceNo))
w.wg.Done()
}

func (w worker) simulateOneTrace() {
func (w *worker) simulateOneTrace(tracer trace.Tracer) {
ctx := context.Background()
attrs := []attribute.KeyValue{
attribute.String("peer.service", "tracegen-server"),
attribute.String("peer.host.ipv4", "1.1.1.1"),
}
if w.debug {
if w.Debug {
attrs = append(attrs, attribute.Bool("jaeger.debug", true))
}
if w.firehose {
if w.Firehose {
attrs = append(attrs, attribute.Bool("jaeger.firehose", true))
}
start := time.Now()
ctx, sp := w.tracer.Start(
ctx, parent := tracer.Start(
ctx,
"lets-go",
trace.WithSpanKind(trace.SpanKindClient),
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attrs...),
trace.WithTimestamp(start),
)
w.simulateChildSpans(ctx, start, tracer)

_, child := w.tracer.Start(
ctx,
"okey-dokey",
trace.WithSpanKind(trace.SpanKindServer),
)

time.Sleep(w.pause)

if w.pause != 0 {
child.End()
sp.End()
if w.Pause != 0 {
parent.End()
} else {
child.End(
trace.WithTimestamp(start.Add(fakeSpanDuration)),
totalDuration := time.Duration(w.ChildSpans) * fakeSpanDuration
parent.End(
trace.WithTimestamp(start.Add(totalDuration)),
)
sp.End(
trace.WithTimestamp(start.Add(fakeSpanDuration)),
}
}

func (w *worker) simulateChildSpans(ctx context.Context, start time.Time, tracer trace.Tracer) {
for c := 0; c < w.ChildSpans; c++ {
var attrs []attribute.KeyValue
for a := 0; a < w.Attributes; a++ {
key := fmt.Sprintf("attr_%02d", a)
val := fmt.Sprintf("val_%02d", w.attrValNo)
w.attrValNo = (w.attrValNo + 1) % w.AttrValues
attrs = append(attrs, attribute.String(key, val))
}
opts := []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attrs...),
}
childStart := start.Add(time.Duration(c) * fakeSpanDuration)
if w.Pause == 0 {
opts = append(opts, trace.WithTimestamp(childStart))
}
_, child := tracer.Start(
ctx,
fmt.Sprintf("child-span-%02d", c),
opts...,
)
if w.Pause != 0 {
time.Sleep(w.Pause)
child.End()
} else {
child.End(
trace.WithTimestamp(childStart.Add(fakeSpanDuration)),
)
}
}
}