-
Notifications
You must be signed in to change notification settings - Fork 29
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
947d047
commit ef00e39
Showing
16 changed files
with
508 additions
and
469 deletions.
There are no files selected for viewing
71 changes: 42 additions & 29 deletions
71
src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,38 +1,51 @@ | ||
using System; | ||
// Copyright © Serilog Contributors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
using System; | ||
using Serilog.Configuration; | ||
using Serilog.Sinks.Async; | ||
|
||
namespace Serilog | ||
namespace Serilog; | ||
|
||
/// <summary> | ||
/// Extends <see cref="LoggerConfiguration"/> with methods for configuring asynchronous logging. | ||
/// </summary> | ||
public static class LoggerConfigurationAsyncExtensions | ||
{ | ||
/// <summary> | ||
/// Extends <see cref="LoggerConfiguration"/> with methods for configuring asynchronous logging. | ||
/// Configure a sink to be invoked asynchronously, on a background worker thread. | ||
/// Accepts a reference to a <paramref name="monitor"/> that will be supplied the internal state interface for health monitoring purposes. | ||
/// </summary> | ||
public static class LoggerConfigurationAsyncExtensions | ||
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param> | ||
/// <param name="configure">An action that configures the wrapped sink.</param> | ||
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If | ||
/// the thread is unable to process events quickly enough and the queue is filled, depending on | ||
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until | ||
/// room is made in the queue.</param> | ||
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param> | ||
/// <param name="monitor">Monitor to supply buffer information to.</param> | ||
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns> | ||
public static LoggerConfiguration Async( | ||
this LoggerSinkConfiguration loggerSinkConfiguration, | ||
Action<LoggerSinkConfiguration> configure, | ||
int bufferSize = 10000, | ||
bool blockWhenFull = false, | ||
IAsyncLogEventSinkMonitor? monitor = null) | ||
{ | ||
/// <summary> | ||
/// Configure a sink to be invoked asynchronously, on a background worker thread. | ||
/// Accepts a reference to a <paramref name="monitor"/> that will be supplied the internal state interface for health monitoring purposes. | ||
/// </summary> | ||
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param> | ||
/// <param name="configure">An action that configures the wrapped sink.</param> | ||
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If | ||
/// the thread is unable to process events quickly enough and the queue is filled, depending on | ||
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until | ||
/// room is made in the queue.</param> | ||
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param> | ||
/// <param name="monitor">Monitor to supply buffer information to.</param> | ||
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns> | ||
public static LoggerConfiguration Async( | ||
this LoggerSinkConfiguration loggerSinkConfiguration, | ||
Action<LoggerSinkConfiguration> configure, | ||
int bufferSize = 10000, | ||
bool blockWhenFull = false, | ||
IAsyncLogEventSinkMonitor? monitor = null) | ||
{ | ||
var wrapper = LoggerSinkConfiguration.Wrap( | ||
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitor), | ||
configure); | ||
return loggerSinkConfiguration.Sink(wrapper); | ||
} | ||
var wrapper = LoggerSinkConfiguration.Wrap( | ||
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitor), | ||
configure); | ||
return loggerSinkConfiguration.Sink(wrapper); | ||
} | ||
} |
149 changes: 81 additions & 68 deletions
149
src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,100 +1,113 @@ | ||
using System; | ||
// Copyright © Serilog Contributors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
using System; | ||
using System.Collections.Concurrent; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Serilog.Core; | ||
using Serilog.Debugging; | ||
using Serilog.Events; | ||
|
||
namespace Serilog.Sinks.Async | ||
namespace Serilog.Sinks.Async; | ||
|
||
sealed class BackgroundWorkerSink : ILogEventSink, IAsyncLogEventSinkInspector, IDisposable | ||
{ | ||
sealed class BackgroundWorkerSink : ILogEventSink, IAsyncLogEventSinkInspector, IDisposable | ||
{ | ||
readonly ILogEventSink _wrappedSink; | ||
readonly bool _blockWhenFull; | ||
readonly BlockingCollection<LogEvent> _queue; | ||
readonly Task _worker; | ||
readonly IAsyncLogEventSinkMonitor? _monitor; | ||
readonly ILogEventSink _wrappedSink; | ||
readonly bool _blockWhenFull; | ||
readonly BlockingCollection<LogEvent> _queue; | ||
readonly Task _worker; | ||
readonly IAsyncLogEventSinkMonitor? _monitor; | ||
|
||
long _droppedMessages; | ||
long _droppedMessages; | ||
|
||
public BackgroundWorkerSink(ILogEventSink wrappedSink, int bufferCapacity, bool blockWhenFull, IAsyncLogEventSinkMonitor? monitor = null) | ||
{ | ||
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity)); | ||
_wrappedSink = wrappedSink ?? throw new ArgumentNullException(nameof(wrappedSink)); | ||
_blockWhenFull = blockWhenFull; | ||
_queue = new BlockingCollection<LogEvent>(bufferCapacity); | ||
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); | ||
_monitor = monitor; | ||
monitor?.StartMonitoring(this); | ||
} | ||
public BackgroundWorkerSink(ILogEventSink wrappedSink, int bufferCapacity, bool blockWhenFull, IAsyncLogEventSinkMonitor? monitor = null) | ||
{ | ||
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity)); | ||
_wrappedSink = wrappedSink ?? throw new ArgumentNullException(nameof(wrappedSink)); | ||
_blockWhenFull = blockWhenFull; | ||
_queue = new BlockingCollection<LogEvent>(bufferCapacity); | ||
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); | ||
_monitor = monitor; | ||
monitor?.StartMonitoring(this); | ||
} | ||
|
||
public void Emit(LogEvent logEvent) | ||
{ | ||
if (_queue.IsAddingCompleted) | ||
return; | ||
public void Emit(LogEvent logEvent) | ||
{ | ||
if (_queue.IsAddingCompleted) | ||
return; | ||
|
||
try | ||
try | ||
{ | ||
if (_blockWhenFull) | ||
{ | ||
if (_blockWhenFull) | ||
{ | ||
_queue.Add(logEvent); | ||
} | ||
else | ||
{ | ||
if (!_queue.TryAdd(logEvent)) | ||
{ | ||
Interlocked.Increment(ref _droppedMessages); | ||
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _queue.BoundedCapacity); | ||
} | ||
} | ||
_queue.Add(logEvent); | ||
} | ||
catch (InvalidOperationException) | ||
else | ||
{ | ||
// Thrown in the event of a race condition when we try to add another event after | ||
// CompleteAdding has been called | ||
if (!_queue.TryAdd(logEvent)) | ||
{ | ||
Interlocked.Increment(ref _droppedMessages); | ||
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _queue.BoundedCapacity); | ||
} | ||
} | ||
} | ||
|
||
public void Dispose() | ||
catch (InvalidOperationException) | ||
{ | ||
// Prevent any more events from being added | ||
_queue.CompleteAdding(); | ||
// Thrown in the event of a race condition when we try to add another event after | ||
// CompleteAdding has been called | ||
} | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
// Prevent any more events from being added | ||
_queue.CompleteAdding(); | ||
|
||
// Allow queued events to be flushed | ||
_worker.Wait(); | ||
// Allow queued events to be flushed | ||
_worker.Wait(); | ||
|
||
(_wrappedSink as IDisposable)?.Dispose(); | ||
(_wrappedSink as IDisposable)?.Dispose(); | ||
|
||
_monitor?.StopMonitoring(this); | ||
} | ||
_monitor?.StopMonitoring(this); | ||
} | ||
|
||
void Pump() | ||
void Pump() | ||
{ | ||
try | ||
{ | ||
try | ||
foreach (var next in _queue.GetConsumingEnumerable()) | ||
{ | ||
foreach (var next in _queue.GetConsumingEnumerable()) | ||
try | ||
{ | ||
try | ||
{ | ||
_wrappedSink.Emit(next); | ||
} | ||
catch (Exception ex) | ||
{ | ||
SelfLog.WriteLine("{0} failed to emit event to wrapped sink: {1}", typeof(BackgroundWorkerSink), ex); | ||
} | ||
_wrappedSink.Emit(next); | ||
} | ||
catch (Exception ex) | ||
{ | ||
SelfLog.WriteLine("{0} failed to emit event to wrapped sink: {1}", typeof(BackgroundWorkerSink), ex); | ||
} | ||
} | ||
catch (Exception fatal) | ||
{ | ||
SelfLog.WriteLine("{0} fatal error in worker thread: {1}", typeof(BackgroundWorkerSink), fatal); | ||
} | ||
} | ||
catch (Exception fatal) | ||
{ | ||
SelfLog.WriteLine("{0} fatal error in worker thread: {1}", typeof(BackgroundWorkerSink), fatal); | ||
} | ||
} | ||
|
||
int IAsyncLogEventSinkInspector.BufferSize => _queue.BoundedCapacity; | ||
int IAsyncLogEventSinkInspector.BufferSize => _queue.BoundedCapacity; | ||
|
||
int IAsyncLogEventSinkInspector.Count => _queue.Count; | ||
int IAsyncLogEventSinkInspector.Count => _queue.Count; | ||
|
||
long IAsyncLogEventSinkInspector.DroppedMessagesCount => _droppedMessages; | ||
} | ||
long IAsyncLogEventSinkInspector.DroppedMessagesCount => _droppedMessages; | ||
} |
51 changes: 32 additions & 19 deletions
51
src/Serilog.Sinks.Async/Sinks/Async/IAsyncLogEventSinkInspector.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,25 +1,38 @@ | ||
namespace Serilog.Sinks.Async | ||
// Copyright © Serilog Contributors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
namespace Serilog.Sinks.Async; | ||
|
||
/// <summary> | ||
/// Provides a way to inspect the state of Async wrapper's ingestion queue. | ||
/// </summary> | ||
public interface IAsyncLogEventSinkInspector | ||
{ | ||
/// <summary> | ||
/// Provides a way to inspect the state of Async wrapper's ingestion queue. | ||
/// Configured maximum number of items permitted to be held in the buffer awaiting ingestion. | ||
/// </summary> | ||
public interface IAsyncLogEventSinkInspector | ||
{ | ||
/// <summary> | ||
/// Configured maximum number of items permitted to be held in the buffer awaiting ingestion. | ||
/// </summary> | ||
/// <exception cref="T:System.ObjectDisposedException">The Sink has been disposed.</exception> | ||
int BufferSize { get; } | ||
/// <exception cref="T:System.ObjectDisposedException">The Sink has been disposed.</exception> | ||
int BufferSize { get; } | ||
|
||
/// <summary> | ||
/// Current moment-in-time Count of items currently awaiting ingestion. | ||
/// </summary> | ||
/// <exception cref="T:System.ObjectDisposedException">The Sink has been disposed.</exception> | ||
int Count { get; } | ||
/// <summary> | ||
/// Current moment-in-time Count of items currently awaiting ingestion. | ||
/// </summary> | ||
/// <exception cref="T:System.ObjectDisposedException">The Sink has been disposed.</exception> | ||
int Count { get; } | ||
|
||
/// <summary> | ||
/// Accumulated number of messages dropped due to breaches of <see cref="BufferSize"/> limit. | ||
/// </summary> | ||
long DroppedMessagesCount { get; } | ||
} | ||
/// <summary> | ||
/// Accumulated number of messages dropped due to breaches of <see cref="BufferSize"/> limit. | ||
/// </summary> | ||
long DroppedMessagesCount { get; } | ||
} |
42 changes: 27 additions & 15 deletions
42
src/Serilog.Sinks.Async/Sinks/Async/IAsyncLogEventSinkMonitor.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,32 @@ | ||
namespace Serilog.Sinks.Async | ||
namespace Serilog.Sinks.Async; | ||
// Copyright © Serilog Contributors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
/// <summary> | ||
/// Defines a mechanism for the Async Sink to afford Health Checks a buffer metadata inspection mechanism. | ||
/// </summary> | ||
public interface IAsyncLogEventSinkMonitor | ||
{ | ||
/// <summary> | ||
/// Defines a mechanism for the Async Sink to afford Health Checks a buffer metadata inspection mechanism. | ||
/// Invoked by Sink to supply the inspector to the monitor. | ||
/// </summary> | ||
public interface IAsyncLogEventSinkMonitor | ||
{ | ||
/// <summary> | ||
/// Invoked by Sink to supply the inspector to the monitor. | ||
/// </summary> | ||
/// <param name="inspector">The Async Sink's inspector.</param> | ||
void StartMonitoring(IAsyncLogEventSinkInspector inspector); | ||
/// <param name="inspector">The Async Sink's inspector.</param> | ||
void StartMonitoring(IAsyncLogEventSinkInspector inspector); | ||
|
||
/// <summary> | ||
/// Invoked by Sink to indicate that it is being Disposed. | ||
/// </summary> | ||
/// <param name="inspector">The Async Sink's inspector.</param> | ||
void StopMonitoring(IAsyncLogEventSinkInspector inspector); | ||
} | ||
/// <summary> | ||
/// Invoked by Sink to indicate that it is being Disposed. | ||
/// </summary> | ||
/// <param name="inspector">The Async Sink's inspector.</param> | ||
void StopMonitoring(IAsyncLogEventSinkInspector inspector); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,14 @@ | ||
using BenchmarkDotNet.Running; | ||
using Xunit; | ||
|
||
namespace Serilog.Sinks.Async.PerformanceTests | ||
namespace Serilog.Sinks.Async.PerformanceTests; | ||
|
||
public class Benchmarks | ||
{ | ||
public class Benchmarks | ||
[Fact] | ||
public void Benchmark() | ||
{ | ||
[Fact] | ||
public void Benchmark() | ||
{ | ||
BenchmarkRunner.Run<ThroughputBenchmark>(); | ||
BenchmarkRunner.Run<LatencyBenchmark>(); | ||
} | ||
BenchmarkRunner.Run<ThroughputBenchmark>(); | ||
BenchmarkRunner.Run<LatencyBenchmark>(); | ||
} | ||
} |
Oops, something went wrong.