Skip to content
19 changes: 16 additions & 3 deletions src/OpenTelemetry/Batch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ public void Dispose()
T item = this.circularBuffer.Read();
if (typeof(T) == typeof(LogRecord))
{
LogRecordSharedPool.Current.Return((LogRecord)(object)item);
var logRecord = (LogRecord)(object)item;
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
{
LogRecordSharedPool.Current.Return(logRecord);
}
}
}
}
Expand Down Expand Up @@ -134,7 +138,11 @@ public struct Enumerator : IEnumerator<T>

if (currentItem != null)
{
LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem);
var logRecord = (LogRecord)(object)currentItem;
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
{
LogRecordSharedPool.Current.Return(logRecord);
}
}

if (circularBuffer!.RemovedCount < enumerator.targetCount)
Expand Down Expand Up @@ -215,7 +223,12 @@ public void Dispose()
var currentItem = this.current;
if (currentItem != null)
{
LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem);
var logRecord = (LogRecord)(object)currentItem;
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
{
LogRecordSharedPool.Current.Return(logRecord);
}

this.current = null;
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
state for cumulative temporality.
[#5230](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5230)

* Fixed an issue causing `LogRecord`s to be incorrectly reused when wrapping an
instance of `BatchLogRecordExportProcessor` inside another
`BaseProcessor<LogRecord>` which leads to missing or incorrect data during
export.
[#5255](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5255)

## 1.7.0

Released 2023-Dec-08
Expand Down
24 changes: 19 additions & 5 deletions src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,27 @@ public override void OnEnd(LogRecord data)
// happen here.
Debug.Assert(data != null, "LogRecord was null.");

data!.Buffer();
switch (data!.Source)
{
case LogRecord.LogRecordSource.FromSharedPool:
data.Buffer();
data.AddReference();
if (!this.TryExport(data))
{
LogRecordSharedPool.Current.Return(data);
}

data.AddReference();
break;
case LogRecord.LogRecordSource.CreatedManually:
data.Buffer();
this.TryExport(data);
break;
default:
Debug.Assert(data.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "LogRecord source was something unexpected");

if (!this.TryExport(data))
{
LogRecordSharedPool.Current.Return(data);
// Note: If we are using ThreadStatic pool we make a copy of the record.
this.TryExport(data.Copy());
break;
}
}
}
19 changes: 19 additions & 0 deletions src/OpenTelemetry/Logs/LogRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public sealed class LogRecord
internal IReadOnlyList<KeyValuePair<string, object?>>? AttributeData;
internal List<KeyValuePair<string, object?>>? AttributeStorage;
internal List<object?>? ScopeStorage;
internal LogRecordSource Source = LogRecordSource.CreatedManually;
internal int PoolReferenceCount = int.MaxValue;

private static readonly Action<object?, List<object?>> AddScopeToBufferedList = (object? scope, List<object?> state) =>
Expand Down Expand Up @@ -80,6 +81,24 @@ internal LogRecord(
}
}

internal enum LogRecordSource
{
/// <summary>
/// A <see cref="LogRecord"/> created manually.
/// </summary>
CreatedManually,

/// <summary>
/// A <see cref="LogRecord"/> rented from the <see cref="LogRecordThreadStaticPool"/>.
/// </summary>
FromThreadStaticPool,

/// <summary>
/// A <see cref="LogRecord"/> rented from the <see cref="LogRecordSharedPool"/>.
/// </summary>
FromSharedPool,
}

/// <summary>
/// Gets or sets the log timestamp.
/// </summary>
Expand Down
11 changes: 9 additions & 2 deletions src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using OpenTelemetry.Internal;

Expand All @@ -17,7 +18,7 @@ internal sealed class LogRecordSharedPool : ILogRecordPool
private long rentIndex;
private long returnIndex;

public LogRecordSharedPool(int capacity)
private LogRecordSharedPool(int capacity)
{
this.Capacity = capacity;
this.pool = new LogRecord?[capacity];
Expand Down Expand Up @@ -54,18 +55,24 @@ public LogRecord Rent()
continue;
}

Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromSharedPool, "logRecord.Source was not FromSharedPool");
logRecord.ResetReferenceCount();
return logRecord;
}
}

var newLogRecord = new LogRecord();
var newLogRecord = new LogRecord()
{
Source = LogRecord.LogRecordSource.FromSharedPool,
};
newLogRecord.ResetReferenceCount();
return newLogRecord;
}

public void Return(LogRecord logRecord)
{
Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromSharedPool, "logRecord.Source was not FromSharedPool");

if (logRecord.RemoveReference() != 0)
{
return;
Expand Down
14 changes: 12 additions & 2 deletions src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;

namespace OpenTelemetry.Logs;

internal sealed class LogRecordThreadStaticPool : ILogRecordPool
Expand All @@ -19,15 +21,23 @@ public LogRecord Rent()
var logRecord = Storage;
if (logRecord != null)
{
Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "logRecord.Source was not FromThreadStaticPool");
Storage = null;
return logRecord;
}
else
{
logRecord = new()
{
Source = LogRecord.LogRecordSource.FromThreadStaticPool,
};
}

return new();
return logRecord;
}

public void Return(LogRecord logRecord)
{
Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "logRecord.Source was not FromThreadStaticPool");
if (Storage == null)
{
LogRecordPoolHelper.Clear(logRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public void StateValuesAndScopeBufferingTest()

using var scope = scopeProvider.Push(exportedItems);

var logRecord = new LogRecord();
var pool = LogRecordSharedPool.Current;

var logRecord = pool.Rent();

var state = new LogRecordTest.DisposingState("Hello world");

Expand Down Expand Up @@ -60,6 +62,7 @@ public void StateValuesAndScopeBufferingTest()
processor.Shutdown();

Assert.Single(exportedItems);
Assert.Same(logRecord, exportedItems[0]);
}

[Fact]
Expand All @@ -74,14 +77,19 @@ public void StateBufferingTest()
using var processor = new BatchLogRecordExportProcessor(
new InMemoryExporter<LogRecord>(exportedItems));

var logRecord = new LogRecord();
var pool = LogRecordSharedPool.Current;

var logRecord = pool.Rent();

var state = new LogRecordTest.DisposingState("Hello world");
logRecord.State = state;

processor.OnEnd(logRecord);
processor.Shutdown();

Assert.Single(exportedItems);
Assert.Same(logRecord, exportedItems[0]);

state.Dispose();

Assert.Throws<ObjectDisposedException>(() =>
Expand All @@ -93,5 +101,41 @@ public void StateBufferingTest()
}
});
}

[Fact]
public void CopyMadeWhenLogRecordIsFromThreadStaticPoolTest()
{
List<LogRecord> exportedItems = new();

using var processor = new BatchLogRecordExportProcessor(
new InMemoryExporter<LogRecord>(exportedItems));

var pool = LogRecordThreadStaticPool.Instance;

var logRecord = pool.Rent();

processor.OnEnd(logRecord);
processor.Shutdown();

Assert.Single(exportedItems);
Assert.NotSame(logRecord, exportedItems[0]);
}

[Fact]
public void LogRecordAddedToBatchIfNotFromAnyPoolTest()
{
List<LogRecord> exportedItems = new();

using var processor = new BatchLogRecordExportProcessor(
new InMemoryExporter<LogRecord>(exportedItems));

var logRecord = new LogRecord();

processor.OnEnd(logRecord);
processor.Shutdown();

Assert.Single(exportedItems);
Assert.Same(logRecord, exportedItems[0]);
}
}
#endif
21 changes: 12 additions & 9 deletions test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,23 @@ public void RentReturnTests()

Assert.Equal(1, pool.Count);

// Note: This is ignored because logRecord manually created has PoolReferenceCount = int.MaxValue.
LogRecord manualRecord = new();
Assert.Equal(int.MaxValue, manualRecord.PoolReferenceCount);
pool.Return(manualRecord);
var logRecordWithReferencesAdded = pool.Rent();

Assert.Equal(1, pool.Count);
// Note: This record won't be returned to the pool because we add a reference to it.
logRecordWithReferencesAdded.AddReference();

Assert.Equal(2, logRecordWithReferencesAdded.PoolReferenceCount);
pool.Return(logRecordWithReferencesAdded);

Assert.Equal(0, pool.Count);

pool.Return(logRecord2);

Assert.Equal(2, pool.Count);
Assert.Equal(1, pool.Count);

logRecord1 = pool.Rent();
Assert.NotNull(logRecord1);
Assert.Equal(1, pool.Count);
Assert.Equal(0, pool.Count);

logRecord2 = pool.Rent();
Assert.NotNull(logRecord2);
Expand All @@ -70,7 +73,7 @@ public void RentReturnTests()

pool.Return(logRecord1);
pool.Return(logRecord2);
pool.Return(logRecord3);
pool.Return(logRecord3); // <- Discarded due to pool size of 2
pool.Return(logRecord4); // <- Discarded due to pool size of 2

Assert.Equal(2, pool.Count);
Expand Down Expand Up @@ -163,7 +166,7 @@ public async Task ExportTest(bool warmup)
{
for (int i = 0; i < LogRecordSharedPool.DefaultMaxPoolSize; i++)
{
pool.Return(new LogRecord { PoolReferenceCount = 1 });
pool.Return(new LogRecord { Source = LogRecord.LogRecordSource.FromSharedPool, PoolReferenceCount = 1 });
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ public void RentReturnTests()
Assert.NotNull(LogRecordThreadStaticPool.Storage);
Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage);

LogRecordThreadStaticPool.Instance.Return(new());
// Note: This record will be ignored because there is already something in the ThreadStatic storage.
LogRecordThreadStaticPool.Instance.Return(new() { Source = LogRecord.LogRecordSource.FromThreadStaticPool });
Assert.NotNull(LogRecordThreadStaticPool.Storage);
Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage);

LogRecordThreadStaticPool.Storage = null;

var manual = new LogRecord();
LogRecordThreadStaticPool.Instance.Return(manual);
var newLogRecord = new LogRecord() { Source = LogRecord.LogRecordSource.FromThreadStaticPool };
LogRecordThreadStaticPool.Instance.Return(newLogRecord);
Assert.NotNull(LogRecordThreadStaticPool.Storage);
Assert.Equal(manual, LogRecordThreadStaticPool.Storage);
Assert.Equal(newLogRecord, LogRecordThreadStaticPool.Storage);
}

[Fact]
Expand Down