Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Collections.Generic;
using System.IO;
using System.Text;
using BenchmarkDotNet.Attributes;
using Elastic.Apm.Api;
using Elastic.Apm.Logging;
using Elastic.Apm.Metrics;
using Elastic.Apm.Model;
using Elastic.Apm.Report.Serialization;
using Elastic.Apm.Tests.Utilities;

namespace Elastic.Apm.Benchmarks;

/// <summary>
/// Measures the allocation cost of the serialization buffer strategy used by
/// <see cref="Elastic.Apm.Report.PayloadSenderV2"/> when flushing a batch of APM events.
/// <para>
/// <b>Old approach</b>: <c>new MemoryStream(1024)</c> allocated on every batch.
/// As the stream grows it doubles its internal <c>byte[]</c>, producing a chain of
/// short-lived arrays. For batches larger than 85 KB the final array lands in the
/// Large Object Heap (LOH), which is not compacted during ordinary GC cycles.
/// Over a long-running deployment this causes LOH fragmentation and sustained
/// process-level memory growth.
/// </para>
/// <para>
/// <b>New approach</b>: a single <c>MemoryStream</c> is allocated once and reset with
/// <c>SetLength(0)</c> before each batch. The internal buffer is reused, so no new
/// heap objects are created per batch.
/// </para>
/// <para>
/// Key metrics to watch in the results:
/// <list type="bullet">
/// <item><b>Allocated</b> – bytes per operation.
/// OldApproach ≈ serialised-payload-size (plus MemoryStream overhead).
/// NewApproach ≈ 0 (no buffer allocation).</item>
/// <item><b>Gen0 / Gen1 / Gen2</b> – GC pressure.
/// For <c>SpanCount = 50</c> the old approach generates Gen2 collections
/// (LOH objects); the new approach should show none.</item>
/// </list>
/// </para>
/// </summary>
[MemoryDiagnoser]
public class PayloadSenderSerializationBenchmarks
{
private static readonly UTF8Encoding Utf8Encoding = new(encoderShouldEmitUTF8Identifier: false);

private PayloadItemSerializer _serializer;
private string _cachedMetadataLine;
private object[] _batch;
private MemoryStream _reusableBuffer;

/// <summary>
/// Number of spans captured per transaction.
/// <list type="bullet">
/// <item><b>5</b> – small batch, well under the 85 KB LOH threshold.</item>
/// <item><b>20</b> – medium batch, may approach the threshold.</item>
/// <item><b>50</b> – large batch that comfortably exceeds the threshold;
/// simulates high-throughput production load.</item>
/// </list>
/// </summary>
[Params(5, 20, 50)]
public int SpanCount { get; set; }

[GlobalSetup]
public void Setup()
{
_serializer = new PayloadItemSerializer();
_reusableBuffer = new MemoryStream(4096);

var logger = new NoopLogger();
var config = new MockConfiguration(logger);
var service = Service.GetDefaultService(config, logger);
var process = ProcessInformation.Create();
var metadata = new Metadata { Service = service, System = new Api.System(), Process = process };
_cachedMetadataLine = _serializer.Serialize(metadata);

// Capture real agent objects so the serialization exercises a fully-populated
// payload — the same data that PayloadSenderV2.ProcessQueueItems would handle.
var mockSender = new MockPayloadSender();
using var agent = new ApmAgent(new AgentComponents(
payloadSender: mockSender,
configurationReader: config,
logger: logger));

agent.Tracer.CaptureTransaction("BenchmarkTransaction", "benchmark", t =>
{
for (var i = 0; i < SpanCount; i++)
{
t.CaptureSpan($"SELECT * FROM BenchTable{i}", ApiConstants.TypeDb,
() => { }, ApiConstants.SubtypeMssql);
}
});

// MockPayloadSender stores items synchronously, so all items are ready here.
var items = new List<object>();
if (mockSender.FirstTransaction != null)
items.Add(mockSender.FirstTransaction);
foreach (var span in mockSender.Spans)
items.Add(span);

_batch = items.ToArray();
}

[GlobalCleanup]
public void Cleanup() => _reusableBuffer.Dispose();

/// <summary>
/// Original implementation: allocates a new <see cref="MemoryStream"/> for every batch.
/// </summary>
[Benchmark(Baseline = true, Description = "Old: new MemoryStream(1024) per batch")]
public long OldApproach()
{
using var stream = new MemoryStream(1024);
SerializeBatch(stream);
return stream.Length; // consumed to prevent dead-code elimination
}

/// <summary>
/// Optimized implementation: reuses a single <see cref="MemoryStream"/>.
/// <c>SetLength(0)</c> resets both the logical length and the write cursor
/// without releasing the underlying buffer.
/// </summary>
[Benchmark(Description = "New: reuse MemoryStream via SetLength(0)")]
public long NewApproach()
{
_reusableBuffer.SetLength(0);
SerializeBatch(_reusableBuffer);
return _reusableBuffer.Length;
}

private void SerializeBatch(MemoryStream stream)
{
// Mirror the exact structure written by PayloadSenderV2.ProcessQueueItems.
using var writer = new StreamWriter(stream, Utf8Encoding, bufferSize: 1024, leaveOpen: true);

writer.Write("{\"metadata\":");
writer.Write(_cachedMetadataLine);
writer.Write("}\n");

foreach (var item in _batch)
{
var eventType = item switch
{
Transaction => "transaction",
Span => "span",
Error => "error",
MetricSet => "metricset",
_ => null
};

if (eventType is null)
continue;

writer.Write("{\"");
writer.Write(eventType);
writer.Write("\":");
writer.Write(_serializer.Serialize(item));
writer.Write("}\n");
}
}
}
18 changes: 13 additions & 5 deletions src/Elastic.Apm/Report/PayloadSenderV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ internal class PayloadSenderV2 : BackendCommComponentBase, IPayloadSender, IPayl
private int _inFlightSends;
private readonly ConcurrentQueue<TaskCompletionSource<bool>> _flushWaiters = new();

// Reused across batches by the single work-loop thread — avoids per-batch heap allocations
// that could otherwise exceed the 85 KB LOH threshold and cause long-lived fragmentation.
private readonly MemoryStream _serializationBuffer = new(4096);

private readonly ElasticVersion _brokenActivationMethodVersion;
private readonly string _cachedActivationMethod;
private readonly bool _isEnabled;
Expand Down Expand Up @@ -200,7 +204,11 @@ protected override void Dispose(bool disposing)
}

_eventQueue?.Complete();
// base.Dispose cancels the token and joins the work-loop thread, so
// _serializationBuffer is guaranteed idle before we dispose it.
base.Dispose(disposing);
if (disposing)
_serializationBuffer.Dispose();
}

/// <inheritdoc cref="IFlushablePayloadSender"/>
Expand Down Expand Up @@ -502,14 +510,13 @@ private object[] ReceiveBatch()

private void ProcessQueueItems(object[] queueItems)
{
// can reuse underlying buffers from a pool in future.
using var stream = new MemoryStream(1024);
_serializationBuffer.SetLength(0);

try
{
_cachedMetadataJsonLine ??= _payloadItemSerializer.Serialize(_metadata);

using (var writer = new StreamWriter(stream, Utf8Encoding, 1024, true))
using (var writer = new StreamWriter(_serializationBuffer, Utf8Encoding, 1024, leaveOpen: true))
{
writer.Write("{\"metadata\":");
writer.Write(_cachedMetadataJsonLine);
Expand Down Expand Up @@ -538,8 +545,9 @@ private void ProcessQueueItems(object[] queueItems)
}
}

stream.Position = 0;
using (var content = new StreamContent(stream))
// Wrap in a non-owning view so StreamContent.Dispose() doesn't close _serializationBuffer.
var sendStream = new MemoryStream(_serializationBuffer.GetBuffer(), 0, (int)_serializationBuffer.Length, writable: false);
using (var content = new StreamContent(sendStream))
{
content.Headers.ContentType = MediaTypeHeaderValue;

Expand Down
65 changes: 65 additions & 0 deletions test/Elastic.Apm.Tests/BackendCommTests/PayloadSenderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,71 @@ await payloadSender.EnqueueEventInternal(
sendCount.Should().Be(1, "FlushAsync must return only after the HTTP POST completes");
}

/// <summary>
/// Regression guard for the serialization-buffer reuse optimization.
/// <para>
/// Before the fix, <see cref="PayloadSenderV2"/> allocated a new
/// <c>MemoryStream</c> for every outgoing batch, which caused LOH pressure
/// under sustained load. After the fix a single buffer is reused via
/// <c>SetLength(0)</c>.
/// </para>
/// <para>
/// This test verifies that the reset is complete: data written in batch A
/// must not bleed into the body received by the server for batch B.
/// </para>
/// </summary>
[Fact]
public async Task SequentialBatches_SerializationBufferIsIsolated()
{
var receivedBodies = new List<string>();
var handler = new MockHttpMessageHandler((request, _) =>
{
var body = request.Content.ReadAsStringAsync().GetAwaiter().GetResult();
lock (receivedBodies)
receivedBodies.Add(body);
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.Accepted));
});

// flushInterval: "0" forces the work loop to send immediately after each item.
var config = new MockConfiguration(_logger, flushInterval: "0", maxBatchEventCount: "3");
var service = Service.GetDefaultService(config, _logger);
using var payloadSender = new PayloadSenderV2(_logger, config, service, new Api.System(),
MockApmServerInfo.Version710, handler, TestDisplayName);
using var agent = new ApmAgent(new TestAgentComponents(_logger, payloadSender: payloadSender));

// Batch A: three transactions with a distinguishable name prefix.
for (var i = 0; i < 3; i++)
await payloadSender.EnqueueEventInternal(
new Transaction(agent, $"BatchA-Tx{i}", "test"), "Transaction");

await agent.FlushAsync(new CancellationTokenSource(10.Seconds()).Token);

// Batch B: three transactions with a different name prefix.
for (var i = 0; i < 3; i++)
await payloadSender.EnqueueEventInternal(
new Transaction(agent, $"BatchB-Tx{i}", "test"), "Transaction");

await agent.FlushAsync(new CancellationTokenSource(10.Seconds()).Token);

string batchABody, batchBBody;
lock (receivedBodies)
{
receivedBodies.Should().HaveCountGreaterOrEqualTo(2,
"two distinct flushes must produce at least two separate HTTP POSTs");

batchABody = receivedBodies[0];
batchBBody = receivedBodies[receivedBodies.Count - 1];
}

// Batch A's body must contain only A-prefixed names.
batchABody.Should().Contain("BatchA-Tx", "first batch must include BatchA transactions");
batchABody.Should().NotContain("BatchB-Tx", "BatchB data must not bleed into BatchA's HTTP body");

// Batch B's body must contain only B-prefixed names.
batchBBody.Should().Contain("BatchB-Tx", "second batch must include BatchB transactions");
batchBBody.Should().NotContain("BatchA-Tx", "BatchA data must not bleed into BatchB's HTTP body");
}

internal class TestArgs
{
internal int ArgsIndex { get; set; }
Expand Down
Loading