From 48a696815b254631754e7e5a3ea8fd5873156903 Mon Sep 17 00:00:00 2001 From: Fernando de Oliveira <5161098+fedeoliv@users.noreply.github.com> Date: Fri, 10 Jun 2022 10:41:05 -0300 Subject: [PATCH 01/12] feature: Producer activities with tags targeting OpenTelemetry messaging attributes --- src/Confluent.Kafka/Confluent.Kafka.csproj | 5 +- src/Confluent.Kafka/Diagnostics.cs | 78 ++++++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) mode change 100755 => 100644 src/Confluent.Kafka/Confluent.Kafka.csproj create mode 100644 src/Confluent.Kafka/Diagnostics.cs diff --git a/src/Confluent.Kafka/Confluent.Kafka.csproj b/src/Confluent.Kafka/Confluent.Kafka.csproj old mode 100755 new mode 100644 index 1cc4ae90d..353510345 --- a/src/Confluent.Kafka/Confluent.Kafka.csproj +++ b/src/Confluent.Kafka/Confluent.Kafka.csproj @@ -13,7 +13,7 @@ Confluent.Kafka Confluent.Kafka 1.8.2 - net462;net5.0;netstandard1.3;netstandard2.0 + net462;net6.0;netstandard1.3;netstandard2.0 true true true @@ -24,7 +24,8 @@ None - + + diff --git a/src/Confluent.Kafka/Diagnostics.cs b/src/Confluent.Kafka/Diagnostics.cs new file mode 100644 index 000000000..c1c99bbb8 --- /dev/null +++ b/src/Confluent.Kafka/Diagnostics.cs @@ -0,0 +1,78 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Diagnostics; +using System.Runtime.InteropServices; + +namespace Confluent.Kafka +{ + internal static class Diagnostics + { + public const string ActivitySourceName = "Confluent.Kafka.Activity"; + public static ActivitySource ActivitySource { get; } = new ActivitySource(ActivitySourceName); + + public static class Producer + { + public const string ActivityName = ActivitySourceName + ".MessageProduced"; + + public static Activity Start (string topic, Message message) + { + Activity activity = ActivitySource.StartActivity(ActivityName); + + if (activity == null) + return null; + + using (activity) + { + activity?.AddDefaultOpenTelemetryTags(topic, message); + } + + return activity; + } + + public static Activity Start(TopicPartition topicPartition, Message message) + { + Activity activity = ActivitySource.StartActivity(ActivityName); + + if (activity == null) + return null; + + using (activity) + { + activity?.AddDefaultOpenTelemetryTags(topicPartition.Topic, message); + activity?.AddTag("messaging.kafka.partition", topicPartition.Partition.Value); + } + + return activity; + } + } + + private static Activity AddDefaultOpenTelemetryTags(this Activity activity, string topic, Message message) + { + activity?.AddTag("messaging.system", "kafka"); + activity?.AddTag("messaging.destination", topic); + activity?.AddTag("messaging.destination_kind", "topic"); + + if (message.Key != null) + activity?.AddTag("messaging.kafka.message_key", message.Key); + + if (message.Value != null) + activity?.AddTag("messaging.message_payload_size_bytes", Marshal.SizeOf(message.Value)); + + return activity; + } + } +} From 8dcca151fdffbff2c4848019a6205e633879181f Mon Sep 17 00:00:00 2001 From: Fernando de Oliveira <5161098+fedeoliv@users.noreply.github.com> Date: Sat, 11 Jun 2022 18:35:05 -0300 Subject: [PATCH 02/12] refactor: unused activity method removed --- src/Confluent.Kafka/Diagnostics.cs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/Confluent.Kafka/Diagnostics.cs b/src/Confluent.Kafka/Diagnostics.cs index c1c99bbb8..f905eab52 100644 --- a/src/Confluent.Kafka/Diagnostics.cs +++ b/src/Confluent.Kafka/Diagnostics.cs @@ -42,22 +42,6 @@ public static class Producer return activity; } - - public static Activity Start(TopicPartition topicPartition, Message message) - { - Activity activity = ActivitySource.StartActivity(ActivityName); - - if (activity == null) - return null; - - using (activity) - { - activity?.AddDefaultOpenTelemetryTags(topicPartition.Topic, message); - activity?.AddTag("messaging.kafka.partition", topicPartition.Partition.Value); - } - - return activity; - } } private static Activity AddDefaultOpenTelemetryTags(this Activity activity, string topic, Message message) From af3cfa3c1a041f2a0c69dc6eb1599125d5156e41 Mon Sep 17 00:00:00 2001 From: Fernando de Oliveira <5161098+fedeoliv@users.noreply.github.com> Date: Sat, 11 Jun 2022 19:19:41 -0300 Subject: [PATCH 03/12] feature: OpenTelemetry activity added on Produce and ProduceAsync methods --- src/Confluent.Kafka/Diagnostics.cs | 15 ++++++++++----- src/Confluent.Kafka/Producer.cs | 19 ++++++++++++++++++- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/Confluent.Kafka/Diagnostics.cs b/src/Confluent.Kafka/Diagnostics.cs index f905eab52..7f7124b7c 100644 --- a/src/Confluent.Kafka/Diagnostics.cs +++ b/src/Confluent.Kafka/Diagnostics.cs @@ -21,14 +21,14 @@ namespace Confluent.Kafka { internal static class Diagnostics { - public const string ActivitySourceName = "Confluent.Kafka.Activity"; + public const string ActivitySourceName = "Confluent.Kafka"; public static ActivitySource ActivitySource { get; } = new ActivitySource(ActivitySourceName); public static class Producer { public const string ActivityName = ActivitySourceName + ".MessageProduced"; - public static Activity Start (string topic, Message message) + public static Activity Start(TopicPartition topicPartition, Message message) { Activity activity = ActivitySource.StartActivity(ActivityName); @@ -37,18 +37,23 @@ public static class Producer using (activity) { - activity?.AddDefaultOpenTelemetryTags(topic, message); + activity?.AddDefaultOpenTelemetryTags(topicPartition, message); + activity?.AddTag("messaging.kafka.partition", topicPartition.Partition.Value); } return activity; } } - private static Activity AddDefaultOpenTelemetryTags(this Activity activity, string topic, Message message) + private static Activity AddDefaultOpenTelemetryTags( + this Activity activity, + TopicPartition topicPartition, + Message message) { activity?.AddTag("messaging.system", "kafka"); - activity?.AddTag("messaging.destination", topic); + activity?.AddTag("messaging.destination", topicPartition.Topic); activity?.AddTag("messaging.destination_kind", "topic"); + activity?.AddTag("messaging.kafka.partition", topicPartition.Partition.Value); if (message.Key != null) activity?.AddTag("messaging.kafka.message_key", message.Key); diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 0fa5970a8..01644d3df 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -16,7 +16,10 @@ using System; using System.Collections.Generic; +using System.Diagnostics; +using System.IO; using System.Linq; +using System.Reflection; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -581,7 +584,9 @@ internal Producer(ProducerBuilder builder) this.DeliveryReportCallback = DeliveryReportCallbackImpl; - Librdkafka.Initialize(null); + string pathToLibrd = Path.Combine(Path.GetDirectoryName(Path.GetDirectoryName(Directory.GetCurrentDirectory())), "Debug\\net6.0\\librdkafka\\x64\\librdkafka.dll"); + + Librdkafka.Initialize(pathToLibrd); var modifiedConfig = Library.NameAndVersionConfig .Concat(config @@ -784,6 +789,8 @@ public async Task> ProduceAsync( ex); } + Activity activity = Diagnostics.Producer.Start(topicPartition, message); + try { if (enableDeliveryReports) @@ -836,6 +843,10 @@ public async Task> ProduceAsync( TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) }); } + finally + { + activity?.Stop(); + } } @@ -907,6 +918,8 @@ public void Produce( ex); } + Activity activity = Diagnostics.Producer.Start(topicPartition, message); + try { ProduceImpl( @@ -933,6 +946,10 @@ public void Produce( TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) }); } + finally + { + activity?.Stop(); + } } private class TypedTaskDeliveryHandlerShim : TaskCompletionSource>, IDeliveryHandler From e629b6d6c3965d5eaa2324de04a862291081d2e3 Mon Sep 17 00:00:00 2001 From: Fernando de Oliveira <5161098+fedeoliv@users.noreply.github.com> Date: Mon, 13 Jun 2022 11:26:26 -0300 Subject: [PATCH 04/12] fix: Kafka partition tag value converted to string --- src/Confluent.Kafka/Diagnostics.cs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/Confluent.Kafka/Diagnostics.cs b/src/Confluent.Kafka/Diagnostics.cs index 7f7124b7c..82e1fa2d1 100644 --- a/src/Confluent.Kafka/Diagnostics.cs +++ b/src/Confluent.Kafka/Diagnostics.cs @@ -21,7 +21,7 @@ namespace Confluent.Kafka { internal static class Diagnostics { - public const string ActivitySourceName = "Confluent.Kafka"; + private const string ActivitySourceName = "Confluent.Kafka"; public static ActivitySource ActivitySource { get; } = new ActivitySource(ActivitySourceName); public static class Producer @@ -38,7 +38,6 @@ public static Activity Start(TopicPartition topicPartition, Messag using (activity) { activity?.AddDefaultOpenTelemetryTags(topicPartition, message); - activity?.AddTag("messaging.kafka.partition", topicPartition.Partition.Value); } return activity; @@ -53,14 +52,11 @@ private static Activity AddDefaultOpenTelemetryTags( activity?.AddTag("messaging.system", "kafka"); activity?.AddTag("messaging.destination", topicPartition.Topic); activity?.AddTag("messaging.destination_kind", "topic"); - activity?.AddTag("messaging.kafka.partition", topicPartition.Partition.Value); + activity?.AddTag("messaging.kafka.partition", topicPartition.Partition.Value.ToString()); if (message.Key != null) activity?.AddTag("messaging.kafka.message_key", message.Key); - if (message.Value != null) - activity?.AddTag("messaging.message_payload_size_bytes", Marshal.SizeOf(message.Value)); - return activity; } } From e85c38f3092f003188b57ecec6878925ab561adf Mon Sep 17 00:00:00 2001 From: Fernando de Oliveira <5161098+fedeoliv@users.noreply.github.com> Date: Mon, 13 Jun 2022 17:00:11 -0300 Subject: [PATCH 05/12] test: Integration test for the producer instrumentation --- .../ActivityEventsRecorder.cs | 54 ++++++++++++++++ .../Tests/Producer_Produce.cs | 62 ++++++++++++++++++- 2 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 test/Confluent.Kafka.IntegrationTests/ActivityEventsRecorder.cs diff --git a/test/Confluent.Kafka.IntegrationTests/ActivityEventsRecorder.cs b/test/Confluent.Kafka.IntegrationTests/ActivityEventsRecorder.cs new file mode 100644 index 000000000..0c58db53d --- /dev/null +++ b/test/Confluent.Kafka.IntegrationTests/ActivityEventsRecorder.cs @@ -0,0 +1,54 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; + +namespace Confluent.Kafka.IntegrationTests +{ + internal class ActivityEventsRecorder + { + internal ConcurrentQueue>>> Events = new(); + private readonly string activityName; + + internal ActivityEventsRecorder(string activityName) + { + this.activityName = activityName; + } + + internal ActivityListener BuildActivityListener() + { + using var listener = new ActivityListener + { + ShouldListenTo = _ => true, + Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData, + ActivityStarted = activity => + { + if (activity.DisplayName == activityName) + Events.Enqueue(new KeyValuePair>>(activity.Id, activity.Tags)); + }, + ActivityStopped = activity => + { + if (activity.DisplayName == activityName) + Events.Enqueue(new KeyValuePair>>(activity.Id, activity.Tags)); + } + }; + + return listener; + } + } +} diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs index 035055c6e..84e56ba2d 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs @@ -17,6 +17,10 @@ #pragma warning disable xUnit1026 using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; using System.Text; using Xunit; @@ -28,6 +32,62 @@ namespace Confluent.Kafka.IntegrationTests /// public partial class Tests { + [Theory] + [MemberData(nameof(KafkaParameters))] + public void Producer_Produce_WithOpenTelemetryInstrumentation(string bootstrapServers) + { + LogToFile("start Producer_Produce_WithOpenTelemetryInstrumentation"); + + // Prepare the activity events listener + string activityName = "Confluent.Kafka.MessageProduced"; + ActivityEventsRecorder eventsRecorder = new(activityName); + ActivityListener listener = eventsRecorder.BuildActivityListener(); + ActivitySource.AddActivityListener(listener); + + // Produce messages + var producerConfig = new ProducerConfig + { + BootstrapServers = bootstrapServers, + EnableIdempotence = true, + LingerMs = 1.5 + }; + + using var producer = new ProducerBuilder(producerConfig).Build(); + + producer.Produce( + new TopicPartition(singlePartitionTopic, 0), + new Message { Key = "test key 0", Value = "test val 0" }); + + producer.Produce( + new TopicPartition(singlePartitionTopic, 0), + new Message { Key = "test key 1", Value = "test val 1" }); + + producer.Flush(TimeSpan.FromSeconds(10)); + + // Capture start/stop events + int actualStartEventsCount = eventsRecorder.Events.Count(x => x.Value.Any()); + int actualStopEventsCount = eventsRecorder.Events.Count(x => !x.Value.Any()); + + var startEventsTags = eventsRecorder.Events + .Select(activityEvent => activityEvent.Value) + .Where(eventTags => eventTags.Any()); + + // Check the number of start/stop events generated + Assert.Equal(4, eventsRecorder.Events.Count); + Assert.Equal(2, actualStartEventsCount); + Assert.Equal(2, actualStopEventsCount); + + // Check if default OpenTelemetry attributes were created on start events + foreach (IEnumerable> startEventTags in startEventsTags) + { + Assert.Contains(startEventTags, tag => tag.Key == "messaging.system" && tag.Value == "kafka"); + Assert.Contains(startEventTags, tag => tag.Key == "messaging.destination_kind" && tag.Value == "topic"); + Assert.Contains(startEventTags, tag => tag.Key == "messaging.destination" && tag.Value == singlePartitionTopic); + Assert.Contains(startEventTags, tag => tag.Key == "messaging.kafka.partition" && tag.Value == "0"); + Assert.Contains(startEventTags, tag => tag.Key == "messaging.kafka.message_key" && tag.Value.Contains("test key")); + } + } + [Theory, MemberData(nameof(KafkaParameters))] public void Producer_Produce(string bootstrapServers) { @@ -40,7 +100,6 @@ public void Producer_Produce(string bootstrapServers) LingerMs = 1.5 }; - // serializer case. int count = 0; @@ -73,7 +132,6 @@ public void Producer_Produce(string bootstrapServers) Assert.Equal(2, count); - // byte[] case. count = 0; From 0a71ede329cf7170e1fe6b4c860aa9678c1952d2 Mon Sep 17 00:00:00 2001 From: Fernando de Oliveira <5161098+fedeoliv@users.noreply.github.com> Date: Mon, 13 Jun 2022 18:00:56 -0300 Subject: [PATCH 06/12] refactor: OpenTelemetry messaging attributes as reusable static variables --- src/Confluent.Kafka/Diagnostics.cs | 11 ++-- src/Confluent.Kafka/OpenTelemetryMessaging.cs | 50 +++++++++++++++++++ .../Tests/Producer_Produce.cs | 10 ++-- 3 files changed, 60 insertions(+), 11 deletions(-) create mode 100644 src/Confluent.Kafka/OpenTelemetryMessaging.cs diff --git a/src/Confluent.Kafka/Diagnostics.cs b/src/Confluent.Kafka/Diagnostics.cs index 82e1fa2d1..3134032a0 100644 --- a/src/Confluent.Kafka/Diagnostics.cs +++ b/src/Confluent.Kafka/Diagnostics.cs @@ -15,7 +15,6 @@ // Refer to LICENSE for more information. using System.Diagnostics; -using System.Runtime.InteropServices; namespace Confluent.Kafka { @@ -49,13 +48,13 @@ private static Activity AddDefaultOpenTelemetryTags( TopicPartition topicPartition, Message message) { - activity?.AddTag("messaging.system", "kafka"); - activity?.AddTag("messaging.destination", topicPartition.Topic); - activity?.AddTag("messaging.destination_kind", "topic"); - activity?.AddTag("messaging.kafka.partition", topicPartition.Partition.Value.ToString()); + activity?.AddTag(OpenTelemetryMessaging.SYSTEM, "kafka"); + activity?.AddTag(OpenTelemetryMessaging.DESTINATION, topicPartition.Topic); + activity?.AddTag(OpenTelemetryMessaging.DESTINATION_KIND, "topic"); + activity?.AddTag(OpenTelemetryMessaging.KAFKA_PARTITION, topicPartition.Partition.Value.ToString()); if (message.Key != null) - activity?.AddTag("messaging.kafka.message_key", message.Key); + activity?.AddTag(OpenTelemetryMessaging.KAFKA_MESSAGE_KEY, message.Key); return activity; } diff --git a/src/Confluent.Kafka/OpenTelemetryMessaging.cs b/src/Confluent.Kafka/OpenTelemetryMessaging.cs new file mode 100644 index 000000000..be6bf7363 --- /dev/null +++ b/src/Confluent.Kafka/OpenTelemetryMessaging.cs @@ -0,0 +1,50 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +namespace Confluent.Kafka +{ + /// + /// Provides the OpenTelemetry messaging attributes. + /// The complete list of messaging attributes specification is available here: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#messaging-attributes + /// + public static class OpenTelemetryMessaging + { + /// + /// Message system. For Kafka, attribute value must be "kafka". + /// + public const string SYSTEM = "messaging.system"; + + /// + /// Message destination. For Kafka, attribute value must be a Kafka topic. + /// + public const string DESTINATION = "messaging.destination"; + + /// + /// Destination kind. For Kafka, attribute value must be "topic". + /// + public const string DESTINATION_KIND = "messaging.destination_kind"; + + /// + /// Kafka partition number. + /// + public const string KAFKA_PARTITION = "messaging.kafka.partition"; + + /// + /// Kafka message key. + /// + public const string KAFKA_MESSAGE_KEY = "messaging.kafka.message_key"; + } +} diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs index 84e56ba2d..c30a475e1 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs @@ -80,11 +80,11 @@ public void Producer_Produce_WithOpenTelemetryInstrumentation(string bootstrapSe // Check if default OpenTelemetry attributes were created on start events foreach (IEnumerable> startEventTags in startEventsTags) { - Assert.Contains(startEventTags, tag => tag.Key == "messaging.system" && tag.Value == "kafka"); - Assert.Contains(startEventTags, tag => tag.Key == "messaging.destination_kind" && tag.Value == "topic"); - Assert.Contains(startEventTags, tag => tag.Key == "messaging.destination" && tag.Value == singlePartitionTopic); - Assert.Contains(startEventTags, tag => tag.Key == "messaging.kafka.partition" && tag.Value == "0"); - Assert.Contains(startEventTags, tag => tag.Key == "messaging.kafka.message_key" && tag.Value.Contains("test key")); + Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.SYSTEM && tag.Value == "kafka"); + Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.DESTINATION_KIND && tag.Value == "topic"); + Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.DESTINATION && tag.Value == singlePartitionTopic); + Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.KAFKA_PARTITION && tag.Value == "0"); + Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.KAFKA_MESSAGE_KEY && tag.Value.Contains("test key")); } } From a70edf4c8dfe4b6f2c9092ff29a2d0d00b8c806c Mon Sep 17 00:00:00 2001 From: Fernando de Oliveira <5161098+fedeoliv@users.noreply.github.com> Date: Tue, 14 Jun 2022 09:47:46 -0300 Subject: [PATCH 07/12] docs: summary added to Diagnostics and ActivityEventsRecorder methods --- src/Confluent.Kafka/Diagnostics.cs | 12 +++++++++--- .../ActivityEventsRecorder.cs | 4 ++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/Confluent.Kafka/Diagnostics.cs b/src/Confluent.Kafka/Diagnostics.cs index 3134032a0..1e63c8f8b 100644 --- a/src/Confluent.Kafka/Diagnostics.cs +++ b/src/Confluent.Kafka/Diagnostics.cs @@ -18,16 +18,22 @@ namespace Confluent.Kafka { + /// + /// Implements Activity objects with OpenTelemetry messaging tags for instrumentation + /// internal static class Diagnostics { private const string ActivitySourceName = "Confluent.Kafka"; public static ActivitySource ActivitySource { get; } = new ActivitySource(ActivitySourceName); - public static class Producer + /// + /// Provides an Activity object for the Producer with OpenTelemetry messaging tags for instrumentation + /// + internal static class Producer { - public const string ActivityName = ActivitySourceName + ".MessageProduced"; + private const string ActivityName = ActivitySourceName + ".MessageProduced"; - public static Activity Start(TopicPartition topicPartition, Message message) + internal static Activity Start(TopicPartition topicPartition, Message message) { Activity activity = ActivitySource.StartActivity(ActivityName); diff --git a/test/Confluent.Kafka.IntegrationTests/ActivityEventsRecorder.cs b/test/Confluent.Kafka.IntegrationTests/ActivityEventsRecorder.cs index 0c58db53d..7f64c3a5a 100644 --- a/test/Confluent.Kafka.IntegrationTests/ActivityEventsRecorder.cs +++ b/test/Confluent.Kafka.IntegrationTests/ActivityEventsRecorder.cs @@ -30,6 +30,10 @@ internal ActivityEventsRecorder(string activityName) this.activityName = activityName; } + /// + /// Builds an ActivityListener with callbacks to store start and stop events to a concurrent queue. + /// + /// internal ActivityListener BuildActivityListener() { using var listener = new ActivityListener From 8a8fc71b2d899869e4a51008cbb4bdf3b82ed4c6 Mon Sep 17 00:00:00 2001 From: Fernando de Oliveira <5161098+fedeoliv@users.noreply.github.com> Date: Tue, 14 Jun 2022 10:23:40 -0300 Subject: [PATCH 08/12] feature: OpenTelemetry message payload size attribute added to Producer activity --- src/Confluent.Kafka/Diagnostics.cs | 4 ++++ src/Confluent.Kafka/OpenTelemetryMessaging.cs | 5 +++++ .../Tests/Producer_Produce.cs | 1 + 3 files changed, 10 insertions(+) diff --git a/src/Confluent.Kafka/Diagnostics.cs b/src/Confluent.Kafka/Diagnostics.cs index 1e63c8f8b..5f4f649b8 100644 --- a/src/Confluent.Kafka/Diagnostics.cs +++ b/src/Confluent.Kafka/Diagnostics.cs @@ -15,6 +15,7 @@ // Refer to LICENSE for more information. using System.Diagnostics; +using System.Text; namespace Confluent.Kafka { @@ -54,10 +55,13 @@ private static Activity AddDefaultOpenTelemetryTags( TopicPartition topicPartition, Message message) { + int messagePayloadBytes = Encoding.UTF8.GetByteCount(message.Value.ToString()); + activity?.AddTag(OpenTelemetryMessaging.SYSTEM, "kafka"); activity?.AddTag(OpenTelemetryMessaging.DESTINATION, topicPartition.Topic); activity?.AddTag(OpenTelemetryMessaging.DESTINATION_KIND, "topic"); activity?.AddTag(OpenTelemetryMessaging.KAFKA_PARTITION, topicPartition.Partition.Value.ToString()); + activity?.AddTag(OpenTelemetryMessaging.MESSAGE_PAYLOAD_SIZE_BYTES, messagePayloadBytes.ToString()); if (message.Key != null) activity?.AddTag(OpenTelemetryMessaging.KAFKA_MESSAGE_KEY, message.Key); diff --git a/src/Confluent.Kafka/OpenTelemetryMessaging.cs b/src/Confluent.Kafka/OpenTelemetryMessaging.cs index be6bf7363..2ffc15c37 100644 --- a/src/Confluent.Kafka/OpenTelemetryMessaging.cs +++ b/src/Confluent.Kafka/OpenTelemetryMessaging.cs @@ -46,5 +46,10 @@ public static class OpenTelemetryMessaging /// Kafka message key. /// public const string KAFKA_MESSAGE_KEY = "messaging.kafka.message_key"; + + /// + /// Kafka message payload size (bytes). + /// + public const string MESSAGE_PAYLOAD_SIZE_BYTES = "messaging.message_payload_size_bytes"; } } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs index c30a475e1..02a4cf266 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs @@ -85,6 +85,7 @@ public void Producer_Produce_WithOpenTelemetryInstrumentation(string bootstrapSe Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.DESTINATION && tag.Value == singlePartitionTopic); Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.KAFKA_PARTITION && tag.Value == "0"); Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.KAFKA_MESSAGE_KEY && tag.Value.Contains("test key")); + Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.MESSAGE_PAYLOAD_SIZE_BYTES && tag.Value == "10"); } } From 55e4745492a298968235726ee67b1aab662394a8 Mon Sep 17 00:00:00 2001 From: Fernando de Oliveira <5161098+fedeoliv@users.noreply.github.com> Date: Tue, 14 Jun 2022 10:28:15 -0300 Subject: [PATCH 09/12] fix: librdkafka path removed from library initialization --- src/Confluent.Kafka/Producer.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 01644d3df..3215b7037 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -584,9 +584,7 @@ internal Producer(ProducerBuilder builder) this.DeliveryReportCallback = DeliveryReportCallbackImpl; - string pathToLibrd = Path.Combine(Path.GetDirectoryName(Path.GetDirectoryName(Directory.GetCurrentDirectory())), "Debug\\net6.0\\librdkafka\\x64\\librdkafka.dll"); - - Librdkafka.Initialize(pathToLibrd); + Librdkafka.Initialize(null); var modifiedConfig = Library.NameAndVersionConfig .Concat(config From d34ba4e1778123e0a55574e6d08fbdc9e3228ace Mon Sep 17 00:00:00 2001 From: Fernando de Oliveira <5161098+fedeoliv@users.noreply.github.com> Date: Tue, 14 Jun 2022 10:38:51 -0300 Subject: [PATCH 10/12] fix: librdkafka dll inclusion on project binaries removed --- src/Confluent.Kafka/Confluent.Kafka.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Confluent.Kafka/Confluent.Kafka.csproj b/src/Confluent.Kafka/Confluent.Kafka.csproj index 353510345..449dd7dbe 100644 --- a/src/Confluent.Kafka/Confluent.Kafka.csproj +++ b/src/Confluent.Kafka/Confluent.Kafka.csproj @@ -13,7 +13,7 @@ Confluent.Kafka Confluent.Kafka 1.8.2 - net462;net6.0;netstandard1.3;netstandard2.0 + net462;net5.0;netstandard1.3;netstandard2.0 true true true From 001596d76e3037da308c0107d7e75a78c5ec14f4 Mon Sep 17 00:00:00 2001 From: Fernando de Oliveira <5161098+fedeoliv@users.noreply.github.com> Date: Tue, 14 Jun 2022 13:18:31 -0300 Subject: [PATCH 11/12] fix: System.Diagnostics.DiagnosticSource package upgrade to 5.0.1 --- .../Confluent.SchemaRegistry.Serdes.IntegrationTests.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Confluent.SchemaRegistry.Serdes.IntegrationTests.csproj b/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Confluent.SchemaRegistry.Serdes.IntegrationTests.csproj index 5acd623d0..0c77aad28 100644 --- a/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Confluent.SchemaRegistry.Serdes.IntegrationTests.csproj +++ b/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Confluent.SchemaRegistry.Serdes.IntegrationTests.csproj @@ -27,7 +27,7 @@ - + From 6fa9ed11beee192790f7f5a8a0dcb586a2371503 Mon Sep 17 00:00:00 2001 From: Fernando de Oliveira <5161098+fedeoliv@users.noreply.github.com> Date: Tue, 14 Jun 2022 13:25:44 -0300 Subject: [PATCH 12/12] fix: message value null validation on Producer activity --- src/Confluent.Kafka/Diagnostics.cs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Confluent.Kafka/Diagnostics.cs b/src/Confluent.Kafka/Diagnostics.cs index 5f4f649b8..081158beb 100644 --- a/src/Confluent.Kafka/Diagnostics.cs +++ b/src/Confluent.Kafka/Diagnostics.cs @@ -55,17 +55,20 @@ private static Activity AddDefaultOpenTelemetryTags( TopicPartition topicPartition, Message message) { - int messagePayloadBytes = Encoding.UTF8.GetByteCount(message.Value.ToString()); - activity?.AddTag(OpenTelemetryMessaging.SYSTEM, "kafka"); activity?.AddTag(OpenTelemetryMessaging.DESTINATION, topicPartition.Topic); activity?.AddTag(OpenTelemetryMessaging.DESTINATION_KIND, "topic"); activity?.AddTag(OpenTelemetryMessaging.KAFKA_PARTITION, topicPartition.Partition.Value.ToString()); - activity?.AddTag(OpenTelemetryMessaging.MESSAGE_PAYLOAD_SIZE_BYTES, messagePayloadBytes.ToString()); - + if (message.Key != null) activity?.AddTag(OpenTelemetryMessaging.KAFKA_MESSAGE_KEY, message.Key); + if (message.Value != null) + { + int messagePayloadBytes = Encoding.UTF8.GetByteCount(message.Value.ToString()); + activity?.AddTag(OpenTelemetryMessaging.MESSAGE_PAYLOAD_SIZE_BYTES, messagePayloadBytes.ToString()); + } + return activity; } }