diff --git a/src/Confluent.Kafka/Confluent.Kafka.csproj b/src/Confluent.Kafka/Confluent.Kafka.csproj
old mode 100755
new mode 100644
index 1cc4ae90d..449dd7dbe
--- a/src/Confluent.Kafka/Confluent.Kafka.csproj
+++ b/src/Confluent.Kafka/Confluent.Kafka.csproj
@@ -24,7 +24,8 @@
None
-
+
+
diff --git a/src/Confluent.Kafka/Diagnostics.cs b/src/Confluent.Kafka/Diagnostics.cs
new file mode 100644
index 000000000..081158beb
--- /dev/null
+++ b/src/Confluent.Kafka/Diagnostics.cs
@@ -0,0 +1,75 @@
+// Copyright 2022 Confluent Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Refer to LICENSE for more information.
+
+using System.Diagnostics;
+using System.Text;
+
+namespace Confluent.Kafka
+{
+ ///
+ /// Implements Activity objects with OpenTelemetry messaging tags for instrumentation
+ ///
+ internal static class Diagnostics
+ {
+ private const string ActivitySourceName = "Confluent.Kafka";
+ public static ActivitySource ActivitySource { get; } = new ActivitySource(ActivitySourceName);
+
+ ///
+ /// Provides an Activity object for the Producer with OpenTelemetry messaging tags for instrumentation
+ ///
+ internal static class Producer
+ {
+ private const string ActivityName = ActivitySourceName + ".MessageProduced";
+
+ internal static Activity Start(TopicPartition topicPartition, Message message)
+ {
+ Activity activity = ActivitySource.StartActivity(ActivityName);
+
+ if (activity == null)
+ return null;
+
+ using (activity)
+ {
+ activity?.AddDefaultOpenTelemetryTags(topicPartition, message);
+ }
+
+ return activity;
+ }
+ }
+
+ private static Activity AddDefaultOpenTelemetryTags(
+ this Activity activity,
+ TopicPartition topicPartition,
+ Message message)
+ {
+ activity?.AddTag(OpenTelemetryMessaging.SYSTEM, "kafka");
+ activity?.AddTag(OpenTelemetryMessaging.DESTINATION, topicPartition.Topic);
+ activity?.AddTag(OpenTelemetryMessaging.DESTINATION_KIND, "topic");
+ activity?.AddTag(OpenTelemetryMessaging.KAFKA_PARTITION, topicPartition.Partition.Value.ToString());
+
+ if (message.Key != null)
+ activity?.AddTag(OpenTelemetryMessaging.KAFKA_MESSAGE_KEY, message.Key);
+
+ if (message.Value != null)
+ {
+ int messagePayloadBytes = Encoding.UTF8.GetByteCount(message.Value.ToString());
+ activity?.AddTag(OpenTelemetryMessaging.MESSAGE_PAYLOAD_SIZE_BYTES, messagePayloadBytes.ToString());
+ }
+
+ return activity;
+ }
+ }
+}
diff --git a/src/Confluent.Kafka/OpenTelemetryMessaging.cs b/src/Confluent.Kafka/OpenTelemetryMessaging.cs
new file mode 100644
index 000000000..2ffc15c37
--- /dev/null
+++ b/src/Confluent.Kafka/OpenTelemetryMessaging.cs
@@ -0,0 +1,55 @@
+// Copyright 2022 Confluent Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Refer to LICENSE for more information.
+
+namespace Confluent.Kafka
+{
+ ///
+ /// Provides the OpenTelemetry messaging attributes.
+ /// The complete list of messaging attributes specification is available here: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#messaging-attributes
+ ///
+ public static class OpenTelemetryMessaging
+ {
+ ///
+ /// Message system. For Kafka, attribute value must be "kafka".
+ ///
+ public const string SYSTEM = "messaging.system";
+
+ ///
+ /// Message destination. For Kafka, attribute value must be a Kafka topic.
+ ///
+ public const string DESTINATION = "messaging.destination";
+
+ ///
+ /// Destination kind. For Kafka, attribute value must be "topic".
+ ///
+ public const string DESTINATION_KIND = "messaging.destination_kind";
+
+ ///
+ /// Kafka partition number.
+ ///
+ public const string KAFKA_PARTITION = "messaging.kafka.partition";
+
+ ///
+ /// Kafka message key.
+ ///
+ public const string KAFKA_MESSAGE_KEY = "messaging.kafka.message_key";
+
+ ///
+ /// Kafka message payload size (bytes).
+ ///
+ public const string MESSAGE_PAYLOAD_SIZE_BYTES = "messaging.message_payload_size_bytes";
+ }
+}
diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs
index 0fa5970a8..3215b7037 100644
--- a/src/Confluent.Kafka/Producer.cs
+++ b/src/Confluent.Kafka/Producer.cs
@@ -16,7 +16,10 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
using System.Linq;
+using System.Reflection;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@@ -784,6 +787,8 @@ public async Task> ProduceAsync(
ex);
}
+ Activity activity = Diagnostics.Producer.Start(topicPartition, message);
+
try
{
if (enableDeliveryReports)
@@ -836,6 +841,10 @@ public async Task> ProduceAsync(
TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
});
}
+ finally
+ {
+ activity?.Stop();
+ }
}
@@ -907,6 +916,8 @@ public void Produce(
ex);
}
+ Activity activity = Diagnostics.Producer.Start(topicPartition, message);
+
try
{
ProduceImpl(
@@ -933,6 +944,10 @@ public void Produce(
TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
});
}
+ finally
+ {
+ activity?.Stop();
+ }
}
private class TypedTaskDeliveryHandlerShim : TaskCompletionSource>, IDeliveryHandler
diff --git a/test/Confluent.Kafka.IntegrationTests/ActivityEventsRecorder.cs b/test/Confluent.Kafka.IntegrationTests/ActivityEventsRecorder.cs
new file mode 100644
index 000000000..7f64c3a5a
--- /dev/null
+++ b/test/Confluent.Kafka.IntegrationTests/ActivityEventsRecorder.cs
@@ -0,0 +1,58 @@
+// Copyright 2022 Confluent Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Refer to LICENSE for more information.
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+
+namespace Confluent.Kafka.IntegrationTests
+{
+ internal class ActivityEventsRecorder
+ {
+ internal ConcurrentQueue>>> Events = new();
+ private readonly string activityName;
+
+ internal ActivityEventsRecorder(string activityName)
+ {
+ this.activityName = activityName;
+ }
+
+ ///
+ /// Builds an ActivityListener with callbacks to store start and stop events to a concurrent queue.
+ ///
+ ///
+ internal ActivityListener BuildActivityListener()
+ {
+ using var listener = new ActivityListener
+ {
+ ShouldListenTo = _ => true,
+ Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData,
+ ActivityStarted = activity =>
+ {
+ if (activity.DisplayName == activityName)
+ Events.Enqueue(new KeyValuePair>>(activity.Id, activity.Tags));
+ },
+ ActivityStopped = activity =>
+ {
+ if (activity.DisplayName == activityName)
+ Events.Enqueue(new KeyValuePair>>(activity.Id, activity.Tags));
+ }
+ };
+
+ return listener;
+ }
+ }
+}
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs
index 035055c6e..02a4cf266 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs
@@ -17,6 +17,10 @@
#pragma warning disable xUnit1026
using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
using System.Text;
using Xunit;
@@ -28,6 +32,63 @@ namespace Confluent.Kafka.IntegrationTests
///
public partial class Tests
{
+ [Theory]
+ [MemberData(nameof(KafkaParameters))]
+ public void Producer_Produce_WithOpenTelemetryInstrumentation(string bootstrapServers)
+ {
+ LogToFile("start Producer_Produce_WithOpenTelemetryInstrumentation");
+
+ // Prepare the activity events listener
+ string activityName = "Confluent.Kafka.MessageProduced";
+ ActivityEventsRecorder eventsRecorder = new(activityName);
+ ActivityListener listener = eventsRecorder.BuildActivityListener();
+ ActivitySource.AddActivityListener(listener);
+
+ // Produce messages
+ var producerConfig = new ProducerConfig
+ {
+ BootstrapServers = bootstrapServers,
+ EnableIdempotence = true,
+ LingerMs = 1.5
+ };
+
+ using var producer = new ProducerBuilder(producerConfig).Build();
+
+ producer.Produce(
+ new TopicPartition(singlePartitionTopic, 0),
+ new Message { Key = "test key 0", Value = "test val 0" });
+
+ producer.Produce(
+ new TopicPartition(singlePartitionTopic, 0),
+ new Message { Key = "test key 1", Value = "test val 1" });
+
+ producer.Flush(TimeSpan.FromSeconds(10));
+
+ // Capture start/stop events
+ int actualStartEventsCount = eventsRecorder.Events.Count(x => x.Value.Any());
+ int actualStopEventsCount = eventsRecorder.Events.Count(x => !x.Value.Any());
+
+ var startEventsTags = eventsRecorder.Events
+ .Select(activityEvent => activityEvent.Value)
+ .Where(eventTags => eventTags.Any());
+
+ // Check the number of start/stop events generated
+ Assert.Equal(4, eventsRecorder.Events.Count);
+ Assert.Equal(2, actualStartEventsCount);
+ Assert.Equal(2, actualStopEventsCount);
+
+ // Check if default OpenTelemetry attributes were created on start events
+ foreach (IEnumerable> startEventTags in startEventsTags)
+ {
+ Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.SYSTEM && tag.Value == "kafka");
+ Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.DESTINATION_KIND && tag.Value == "topic");
+ Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.DESTINATION && tag.Value == singlePartitionTopic);
+ Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.KAFKA_PARTITION && tag.Value == "0");
+ Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.KAFKA_MESSAGE_KEY && tag.Value.Contains("test key"));
+ Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.MESSAGE_PAYLOAD_SIZE_BYTES && tag.Value == "10");
+ }
+ }
+
[Theory, MemberData(nameof(KafkaParameters))]
public void Producer_Produce(string bootstrapServers)
{
@@ -40,7 +101,6 @@ public void Producer_Produce(string bootstrapServers)
LingerMs = 1.5
};
-
// serializer case.
int count = 0;
@@ -73,7 +133,6 @@ public void Producer_Produce(string bootstrapServers)
Assert.Equal(2, count);
-
// byte[] case.
count = 0;
diff --git a/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Confluent.SchemaRegistry.Serdes.IntegrationTests.csproj b/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Confluent.SchemaRegistry.Serdes.IntegrationTests.csproj
index 5acd623d0..0c77aad28 100644
--- a/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Confluent.SchemaRegistry.Serdes.IntegrationTests.csproj
+++ b/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Confluent.SchemaRegistry.Serdes.IntegrationTests.csproj
@@ -27,7 +27,7 @@
-
+