Skip to content

Commit

Permalink
add sampling support to span processors
Browse files Browse the repository at this point in the history
  • Loading branch information
blakeroberts-wk committed Jan 7, 2025
1 parent 4b6145a commit b4dd92b
Show file tree
Hide file tree
Showing 13 changed files with 358 additions and 149 deletions.
91 changes: 91 additions & 0 deletions example/sampling.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021-2022 Workiva.
// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information

import 'package:collection/collection.dart';
import 'package:opentelemetry/api.dart'
show
Attribute,
Context,
SpanKind,
SpanLink,
TraceId,
registerGlobalTracerProvider,
spanContextFromContext;
import 'package:opentelemetry/sdk.dart'
show
ConsoleExporter,
Decision,
ParentBasedSampler,
ReadOnlySpan,
ReadWriteSpan,
Sampler,
SamplingResult,
SimpleSpanProcessor,
TracerProviderBase;

final Attribute samplingOffAttribute =
Attribute.fromInt('sampling.priority', 0);

class SpanSamplingPrioritySampler implements Sampler {
@override
SamplingResult shouldSample(
Context parentContext,
TraceId traceId,
String name,
SpanKind spanKind,
List<Attribute> attributes,
List<SpanLink> links) {
final decision = attributes.firstWhereOrNull((element) =>
element.key == 'sampling.priority' && element.value == 0) !=
null
? Decision.recordOnly
: Decision.recordAndSample;

return SamplingResult(
decision, attributes, spanContextFromContext(parentContext).traceState);
}

@override
String get description => 'SpanSamplingPrioritySampler';
}

class PrintingSpanProcessor extends SimpleSpanProcessor {
PrintingSpanProcessor(super.exporter);

@override
void onStart(ReadWriteSpan span, Context parentContext) {
print('Span started: ${span.name}');
super.onStart(span, parentContext);
}

@override
void onEnd(ReadOnlySpan span) {
print('Span ended: ${span.name}');
super.onEnd(span);
}

@override
void shutdown() {
print('Shutting down');
super.shutdown();
}

@override
void forceFlush() {}
}

void main(List<String> args) async {
final sampler = ParentBasedSampler(SpanSamplingPrioritySampler());
final tp = TracerProviderBase(
processors: [PrintingSpanProcessor(ConsoleExporter())], sampler: sampler);
registerGlobalTracerProvider(tp);

final tracer = tp.getTracer('instrumentation-name');

tracer.startSpan('span-not-sampled', attributes: [
samplingOffAttribute,
]).end();
tracer.startSpan('span-sampled').end();

tp.shutdown();
}
2 changes: 1 addition & 1 deletion lib/src/api/span_processors/span_processor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import '../../../api.dart' as api;

@Deprecated(
'This class will be moved to the SDK package in 0.19.0. Use [SpanExporter] from SDK instead.')
'This class will be moved to the SDK package in 0.19.0. Use [SpanProcessor] from SDK instead.')
abstract class SpanProcessor {
void onStart(api.Span span, api.Context parentContext);

Expand Down
42 changes: 16 additions & 26 deletions lib/src/api/trace/span_context.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,29 @@ import '../../../api.dart' as api;

/// Representation of the context of an individual span.
class SpanContext {
final api.SpanId _spanId;
final api.TraceId _traceId;
final int _traceFlags;
final api.TraceState _traceState;
final bool _isRemote;

api.TraceId get traceId => _traceId;

api.SpanId get spanId => _spanId;

int get traceFlags => _traceFlags;

api.TraceState get traceState => _traceState;
final api.TraceId traceId;
final api.SpanId spanId;
final int traceFlags;
final api.TraceState traceState;
final bool isRemote;

bool get isValid => spanId.isValid && traceId.isValid;

/// Construct a [SpanContext].
SpanContext(this._traceId, this._spanId, this._traceFlags, this._traceState)
: _isRemote = false;
SpanContext(this.traceId, this.spanId, this.traceFlags, this.traceState)
: isRemote = false;

/// Construct a [SpanContext] representing an operation which originated
/// from a remote source.
/// Construct a [SpanContext] representing an operation which originated from
/// a remote source.
SpanContext.remote(
this._traceId, this._spanId, this._traceFlags, this._traceState)
: _isRemote = true;
this.traceId, this.spanId, this.traceFlags, this.traceState)
: isRemote = true;

/// Construct an invalid [SpanContext].
SpanContext.invalid()
: _spanId = api.SpanId.invalid(),
_traceId = api.TraceId.invalid(),
_traceFlags = api.TraceFlags.none,
_traceState = api.TraceState.empty(),
_isRemote = false;

bool get isRemote => _isRemote;
: spanId = api.SpanId.invalid(),
traceId = api.TraceId.invalid(),
traceFlags = api.TraceFlags.none,
traceState = api.TraceState.empty(),
isRemote = false;
}
2 changes: 2 additions & 0 deletions lib/src/sdk/trace/exporters/collector_exporter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ class CollectorExporter implements sdk.SpanExporter {
return pb_common.AnyValue();
}

@Deprecated(
'This method will be removed in 0.19.0. Use [SpanProcessor] instead.')
@override
void forceFlush() {
return;
Expand Down
2 changes: 2 additions & 0 deletions lib/src/sdk/trace/exporters/console_exporter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class ConsoleExporter implements sdk.SpanExporter {
_printSpans(spans);
}

@Deprecated(
'This method will be removed in 0.19.0. Use [SpanProcessor] instead.')
@override
void forceFlush() {
return;
Expand Down
6 changes: 4 additions & 2 deletions lib/src/sdk/trace/exporters/span_exporter.dart
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
// Copyright 2021-2022 Workiva.
// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information

import '../../../../sdk.dart' as sdk;
import '../read_only_span.dart';

abstract class SpanExporter {
void export(List<sdk.ReadOnlySpan> spans);
void export(List<ReadOnlySpan> spans);

@Deprecated(
'This method will be removed in 0.19.0. Use [SpanProcessor] instead.')
void forceFlush();

void shutdown();
Expand Down
29 changes: 18 additions & 11 deletions lib/src/sdk/trace/span_processors/batch_processor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@ import 'dart:math';

import 'package:logging/logging.dart';

import '../../../../api.dart' as api;
import '../../../../sdk.dart' as sdk;

class BatchSpanProcessor implements sdk.SpanProcessor {
import '../../../api/context/context.dart';
import '../../../api/trace/trace_flags.dart';
import '../exporters/span_exporter.dart';
import '../read_only_span.dart';
import '../read_write_span.dart';
import 'span_processor.dart';

class BatchSpanProcessor implements SpanProcessor {
static const int _DEFAULT_MAXIMUM_BATCH_SIZE = 512;
static const int _DEFAULT_MAXIMUM_QUEUE_SIZE = 2048;
static const int _DEFAULT_EXPORT_DELAY = 5000;

final sdk.SpanExporter _exporter;
final SpanExporter _exporter;
final Logger _log = Logger('opentelemetry.BatchSpanProcessor');
final int _maxExportBatchSize;
final int _maxQueueSize;
final List<sdk.ReadOnlySpan> _spanBuffer = [];
final List<ReadOnlySpan> _spanBuffer = [];

late final Timer _timer;

Expand All @@ -41,19 +45,18 @@ class BatchSpanProcessor implements sdk.SpanProcessor {
while (_spanBuffer.isNotEmpty) {
_exportBatch(_timer);
}
_exporter.forceFlush();
}

@override
void onEnd(sdk.ReadOnlySpan span) {
void onEnd(ReadOnlySpan span) {
if (_isShutdown) {
return;
}
_addToBuffer(span);
}

@override
void onStart(sdk.ReadWriteSpan span, api.Context parentContext) {}
void onStart(ReadWriteSpan span, Context parentContext) {}

@override
void shutdown() {
Expand All @@ -63,15 +66,19 @@ class BatchSpanProcessor implements sdk.SpanProcessor {
_exporter.shutdown();
}

void _addToBuffer(sdk.ReadOnlySpan span) {
void _addToBuffer(ReadOnlySpan span) {
if (_spanBuffer.length >= _maxQueueSize) {
// Buffer is full, drop span.
_log.warning(
'Max queue size exceeded. Dropping ${_spanBuffer.length} spans.');
return;
}

_spanBuffer.add(span);
final isSampled =
span.spanContext.traceFlags & TraceFlags.sampled == TraceFlags.sampled;
if (isSampled) {
_spanBuffer.add(span);
}
}

void _exportBatch(Timer timer) {
Expand Down
29 changes: 17 additions & 12 deletions lib/src/sdk/trace/span_processors/simple_processor.dart
Original file line number Diff line number Diff line change
@@ -1,35 +1,40 @@
// Copyright 2021-2022 Workiva.
// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information

import '../../../../api.dart' as api;
import '../../../../sdk.dart' as sdk;

class SimpleSpanProcessor implements sdk.SpanProcessor {
final sdk.SpanExporter _exporter;
import '../../../api/context/context.dart';
import '../../../api/trace/trace_flags.dart';
import '../exporters/span_exporter.dart';
import '../read_only_span.dart';
import '../read_write_span.dart';
import 'span_processor.dart';

class SimpleSpanProcessor implements SpanProcessor {
final SpanExporter _exporter;
bool _isShutdown = false;

SimpleSpanProcessor(this._exporter);

@override
void forceFlush() {
_exporter.forceFlush();
}
void forceFlush() {}

@override
void onEnd(sdk.ReadOnlySpan span) {
void onEnd(ReadOnlySpan span) {
if (_isShutdown) {
return;
}

_exporter.export([span]);
final isSampled =
span.spanContext.traceFlags & TraceFlags.sampled == TraceFlags.sampled;
if (isSampled) {
_exporter.export([span]);
}
}

@override
void onStart(sdk.ReadWriteSpan span, api.Context parentContext) {}
void onStart(ReadWriteSpan span, Context parentContext) {}

@override
void shutdown() {
forceFlush();
_isShutdown = true;
_exporter.shutdown();
}
Expand Down
Loading

0 comments on commit b4dd92b

Please sign in to comment.