From 98a22d4d7c87c72c8116a9b1a649076bfa62de0e Mon Sep 17 00:00:00 2001 From: Manish Yadav Date: Thu, 7 Jul 2022 11:08:50 +0530 Subject: [PATCH 1/7] Added distributed tracing instrumentation --- .../Trigger/RabbitMQListener.cs | 34 +++++++++++++++++++ .../WebJobs.Extensions.RabbitMQ.csproj | 1 + 2 files changed, 35 insertions(+) diff --git a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs index 1162f31..aaad665 100644 --- a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs +++ b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs @@ -6,6 +6,7 @@ using System.Diagnostics; using System.Globalization; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.WebJobs.Host.Executors; @@ -20,6 +21,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ { internal sealed class RabbitMQListener : IListener, IScaleMonitor { +#pragma warning disable SA1000 + private static readonly ActivitySource Source = new("Microsoft.Azure.WebJobs.Extensions.RabbitMQ"); +#pragma warning restore SA1000 private readonly ITriggeredFunctionExecutor executor; private readonly string queueName; private readonly ushort prefetchCount; @@ -82,6 +86,8 @@ public Task StartAsync(CancellationToken cancellationToken) this.consumer.Received += async (model, ea) => { + Activity activity = StartActivity(ea); // create activity object + var input = new TriggeredFunctionData() { TriggerValue = ea }; FunctionResult result = await this.executor.TryExecuteAsync(input, cancellationToken).ConfigureAwait(false); @@ -100,6 +106,8 @@ public Task StartAsync(CancellationToken cancellationToken) this.RepublishMessages(ea); } } + + activity?.Stop(); }; this.consumerTag = this.rabbitMQModel.BasicConsume(queue: this.queueName, autoAck: false, consumer: this.consumer); @@ -152,6 +160,32 @@ public ScaleStatus GetScaleStatus(ScaleStatusContext con return this.GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray()); } + internal static Activity StartActivity(BasicDeliverEventArgs ea) + { + Activity activity; + if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("traceparent")) + { + // check if traceId present in header + byte[] traceParentIdInBytes = ea.BasicProperties.Headers["traceparent"] as byte[]; + string traceparentId = Encoding.Default.GetString(traceParentIdInBytes); + activity = Source.StartActivity("RabbitMQ.function.trigger", ActivityKind.Consumer, traceparentId); + } + else + { + // create new traceId, StartActivity will create and start new activity + activity = Source.StartActivity("RabbitMQ.function.trigger", ActivityKind.Server); + if (ea.BasicProperties.Headers == null) + { + ea.BasicProperties.Headers = new Dictionary(); + } + + byte[] traceParentIdInBytes = Encoding.Default.GetBytes(activity?.Id); + ea.BasicProperties.Headers["traceparent"] = traceParentIdInBytes; // add trace-id to header + } + + return activity; + } + internal void CreateHeadersAndRepublish(BasicDeliverEventArgs ea) { if (ea.BasicProperties.Headers == null) diff --git a/extension/WebJobs.Extensions.RabbitMQ/WebJobs.Extensions.RabbitMQ.csproj b/extension/WebJobs.Extensions.RabbitMQ/WebJobs.Extensions.RabbitMQ.csproj index 1cd4b74..74fed4c 100644 --- a/extension/WebJobs.Extensions.RabbitMQ/WebJobs.Extensions.RabbitMQ.csproj +++ b/extension/WebJobs.Extensions.RabbitMQ/WebJobs.Extensions.RabbitMQ.csproj @@ -58,6 +58,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all + From fd74b7dcfc55a8f9dac7ad89f9656ba923e2a016 Mon Sep 17 00:00:00 2001 From: Manish Yadav Date: Thu, 7 Jul 2022 15:00:36 +0530 Subject: [PATCH 2/7] changes based on code review feedback --- .../Trigger/RabbitMQListener.cs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs index aaad665..b597638 100644 --- a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs +++ b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs @@ -22,7 +22,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ internal sealed class RabbitMQListener : IListener, IScaleMonitor { #pragma warning disable SA1000 - private static readonly ActivitySource Source = new("Microsoft.Azure.WebJobs.Extensions.RabbitMQ"); + private static readonly ActivitySource ActivitySource = new("Microsoft.Azure.WebJobs.Extensions.RabbitMQ"); #pragma warning restore SA1000 private readonly ITriggeredFunctionExecutor executor; private readonly string queueName; @@ -86,7 +86,7 @@ public Task StartAsync(CancellationToken cancellationToken) this.consumer.Received += async (model, ea) => { - Activity activity = StartActivity(ea); // create activity object + using Activity activity = StartActivity(ea); var input = new TriggeredFunctionData() { TriggerValue = ea }; FunctionResult result = await this.executor.TryExecuteAsync(input, cancellationToken).ConfigureAwait(false); @@ -106,8 +106,6 @@ public Task StartAsync(CancellationToken cancellationToken) this.RepublishMessages(ea); } } - - activity?.Stop(); }; this.consumerTag = this.rabbitMQModel.BasicConsume(queue: this.queueName, autoAck: false, consumer: this.consumer); @@ -165,22 +163,20 @@ internal static Activity StartActivity(BasicDeliverEventArgs ea) Activity activity; if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("traceparent")) { - // check if traceId present in header byte[] traceParentIdInBytes = ea.BasicProperties.Headers["traceparent"] as byte[]; string traceparentId = Encoding.Default.GetString(traceParentIdInBytes); - activity = Source.StartActivity("RabbitMQ.function.trigger", ActivityKind.Consumer, traceparentId); + activity = ActivitySource.StartActivity("Trigger", ActivityKind.Consumer, traceparentId); } else { - // create new traceId, StartActivity will create and start new activity - activity = Source.StartActivity("RabbitMQ.function.trigger", ActivityKind.Server); + activity = ActivitySource.StartActivity("Trigger", ActivityKind.Server); if (ea.BasicProperties.Headers == null) { ea.BasicProperties.Headers = new Dictionary(); } - byte[] traceParentIdInBytes = Encoding.Default.GetBytes(activity?.Id); - ea.BasicProperties.Headers["traceparent"] = traceParentIdInBytes; // add trace-id to header + byte[] traceParentIdInBytes = Encoding.Default.GetBytes(activity.Id); + ea.BasicProperties.Headers["traceparent"] = traceParentIdInBytes; } return activity; From 08669272be2910a73b65c2c0777a389c0749a4f9 Mon Sep 17 00:00:00 2001 From: t-manyadav <106390596+t-manyadav@users.noreply.github.com> Date: Thu, 7 Jul 2022 23:16:49 +0530 Subject: [PATCH 3/7] Update extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs Co-authored-by: Jatin Sanghvi <20547963+JatinSanghvi@users.noreply.github.com> --- .../WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs index b597638..74f4dde 100644 --- a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs +++ b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs @@ -170,7 +170,7 @@ internal static Activity StartActivity(BasicDeliverEventArgs ea) else { activity = ActivitySource.StartActivity("Trigger", ActivityKind.Server); - if (ea.BasicProperties.Headers == null) +ea.BasicProperties.Headers ??= new Dictionary(); { ea.BasicProperties.Headers = new Dictionary(); } From 194e0805b363b9e45a1d64b36f245c09a42aaa3d Mon Sep 17 00:00:00 2001 From: t-manyadav <106390596+t-manyadav@users.noreply.github.com> Date: Thu, 7 Jul 2022 23:33:56 +0530 Subject: [PATCH 4/7] Update RabbitMQListener.cs --- .../WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs index 74f4dde..4cfdc88 100644 --- a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs +++ b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs @@ -171,10 +171,6 @@ internal static Activity StartActivity(BasicDeliverEventArgs ea) { activity = ActivitySource.StartActivity("Trigger", ActivityKind.Server); ea.BasicProperties.Headers ??= new Dictionary(); - { - ea.BasicProperties.Headers = new Dictionary(); - } - byte[] traceParentIdInBytes = Encoding.Default.GetBytes(activity.Id); ea.BasicProperties.Headers["traceparent"] = traceParentIdInBytes; } From a61b35039caf18270e41309e2f4827384702b793 Mon Sep 17 00:00:00 2001 From: Manish Yadav Date: Thu, 7 Jul 2022 23:24:15 +0530 Subject: [PATCH 5/7] Update extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs --- .../WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs index 4cfdc88..7bf4082 100644 --- a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs +++ b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs @@ -170,7 +170,7 @@ internal static Activity StartActivity(BasicDeliverEventArgs ea) else { activity = ActivitySource.StartActivity("Trigger", ActivityKind.Server); -ea.BasicProperties.Headers ??= new Dictionary(); + ea.BasicProperties.Headers ??= new Dictionary(); byte[] traceParentIdInBytes = Encoding.Default.GetBytes(activity.Id); ea.BasicProperties.Headers["traceparent"] = traceParentIdInBytes; } From 93bf7b2f81da5886e3982c273c47271e20045e07 Mon Sep 17 00:00:00 2001 From: Manish Yadav Date: Fri, 8 Jul 2022 12:37:43 +0530 Subject: [PATCH 6/7] Update RabbitMQListener.cs based on review feedback --- .../WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs index 7bf4082..45917fd 100644 --- a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs +++ b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs @@ -164,14 +164,14 @@ internal static Activity StartActivity(BasicDeliverEventArgs ea) if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("traceparent")) { byte[] traceParentIdInBytes = ea.BasicProperties.Headers["traceparent"] as byte[]; - string traceparentId = Encoding.Default.GetString(traceParentIdInBytes); + string traceparentId = Encoding.UTF8.GetString(traceParentIdInBytes); activity = ActivitySource.StartActivity("Trigger", ActivityKind.Consumer, traceparentId); } else { activity = ActivitySource.StartActivity("Trigger", ActivityKind.Server); ea.BasicProperties.Headers ??= new Dictionary(); - byte[] traceParentIdInBytes = Encoding.Default.GetBytes(activity.Id); + byte[] traceParentIdInBytes = Encoding.UTF8.GetBytes(activity.Id); ea.BasicProperties.Headers["traceparent"] = traceParentIdInBytes; } From 0e54eed743d8fa5d6653969be03c19f949e3d265 Mon Sep 17 00:00:00 2001 From: Manish Yadav Date: Fri, 8 Jul 2022 16:44:09 +0530 Subject: [PATCH 7/7] Changed ActivityKind --- .../WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs index 45917fd..211666d 100644 --- a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs +++ b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs @@ -169,7 +169,7 @@ internal static Activity StartActivity(BasicDeliverEventArgs ea) } else { - activity = ActivitySource.StartActivity("Trigger", ActivityKind.Server); + activity = ActivitySource.StartActivity("Trigger", ActivityKind.Consumer); ea.BasicProperties.Headers ??= new Dictionary(); byte[] traceParentIdInBytes = Encoding.UTF8.GetBytes(activity.Id); ea.BasicProperties.Headers["traceparent"] = traceParentIdInBytes;