Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/Elastic.Apm/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Apm.Api;
using Elastic.Apm.BackendComm.CentralConfig;
using Elastic.Apm.Config;
Expand Down Expand Up @@ -218,6 +220,26 @@ private static bool CheckAndAddFilter(Func<IPayloadSenderWithFilters, bool> acti
/// </returns>
public static IDisposable Subscribe(params IDiagnosticsSubscriber[] subscribers) => Instance.Subscribe(subscribers);

/// <summary>
/// Flushes all queued APM events, waiting until they have been sent to APM Server.
/// Useful for short-lived processes (Lambda functions, CLI tools, console apps) where the
/// process may exit before the agent's background sender has transmitted all events.
/// </summary>
/// <param name="cancellationToken">
/// Cancels the wait. The events that were already in-flight are not affected.
/// </param>
/// <remarks>
/// Waits until the sender is idle: the event queue is empty and any in-progress HTTP send
/// has completed. Completion indicates the send attempt finished; it does not guarantee the
/// server accepted the data.
/// If the agent uses a custom <see cref="IPayloadSender"/> that does not implement the
/// internal flush interface, this method returns a completed task immediately.
/// </remarks>
public static Task FlushAsync(CancellationToken cancellationToken = default) =>
Instance.PayloadSender is IFlushablePayloadSender flushable
? flushable.FlushAsync(cancellationToken)
: Task.CompletedTask;

public static void Setup(AgentComponents agentComponents)
{
lock (InitializationLock)
Expand Down
16 changes: 16 additions & 0 deletions src/Elastic.Apm/ApmAgentExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Apm.Api;
using Elastic.Apm.DiagnosticSource;
using Elastic.Apm.Instrumentations.SqlClient;
using Elastic.Apm.Logging;
using Elastic.Apm.Report;

namespace Elastic.Apm
{
Expand Down Expand Up @@ -76,6 +79,19 @@ internal static IDisposable SubscribeIncludingAllDefaults(this IApmAgent agent,
return agent.Subscribe(userProvidedAndDefaultSubs);
}

/// <summary>
/// Flushes all queued APM events for this agent instance, waiting until they have been
/// sent to APM Server.
/// </summary>
/// <remarks>
/// See <see cref="Agent.FlushAsync"/> for details on coverage, semantics, and the
/// fire-and-forget enqueue race in short-lived process scenarios.
/// </remarks>
public static Task FlushAsync(this IApmAgent agent, CancellationToken cancellationToken = default) =>
agent.PayloadSender is IFlushablePayloadSender flushable
? flushable.FlushAsync(cancellationToken)
: Task.CompletedTask;

internal static IExecutionSegment GetCurrentExecutionSegment(this IApmAgent agent) =>
agent.Tracer.CurrentSpan ?? (IExecutionSegment)agent.Tracer.CurrentTransaction;
}
Expand Down
21 changes: 21 additions & 0 deletions src/Elastic.Apm/Report/IPayloadSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Apm.Api;

namespace Elastic.Apm.Report
Expand All @@ -25,4 +27,23 @@ public interface IPayloadSenderWithFilters
bool AddFilter(Func<ISpan, ISpan> spanFilter);
bool AddFilter(Func<IError, IError> errorFilter);
}

/// <summary>
/// Implemented by payload senders that support explicit flushing.
/// Internal only — not part of the public IPayloadSender contract because
/// netstandard2.0 and .NET Framework targets cannot use default interface members.
/// </summary>
internal interface IFlushablePayloadSender
{
/// <summary>
/// Waits until the sender is idle — the event queue is empty and any in-progress HTTP
/// send has completed — then returns.
/// </summary>
/// <remarks>
/// Completion indicates the send attempt finished; it does not guarantee the server
/// accepted the data. For short-lived processes, "idle" is equivalent to "all recorded
/// events transmitted" because no new events arrive after the call.
/// </remarks>
Task FlushAsync(CancellationToken cancellationToken = default);
}
}
155 changes: 150 additions & 5 deletions src/Elastic.Apm/Report/PayloadSenderV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand Down Expand Up @@ -30,7 +31,7 @@ namespace Elastic.Apm.Report
/// Responsible for sending the data to APM server. Implements Intake V2.
/// Each instance creates its own thread to do the work. Therefore, instances should be reused if possible.
/// </summary>
internal class PayloadSenderV2 : BackendCommComponentBase, IPayloadSender, IPayloadSenderWithFilters
internal class PayloadSenderV2 : BackendCommComponentBase, IPayloadSender, IPayloadSenderWithFilters, IFlushablePayloadSender
{
private const string ThisClassName = nameof(PayloadSenderV2);
private readonly List<Func<IError, IError>> ErrorFilters = new();
Expand All @@ -56,6 +57,11 @@ internal class PayloadSenderV2 : BackendCommComponentBase, IPayloadSender, IPayl

private string _cachedMetadataJsonLine;
private long _eventQueueCount;
// Always 0 or 1: only the single work-loop thread increments/decrements this.
// Incremented in ReceiveBatch (before _eventQueueCount is decremented); caller
// must decrement in a finally block — see WorkLoopIteration.
private int _inFlightSends;
private readonly ConcurrentQueue<TaskCompletionSource<bool>> _flushWaiters = new();

private readonly ElasticVersion _brokenActivationMethodVersion;
private readonly string _cachedActivationMethod;
Expand Down Expand Up @@ -175,11 +181,78 @@ static PayloadSenderV2()

protected override void Dispose(bool disposing)
{
_eventQueue?.TriggerBatch();
if (_isEnabled)
{
_eventQueue?.TriggerBatch();

// Drain the queue before base.Dispose() cancels the CancellationTokenSource.
// Without this, the token passed to HttpClient.PostAsync is cancelled mid-request
// and the last batch is silently dropped.
using var drainCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
FlushAsync(drainCts.Token).GetAwaiter().GetResult();
}
catch (OperationCanceledException)
{
_logger?.Warning()?.Log("FlushAsync timed out during Dispose — some events may not have been sent.");
}
}

_eventQueue?.Complete();
base.Dispose(disposing);
}

/// <inheritdoc cref="IFlushablePayloadSender"/>
public async Task FlushAsync(CancellationToken cancellationToken = default)
{
if (!_isEnabled)
return;

// Move any buffered items into the BatchBlock output so ReceiveBatch picks them up
// on the next iteration without waiting for FlushInterval to expire.
_eventQueue.TriggerBatch();

// Fast path: nothing in the queue and no HTTP send in progress.
// _eventQueueCount is decremented by ReceiveBatch *before* ProcessQueueItems runs,
// so we must also check _inFlightSends to avoid returning early while a batch is
// still being transmitted.
if (Volatile.Read(ref _eventQueueCount) == 0 && Volatile.Read(ref _inFlightSends) == 0)
return;

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_flushWaiters.Enqueue(tcs);

// Trigger again after enqueuing: items may have arrived in the BatchBlock input between
// the first TriggerBatch call and now (e.g. via a concurrent EnqueueEventInternal Task.Run).
_eventQueue.TriggerBatch();

// Double-check after enqueuing: the queue may have fully drained between the first
// count read and the Enqueue above. Complete our own TCS directly — never dequeue a
// sibling, which would return the wrong task to a concurrent caller.
// TrySetResult is idempotent if the work loop also signals this TCS.
if (Volatile.Read(ref _eventQueueCount) == 0 && Volatile.Read(ref _inFlightSends) == 0)
tcs.TrySetResult(true);

CancellationTokenRegistration registration = default;
if (cancellationToken.CanBeCanceled)
{
registration = cancellationToken.Register(
state => ((TaskCompletionSource<bool>)state).TrySetCanceled(),
tcs,
useSynchronizationContext: false);
}

try
{
await tcs.Task.ConfigureAwait(false);
}
finally
{
registration.Dispose();
}
}

internal async Task<bool> EnqueueEventInternal(object eventObj, string dbgEventKind)
{
if (!_isEnabled)
Expand Down Expand Up @@ -221,7 +294,7 @@ internal async Task<bool> EnqueueEventInternal(object eventObj, string dbgEventK
+ " " + dbgEventKind + ": {" + dbgEventKind + "}."
, newEventQueueCount, _maxQueueEventCount, eventObj);

if (_flushInterval == TimeSpan.Zero)
if (_flushInterval == TimeSpan.Zero || !_flushWaiters.IsEmpty)
_eventQueue.TriggerBatch();

return true;
Expand All @@ -231,7 +304,57 @@ internal void EnqueueEvent(object eventObj, string dbgEventKind)
{
if (!_isEnabled)
return;
Task.Run(async () => await EnqueueEventInternal(eventObj, dbgEventKind));

// Increment synchronously so FlushAsync/Dispose see this event the moment
// QueueTransaction/QueueSpan/etc. returns, before Task.Run has executed.
var newEventQueueCount = Interlocked.Increment(ref _eventQueueCount);
if (newEventQueueCount > _maxQueueEventCount)
{
_logger.Debug()
?.Log("Queue reached max capacity - " + dbgEventKind + " will be discarded."
+ " newEventQueueCount: {EventQueueCount}."
+ " MaxQueueEventCount: {MaxQueueEventCount}."
+ " " + dbgEventKind + ": {" + dbgEventKind + "}."
, newEventQueueCount, _maxQueueEventCount, eventObj);
Interlocked.Decrement(ref _eventQueueCount);
return;
}

Task.Run(async () =>
{
try
{
var enqueuedSuccessfully = await _eventQueue.SendAsync(eventObj);
if (!enqueuedSuccessfully)
{
_logger.Debug()
?.Log("Failed to enqueue " + dbgEventKind + "."
+ " newEventQueueCount: {EventQueueCount}."
+ " MaxQueueEventCount: {MaxQueueEventCount}."
+ " " + dbgEventKind + ": {" + dbgEventKind + "}."
, newEventQueueCount, _maxQueueEventCount, eventObj);
Interlocked.Decrement(ref _eventQueueCount);
return;
}

_logger.Debug()
?.Log("Enqueued " + dbgEventKind + "."
+ " newEventQueueCount: {EventQueueCount}."
+ " MaxQueueEventCount: {MaxQueueEventCount}."
+ " " + dbgEventKind + ": {" + dbgEventKind + "}."
, newEventQueueCount, _maxQueueEventCount, eventObj);

if (_flushInterval == TimeSpan.Zero || !_flushWaiters.IsEmpty)
_eventQueue.TriggerBatch();
}
catch (Exception ex)
{
// Unexpected exception (e.g. faulted BatchBlock): ensure the count is rolled
// back so FlushAsync does not wait forever for an event that was never queued.
_logger.Error()?.LogException(ex, "Unexpected exception while enqueuing " + dbgEventKind + ".");
Interlocked.Decrement(ref _eventQueueCount);
}
});
}

/// <summary>
Expand Down Expand Up @@ -262,7 +385,26 @@ protected override void WorkLoopIteration()
if (_allowFilterAdd && batch is { Length: > 0 })
_allowFilterAdd = false;
if (batch != null)
ProcessQueueItems(batch);
{
// _inFlightSends was incremented inside ReceiveBatch (before _eventQueueCount was
// decremented) so FlushAsync cannot observe both as zero mid-batch. Decrement here
// once the send is done (success, failure, or cancellation).
try
{
ProcessQueueItems(batch);
}
finally
{
Interlocked.Decrement(ref _inFlightSends);
}
}

// Signal flush waiters only after both the queue and the in-flight send are clear.
if (Volatile.Read(ref _eventQueueCount) == 0 && Volatile.Read(ref _inFlightSends) == 0 && !_flushWaiters.IsEmpty)
{
while (_flushWaiters.TryDequeue(out var tcs))
tcs.TrySetResult(true);
}
}

private void ResetActivationMethodIfKnownBrokenApmServer()
Expand Down Expand Up @@ -341,6 +483,9 @@ private object[] ReceiveBatch()
var eventBatchToSend = receivedItems;
if (eventBatchToSend != null)
{
// Increment _inFlightSends BEFORE decrementing _eventQueueCount so that FlushAsync
// never observes both as zero while the batch is in transit to ProcessQueueItems.
Interlocked.Increment(ref _inFlightSends);
var newEventQueueCount = Interlocked.Add(ref _eventQueueCount, -eventBatchToSend.Length);

_logger.Trace()
Expand Down
Loading
Loading