Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Confluent.Kafka/Confluent.Kafka.csproj
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
<PackageReference Include="librdkafka.redist" Version="1.8.2">
<PrivateAssets Condition="'$(TargetFrameworkIdentifier)' == '.NETFramework'">None</PrivateAssets>
</PackageReference>
<PackageReference Include="System.Memory" Version="4.5.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
<PackageReference Include="System.Memory" Version="4.5.5" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.3' ">
Expand Down
75 changes: 75 additions & 0 deletions src/Confluent.Kafka/Diagnostics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2022 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System.Diagnostics;
using System.Text;

namespace Confluent.Kafka
{
/// <summary>
/// Implements Activity objects with OpenTelemetry messaging tags for instrumentation
/// </summary>
internal static class Diagnostics
{
private const string ActivitySourceName = "Confluent.Kafka";
public static ActivitySource ActivitySource { get; } = new ActivitySource(ActivitySourceName);

/// <summary>
/// Provides an Activity object for the Producer with OpenTelemetry messaging tags for instrumentation
/// </summary>
internal static class Producer
{
private const string ActivityName = ActivitySourceName + ".MessageProduced";
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActivitySourceName should not be prefixed for ActivityName.


internal static Activity Start<TKey, TValue>(TopicPartition topicPartition, Message<TKey, TValue> message)
{
Activity activity = ActivitySource.StartActivity(ActivityName);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of an unknown here. StartActivity is when sampling happens. The OpenTelemetry specification states...

The API documentation MUST state that adding attributes at span creation is preferred to calling SetAttribute later, as samplers can only consider information already present during span creation.

In this code there are no attributes/tags on start so users won't be able to make interesting sampling decisions based on details about the spans/activity instances. This is kind of an ongoing discussion, not sure if there will eventually be a list of attributes that will be required or it will always be optional. Just sharing the state of things 😄

/cc Cijo Thomas (@cijothomas)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For things which are known already, I'd suggest passing it to StartActivityCall.

One example for sql, where the db system is known (and static), and is passed at startActivity time itself.
https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/src/OpenTelemetry.Instrumentation.SqlClient/Implementation/SqlClientDiagnosticListener.cs#L64


if (activity == null)
return null;

using (activity)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have this using? Won't that cause a stopped Activity to be returned?

{
activity?.AddDefaultOpenTelemetryTags(topicPartition, message);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need the null-conditional here (activity?) because line 41 already verified it is non-null.

}

return activity;
}
}

private static Activity AddDefaultOpenTelemetryTags<TKey, TValue>(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another area with some unknowns. The semantic conventions are still marked as Experimental. Meaning: They are likely to change. There is this whole concept of a schema url that should be attached to instrumentation before it can be declared stable. One option could be we create the Activity here and then provide a callback instrumentation (such as OpenTelemetry) or users can use in order to add the tags that make sense for their domain. The nice thing about that approach would be Confluent Kafka doesn't need to worry about a spec that is changing 😄 That approach (more or less) is what we have done with dotnet itself. The runtime starts things but OpenTelemetry uses hooks in its instrumentation to add the tags according to the spec.

this Activity activity,
TopicPartition topicPartition,
Message<TKey, TValue> message)
{
activity?.AddTag(OpenTelemetryMessaging.SYSTEM, "kafka");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need any of the null-conditionals in here because this is only ever called for non-null activity.

activity?.AddTag(OpenTelemetryMessaging.DESTINATION, topicPartition.Topic);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally we use Activity.SetTag as opposed to AddTag. SetTag implements the OTel spec logic for null behavior, de-dupe, etc.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open-telemetry/opentelemetry-dotnet#5173 Based on this, we may be able to suggest AddTag is certain scenarios as well to save some perf.

activity?.AddTag(OpenTelemetryMessaging.DESTINATION_KIND, "topic");
activity?.AddTag(OpenTelemetryMessaging.KAFKA_PARTITION, topicPartition.Partition.Value.ToString());

if (message.Key != null)
activity?.AddTag(OpenTelemetryMessaging.KAFKA_MESSAGE_KEY, message.Key);

if (message.Value != null)
{
int messagePayloadBytes = Encoding.UTF8.GetByteCount(message.Value.ToString());
activity?.AddTag(OpenTelemetryMessaging.MESSAGE_PAYLOAD_SIZE_BYTES, messagePayloadBytes.ToString());
Comment on lines +68 to +69
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Is message.Value.ToString() going to be very expensive? Ideally getting the size would either be cheap or it would be an opt-in thing for users who need it enough to pay for it.
  • messagePayloadBytes.ToString() why not just push the numeric value? Basically you can add an int as a tag it doesn't need to be forced into a string.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the very least, check activity.IsAllDataRequested before doing anything which is non-trivial in terms of cost. This ensures that the cost is paid only if the activity has a chance of being exported to some telemetry destination.

}

return activity;
}
}
}
55 changes: 55 additions & 0 deletions src/Confluent.Kafka/OpenTelemetryMessaging.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2022 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

namespace Confluent.Kafka
{
/// <summary>
/// Provides the OpenTelemetry messaging attributes.
/// The complete list of messaging attributes specification is available here: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#messaging-attributes
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the conventions in OTel are still work-in-progress. I'd suggest to mark this overall feature as experimental/non-stable, from the perspective that the name of the tags could change in the future, until the otel conventions become stable.

/// </summary>
public static class OpenTelemetryMessaging
{
/// <summary>
/// Message system. For Kafka, attribute value must be "kafka".
/// </summary>
public const string SYSTEM = "messaging.system";

/// <summary>
/// Message destination. For Kafka, attribute value must be a Kafka topic.
/// </summary>
public const string DESTINATION = "messaging.destination";

/// <summary>
/// Destination kind. For Kafka, attribute value must be "topic".
/// </summary>
public const string DESTINATION_KIND = "messaging.destination_kind";

/// <summary>
/// Kafka partition number.
/// </summary>
public const string KAFKA_PARTITION = "messaging.kafka.partition";

/// <summary>
/// Kafka message key.
/// </summary>
public const string KAFKA_MESSAGE_KEY = "messaging.kafka.message_key";

/// <summary>
/// Kafka message payload size (bytes).
/// </summary>
public const string MESSAGE_PAYLOAD_SIZE_BYTES = "messaging.message_payload_size_bytes";
}
}
15 changes: 15 additions & 0 deletions src/Confluent.Kafka/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -784,6 +787,8 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
ex);
}

Activity activity = Diagnostics.Producer.Start(topicPartition, message);

try
{
if (enableDeliveryReports)
Expand Down Expand Up @@ -836,6 +841,10 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
});
}
finally
{
activity?.Stop();
}
}


Expand Down Expand Up @@ -907,6 +916,8 @@ public void Produce(
ex);
}

Activity activity = Diagnostics.Producer.Start(topicPartition, message);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use using and remove activity.stop in finally?


try
{
ProduceImpl(
Expand All @@ -933,6 +944,10 @@ public void Produce(
TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
});
}
finally
{
activity?.Stop();
}
}

private class TypedTaskDeliveryHandlerShim : TaskCompletionSource<DeliveryResult<TKey, TValue>>, IDeliveryHandler
Expand Down
58 changes: 58 additions & 0 deletions test/Confluent.Kafka.IntegrationTests/ActivityEventsRecorder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2022 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;

namespace Confluent.Kafka.IntegrationTests
{
internal class ActivityEventsRecorder
{
internal ConcurrentQueue<KeyValuePair<string, IEnumerable<KeyValuePair<string, string>>>> Events = new();
private readonly string activityName;

internal ActivityEventsRecorder(string activityName)
{
this.activityName = activityName;
}

/// <summary>
/// Builds an ActivityListener with callbacks to store start and stop events to a concurrent queue.
/// </summary>
/// <returns></returns>
internal ActivityListener BuildActivityListener()
{
using var listener = new ActivityListener
{
ShouldListenTo = _ => true,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
ActivityStarted = activity =>
{
if (activity.DisplayName == activityName)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

string.equals for string comparison

Events.Enqueue(new KeyValuePair<string, IEnumerable<KeyValuePair<string, string>>>(activity.Id, activity.Tags));
},
ActivityStopped = activity =>
{
if (activity.DisplayName == activityName)
Events.Enqueue(new KeyValuePair<string, IEnumerable<KeyValuePair<string, string>>>(activity.Id, activity.Tags));
}
};

return listener;
}
}
}
63 changes: 61 additions & 2 deletions test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
#pragma warning disable xUnit1026

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using Xunit;

Expand All @@ -28,6 +32,63 @@ namespace Confluent.Kafka.IntegrationTests
/// </summary>
public partial class Tests
{
[Theory]
[MemberData(nameof(KafkaParameters))]
public void Producer_Produce_WithOpenTelemetryInstrumentation(string bootstrapServers)
{
LogToFile("start Producer_Produce_WithOpenTelemetryInstrumentation");

// Prepare the activity events listener
string activityName = "Confluent.Kafka.MessageProduced";
ActivityEventsRecorder eventsRecorder = new(activityName);
ActivityListener listener = eventsRecorder.BuildActivityListener();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActivitySource.AddActivityListener(listener);

// Produce messages
var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrapServers,
EnableIdempotence = true,
LingerMs = 1.5
};

using var producer = new ProducerBuilder<string, string>(producerConfig).Build();

producer.Produce(
new TopicPartition(singlePartitionTopic, 0),
new Message<string, string> { Key = "test key 0", Value = "test val 0" });

producer.Produce(
new TopicPartition(singlePartitionTopic, 0),
new Message<string, string> { Key = "test key 1", Value = "test val 1" });

producer.Flush(TimeSpan.FromSeconds(10));

// Capture start/stop events
int actualStartEventsCount = eventsRecorder.Events.Count(x => x.Value.Any());
int actualStopEventsCount = eventsRecorder.Events.Count(x => !x.Value.Any());

var startEventsTags = eventsRecorder.Events
.Select(activityEvent => activityEvent.Value)
.Where(eventTags => eventTags.Any());

// Check the number of start/stop events generated
Assert.Equal(4, eventsRecorder.Events.Count);
Assert.Equal(2, actualStartEventsCount);
Assert.Equal(2, actualStopEventsCount);

// Check if default OpenTelemetry attributes were created on start events
foreach (IEnumerable<KeyValuePair<string, string>> startEventTags in startEventsTags)
{
Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.SYSTEM && tag.Value == "kafka");
Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.DESTINATION_KIND && tag.Value == "topic");
Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.DESTINATION && tag.Value == singlePartitionTopic);
Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.KAFKA_PARTITION && tag.Value == "0");
Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.KAFKA_MESSAGE_KEY && tag.Value.Contains("test key"));
Assert.Contains(startEventTags, tag => tag.Key == OpenTelemetryMessaging.MESSAGE_PAYLOAD_SIZE_BYTES && tag.Value == "10");
}
}

[Theory, MemberData(nameof(KafkaParameters))]
public void Producer_Produce(string bootstrapServers)
{
Expand All @@ -40,7 +101,6 @@ public void Producer_Produce(string bootstrapServers)
LingerMs = 1.5
};


// serializer case.

int count = 0;
Expand Down Expand Up @@ -73,7 +133,6 @@ public void Producer_Produce(string bootstrapServers)

Assert.Equal(2, count);


// byte[] case.

count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

<ItemGroup>
<PackageReference Include="Apache.Avro" Version="1.11.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
Expand Down