diff --git a/cmd/tracegen/main.go b/cmd/tracegen/main.go index 3d04c26c194..6c7a07e8158 100644 --- a/cmd/tracegen/main.go +++ b/cmd/tracegen/main.go @@ -16,6 +16,7 @@ package main import ( "context" + "errors" "flag" "fmt" @@ -29,6 +30,7 @@ import ( "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/internal/jaegerclientenv2otel" @@ -46,22 +48,47 @@ func main() { otel.SetTextMapPropagator(propagation.TraceContext{}) jaegerclientenv2otel.MapJaegerToOtelEnvVars(logger) - exp, err := createOtelExporter(cfg.TraceExporter) - if err != nil { - logger.Sugar().Fatalf("cannot create trace exporter %s: %w", cfg.TraceExporter, err) + tracers, shutdown := createTracers(cfg) + defer shutdown(context.Background()) + + tracegen.Run(cfg, tracers, logger) +} + +func createTracers(cfg *tracegen.Config) ([]trace.Tracer, func(context.Context) error) { + if cfg.Services < 1 { + cfg.Services = 1 } - logger.Sugar().Infof("using %s trace exporter", cfg.TraceExporter) + var shutdown []func(context.Context) error + var tracers []trace.Tracer + for s := 0; s < cfg.Services; s++ { + svc := cfg.Service + if cfg.Services > 1 { + svc = fmt.Sprintf("%s-%02d", svc, s) + } - tp := sdktrace.NewTracerProvider( - sdktrace.WithBatcher(exp), - sdktrace.WithResource(resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceNameKey.String(cfg.Service), - )), - ) - defer tp.Shutdown(context.Background()) + exp, err := createOtelExporter(cfg.TraceExporter) + if err != nil { + logger.Sugar().Fatalf("cannot create trace exporter %s: %w", cfg.TraceExporter, err) + } + logger.Sugar().Infof("using %s trace exporter for service %s", cfg.TraceExporter, svc) - tracegen.Run(cfg, tp.Tracer("tracegen"), logger) + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + sdktrace.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String(svc), + )), + ) + tracers = append(tracers, tp.Tracer(cfg.Service)) + shutdown = append(shutdown, tp.Shutdown) + } + return tracers, func(ctx context.Context) error { + var errs []error + for _, f := range shutdown { + errs = append(errs, f(ctx)) + } + return errors.Join(errs...) + } } func createOtelExporter(exporterType string) (sdktrace.SpanExporter, error) { diff --git a/internal/tracegen/config.go b/internal/tracegen/config.go index 40ae8d9521a..7ccb574dd2e 100644 --- a/internal/tracegen/config.go +++ b/internal/tracegen/config.go @@ -28,7 +28,11 @@ import ( // Config describes the test scenario. type Config struct { Workers int + Services int Traces int + ChildSpans int + Attributes int + AttrValues int Marshal bool Debug bool Firehose bool @@ -41,17 +45,21 @@ type Config struct { // Flags registers config flags. func (c *Config) Flags(fs *flag.FlagSet) { fs.IntVar(&c.Workers, "workers", 1, "Number of workers (goroutines) to run") - fs.IntVar(&c.Traces, "traces", 1, "Number of traces to generate in each worker (ignored if duration is provided") + fs.IntVar(&c.Traces, "traces", 1, "Number of traces to generate in each worker (ignored if duration is provided)") + fs.IntVar(&c.ChildSpans, "spans", 1, "Number of child spans to generate for each trace") + fs.IntVar(&c.Attributes, "attrs", 1, "Number of attributes to generate for each child span") + fs.IntVar(&c.AttrValues, "attr-values", 5, "Number of distinct values to allow for each attribute") fs.BoolVar(&c.Debug, "debug", false, "Whether to set DEBUG flag on the spans to force sampling") fs.BoolVar(&c.Firehose, "firehose", false, "Whether to set FIREHOSE flag on the spans to skip indexing") - fs.DurationVar(&c.Pause, "pause", time.Microsecond, "How long to pause before finishing trace") - fs.DurationVar(&c.Duration, "duration", 0, "For how long to run the test") - fs.StringVar(&c.Service, "service", "tracegen", "Service name to use") + fs.DurationVar(&c.Pause, "pause", time.Microsecond, "How long to sleep before finishing each span. If set to 0s then a fake 123µs duration is used.") + fs.DurationVar(&c.Duration, "duration", 0, "For how long to run the test if greater than 0s (overrides -traces).") + fs.StringVar(&c.Service, "service", "tracegen", "Service name prefix to use") + fs.IntVar(&c.Services, "services", 1, "Number of unique suffixes to add to service name when generating traces, e.g. tracegen-01 (but only one service per trace)") fs.StringVar(&c.TraceExporter, "trace-exporter", "jaeger", "Trace exporter (jaeger|otlp/otlp-http|otlp-grpc|stdout). Exporters can be additionally configured via environment variables, see https://github.com/jaegertracing/jaeger/blob/main/cmd/tracegen/README.md") } // Run executes the test scenario. -func Run(c *Config, tracer trace.Tracer, logger *zap.Logger) error { +func Run(c *Config, tracers []trace.Tracer, logger *zap.Logger) error { if c.Duration > 0 { c.Traces = 0 } else if c.Traces <= 0 { @@ -63,17 +71,12 @@ func Run(c *Config, tracer trace.Tracer, logger *zap.Logger) error { for i := 0; i < c.Workers; i++ { wg.Add(1) w := worker{ - id: i, - tracer: tracer, - traces: c.Traces, - marshal: c.Marshal, - debug: c.Debug, - firehose: c.Firehose, - pause: c.Pause, - duration: c.Duration, - running: &running, - wg: &wg, - logger: logger.With(zap.Int("worker", i)), + id: i, + tracers: tracers, + Config: *c, + running: &running, + wg: &wg, + logger: logger.With(zap.Int("worker", i)), } go w.simulateTraces() diff --git a/internal/tracegen/worker.go b/internal/tracegen/worker.go index 6125ed5cb18..8eeb5373263 100644 --- a/internal/tracegen/worker.go +++ b/internal/tracegen/worker.go @@ -27,76 +27,98 @@ import ( ) type worker struct { - tracer trace.Tracer - running *uint32 // pointer to shared flag that indicates it's time to stop the test - id int // worker id - traces int // how many traces the worker has to generate (only when duration==0) - marshal bool // whether the worker needs to marshal trace context via HTTP headers - debug bool // whether to set DEBUG flag on the spans - firehose bool // whether to set FIREHOSE flag on the spans - duration time.Duration // how long to run the test for (overrides `traces`) - pause time.Duration // how long to pause before finishing the trace - wg *sync.WaitGroup // notify when done - logger *zap.Logger + tracers []trace.Tracer + running *uint32 // pointer to shared flag that indicates it's time to stop the test + id int // worker id + Config + wg *sync.WaitGroup // notify when done + logger *zap.Logger + + // internal counters + traceNo int + attrValNo int } const ( fakeSpanDuration = 123 * time.Microsecond ) -func (w worker) simulateTraces() { - var i int +func (w *worker) simulateTraces() { for atomic.LoadUint32(w.running) == 1 { - w.simulateOneTrace() - i++ - if w.traces != 0 { - if i >= w.traces { + svcNo := w.traceNo % len(w.tracers) + w.simulateOneTrace(w.tracers[svcNo]) + w.traceNo++ + if w.Traces != 0 { + if w.traceNo >= w.Traces { break } } } - w.logger.Info(fmt.Sprintf("Worker %d generated %d traces", w.id, i)) + w.logger.Info(fmt.Sprintf("Worker %d generated %d traces", w.id, w.traceNo)) w.wg.Done() } -func (w worker) simulateOneTrace() { +func (w *worker) simulateOneTrace(tracer trace.Tracer) { ctx := context.Background() attrs := []attribute.KeyValue{ attribute.String("peer.service", "tracegen-server"), attribute.String("peer.host.ipv4", "1.1.1.1"), } - if w.debug { + if w.Debug { attrs = append(attrs, attribute.Bool("jaeger.debug", true)) } - if w.firehose { + if w.Firehose { attrs = append(attrs, attribute.Bool("jaeger.firehose", true)) } start := time.Now() - ctx, sp := w.tracer.Start( + ctx, parent := tracer.Start( ctx, "lets-go", - trace.WithSpanKind(trace.SpanKindClient), + trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attrs...), trace.WithTimestamp(start), ) + w.simulateChildSpans(ctx, start, tracer) - _, child := w.tracer.Start( - ctx, - "okey-dokey", - trace.WithSpanKind(trace.SpanKindServer), - ) - - time.Sleep(w.pause) - - if w.pause != 0 { - child.End() - sp.End() + if w.Pause != 0 { + parent.End() } else { - child.End( - trace.WithTimestamp(start.Add(fakeSpanDuration)), + totalDuration := time.Duration(w.ChildSpans) * fakeSpanDuration + parent.End( + trace.WithTimestamp(start.Add(totalDuration)), ) - sp.End( - trace.WithTimestamp(start.Add(fakeSpanDuration)), + } +} + +func (w *worker) simulateChildSpans(ctx context.Context, start time.Time, tracer trace.Tracer) { + for c := 0; c < w.ChildSpans; c++ { + var attrs []attribute.KeyValue + for a := 0; a < w.Attributes; a++ { + key := fmt.Sprintf("attr_%02d", a) + val := fmt.Sprintf("val_%02d", w.attrValNo) + w.attrValNo = (w.attrValNo + 1) % w.AttrValues + attrs = append(attrs, attribute.String(key, val)) + } + opts := []trace.SpanStartOption{ + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(attrs...), + } + childStart := start.Add(time.Duration(c) * fakeSpanDuration) + if w.Pause == 0 { + opts = append(opts, trace.WithTimestamp(childStart)) + } + _, child := tracer.Start( + ctx, + fmt.Sprintf("child-span-%02d", c), + opts..., ) + if w.Pause != 0 { + time.Sleep(w.Pause) + child.End() + } else { + child.End( + trace.WithTimestamp(childStart.Add(fakeSpanDuration)), + ) + } } }