Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public void OnNext(KeyValuePair<string, object> value)
{
Name = name,
Activity = Activity.Current,
Links = links.Select(a => a.ParentId).ToList()
Links = links.Select(a => a.ParentId).ToList(),
LinkedActivities = links.ToList()
};

Scopes.Add(scope);
Expand Down Expand Up @@ -194,6 +195,7 @@ public class ProducedDiagnosticScope
public bool IsFailed => Exception != null;
public Exception Exception { get; set; }
public List<string> Links { get; set; } = new List<string>();
public List<Activity> LinkedActivities { get; set; } = new List<Activity>();

public override string ToString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,12 @@ internal virtual Task RunPartitionProcessingAsync(string partitionId,
&& partitionEvent.Data != null
&& EventDataInstrumentation.TryExtractDiagnosticId(partitionEvent.Data, out string diagnosticId))
{
diagnosticScope.AddLink(diagnosticId);
diagnosticScope.AddAttribute(DiagnosticProperty.EnqueuedTimeAttribute, partitionEvent.Data.EnqueuedTime.ToUnixTimeSeconds());
var attributes = new Dictionary<string, string>()
{
{ DiagnosticProperty.EnqueuedTimeAttribute, partitionEvent.Data.EnqueuedTime.ToUnixTimeSeconds().ToString() }
};

diagnosticScope.AddLink(diagnosticId, attributes);
}

diagnosticScope.Start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public async Task RunPartitionProcessingAsyncCreatesScopeForEventProcessing()
var mockStorage = new MockCheckPointStorage();
var mockConsumer = new Mock<EventHubConsumerClient>("cg", Mock.Of<EventHubConnection>(), default);
var mockProcessor = new Mock<EventProcessorClient>(mockStorage, "cg", fullyQualifiedNamespace, eventHubName, Mock.Of<Func<EventHubConnection>>(), default, default) { CallBase = true };
var enqueuedTime = DateTimeOffset.UtcNow;

using ClientDiagnosticListener listener = new ClientDiagnosticListener(DiagnosticSourceName);
var completionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand All @@ -100,8 +99,8 @@ async IAsyncEnumerable<PartitionEvent> mockPartitionEventEnumerable()
{
var context = new MockPartitionContext(partitionId);

yield return new PartitionEvent(context, new MockEventData(Array.Empty<byte>(), enqueuedTime: enqueuedTime) { Properties = { { "Diagnostic-Id", "id" } } });
yield return new PartitionEvent(context, new MockEventData(Array.Empty<byte>(), enqueuedTime: enqueuedTime) { Properties = { { "Diagnostic-Id", "id2" } } });
yield return new PartitionEvent(context, new EventData(Array.Empty<byte>()) { Properties = { { DiagnosticProperty.DiagnosticIdAttribute, "id" } } });
yield return new PartitionEvent(context, new EventData(Array.Empty<byte>()) { Properties = { { DiagnosticProperty.DiagnosticIdAttribute, "id2" } } });

while (!completionSource.Task.IsCompleted && !token.IsCancellationRequested)
{
Expand Down Expand Up @@ -161,8 +160,7 @@ async IAsyncEnumerable<PartitionEvent> mockPartitionEventEnumerable()
{
new KeyValuePair<string, string>(DiagnosticProperty.KindAttribute, DiagnosticProperty.ConsumerKind),
new KeyValuePair<string, string>(DiagnosticProperty.EventHubAttribute, eventHubName),
new KeyValuePair<string, string>(DiagnosticProperty.EndpointAttribute, fullyQualifiedNamespace),
new KeyValuePair<string, string>(DiagnosticProperty.EnqueuedTimeAttribute, enqueuedTime.ToUnixTimeSeconds().ToString())
new KeyValuePair<string, string>(DiagnosticProperty.EndpointAttribute, fullyQualifiedNamespace)
};

foreach (var scope in listener.Scopes)
Expand All @@ -171,6 +169,102 @@ async IAsyncEnumerable<PartitionEvent> mockPartitionEventEnumerable()
}
}

/// <summary>
/// Verifies diagnostics functionality of the <see cref="EventProcessorClient.RunPartitionProcessingAsync" />
/// method.
/// </summary>
///
[Test]
public async Task RunPartitionProcessingAsyncAddsAttributesToLinkedActivities()
{
string fullyQualifiedNamespace = "namespace";
string eventHubName = "eventHub";

var mockStorage = new MockCheckPointStorage();
var mockConsumer = new Mock<EventHubConsumerClient>("cg", Mock.Of<EventHubConnection>(), default);
var mockProcessor = new Mock<EventProcessorClient>(mockStorage, "cg", fullyQualifiedNamespace, eventHubName, Mock.Of<Func<EventHubConnection>>(), default, default) { CallBase = true };
var enqueuedTime = DateTimeOffset.UtcNow;

using ClientDiagnosticListener listener = new ClientDiagnosticListener(DiagnosticSourceName);
var completionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var processEventCalled = false;

mockConsumer
.Setup(consumer => consumer.ReadEventsFromPartitionAsync(
It.IsAny<string>(),
It.IsAny<EventPosition>(),
It.IsAny<ReadEventOptions>(),
It.IsAny<CancellationToken>()))
.Returns<string, EventPosition, ReadEventOptions, CancellationToken>((partitionId, position, options, token) =>
{
async IAsyncEnumerable<PartitionEvent> mockPartitionEventEnumerable()
{
var context = new MockPartitionContext(partitionId);

yield return new PartitionEvent(context, new MockEventData(Array.Empty<byte>(), enqueuedTime: enqueuedTime) { Properties = { { DiagnosticProperty.DiagnosticIdAttribute, "id" } } });

while (!completionSource.Task.IsCompleted && !token.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1));
yield return new PartitionEvent();
}

yield break;
};

return mockPartitionEventEnumerable();
});

mockProcessor
.Setup(processor => processor.CreateConsumer(
It.IsAny<string>(),
It.IsAny<EventHubConnection>(),
It.IsAny<EventHubConsumerClientOptions>()))
.Returns(mockConsumer.Object);

mockProcessor.Object.ProcessEventAsync += eventArgs =>
{
if (!processEventCalled)
{
processEventCalled = true;
completionSource.SetResult(null);
}

return Task.CompletedTask;
};

// RunPartitionProcessingAsync does not invoke the error handler, but we are setting it here in case
// this fact changes in the future.

mockProcessor.Object.ProcessErrorAsync += eventArgs => Task.CompletedTask;

// Start processing and wait for the consumer to be invoked. Set a cancellation for backup to ensure
// that the test completes deterministically.

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(15));

using var partitionProcessingTask = Task.Run(() => mockProcessor.Object.RunPartitionProcessingAsync("pid", default, cancellationSource.Token));
await Task.WhenAny(Task.Delay(-1, cancellationSource.Token), completionSource.Task);
await Task.WhenAny(Task.Delay(-1, cancellationSource.Token), partitionProcessingTask);

// Validate that cancellation did not take place.

Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The processor should have stopped without cancellation.");

// Validate diagnostics functionality.

var processingScope = listener.Scopes.Single(s => s.Name == DiagnosticProperty.EventProcessorProcessingActivityName);
var linkedActivity = processingScope.LinkedActivities.Single(a => a.ParentId == "id");

var expectedTags = new List<KeyValuePair<string, string>>()
{
new KeyValuePair<string, string>(DiagnosticProperty.EnqueuedTimeAttribute, enqueuedTime.ToUnixTimeSeconds().ToString())
};

Assert.That(linkedActivity.Tags, Is.EquivalentTo(expectedTags));
}

private class MockConnection : EventHubConnection
{
private const string MockConnectionString = "Endpoint=value.com;SharedAccessKeyName=[value];SharedAccessKey=[value];";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ internal static class DiagnosticProperty
public const string EndpointAttribute = "peer.address";

/// <summary>The attribute which represents the UNIX Epoch enqueued time of an event to associate with diagnostics information.</summary>
public const string EnqueuedTimeAttribute = "x-opt-enqueued-time";
public const string EnqueuedTimeAttribute = "enqueuedTime";

/// <summary>The value which identifies the Event Hubs diagnostics context.</summary>
public const string EventHubsServiceContext = "eventhubs";
Expand Down