diff --git a/sdk/core/Azure.Core/tests/TestFramework/ClientDiagnosticListener.cs b/sdk/core/Azure.Core/tests/TestFramework/ClientDiagnosticListener.cs index 9af9c3f9b298..26e008c410cd 100755 --- a/sdk/core/Azure.Core/tests/TestFramework/ClientDiagnosticListener.cs +++ b/sdk/core/Azure.Core/tests/TestFramework/ClientDiagnosticListener.cs @@ -55,7 +55,8 @@ public void OnNext(KeyValuePair value) { Name = name, Activity = Activity.Current, - Links = links.Select(a => a.ParentId).ToList() + Links = links.Select(a => a.ParentId).ToList(), + LinkedActivities = links.ToList() }; Scopes.Add(scope); @@ -194,6 +195,7 @@ public class ProducedDiagnosticScope public bool IsFailed => Exception != null; public Exception Exception { get; set; } public List Links { get; set; } = new List(); + public List LinkedActivities { get; set; } = new List(); public override string ToString() { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs index 1d896bbc5a7f..b21b83035f15 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs @@ -726,8 +726,12 @@ internal virtual Task RunPartitionProcessingAsync(string partitionId, && partitionEvent.Data != null && EventDataInstrumentation.TryExtractDiagnosticId(partitionEvent.Data, out string diagnosticId)) { - diagnosticScope.AddLink(diagnosticId); - diagnosticScope.AddAttribute(DiagnosticProperty.EnqueuedTimeAttribute, partitionEvent.Data.EnqueuedTime.ToUnixTimeSeconds()); + var attributes = new Dictionary() + { + { DiagnosticProperty.EnqueuedTimeAttribute, partitionEvent.Data.EnqueuedTime.ToUnixTimeSeconds().ToString() } + }; + + diagnosticScope.AddLink(diagnosticId, attributes); } diagnosticScope.Start(); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs index fa4eadb137c1..4920ed3c565a 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs @@ -82,7 +82,6 @@ public async Task RunPartitionProcessingAsyncCreatesScopeForEventProcessing() var mockStorage = new MockCheckPointStorage(); var mockConsumer = new Mock("cg", Mock.Of(), default); var mockProcessor = new Mock(mockStorage, "cg", fullyQualifiedNamespace, eventHubName, Mock.Of>(), default, default) { CallBase = true }; - var enqueuedTime = DateTimeOffset.UtcNow; using ClientDiagnosticListener listener = new ClientDiagnosticListener(DiagnosticSourceName); var completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -100,8 +99,8 @@ async IAsyncEnumerable mockPartitionEventEnumerable() { var context = new MockPartitionContext(partitionId); - yield return new PartitionEvent(context, new MockEventData(Array.Empty(), enqueuedTime: enqueuedTime) { Properties = { { "Diagnostic-Id", "id" } } }); - yield return new PartitionEvent(context, new MockEventData(Array.Empty(), enqueuedTime: enqueuedTime) { Properties = { { "Diagnostic-Id", "id2" } } }); + yield return new PartitionEvent(context, new EventData(Array.Empty()) { Properties = { { DiagnosticProperty.DiagnosticIdAttribute, "id" } } }); + yield return new PartitionEvent(context, new EventData(Array.Empty()) { Properties = { { DiagnosticProperty.DiagnosticIdAttribute, "id2" } } }); while (!completionSource.Task.IsCompleted && !token.IsCancellationRequested) { @@ -161,8 +160,7 @@ async IAsyncEnumerable mockPartitionEventEnumerable() { new KeyValuePair(DiagnosticProperty.KindAttribute, DiagnosticProperty.ConsumerKind), new KeyValuePair(DiagnosticProperty.EventHubAttribute, eventHubName), - new KeyValuePair(DiagnosticProperty.EndpointAttribute, fullyQualifiedNamespace), - new KeyValuePair(DiagnosticProperty.EnqueuedTimeAttribute, enqueuedTime.ToUnixTimeSeconds().ToString()) + new KeyValuePair(DiagnosticProperty.EndpointAttribute, fullyQualifiedNamespace) }; foreach (var scope in listener.Scopes) @@ -171,6 +169,102 @@ async IAsyncEnumerable mockPartitionEventEnumerable() } } + /// + /// Verifies diagnostics functionality of the + /// method. + /// + /// + [Test] + public async Task RunPartitionProcessingAsyncAddsAttributesToLinkedActivities() + { + string fullyQualifiedNamespace = "namespace"; + string eventHubName = "eventHub"; + + var mockStorage = new MockCheckPointStorage(); + var mockConsumer = new Mock("cg", Mock.Of(), default); + var mockProcessor = new Mock(mockStorage, "cg", fullyQualifiedNamespace, eventHubName, Mock.Of>(), default, default) { CallBase = true }; + var enqueuedTime = DateTimeOffset.UtcNow; + + using ClientDiagnosticListener listener = new ClientDiagnosticListener(DiagnosticSourceName); + var completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var processEventCalled = false; + + mockConsumer + .Setup(consumer => consumer.ReadEventsFromPartitionAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns((partitionId, position, options, token) => + { + async IAsyncEnumerable mockPartitionEventEnumerable() + { + var context = new MockPartitionContext(partitionId); + + yield return new PartitionEvent(context, new MockEventData(Array.Empty(), enqueuedTime: enqueuedTime) { Properties = { { DiagnosticProperty.DiagnosticIdAttribute, "id" } } }); + + while (!completionSource.Task.IsCompleted && !token.IsCancellationRequested) + { + await Task.Delay(TimeSpan.FromSeconds(1)); + yield return new PartitionEvent(); + } + + yield break; + }; + + return mockPartitionEventEnumerable(); + }); + + mockProcessor + .Setup(processor => processor.CreateConsumer( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(mockConsumer.Object); + + mockProcessor.Object.ProcessEventAsync += eventArgs => + { + if (!processEventCalled) + { + processEventCalled = true; + completionSource.SetResult(null); + } + + return Task.CompletedTask; + }; + + // RunPartitionProcessingAsync does not invoke the error handler, but we are setting it here in case + // this fact changes in the future. + + mockProcessor.Object.ProcessErrorAsync += eventArgs => Task.CompletedTask; + + // Start processing and wait for the consumer to be invoked. Set a cancellation for backup to ensure + // that the test completes deterministically. + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(15)); + + using var partitionProcessingTask = Task.Run(() => mockProcessor.Object.RunPartitionProcessingAsync("pid", default, cancellationSource.Token)); + await Task.WhenAny(Task.Delay(-1, cancellationSource.Token), completionSource.Task); + await Task.WhenAny(Task.Delay(-1, cancellationSource.Token), partitionProcessingTask); + + // Validate that cancellation did not take place. + + Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The processor should have stopped without cancellation."); + + // Validate diagnostics functionality. + + var processingScope = listener.Scopes.Single(s => s.Name == DiagnosticProperty.EventProcessorProcessingActivityName); + var linkedActivity = processingScope.LinkedActivities.Single(a => a.ParentId == "id"); + + var expectedTags = new List>() + { + new KeyValuePair(DiagnosticProperty.EnqueuedTimeAttribute, enqueuedTime.ToUnixTimeSeconds().ToString()) + }; + + Assert.That(linkedActivity.Tags, Is.EquivalentTo(expectedTags)); + } + private class MockConnection : EventHubConnection { private const string MockConnectionString = "Endpoint=value.com;SharedAccessKeyName=[value];SharedAccessKey=[value];"; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Diagnostics/DiagnosticProperty.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Diagnostics/DiagnosticProperty.cs index a3fa0f38b79c..cfc9246794a9 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Diagnostics/DiagnosticProperty.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Diagnostics/DiagnosticProperty.cs @@ -22,7 +22,7 @@ internal static class DiagnosticProperty public const string EndpointAttribute = "peer.address"; /// The attribute which represents the UNIX Epoch enqueued time of an event to associate with diagnostics information. - public const string EnqueuedTimeAttribute = "x-opt-enqueued-time"; + public const string EnqueuedTimeAttribute = "enqueuedTime"; /// The value which identifies the Event Hubs diagnostics context. public const string EventHubsServiceContext = "eventhubs";