Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/OpenTelemetry/BaseExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ public abstract class BaseExporter<T> : IDisposable
/// <returns>Result of the export operation.</returns>
public abstract ExportResult Export(in Batch<T> batch);

/// <summary>
/// Exports a batch of telemetry objects.
/// </summary>
/// <param name="batch">Batch of telemetry objects to export.</param>
/// <returns>Result of the export operation.</returns>
public virtual Task<ExportResult> ExportAsync(Batch<T> batch)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A related discussion on this topic we had in OTel Rust community : open-telemetry/opentelemetry-rust#2027
OTel Rust only has async API for the exporters. This is causing perf issues when the exporter does not benefit from async API (like etw exporters which has no async need as it don't talk to disk/network etc.)

We are also considering adding both options in OTel Rust, with one calling the other automatically!
Just shared as these are similar, though in diff. languages!

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocation overhead for calling ExportAsync instead of Export should just be the Task allocation. AFAIK, state machines are kept on the stack while the methods return synchronously. If a method needs to yield the execution because of a blocking operation (e.g. reading a socket), then the state machine will be moved to the heap.

In this short snippet we can see that the state machine is a struct and when the async method doesn't return synchronously (i.e. in the if (!awaiter.IsCompleted) branch), AwaitUnsafeOnCompleted is called which boxes the state machine.

So I'm not very concerned about the performance impact for synchronous exporters, especially that the method should only be called every X seconds.

One thing that could be done is to use ValueTask to avoid the Task allocation. It is used for example in Stream.ReadAsync(Memory). Though ValueTask has more constraints than Task so it can be a tough decision to make. That's why I always use Task and it never was a bottleneck. More reading from our savior Stephen Toub: https://devblogs.microsoft.com/dotnet/understanding-the-whys-whats-and-whens-of-valuetask.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm not very concerned about the performance impact for synchronous exporters, especially that the method should only be called every X seconds.

The export method is called for every Log, every Span when using sync exporters like ETW. (they don't do any batching)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could make sense to use ValueTask in that case. Could you give me a pointer to the ETW exporter. I can't seem to find it

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this exporter, it will continue to call the synchronous export so they won't be any performance change here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to do this (and that is a big IF), it MUST be:

public virtual Task<ExportResult> ExportAsync(Batch<T> batch, CancellationToken cancellationToken)

Because this is an opportunity to clean up a mistake 😄 That cancellationToken should be built from https://github.com/open-telemetry/opentelemetry-dotnet/blob/1c01770882bb5113ff308b2b4398347fab6e0404/src/OpenTelemetry/BatchExportProcessor.cs#L24C27-L24C54.

I would also like to see:

public virtual ValueTask<ExportResult> 

There's only 2 ExportResult values. We could have 2 cached instances of Task<ExportResult>. But I think ValueTask is better for implementations which may always execute in sync. Imagine an exporter writing to a Stream. Could be async when hooked up to like a NetworkStream. But could also always run synchronously when hooked up to a MemoryStream.

@verdie-g verdie-g Sep 18, 2024

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea for the CancellationToken!

Regarding the ValueTask, given the constraint of that type, I find it a hard decision to make. I've briefly discussed it here #5838 (comment). I think I'll be able to give more info once I implemented ExportAsync for most exporters.

{
return Task.FromResult(this.Export(batch));
}

/// <summary>
/// Flushes the exporter, blocks the current thread until flush
/// completed, shutdown signaled or timed out.
Expand Down
213 changes: 111 additions & 102 deletions src/OpenTelemetry/BatchExportProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ public abstract class BatchExportProcessor<T> : BaseExportProcessor<T>
internal readonly int ExporterTimeoutMilliseconds;

private readonly CircularBuffer<T> circularBuffer;
private readonly Thread exporterThread;
private readonly AutoResetEvent exportTrigger = new(false);
private readonly ManualResetEvent dataExportedNotification = new(false);
private readonly ManualResetEvent shutdownTrigger = new(false);
private readonly CancellationTokenSource exporterTaskCancellation;
private readonly Task exporterTask;
private Task exportTask;
private long shutdownDrainTarget = long.MaxValue;
private long droppedCount;
private bool disposed;
Expand Down Expand Up @@ -57,12 +56,9 @@ protected BatchExportProcessor(
this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds;
this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds;
this.MaxExportBatchSize = maxExportBatchSize;
this.exporterThread = new Thread(this.ExporterProc)
{
IsBackground = true,
Name = $"OpenTelemetry-{nameof(BatchExportProcessor<T>)}-{exporter.GetType().Name}",
};
this.exporterThread.Start();
this.exportTask = Task.CompletedTask;
this.exporterTaskCancellation = new CancellationTokenSource();
this.exporterTask = Task.Run(this.ExporterProc);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't change the default behavior here. If there are scenarios where a thread pool must be leveraged, then it can be a optional, opt-in behavior - either a modification for the BatchExportProcessor (UseDedicatedThread=false) or a new BatchExportProcessorThreadPool.

Having said that, I'll not have bandwidth to do a detailed review at the moment, but I believe the first step is to get some guidance from the maintainers about which direction should be taken:

  1. Provide a opt-in feature flag to BatchExportProcessor so it can do dedicated thread or threadpool.
  2. Make a new BatchExportProcessor to do the thread pool one - either in this repo OR in the contrib repo.
  3. Something else.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. I'll do some experimentations.

}

/// <summary>
Expand All @@ -87,13 +83,7 @@ internal bool TryExport(T data)
{
if (this.circularBuffer.Count >= this.MaxExportBatchSize)
{
try
{
this.exportTrigger.Set();
}
catch (ObjectDisposedException)
{
}
_ = this.ExportAsync();
}

return true; // enqueue succeeded
Expand All @@ -113,6 +103,34 @@ protected override void OnExport(T data)

/// <inheritdoc/>
protected override bool OnForceFlush(int timeoutMilliseconds)
{
return this.FlushAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds)).GetAwaiter().GetResult();
}

/// <inheritdoc/>
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.ShutdownAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds)).GetAwaiter().GetResult();
}

/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
this.exporterTaskCancellation.Cancel();
this.exporterTaskCancellation.Dispose();
}

this.disposed = true;
}

base.Dispose(disposing);
}

private async Task<bool> FlushAsync(TimeSpan timeout)
{
var tail = this.circularBuffer.RemovedCount;
var head = this.circularBuffer.AddedCount;
Expand All @@ -122,82 +140,61 @@ protected override bool OnForceFlush(int timeoutMilliseconds)
return true; // nothing to flush
}

try
{
this.exportTrigger.Set();
}
catch (ObjectDisposedException)
_ = this.ExportAsync();

if (timeout == TimeSpan.Zero)
{
return false;
}

if (timeoutMilliseconds == 0)
CancellationTokenSource timeoutCancellation;
try
{
timeoutCancellation = CancellationTokenSource.CreateLinkedTokenSource(this.exporterTaskCancellation.Token);
}
catch (ObjectDisposedException)
{
return false;
}

var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger };

var sw = timeoutMilliseconds == Timeout.Infinite
? null
: Stopwatch.StartNew();

// There is a chance that the export thread finished processing all the data from the queue,
// and signaled before we enter wait here, use polling to prevent being blocked indefinitely.
const int pollingMilliseconds = 1000;
var timeoutTask = Task.Delay(timeout, timeoutCancellation.Token);

while (true)
{
if (sw == null)
Task completedTask;
try
{
try
{
WaitHandle.WaitAny(triggers, pollingMilliseconds);
}
catch (ObjectDisposedException)
{
return false;
}
completedTask = await Task.WhenAny(timeoutTask, this.ExportAsync()).ConfigureAwait(false);
}
else
catch (ObjectDisposedException)
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return this.circularBuffer.RemovedCount >= head;
}

try
{
WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMilliseconds));
}
catch (ObjectDisposedException)
{
return false;
}
return false;
}

if (this.circularBuffer.RemovedCount >= head)
{
return true;
}

if (completedTask == timeoutTask)
{
return false;
}

if (Volatile.Read(ref this.shutdownDrainTarget) != long.MaxValue)
{
return false;
}
}
}

/// <inheritdoc/>
protected override bool OnShutdown(int timeoutMilliseconds)
private async Task<bool> ShutdownAsync(TimeSpan timeout)
{
Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount);

try
{
this.shutdownTrigger.Set();
this.exporterTaskCancellation.Cancel();
}
catch (ObjectDisposedException)
{
Expand All @@ -206,53 +203,37 @@ protected override bool OnShutdown(int timeoutMilliseconds)

OpenTelemetrySdkEventSource.Log.DroppedExportProcessorItems(this.GetType().Name, this.exporter.GetType().Name, this.DroppedCount);

if (timeoutMilliseconds == Timeout.Infinite)
if (timeout == Timeout.InfiniteTimeSpan)
{
this.exporterThread.Join();
await this.exporterTask.ConfigureAwait(false);
return this.exporter.Shutdown();
}

if (timeoutMilliseconds == 0)
if (timeout == TimeSpan.Zero)
{
return this.exporter.Shutdown(0);
}

var sw = Stopwatch.StartNew();
this.exporterThread.Join(timeoutMilliseconds);
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
return this.exporter.Shutdown((int)Math.Max(timeout, 0));
}

/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
this.exportTrigger.Dispose();
this.dataExportedNotification.Dispose();
this.shutdownTrigger.Dispose();
}

this.disposed = true;
}

base.Dispose(disposing);
await Task.WhenAny(this.exporterTask, Task.Delay(timeout)).ConfigureAwait(false);
var remainingTimeout = timeout.TotalMilliseconds - sw.ElapsedMilliseconds;
return this.exporter.Shutdown((int)Math.Max(remainingTimeout, 0));
}

private void ExporterProc()
private async Task ExporterProc()
{
var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger };

while (true)
{
// only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously
if (this.circularBuffer.Count < this.MaxExportBatchSize)
{
try
{
WaitHandle.WaitAny(triggers, this.ScheduledDelayMilliseconds);
await Task.Delay(this.ScheduledDelayMilliseconds, this.exporterTaskCancellation.Token).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
// The delay was canceled for the exporter to shut down.
}
catch (ObjectDisposedException)
{
Expand All @@ -262,27 +243,55 @@ private void ExporterProc()
}

if (this.circularBuffer.Count > 0)
{
await this.ExportAsync().ConfigureAwait(false);
}

if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget))
{
return;
}
}
}

private Task ExportAsync()
{
var optimisticExportTask = this.exportTask;
if (!optimisticExportTask.IsCompleted)
{
// An export is currently being processed.
return optimisticExportTask;
}

TaskCompletionSource<object?> newCurrentExportTaskCompletion = new(TaskCreationOptions.RunContinuationsAsynchronously);
var localExportTask = Interlocked.CompareExchange(
ref this.exportTask,
newCurrentExportTaskCompletion.Task,
optimisticExportTask);
if (!localExportTask.IsCompleted)
{
// An export is currently being processed.
return localExportTask;
}

// Use Task.Run to yield the execution as soon as possible.
return Task.Run(CoreAsync);

async Task CoreAsync()
{
try
{
using (var batch = new Batch<T>(this.circularBuffer, this.MaxExportBatchSize))
{
this.exporter.Export(batch);
await this.exporter.ExportAsync(batch).ConfigureAwait(false);
}

try
{
this.dataExportedNotification.Set();
this.dataExportedNotification.Reset();
}
catch (ObjectDisposedException)
{
// the exporter is somehow disposed before the worker thread could finish its job
return;
}
newCurrentExportTaskCompletion.SetResult(null);
}

if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget))
catch (Exception e)
{
return;
newCurrentExportTaskCompletion.SetException(e);
throw;
}
}
}
Expand Down