Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ internal virtual Task RunPartitionProcessingAsync(string partitionId,
&& EventDataInstrumentation.TryExtractDiagnosticId(partitionEvent.Data, out string diagnosticId))
{
diagnosticScope.AddLink(diagnosticId);
diagnosticScope.AddAttribute(DiagnosticProperty.EnqueuedTimeAttribute, partitionEvent.Data.EnqueuedTime.ToUnixTimeSeconds());
}

diagnosticScope.Start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public async Task RunPartitionProcessingAsyncCreatesScopeForEventProcessing()
var mockStorage = new MockCheckPointStorage();
var mockConsumer = new Mock<EventHubConsumerClient>("cg", Mock.Of<EventHubConnection>(), default);
var mockProcessor = new Mock<EventProcessorClient>(mockStorage, "cg", fullyQualifiedNamespace, eventHubName, Mock.Of<Func<EventHubConnection>>(), default, default) { CallBase = true };
var enqueuedTime = DateTimeOffset.UtcNow;

using ClientDiagnosticListener listener = new ClientDiagnosticListener(DiagnosticSourceName);
var completionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand All @@ -99,8 +100,8 @@ async IAsyncEnumerable<PartitionEvent> mockPartitionEventEnumerable()
{
var context = new MockPartitionContext(partitionId);

yield return new PartitionEvent(context, new EventData(Array.Empty<byte>()) { Properties = { { "Diagnostic-Id", "id" } } });
yield return new PartitionEvent(context, new EventData(Array.Empty<byte>()) { Properties = { { "Diagnostic-Id", "id2" } } });
yield return new PartitionEvent(context, new MockEventData(Array.Empty<byte>(), enqueuedTime: enqueuedTime) { Properties = { { "Diagnostic-Id", "id" } } });
yield return new PartitionEvent(context, new MockEventData(Array.Empty<byte>(), enqueuedTime: enqueuedTime) { Properties = { { "Diagnostic-Id", "id2" } } });

while (!completionSource.Task.IsCompleted && !token.IsCancellationRequested)
{
Expand Down Expand Up @@ -160,7 +161,8 @@ async IAsyncEnumerable<PartitionEvent> mockPartitionEventEnumerable()
{
new KeyValuePair<string, string>(DiagnosticProperty.KindAttribute, DiagnosticProperty.ConsumerKind),
new KeyValuePair<string, string>(DiagnosticProperty.EventHubAttribute, eventHubName),
new KeyValuePair<string, string>(DiagnosticProperty.EndpointAttribute, fullyQualifiedNamespace)
new KeyValuePair<string, string>(DiagnosticProperty.EndpointAttribute, fullyQualifiedNamespace),
new KeyValuePair<string, string>(DiagnosticProperty.EnqueuedTimeAttribute, enqueuedTime.ToUnixTimeSeconds().ToString())
};

foreach (var scope in listener.Scopes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ internal static class DiagnosticProperty
/// <summary>The attribute which represents the fully-qualified endpoint address of the Event Hubs namespace to associate with diagnostics information.</summary>
public const string EndpointAttribute = "peer.address";

/// <summary>The attribute which represents the UNIX Epoch enqueued time of an event to associate with diagnostics information.</summary>
public const string EnqueuedTimeAttribute = "x-opt-enqueued-time";

/// <summary>The value which identifies the Event Hubs diagnostics context.</summary>
public const string EventHubsServiceContext = "eventhubs";

Expand Down