Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the IMeterFactory approach #7087

Closed
wants to merge 81 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
e69aaf7
Provide an API allowing users to control the start of a new trace whe…
lailabougria Jun 25, 2024
e321845
Robust and faster handling of AssemblyQualifiedNames in MessageMetada…
danielmarbach Jun 26, 2024
9e21a28
Add specific meters for recoverability actions with message type and …
SzymonPobiega Jun 26, 2024
3a7c74c
AssemblyScanner doesn't scan message assemblies that reference Messag…
danielmarbach Jul 1, 2024
22b4103
Add more restrictive where clause to type forwarding test (#7090)
danielmarbach Jul 1, 2024
0144baf
Register handling time metrics.
saratry May 30, 2024
c87fbb8
The API has been updated, and accordingly the approved txt. This impl…
saratry May 31, 2024
2882157
Add missing documentation for return value.
saratry May 31, 2024
9c88d39
Newly introduced classes and interfaces should not be public.
saratry May 31, 2024
a52abb8
For tags, replace the complete Type descriptor with the FullName, tha…
saratry May 31, 2024
6822589
Exclusion of generated internal struct from the Namespace validation …
saratry May 31, 2024
63df3f8
Tests massage handling time metrics.
saratry Jun 3, 2024
c88b47e
Introduce new unit test for metrics' name validation.
saratry Jun 3, 2024
2611b84
Fix metrics unit test.
saratry Jun 3, 2024
8b26cb4
Tests improvements.
saratry Jun 4, 2024
a7a886f
Add discriminator Tag to handling metrics.
saratry Jun 4, 2024
b8bfc04
Refactor the code to avoid auto-generated structure.
saratry Jun 5, 2024
ab733a6
Tests improvement.
saratry Jun 5, 2024
e38cfb5
Fix typo.
saratry Jun 5, 2024
2169aa9
Make tags field readonly.
saratry Jun 5, 2024
51de44d
Fix note description.
saratry Jun 5, 2024
2ddf602
Use var instead of explicit type.
saratry Jun 5, 2024
ffc739b
Rename HandlingMetrics... types to MessageHandlingMetrics... to be mo…
saratry Jun 5, 2024
19f5c8b
Rename HandlingMetrics... types to MessageHandlingMetrics... to be mo…
saratry Jun 5, 2024
a19728f
Make the TagList not readonly to allow to add the FailureType tag.
saratry Jun 5, 2024
1d3b2a0
Rename HandlingTime to MessageHandlerTime for clarity.
saratry Jun 6, 2024
b1b4e49
API test improvement to check the metric name and type mapping.
saratry Jun 6, 2024
648c5ed
Rename the static method that provides the common metric tags for cla…
saratry Jun 6, 2024
e914775
Test improvement to remove the field name from API verification.
saratry Jun 6, 2024
f2cedb8
Validate also the metric unit since they are part of the API.
saratry Jun 7, 2024
950d202
Use seconds instead of milliseconds to record message handling time (…
mauroservienti Jun 8, 2024
6b7914c
Pull request remarks.
saratry Jun 11, 2024
b26f0f6
Add missing usings
mauroservienti Jun 20, 2024
52b68f1
Use collection expressions
mauroservienti Jun 20, 2024
b898467
Revert "Use collection expressions"
mauroservienti Jun 20, 2024
d93a2b8
Prefer being explicit. Do not use a common way for creating a TagList…
mauroservienti Jun 20, 2024
e682bdf
add comment
mauroservienti Jun 21, 2024
acd155f
merge tags
mauroservienti Jun 21, 2024
2f23044
Introduce a static NoOpMessageHandlingMetrics instance
mauroservienti Jun 21, 2024
00588c8
Add the new tag to the approved tags list
mauroservienti Jun 25, 2024
9819a40
Do not use a factory
mauroservienti Jun 25, 2024
75c9c47
Propagate metric tags settings throughout the pipeline
mauroservienti Jun 25, 2024
6d8a041
Register handling time metrics.
saratry May 30, 2024
cefabcf
Refactor the code to avoid auto-generated structure.
saratry Jun 5, 2024
5dfb11d
Rename the static method that provides the common metric tags for cla…
saratry Jun 6, 2024
dbaa300
Pull request remarks.
saratry Jun 11, 2024
e3cd8e5
Tests massage handling time metrics.
saratry Jun 3, 2024
a692796
Introduce critical time metrics.
saratry Jun 6, 2024
0fb6a59
Fixes after merge.
saratry Jun 10, 2024
ed80e4b
Make CancellationToken optional.
saratry Jun 11, 2024
a283297
refer being explicit. Do not use a common way for creating a TagList …
mauroservienti Jun 20, 2024
48d3d5a
Missing using statement
mauroservienti Jun 20, 2024
5db3bf1
Fix Approved API
mauroservienti Jun 20, 2024
5c3e6af
Fix failing test
mauroservienti Jun 20, 2024
dfb6ab6
Fix test
mauroservienti Jun 20, 2024
3b7e41c
Use seconds as specified in the meter setup
mauroservienti Jun 20, 2024
65bcafd
Update src/NServiceBus.Core/OpenTelemetry/Metrics/MetricsExtensions.cs
mauroservienti Jun 21, 2024
dfa4d1a
Fix formatting...
mauroservienti Jun 21, 2024
5c650c5
Update code after rebase
mauroservienti Jun 25, 2024
5829bdf
Use the new IncomingPipelineMetricTags
mauroservienti Jun 25, 2024
5e02706
Add a test ensuring critical time is not recorded on failures
mauroservienti Jun 25, 2024
d0d984e
Use the IMeterFactory approach
mauroservienti Jun 25, 2024
3c33b53
Use a provider style approach and get rid of the RecordMessageHandlin…
mauroservienti Jun 26, 2024
2a0edfa
Another TODO/comment
mauroservienti Jun 26, 2024
ed402d3
Make PipelineMetrics a first class citizen in Core and follow a divid…
mauroservienti Jun 27, 2024
385c60a
Remove TODOs solved by implementing a test IMeterFactory
mauroservienti Jun 27, 2024
814e896
Attempt to work around the ugly need to pass in queue name and discri…
mauroservienti Jun 27, 2024
e4e6339
Make PipelineMetrics depend on IReadOnlySettings to access queue and …
mauroservienti Jun 27, 2024
218cc09
Make metrics specific to the incoming pipeline
mauroservienti Jun 27, 2024
2caf40b
Fix rebase
SzymonPobiega Jul 3, 2024
5c21be4
Fix the acceptance tests
SzymonPobiega Jul 4, 2024
e5b94d6
Define IIncomingPipelineMetrics interface to allow the usage of NoOp …
saratry Jul 4, 2024
ee5a050
Fix MainPipelineExecutorTests using No Op implementation for incoming…
saratry Jul 4, 2024
764481f
Change NoOpIncomingPipelineMetrics namespace to NServiceBus
saratry Jul 4, 2024
c63cdfb
Mark IncomingPipelineMetrics as a required Singleton component in DI.
saratry Jul 5, 2024
5b011a1
Simplify the constructor of InvokeHandlerTerminator by replacing the …
saratry Jul 5, 2024
aa4230b
Fix existing tests.
saratry Jul 5, 2024
abfa21f
Fix the build
SzymonPobiega Jul 6, 2024
81ede72
Fix inspection
SzymonPobiega Jul 7, 2024
e3e158b
Fix inspection
SzymonPobiega Jul 7, 2024
c73f9e0
Replace failure type tag with error type tag.
saratry Jul 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ NServiceBus.Pipeline.LogicalMessageFactory - Singleton
NServiceBus.Settings.IReadOnlySettings - Singleton
NServiceBus.Transport.ISubscriptionManager - Singleton
----------- Private registrations used by Core-----------
NServiceBus.IncomingPipelineMetrics - Singleton
NServiceBus.InferredMessageTypeEnricherBehavior - Transient
NServiceBus.SubscriptionReceiverBehavior - Transient
NServiceBus.SubscriptionRouter - Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,21 @@ public TestingMetricListener(string sourceName)
ReportedMeters.AddOrUpdate(instrument.Name, measurement, (_, val) => val + measurement);
Tags.AddOrUpdate(instrument.Name, _ => tags, (_, _) => tags);
});
meterListener.SetMeasurementEventCallback((Instrument instrument,
double measurement,
ReadOnlySpan<KeyValuePair<string, object>> t,
object _) =>
{
TestContext.WriteLine($"{instrument.Meter.Name}\\{instrument.Name}:{measurement}");
var tags = t.ToArray();
ReportedMeters.AddOrUpdate(instrument.Name, 1, (_, val) => val + 1);
Tags.AddOrUpdate(instrument.Name, _ => tags, (_, _) => tags);
});
meterListener.Start();
}

public static TestingMetricListener SetupNServiceBusMetricsListener() =>
SetupMetricsListener("NServiceBus.Core");
SetupMetricsListener("NServiceBus.Core.Pipeline.Incoming");

public static TestingMetricListener SetupMetricsListener(string sourceName)
{
Expand Down Expand Up @@ -82,4 +92,13 @@ public object AssertTagKeyExists(string metricName, string tagKey)

return meterTag.Value;
}

public void AssertTags(string metricName, Dictionary<string, object> expectedTags)
{
foreach (var kvp in expectedTags)
{
var actualTagValue = AssertTagKeyExists(metricName, kvp.Key);
Assert.AreEqual(kvp.Value, actualTagValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Metrics;

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
Expand All @@ -15,7 +16,7 @@ public async Task Should_report_successful_message_metric()
using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener();

_ = await Scenario.Define<Context>()
.WithEndpoint<EndpointWithMetrics>(b => b
.WithEndpoint<EndpointWithMetrics>(b => b.CustomConfig(x => x.MakeInstanceUniquelyAddressable("disc"))
.When(async (session, ctx) =>
{
for (var x = 0; x < 5; x++)
Expand All @@ -30,13 +31,20 @@ public async Task Should_report_successful_message_metric()
metricsListener.AssertMetric("nservicebus.messaging.fetches", 5);
metricsListener.AssertMetric("nservicebus.messaging.failures", 0);

var successEndpoint = metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.queue");
var successType = metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.message_type");
var fetchedEndpoint = metricsListener.AssertTagKeyExists("nservicebus.messaging.fetches", "nservicebus.queue");
metricsListener.AssertTags("nservicebus.messaging.fetches",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)),
["nservicebus.discriminator"] = "disc",
});

Assert.AreEqual(Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), successEndpoint);
Assert.AreEqual(Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), fetchedEndpoint);
Assert.AreEqual(typeof(OutgoingMessage).FullName, successType);
metricsListener.AssertTags("nservicebus.messaging.successes",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)),
["nservicebus.discriminator"] = "disc",
["nservicebus.message_type"] = typeof(OutgoingMessage).FullName,
});
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Metrics;

using System.Collections.Generic;
using System.Threading.Tasks;
using AcceptanceTesting;
using NUnit.Framework;
using AcceptanceTesting.Customization;

public class When_message_processing_fails : OpenTelemetryAcceptanceTest
{
Expand All @@ -13,13 +15,22 @@ public async Task Should_report_failing_message_metrics()
_ = await Scenario.Define<Context>()
.WithEndpoint<FailingEndpoint>(e => e
.DoNotFailOnErrorMessages()
.CustomConfig(x => x.MakeInstanceUniquelyAddressable("disc"))
.When(s => s.SendLocal(new FailingMessage())))
.Done(c => c.HandlerInvoked)
.Run();

metricsListener.AssertMetric("nservicebus.messaging.fetches", 1);
metricsListener.AssertMetric("nservicebus.messaging.failures", 1);
metricsListener.AssertMetric("nservicebus.messaging.successes", 0);

metricsListener.AssertTags("nservicebus.messaging.failures",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(FailingEndpoint)),
["nservicebus.discriminator"] = "disc",
["error.type"] = typeof(SimulatedException).FullName,
});
}

class Context : ScenarioContext
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Metrics;

using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using NUnit.Framework;

public class When_retrying_messages : OpenTelemetryAcceptanceTest
{
[Test]
public async Task Should_increment_immediate_meter()
{
using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener();

await Scenario.Define<Context>()
.WithEndpoint<RetryingEndpoint>(e => e
.CustomConfig(c => c.Recoverability().Immediate(i => i.NumberOfRetries(1)))
.DoNotFailOnErrorMessages()
.When(s => s.SendLocal(new FailingMessage())))
.Done(c => c.InvocationCounter == 2)
.Run();

metricsListener.AssertMetric("nservicebus.recoverability.immediate", 1);
metricsListener.AssertMetric("nservicebus.recoverability.delayed", 0);
metricsListener.AssertMetric("nservicebus.recoverability.error", 0);
}

[Test]
public async Task Should_increment_delayed_meter()
{
//Requires.DelayedDelivery();

using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener();

await Scenario.Define<Context>()
.WithEndpoint<RetryingEndpoint>(e => e
.CustomConfig(c =>
{
c.Recoverability().Immediate(i => i.NumberOfRetries(0));
c.Recoverability().Delayed(i => i.NumberOfRetries(1).TimeIncrease(TimeSpan.FromMilliseconds(1)));
})
.DoNotFailOnErrorMessages()
.When(s => s.SendLocal(new FailingMessage())))
.Done(c => c.InvocationCounter == 2)
.Run();

metricsListener.AssertMetric("nservicebus.recoverability.immediate", 0);
metricsListener.AssertMetric("nservicebus.recoverability.delayed", 1);
metricsListener.AssertMetric("nservicebus.recoverability.error", 0);
}

[Test]
public async Task Should_increment_error_meter()
{
using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener();

await Scenario.Define<Context>()
.WithEndpoint<RetryingEndpoint>(e => e
.CustomConfig(c =>
{
c.Recoverability().Immediate(i => i.NumberOfRetries(0));
c.Recoverability().Delayed(i => i.NumberOfRetries(0));
})
.DoNotFailOnErrorMessages()
.When(s => s.SendLocal(new FailingMessage())))
.Done(c => c.FailedMessages.Count == 1)
.Run();

metricsListener.AssertMetric("nservicebus.recoverability.immediate", 0);
metricsListener.AssertMetric("nservicebus.recoverability.delayed", 0);
metricsListener.AssertMetric("nservicebus.recoverability.error", 1);
}

class Context : ScenarioContext
{
public int InvocationCounter { get; set; }
}

class RetryingEndpoint : EndpointConfigurationBuilder
{
public RetryingEndpoint()
{
var template = new OpenTelemetryEnabledEndpoint
{
TransportConfiguration = new ConfigureEndpointAcceptanceTestingTransport(false, true)
};
EndpointSetup(template, (endpointConfiguration, descriptor) => { });
}

class Handler : IHandleMessages<FailingMessage>
{
Context testContext;

public Handler(Context testContext)
{
this.testContext = testContext;
}

public Task Handle(FailingMessage message, IMessageHandlerContext context)
{
testContext.InvocationCounter++;

if (testContext.InvocationCounter == 1)
{
throw new SimulatedException("first attempt fails");
}

return Task.CompletedTask;
}
}
}

public class FailingMessage : IMessage
{
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Traces;

using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Customization;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using NUnit.Framework;

public class When_publishing_messages : OpenTelemetryAcceptanceTest
Expand Down Expand Up @@ -36,15 +37,89 @@ public async Task Should_create_outgoing_event_span()
Assert.IsNull(publishedMessage.ParentId, "publishes without ambient span should start a new trace");

var sentMessageTags = publishedMessage.Tags.ToImmutableDictionary();
sentMessageTags.VerifyTag("nservicebus.message_id", context.SentMessageId);
sentMessageTags.VerifyTag("nservicebus.message_id", context.PublishedMessageId);

Assert.IsNotNull(context.TraceParentHeader, "tracing header should be set on the published event");
}

[Test]
public async Task Should_create_child_on_receive_when_requested_via_options()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<Publisher>(b => b
.When(ctx => ctx.SomeEventSubscribed, s =>
{
var publishOptions = new PublishOptions();
publishOptions.ContinueExistingTraceOnReceive();
return s.Publish(new ThisIsAnEvent(), publishOptions);
}))
.WithEndpoint<Subscriber>(b => b.When((session, ctx) =>
{
if (ctx.HasNativePubSubSupport)
{
ctx.SomeEventSubscribed = true;
}

return Task.CompletedTask;
}))
.Done(c => c.OutgoingEventReceived)
.Run();

var publishMessageActivities = NServicebusActivityListener.CompletedActivities.GetPublishEventActivities();
var receiveMessageActivities = NServicebusActivityListener.CompletedActivities.GetReceiveMessageActivities();
Assert.AreEqual(1, publishMessageActivities.Count, "1 message is published as part of this test");
Assert.AreEqual(1, receiveMessageActivities.Count, "1 message is received as part of this test");

var publishRequest = publishMessageActivities[0];
var receiveRequest = receiveMessageActivities[0];

Assert.AreEqual(publishRequest.RootId, receiveRequest.RootId, "publish and receive operations are part the same root activity");
Assert.IsNotNull(receiveRequest.ParentId, "incoming message does have a parent");

CollectionAssert.IsEmpty(receiveRequest.Links, "receive does not have links");
}

[Test]
public async Task Should_create_new_linked_trace_on_receive_by_default()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<Publisher>(b => b
.When(ctx => ctx.SomeEventSubscribed, s =>
{
return s.Publish(new ThisIsAnEvent());
}))
.WithEndpoint<Subscriber>(b => b.When((session, ctx) =>
{
if (ctx.HasNativePubSubSupport)
{
ctx.SomeEventSubscribed = true;
}

return Task.CompletedTask;
}))
.Done(c => c.OutgoingEventReceived)
.Run();

var publishMessageActivities = NServicebusActivityListener.CompletedActivities.GetPublishEventActivities();
var receiveMessageActivities = NServicebusActivityListener.CompletedActivities.GetReceiveMessageActivities();
Assert.AreEqual(1, publishMessageActivities.Count, "1 message is published as part of this test");
Assert.AreEqual(1, receiveMessageActivities.Count, "1 message is received as part of this test");

var publishRequest = publishMessageActivities[0];
var receiveRequest = receiveMessageActivities[0];

Assert.AreNotEqual(publishRequest.RootId, receiveRequest.RootId, "publish and receive operations are part of different root activities");
Assert.IsNull(receiveRequest.ParentId, "incoming message does not have a parent, it's a root");

ActivityLink link = receiveRequest.Links.FirstOrDefault();
Assert.IsNotNull(link, "Receive has a link");
Assert.AreEqual(publishRequest.TraceId, link.Context.TraceId, "receive is linked to publish operation");
}

public class Context : ScenarioContext
{
public bool OutgoingEventReceived { get; set; }
public string SentMessageId { get; set; }
public string PublishedMessageId { get; set; }
public string TraceParentHeader { get; set; }
public bool SomeEventSubscribed { get; set; }
}
Expand All @@ -71,33 +146,33 @@ public class Subscriber : EndpointConfigurationBuilder
{
public Subscriber() =>
EndpointSetup<OpenTelemetryEnabledEndpoint>(c =>
{
},
{
},
metadata =>
{
metadata.RegisterPublisherFor<ThisIsAnEvent>(typeof(Publisher));
});

public class ThisHandlesSomethingHandler : IHandleMessages<ThisIsAnEvent>
{
public ThisHandlesSomethingHandler(Context testContext)
public ThisHandlesSomethingHandler(Context testPublishContext)
{
this.testContext = testContext;
this.testPublishContext = testPublishContext;
}

public Task Handle(ThisIsAnEvent @event, IMessageHandlerContext context)
{
if (context.MessageHeaders.TryGetValue(Headers.DiagnosticsTraceParent, out var traceParentHeader))
{
testContext.TraceParentHeader = traceParentHeader;
testPublishContext.TraceParentHeader = traceParentHeader;
}

testContext.SentMessageId = context.MessageId;
testContext.OutgoingEventReceived = true;
testPublishContext.PublishedMessageId = context.MessageId;
testPublishContext.OutgoingEventReceived = true;
return Task.CompletedTask;
}

Context testContext;
Context testPublishContext;
}
}

Expand Down
Loading