Skip to content

Commit

Permalink
fix: set span attribute to pathway hash (#14)
Browse files Browse the repository at this point in the history
I realized I was setting the otel context instead of the span attribute.
This should link Open Telemetry Kafka processing and producing APM
traces with data stream pathways. It follows a closed PR in the
data-streams-go library, but was confirmed on a call with Datadog.
  • Loading branch information
btkostner authored Jun 5, 2023
1 parent 6af61ac commit 94ca3fd
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
2 changes: 0 additions & 2 deletions lib/datadog/data_streams/context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ defmodule Datadog.DataStreams.Context do
"""

@context_key "dd-datastreams"
@hash "pathway.hash"

alias Datadog.DataStreams.{Pathway, Tags}

Expand All @@ -36,7 +35,6 @@ defmodule Datadog.DataStreams.Context do
@spec set(Pathway.t()) :: Pathway.t()
def set(%Pathway{} = pathway) do
OpenTelemetry.Ctx.set_value(@context_key, pathway)
OpenTelemetry.Ctx.set_value(@hash, pathway.hash)
pathway
end

Expand Down
35 changes: 25 additions & 10 deletions lib/datadog/data_streams/integrations/kafka.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ defmodule Datadog.DataStreams.Integrations.Kafka do
"""

require OpenTelemetry.Tracer, as: Tracer

alias Datadog.DataStreams.{Context, Pathway, Propagator, Tags}

@otel_attribute "pathway.hash"

@typedoc """
A general map that contains the topic, partition, and headers atoms. This
matches the format of `Elsa.elsa_message` by default
Expand Down Expand Up @@ -66,10 +70,15 @@ defmodule Datadog.DataStreams.Integrations.Kafka do
@spec trace_produce_with_pathway(Pathway.t() | nil, msg) :: {msg, Pathway.t()}
when msg: message()
def trace_produce_with_pathway(pathway, message) do
edge_tags = produce_edge_tags(message)
new_pathway = Pathway.set_checkpoint(pathway, edge_tags)
new_headers = Propagator.encode_header(message.headers, new_pathway)
{%{message | headers: new_headers}, new_pathway}
Tracer.with_span "dsm.trace_kafka_produce" do
edge_tags = produce_edge_tags(message)
new_pathway = Pathway.set_checkpoint(pathway, edge_tags)

Tracer.set_attribute(@otel_attribute, to_string(new_pathway.hash))

new_headers = Propagator.encode_header(message.headers, new_pathway)
{%{message | headers: new_headers}, new_pathway}
end
end

@spec produce_edge_tags(message()) :: Tags.input()
Expand Down Expand Up @@ -98,14 +107,20 @@ defmodule Datadog.DataStreams.Integrations.Kafka do
"""
@spec trace_consume_with_pathway(Pathway.t() | nil, message(), String.t()) :: Pathway.t()
def trace_consume_with_pathway(pathway, message, consumer_group) do
edge_tags = consume_edge_tags(message, consumer_group)
Tracer.with_span "dsm.trace_kafka_consume" do
edge_tags = consume_edge_tags(message, consumer_group)

new_pathway =
case Propagator.decode_header(message.headers) do
nil ->
Pathway.set_checkpoint(pathway, edge_tags)

case Propagator.decode_header(message.headers) do
nil ->
Pathway.set_checkpoint(pathway, edge_tags)
decoded_pathway ->
Pathway.set_checkpoint(decoded_pathway, edge_tags)
end

decoded_pathway ->
Pathway.set_checkpoint(decoded_pathway, edge_tags)
Tracer.set_attribute(@otel_attribute, to_string(new_pathway.hash))
new_pathway
end
end

Expand Down

0 comments on commit 94ca3fd

Please sign in to comment.