Skip to content
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Enhancements

- Replace log batcher with telemetry processor ([#3448](https://github.com/getsentry/sentry-dart/pull/3448))

### Dependencies

- Bump Native SDK from v0.10.0 to v0.12.3 ([#3438](https://github.com/getsentry/sentry-dart/pull/3438))
Expand Down
12 changes: 0 additions & 12 deletions packages/dart/lib/src/noop_log_batcher.dart

This file was deleted.

2 changes: 2 additions & 0 deletions packages/dart/lib/src/sentry.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import 'sentry_attachment/sentry_attachment.dart';
import 'sentry_client.dart';
import 'sentry_options.dart';
import 'sentry_run_zoned_guarded.dart';
import 'telemetry/processing/processor_integration.dart';
import 'tracing.dart';
import 'transport/data_category.dart';
import 'transport/task_queue.dart';
Expand Down Expand Up @@ -111,6 +112,7 @@ class Sentry {

options.addIntegration(FeatureFlagsIntegration());
options.addIntegration(LogsEnricherIntegration());
options.addIntegration(InMemoryTelemetryProcessorIntegration());

options.addEventProcessor(EnricherEventProcessor(options));
options.addEventProcessor(ExceptionEventProcessor(options));
Expand Down
8 changes: 2 additions & 6 deletions packages/dart/lib/src/sentry_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import 'type_check_hint.dart';
import 'utils/isolate_utils.dart';
import 'utils/regex_utils.dart';
import 'utils/stacktrace_utils.dart';
import 'sentry_log_batcher.dart';
import 'version.dart';

/// Default value for [SentryUser.ipAddress]. It gets set when an event does not have
Expand Down Expand Up @@ -75,9 +74,6 @@ class SentryClient {
if (enableFlutterSpotlight) {
options.transport = SpotlightHttpTransport(options, options.transport);
}
if (options.enableLogs) {
options.logBatcher = SentryLogBatcher(options);
}
return SentryClient._(options);
}

Expand Down Expand Up @@ -578,7 +574,7 @@ class SentryClient {
if (processedLog != null) {
await _options.lifecycleRegistry
.dispatchCallback(OnBeforeCaptureLog(processedLog));
_options.logBatcher.addLog(processedLog);
_options.telemetryProcessor.addLog(processedLog);
} else {
_options.recorder.recordLostEvent(
DiscardReason.beforeSend,
Expand All @@ -588,7 +584,7 @@ class SentryClient {
}

FutureOr<void> close() {
final flush = _options.logBatcher.flush();
final flush = _options.telemetryProcessor.flush();
if (flush is Future<void>) {
return flush.then((_) => _options.httpClient.close());
}
Expand Down
94 changes: 0 additions & 94 deletions packages/dart/lib/src/sentry_log_batcher.dart

This file was deleted.

5 changes: 2 additions & 3 deletions packages/dart/lib/src/sentry_options.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import 'noop_client.dart';
import 'platform/platform.dart';
import 'sentry_exception_factory.dart';
import 'sentry_stack_trace_factory.dart';
import 'telemetry/processing/processor.dart';
import 'transport/noop_transport.dart';
import 'version.dart';
import 'sentry_log_batcher.dart';
import 'noop_log_batcher.dart';
import 'dart:developer' as developer;

// TODO: shutdownTimeout, flushTimeoutMillis
Expand Down Expand Up @@ -554,7 +553,7 @@ class SentryOptions {
late final SentryLogger logger = SentryLogger(clock);

@internal
SentryLogBatcher logBatcher = NoopLogBatcher();
TelemetryProcessor telemetryProcessor = NoOpTelemetryProcessor();

SentryOptions({String? dsn, Platform? platform, RuntimeChecker? checker}) {
this.dsn = dsn;
Expand Down
13 changes: 13 additions & 0 deletions packages/dart/lib/src/telemetry/processing/buffer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import 'dart:async';

/// A buffer that batches telemetry items for efficient transmission to Sentry.
///
/// Collects items of type [T] and sends them in batches rather than
/// individually, reducing network overhead.
abstract class TelemetryBuffer<T> {
/// Adds an item to the buffer.
void add(T item);

/// When executed immediately sends all buffered items to Sentry and clears the buffer.
FutureOr<void> flush();
}
15 changes: 15 additions & 0 deletions packages/dart/lib/src/telemetry/processing/buffer_config.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
final class TelemetryBufferConfig {
final Duration flushTimeout;
final int maxBufferSizeBytes;
final int maxItemCount;

const TelemetryBufferConfig({
this.flushTimeout = defaultFlushTimeout,
this.maxBufferSizeBytes = defaultMaxBufferSizeBytes,
this.maxItemCount = defaultMaxItemCount,
});

static const Duration defaultFlushTimeout = Duration(seconds: 5);
static const int defaultMaxBufferSizeBytes = 1024 * 1024;
static const int defaultMaxItemCount = 100;
}
143 changes: 143 additions & 0 deletions packages/dart/lib/src/telemetry/processing/in_memory_buffer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import 'dart:async';

import '../../utils/internal_logger.dart';
import 'buffer.dart';
import 'buffer_config.dart';

/// Callback invoked when the buffer is flushed with the accumulated data.
typedef OnFlushCallback<T> = FutureOr<void> Function(T data);

/// Encodes an item of type [T] into bytes.
typedef ItemEncoder<T> = List<int> Function(T item);

/// Base class for in-memory telemetry buffers.
///
/// Buffers telemetry items in memory and flushes them when either the
/// configured size limit, item count limit, or flush timeout is reached.
abstract base class _BaseInMemoryTelemetryBuffer<T, S>
implements TelemetryBuffer<T> {
final TelemetryBufferConfig _config;
final ItemEncoder<T> _encoder;
final OnFlushCallback<S> _onFlush;

S _storage;
int _bufferSize = 0;
int _itemCount = 0;
Timer? _flushTimer;

_BaseInMemoryTelemetryBuffer({
required ItemEncoder<T> encoder,
required OnFlushCallback<S> onFlush,
required S initialStorage,
TelemetryBufferConfig config = const TelemetryBufferConfig(),
}) : _encoder = encoder,
_onFlush = onFlush,
_storage = initialStorage,
_config = config;

S _createEmptyStorage();
void _store(List<int> encoded, T item);
bool get _isEmpty;

bool get _isBufferFull =>
_bufferSize >= _config.maxBufferSizeBytes ||
_itemCount >= _config.maxItemCount;

@override
void add(T item) {
final List<int> encoded;
try {
encoded = _encoder(item);
} catch (exception, stackTrace) {
internalLogger.error(
'$runtimeType: Failed to encode item, dropping',
error: exception,
stackTrace: stackTrace,
);
return;
}

if (encoded.length > _config.maxBufferSizeBytes) {
internalLogger.warning(
'$runtimeType: Item size ${encoded.length} exceeds buffer limit ${_config.maxBufferSizeBytes}, dropping',
);
return;
}

_store(encoded, item);
_bufferSize += encoded.length;
_itemCount++;

if (_isBufferFull) {
internalLogger.debug(
'$runtimeType: Buffer full, flushing $_itemCount items',
);
flush();
} else {
_flushTimer ??= Timer(_config.flushTimeout, flush);
}
}

@override
FutureOr<void> flush() {
_flushTimer?.cancel();
_flushTimer = null;

if (_isEmpty) return null;

final toFlush = _storage;
final flushedCount = _itemCount;
final flushedSize = _bufferSize;
_storage = _createEmptyStorage();
_bufferSize = 0;
_itemCount = 0;

final successMessage =
'$runtimeType: Flushed $flushedCount items ($flushedSize bytes)';
final errorMessage =
'$runtimeType: Flush failed for $flushedCount items ($flushedSize bytes)';

try {
final result = _onFlush(toFlush);
if (result is Future) {
return result.then(
(_) => internalLogger.debug(successMessage),
onError: (exception, stackTrace) => internalLogger.warning(
errorMessage,
error: exception,
stackTrace: stackTrace,
),
);
}
internalLogger.debug(successMessage);
} catch (exception, stackTrace) {
internalLogger.warning(
errorMessage,
error: exception,
stackTrace: stackTrace,
);
}
}
}

/// In-memory buffer that collects telemetry items as a flat list.
///
/// Items are encoded and stored in insertion order. On flush, the entire
/// list of encoded items is passed to the [OnFlushCallback].
final class InMemoryTelemetryBuffer<T>
extends _BaseInMemoryTelemetryBuffer<T, List<List<int>>> {
InMemoryTelemetryBuffer({
required super.encoder,
required super.onFlush,
super.config,
}) : super(initialStorage: []);

@override
List<List<int>> _createEmptyStorage() => [];

@override
void _store(List<int> encoded, T item) => _storage.add(encoded);

@override
bool get _isEmpty => _storage.isEmpty;
}
Loading
Loading