Skip to content

Distributed Tracing: Producer OpenTelemetry instrumentation#1838

Open
Fernando de Oliveira (fedeoliv) wants to merge 12 commits into
confluentinc:masterfrom
fedeoliv:feature/distributed-tracing-producer
Open

Distributed Tracing: Producer OpenTelemetry instrumentation#1838
Fernando de Oliveira (fedeoliv) wants to merge 12 commits into
confluentinc:masterfrom
fedeoliv:feature/distributed-tracing-producer

Conversation

@fedeoliv
Copy link
Copy Markdown

@fedeoliv Fernando de Oliveira (fedeoliv) commented Jun 14, 2022

This PR introduces a distributed tracing instrumentation for the Producer by leveraging the use of Activity objects with tags mapping the OpenTelemetry messaging attributes specification.

The implementation is inspired by the great discussion on issue #1269 and the Mikel Blanchard (@CodeBlanch)'s proposal on the PR #1278.

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Jun 14, 2022

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@fedeoliv
Copy link
Copy Markdown
Author

Matt Howlett (@mhowlett) I'd love your feedback on this implementation proposal 😊

@mhowlett
Copy link
Copy Markdown
Contributor

i'd love to have time to give it to you.. i'll ping product about this feature.

if (activity == null)
return null;

using (activity)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have this using? Won't that cause a stopped Activity to be returned?


using (activity)
{
activity?.AddDefaultOpenTelemetryTags(topicPartition, message);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need the null-conditional here (activity?) because line 41 already verified it is non-null.

TopicPartition topicPartition,
Message<TKey, TValue> message)
{
activity?.AddTag(OpenTelemetryMessaging.SYSTEM, "kafka");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need any of the null-conditionals in here because this is only ever called for non-null activity.

Message<TKey, TValue> message)
{
activity?.AddTag(OpenTelemetryMessaging.SYSTEM, "kafka");
activity?.AddTag(OpenTelemetryMessaging.DESTINATION, topicPartition.Topic);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally we use Activity.SetTag as opposed to AddTag. SetTag implements the OTel spec logic for null behavior, de-dupe, etc.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open-telemetry/opentelemetry-dotnet#5173 Based on this, we may be able to suggest AddTag is certain scenarios as well to save some perf.

Comment on lines +68 to +69
int messagePayloadBytes = Encoding.UTF8.GetByteCount(message.Value.ToString());
activity?.AddTag(OpenTelemetryMessaging.MESSAGE_PAYLOAD_SIZE_BYTES, messagePayloadBytes.ToString());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Is message.Value.ToString() going to be very expensive? Ideally getting the size would either be cheap or it would be an opt-in thing for users who need it enough to pay for it.
  • messagePayloadBytes.ToString() why not just push the numeric value? Basically you can add an int as a tag it doesn't need to be forced into a string.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the very least, check activity.IsAllDataRequested before doing anything which is non-trivial in terms of cost. This ensures that the cost is paid only if the activity has a chance of being exported to some telemetry destination.


internal static Activity Start<TKey, TValue>(TopicPartition topicPartition, Message<TKey, TValue> message)
{
Activity activity = ActivitySource.StartActivity(ActivityName);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of an unknown here. StartActivity is when sampling happens. The OpenTelemetry specification states...

The API documentation MUST state that adding attributes at span creation is preferred to calling SetAttribute later, as samplers can only consider information already present during span creation.

In this code there are no attributes/tags on start so users won't be able to make interesting sampling decisions based on details about the spans/activity instances. This is kind of an ongoing discussion, not sure if there will eventually be a list of attributes that will be required or it will always be optional. Just sharing the state of things 😄

/cc Cijo Thomas (@cijothomas)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For things which are known already, I'd suggest passing it to StartActivityCall.

One example for sql, where the db system is known (and static), and is passed at startActivity time itself.
https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/src/OpenTelemetry.Instrumentation.SqlClient/Implementation/SqlClientDiagnosticListener.cs#L64

/// </summary>
internal static class Producer
{
private const string ActivityName = ActivitySourceName + ".MessageProduced";
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActivitySourceName should not be prefixed for ActivityName.

}
}

private static Activity AddDefaultOpenTelemetryTags<TKey, TValue>(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another area with some unknowns. The semantic conventions are still marked as Experimental. Meaning: They are likely to change. There is this whole concept of a schema url that should be attached to instrumentation before it can be declared stable. One option could be we create the Activity here and then provide a callback instrumentation (such as OpenTelemetry) or users can use in order to add the tags that make sense for their domain. The nice thing about that approach would be Confluent Kafka doesn't need to worry about a spec that is changing 😄 That approach (more or less) is what we have done with dotnet itself. The runtime starts things but OpenTelemetry uses hooks in its instrumentation to add the tags according to the spec.

{
/// <summary>
/// Provides the OpenTelemetry messaging attributes.
/// The complete list of messaging attributes specification is available here: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#messaging-attributes
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the conventions in OTel are still work-in-progress. I'd suggest to mark this overall feature as experimental/non-stable, from the perspective that the name of the tags could change in the future, until the otel conventions become stable.

// Prepare the activity events listener
string activityName = "Confluent.Kafka.MessageProduced";
ActivityEventsRecorder eventsRecorder = new(activityName);
ActivityListener listener = eventsRecorder.BuildActivityListener();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodeBlanch
Copy link
Copy Markdown

Thanks for this Fernando de Oliveira (@fedeoliv)! I did a quick review and left some comments about where we are with the spec, etc. Happy to help out however I can.

ex);
}

Activity activity = Diagnostics.Producer.Start(topicPartition, message);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use using and remove activity.stop in finally?

Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
ActivityStarted = activity =>
{
if (activity.DisplayName == activityName)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

string.equals for string comparison

@chertby
Copy link
Copy Markdown

Does it look like development has stopped?
If there's time pressure, I could try to continue if Fernando de Oliveira (@fedeoliv) don't mind?

@Juandavi1
Copy link
Copy Markdown

Matt Howlett (@mhowlett) It's been 5 months, is there any update on whether and how Kafka is going to support distributed tracing?

@sookeke
Copy link
Copy Markdown

Folks, any update on this? it's been months Matt Howlett (@mhowlett)

@iiwaasnet
Copy link
Copy Markdown

Is context propagation implemented as well? Maybe I missed it, but I can't find any code using propagator.Inject()/propagator.ExtractTraceIdAndState()...

@GiorgioG
Copy link
Copy Markdown

Can we get an update?

@smurariu
Copy link
Copy Markdown

Anchit Jain (@anchitj) any news on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.