diff --git a/src/Elastic.Apm/Agent.cs b/src/Elastic.Apm/Agent.cs index 3468560f9..20b287a99 100644 --- a/src/Elastic.Apm/Agent.cs +++ b/src/Elastic.Apm/Agent.cs @@ -5,6 +5,8 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; using Elastic.Apm.Api; using Elastic.Apm.BackendComm.CentralConfig; using Elastic.Apm.Config; @@ -218,6 +220,26 @@ private static bool CheckAndAddFilter(Func acti /// public static IDisposable Subscribe(params IDiagnosticsSubscriber[] subscribers) => Instance.Subscribe(subscribers); + /// + /// Flushes all queued APM events, waiting until they have been sent to APM Server. + /// Useful for short-lived processes (Lambda functions, CLI tools, console apps) where the + /// process may exit before the agent's background sender has transmitted all events. + /// + /// + /// Cancels the wait. The events that were already in-flight are not affected. + /// + /// + /// Waits until the sender is idle: the event queue is empty and any in-progress HTTP send + /// has completed. Completion indicates the send attempt finished; it does not guarantee the + /// server accepted the data. + /// If the agent uses a custom that does not implement the + /// internal flush interface, this method returns a completed task immediately. + /// + public static Task FlushAsync(CancellationToken cancellationToken = default) => + Instance.PayloadSender is IFlushablePayloadSender flushable + ? flushable.FlushAsync(cancellationToken) + : Task.CompletedTask; + public static void Setup(AgentComponents agentComponents) { lock (InitializationLock) diff --git a/src/Elastic.Apm/ApmAgentExtensions.cs b/src/Elastic.Apm/ApmAgentExtensions.cs index 417d36a27..c86e8e3e9 100644 --- a/src/Elastic.Apm/ApmAgentExtensions.cs +++ b/src/Elastic.Apm/ApmAgentExtensions.cs @@ -5,10 +5,13 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; +using System.Threading.Tasks; using Elastic.Apm.Api; using Elastic.Apm.DiagnosticSource; using Elastic.Apm.Instrumentations.SqlClient; using Elastic.Apm.Logging; +using Elastic.Apm.Report; namespace Elastic.Apm { @@ -76,6 +79,19 @@ internal static IDisposable SubscribeIncludingAllDefaults(this IApmAgent agent, return agent.Subscribe(userProvidedAndDefaultSubs); } + /// + /// Flushes all queued APM events for this agent instance, waiting until they have been + /// sent to APM Server. + /// + /// + /// See for details on coverage, semantics, and the + /// fire-and-forget enqueue race in short-lived process scenarios. + /// + public static Task FlushAsync(this IApmAgent agent, CancellationToken cancellationToken = default) => + agent.PayloadSender is IFlushablePayloadSender flushable + ? flushable.FlushAsync(cancellationToken) + : Task.CompletedTask; + internal static IExecutionSegment GetCurrentExecutionSegment(this IApmAgent agent) => agent.Tracer.CurrentSpan ?? (IExecutionSegment)agent.Tracer.CurrentTransaction; } diff --git a/src/Elastic.Apm/Report/IPayloadSender.cs b/src/Elastic.Apm/Report/IPayloadSender.cs index 8533cebde..c08d9fa51 100644 --- a/src/Elastic.Apm/Report/IPayloadSender.cs +++ b/src/Elastic.Apm/Report/IPayloadSender.cs @@ -4,6 +4,8 @@ using System; using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; using Elastic.Apm.Api; namespace Elastic.Apm.Report @@ -25,4 +27,23 @@ public interface IPayloadSenderWithFilters bool AddFilter(Func spanFilter); bool AddFilter(Func errorFilter); } + + /// + /// Implemented by payload senders that support explicit flushing. + /// Internal only — not part of the public IPayloadSender contract because + /// netstandard2.0 and .NET Framework targets cannot use default interface members. + /// + internal interface IFlushablePayloadSender + { + /// + /// Waits until the sender is idle — the event queue is empty and any in-progress HTTP + /// send has completed — then returns. + /// + /// + /// Completion indicates the send attempt finished; it does not guarantee the server + /// accepted the data. For short-lived processes, "idle" is equivalent to "all recorded + /// events transmitted" because no new events arrive after the call. + /// + Task FlushAsync(CancellationToken cancellationToken = default); + } } diff --git a/src/Elastic.Apm/Report/PayloadSenderV2.cs b/src/Elastic.Apm/Report/PayloadSenderV2.cs index 945e9f930..9f01d19be 100644 --- a/src/Elastic.Apm/Report/PayloadSenderV2.cs +++ b/src/Elastic.Apm/Report/PayloadSenderV2.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -30,7 +31,7 @@ namespace Elastic.Apm.Report /// Responsible for sending the data to APM server. Implements Intake V2. /// Each instance creates its own thread to do the work. Therefore, instances should be reused if possible. /// - internal class PayloadSenderV2 : BackendCommComponentBase, IPayloadSender, IPayloadSenderWithFilters + internal class PayloadSenderV2 : BackendCommComponentBase, IPayloadSender, IPayloadSenderWithFilters, IFlushablePayloadSender { private const string ThisClassName = nameof(PayloadSenderV2); private readonly List> ErrorFilters = new(); @@ -56,6 +57,11 @@ internal class PayloadSenderV2 : BackendCommComponentBase, IPayloadSender, IPayl private string _cachedMetadataJsonLine; private long _eventQueueCount; + // Always 0 or 1: only the single work-loop thread increments/decrements this. + // Incremented in ReceiveBatch (before _eventQueueCount is decremented); caller + // must decrement in a finally block — see WorkLoopIteration. + private int _inFlightSends; + private readonly ConcurrentQueue> _flushWaiters = new(); private readonly ElasticVersion _brokenActivationMethodVersion; private readonly string _cachedActivationMethod; @@ -175,11 +181,78 @@ static PayloadSenderV2() protected override void Dispose(bool disposing) { - _eventQueue?.TriggerBatch(); + if (_isEnabled) + { + _eventQueue?.TriggerBatch(); + + // Drain the queue before base.Dispose() cancels the CancellationTokenSource. + // Without this, the token passed to HttpClient.PostAsync is cancelled mid-request + // and the last batch is silently dropped. + using var drainCts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + try + { + FlushAsync(drainCts.Token).GetAwaiter().GetResult(); + } + catch (OperationCanceledException) + { + _logger?.Warning()?.Log("FlushAsync timed out during Dispose — some events may not have been sent."); + } + } + _eventQueue?.Complete(); base.Dispose(disposing); } + /// + public async Task FlushAsync(CancellationToken cancellationToken = default) + { + if (!_isEnabled) + return; + + // Move any buffered items into the BatchBlock output so ReceiveBatch picks them up + // on the next iteration without waiting for FlushInterval to expire. + _eventQueue.TriggerBatch(); + + // Fast path: nothing in the queue and no HTTP send in progress. + // _eventQueueCount is decremented by ReceiveBatch *before* ProcessQueueItems runs, + // so we must also check _inFlightSends to avoid returning early while a batch is + // still being transmitted. + if (Volatile.Read(ref _eventQueueCount) == 0 && Volatile.Read(ref _inFlightSends) == 0) + return; + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _flushWaiters.Enqueue(tcs); + + // Trigger again after enqueuing: items may have arrived in the BatchBlock input between + // the first TriggerBatch call and now (e.g. via a concurrent EnqueueEventInternal Task.Run). + _eventQueue.TriggerBatch(); + + // Double-check after enqueuing: the queue may have fully drained between the first + // count read and the Enqueue above. Complete our own TCS directly — never dequeue a + // sibling, which would return the wrong task to a concurrent caller. + // TrySetResult is idempotent if the work loop also signals this TCS. + if (Volatile.Read(ref _eventQueueCount) == 0 && Volatile.Read(ref _inFlightSends) == 0) + tcs.TrySetResult(true); + + CancellationTokenRegistration registration = default; + if (cancellationToken.CanBeCanceled) + { + registration = cancellationToken.Register( + state => ((TaskCompletionSource)state).TrySetCanceled(), + tcs, + useSynchronizationContext: false); + } + + try + { + await tcs.Task.ConfigureAwait(false); + } + finally + { + registration.Dispose(); + } + } + internal async Task EnqueueEventInternal(object eventObj, string dbgEventKind) { if (!_isEnabled) @@ -221,7 +294,7 @@ internal async Task EnqueueEventInternal(object eventObj, string dbgEventK + " " + dbgEventKind + ": {" + dbgEventKind + "}." , newEventQueueCount, _maxQueueEventCount, eventObj); - if (_flushInterval == TimeSpan.Zero) + if (_flushInterval == TimeSpan.Zero || !_flushWaiters.IsEmpty) _eventQueue.TriggerBatch(); return true; @@ -231,7 +304,57 @@ internal void EnqueueEvent(object eventObj, string dbgEventKind) { if (!_isEnabled) return; - Task.Run(async () => await EnqueueEventInternal(eventObj, dbgEventKind)); + + // Increment synchronously so FlushAsync/Dispose see this event the moment + // QueueTransaction/QueueSpan/etc. returns, before Task.Run has executed. + var newEventQueueCount = Interlocked.Increment(ref _eventQueueCount); + if (newEventQueueCount > _maxQueueEventCount) + { + _logger.Debug() + ?.Log("Queue reached max capacity - " + dbgEventKind + " will be discarded." + + " newEventQueueCount: {EventQueueCount}." + + " MaxQueueEventCount: {MaxQueueEventCount}." + + " " + dbgEventKind + ": {" + dbgEventKind + "}." + , newEventQueueCount, _maxQueueEventCount, eventObj); + Interlocked.Decrement(ref _eventQueueCount); + return; + } + + Task.Run(async () => + { + try + { + var enqueuedSuccessfully = await _eventQueue.SendAsync(eventObj); + if (!enqueuedSuccessfully) + { + _logger.Debug() + ?.Log("Failed to enqueue " + dbgEventKind + "." + + " newEventQueueCount: {EventQueueCount}." + + " MaxQueueEventCount: {MaxQueueEventCount}." + + " " + dbgEventKind + ": {" + dbgEventKind + "}." + , newEventQueueCount, _maxQueueEventCount, eventObj); + Interlocked.Decrement(ref _eventQueueCount); + return; + } + + _logger.Debug() + ?.Log("Enqueued " + dbgEventKind + "." + + " newEventQueueCount: {EventQueueCount}." + + " MaxQueueEventCount: {MaxQueueEventCount}." + + " " + dbgEventKind + ": {" + dbgEventKind + "}." + , newEventQueueCount, _maxQueueEventCount, eventObj); + + if (_flushInterval == TimeSpan.Zero || !_flushWaiters.IsEmpty) + _eventQueue.TriggerBatch(); + } + catch (Exception ex) + { + // Unexpected exception (e.g. faulted BatchBlock): ensure the count is rolled + // back so FlushAsync does not wait forever for an event that was never queued. + _logger.Error()?.LogException(ex, "Unexpected exception while enqueuing " + dbgEventKind + "."); + Interlocked.Decrement(ref _eventQueueCount); + } + }); } /// @@ -262,7 +385,26 @@ protected override void WorkLoopIteration() if (_allowFilterAdd && batch is { Length: > 0 }) _allowFilterAdd = false; if (batch != null) - ProcessQueueItems(batch); + { + // _inFlightSends was incremented inside ReceiveBatch (before _eventQueueCount was + // decremented) so FlushAsync cannot observe both as zero mid-batch. Decrement here + // once the send is done (success, failure, or cancellation). + try + { + ProcessQueueItems(batch); + } + finally + { + Interlocked.Decrement(ref _inFlightSends); + } + } + + // Signal flush waiters only after both the queue and the in-flight send are clear. + if (Volatile.Read(ref _eventQueueCount) == 0 && Volatile.Read(ref _inFlightSends) == 0 && !_flushWaiters.IsEmpty) + { + while (_flushWaiters.TryDequeue(out var tcs)) + tcs.TrySetResult(true); + } } private void ResetActivationMethodIfKnownBrokenApmServer() @@ -341,6 +483,9 @@ private object[] ReceiveBatch() var eventBatchToSend = receivedItems; if (eventBatchToSend != null) { + // Increment _inFlightSends BEFORE decrementing _eventQueueCount so that FlushAsync + // never observes both as zero while the batch is in transit to ProcessQueueItems. + Interlocked.Increment(ref _inFlightSends); var newEventQueueCount = Interlocked.Add(ref _eventQueueCount, -eventBatchToSend.Length); _logger.Trace() diff --git a/test/Elastic.Apm.Tests/BackendCommTests/PayloadSenderTests.cs b/test/Elastic.Apm.Tests/BackendCommTests/PayloadSenderTests.cs index 4bd44a4a2..65ff62e7d 100644 --- a/test/Elastic.Apm.Tests/BackendCommTests/PayloadSenderTests.cs +++ b/test/Elastic.Apm.Tests/BackendCommTests/PayloadSenderTests.cs @@ -396,6 +396,168 @@ private void CreateSutEnvAndTest(Action doAction) payloadSender.IsRunning.Should().BeFalse(); } + /// + /// Regression test for https://github.com/elastic/apm-agent-dotnet/issues/288. + /// Events queued just before Dispose must not be silently dropped. + /// + [Fact] + public async Task Dispose_sends_queued_events_before_stopping() + { + var received = new List(); + var handler = new MockHttpMessageHandler((request, _) => + { + var body = request.Content.ReadAsStringAsync().GetAwaiter().GetResult(); + lock (received) + received.Add(body); + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.Accepted)); + }); + + var config = new MockConfiguration(_logger, flushInterval: "3600s"); + var service = Service.GetDefaultService(config, _logger); + var payloadSender = new PayloadSenderV2(_logger, config, service, new Api.System(), + MockApmServerInfo.Version710, handler, TestDisplayName); + + using var agent = new ApmAgent(new TestAgentComponents(_logger, payloadSender: payloadSender)); + + // Use EnqueueEventInternal directly (awaitable) so the item is guaranteed to be + // in the BatchBlock before Dispose is called. EnqueueEvent (fire-and-forget) has + // a race with Dispose where the item may not be counted yet. + await payloadSender.EnqueueEventInternal( + new Transaction(agent, "TestTransaction", "TestType"), "Transaction"); + + agent.Dispose(); + + lock (received) + { + received.Should().NotBeEmpty("Dispose must flush queued events before cancelling"); + received.Any(r => r.Contains("\"transaction\"")).Should().BeTrue(); + } + } + + /// + /// Agent.FlushAsync() and the IApmAgent extension must complete without deadlock and + /// signal only after all queued events have been sent. + /// + [Fact] + public async Task FlushAsync_waits_for_events_to_be_sent() + { + var sendCount = 0; + var handler = new MockHttpMessageHandler((_, _) => + { + Interlocked.Increment(ref sendCount); + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.Accepted)); + }); + + var config = new MockConfiguration(_logger, flushInterval: "3600s"); + var service = Service.GetDefaultService(config, _logger); + using var payloadSender = new PayloadSenderV2(_logger, config, service, new Api.System(), + MockApmServerInfo.Version710, handler, TestDisplayName); + + using var agent = new ApmAgent(new TestAgentComponents(_logger, payloadSender: payloadSender)); + + // Use EnqueueEventInternal directly so the item is in the BatchBlock before FlushAsync, + // avoiding a race with the fire-and-forget Task.Run in EnqueueEvent. + await payloadSender.EnqueueEventInternal( + new Transaction(agent, "TestTransaction", "TestType"), "Transaction"); + + await agent.FlushAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token); + + sendCount.Should().Be(1, "FlushAsync must return only after the HTTP POST completes"); + } + + /// + /// FlushAsync on an empty queue must return immediately without deadlock. + /// + [Fact] + public async Task FlushAsync_on_empty_queue_returns_immediately() + { + var config = new MockConfiguration(_logger); + var service = Service.GetDefaultService(config, _logger); + using var payloadSender = new PayloadSenderV2(_logger, config, service, new Api.System(), + MockApmServerInfo.Version710, dbgName: TestDisplayName); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await payloadSender.FlushAsync(cts.Token); + } + + /// + /// FlushAsync via the public QueueTransaction path (EnqueueEvent → Task.Run) must wait + /// for the HTTP send to complete. Regression for the synchronous count-increment fix: + /// _eventQueueCount is incremented before Task.Run so FlushAsync cannot miss the event. + /// + [Fact] + public async Task FlushAsync_via_public_QueueTransaction_waits_for_send() + { + var sendCount = 0; + var handler = new MockHttpMessageHandler((_, _) => + { + Interlocked.Increment(ref sendCount); + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.Accepted)); + }); + + // Long flushInterval: the only batch trigger comes from FlushAsync's TriggerBatch calls. + var config = new MockConfiguration(_logger, flushInterval: "3600s", maxBatchEventCount: "1"); + var service = Service.GetDefaultService(config, _logger); + using var payloadSender = new PayloadSenderV2(_logger, config, service, new Api.System(), + MockApmServerInfo.Version710, handler, TestDisplayName); + using var agent = new ApmAgent(new TestAgentComponents(_logger, payloadSender: payloadSender)); + + // Public path: QueueTransaction calls EnqueueEvent which now increments _eventQueueCount + // synchronously, so FlushAsync will see the event even before Task.Run executes. + agent.PayloadSender.QueueTransaction(new Transaction(agent, "TestTransaction", "TestType")); + + await agent.FlushAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token); + + sendCount.Should().Be(1, "FlushAsync must not return before the HTTP POST completes"); + } + + /// + /// FlushAsync called while a batch has already been received by the work loop (so + /// _eventQueueCount is already 0) but ProcessQueueItems has not yet finished must still + /// wait for the HTTP send to complete. Regression for the _inFlightSends-in-ReceiveBatch fix. + /// + [Fact] + public async Task FlushAsync_waits_when_called_while_batch_is_in_flight() + { + var sendStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var sendCanComplete = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var sendCount = 0; + + var handler = new MockHttpMessageHandler(async (_, _) => + { + sendStarted.TrySetResult(true); + await sendCanComplete.Task; + Interlocked.Increment(ref sendCount); + return new HttpResponseMessage(HttpStatusCode.Accepted); + }); + + var config = new MockConfiguration(_logger, flushInterval: "100ms", maxBatchEventCount: "1"); + var service = Service.GetDefaultService(config, _logger); + using var payloadSender = new PayloadSenderV2(_logger, config, service, new Api.System(), + MockApmServerInfo.Version710, handler, TestDisplayName); + using var agent = new ApmAgent(new TestAgentComponents(_logger, payloadSender: payloadSender)); + + await payloadSender.EnqueueEventInternal( + new Transaction(agent, "TestTransaction", "TestType"), "Transaction"); + + // Wait until the HTTP handler has started: at this point _eventQueueCount is already 0 + // (decremented by ReceiveBatch) but _inFlightSends is 1 (incremented in ReceiveBatch + // before the decrement), so FlushAsync must not take the fast path. + await sendStarted.Task; + + var flushTask = agent.FlushAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token); + + // Yield so FlushAsync evaluates the counters; it must not have completed yet. + // 500 ms gives generous headroom on loaded CI runners (Windows timer resolution is ~15 ms). + await Task.Delay(500); + flushTask.IsCompleted.Should().BeFalse("FlushAsync must wait while the HTTP send is still in progress"); + + sendCanComplete.TrySetResult(true); + await flushTask; + + sendCount.Should().Be(1, "FlushAsync must return only after the HTTP POST completes"); + } + internal class TestArgs { internal int ArgsIndex { get; set; }