Skip to content

Commit

Permalink
feat: allow tracking kafka produce and consume offsets (#18)
Browse files Browse the repository at this point in the history
This was the one remaining feature from the golang library that I
haven't ported yet. Now it's done. It allows tracking Kafka offsets.
Datadog uses this to detect lag and consumers that are offline and not
reporting.

After every publish, we push the last offset to Datadog. After every
consume, we push the last read offset to Datadog. It can then calculate
the offset and alert us on issues.

---------

Co-authored-by: Matt Sutkowski <[email protected]>
  • Loading branch information
btkostner and msutkowski authored Jul 10, 2023
1 parent 37f997f commit ebab69b
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 25 deletions.
21 changes: 20 additions & 1 deletion lib/datadog/data_streams/aggregator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,19 @@ defmodule Datadog.DataStreams.Aggregator do
iex> :ok = Aggregator.add(%Aggregator.Point{})
iex> :ok = Aggregator.add(%Aggregator.Offset{})
"""
@spec add(Aggregator.Point.t()) :: :ok
@spec add(Aggregator.Point.t() | Aggregator.Offset.t()) :: :ok
def add(%Aggregator.Point{} = point) do
:telemetry.execute([:datadog, :datastreams, :aggregator, :payloads_in], %{count: 1})
GenServer.cast(__MODULE__, {:add, point})
end

def add(%Aggregator.Offset{} = offset) do
GenServer.cast(__MODULE__, {:add, offset})
end

@doc """
Sends all stored data to the Datadog agent.
Expand Down Expand Up @@ -122,6 +128,19 @@ defmodule Datadog.DataStreams.Aggregator do
}}
end

def handle_cast({:add, %Aggregator.Offset{} = offset}, state) do
new_ts_type_current_buckets =
Aggregator.Bucket.upsert(state.ts_type_current_buckets, offset.timestamp, fn bucket ->
type_key =
if offset.type == :commit, do: :latest_commit_offsets, else: :latest_produce_offsets

new_offsets = bucket |> Map.get(:type_key, []) |> Aggregator.Offset.upsert(offset)
Map.put(bucket, type_key, new_offsets)
end)

{:noreply, %{state | ts_type_current_buckets: new_ts_type_current_buckets}}
end

@doc false
def handle_info(:send, state) do
Process.cancel_timer(state.send_timer)
Expand Down
13 changes: 4 additions & 9 deletions lib/datadog/data_streams/aggregator/bucket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,19 @@ defmodule Datadog.DataStreams.Aggregator.Bucket do
@bucket_duration 10 * 1_000 * 1_000 * 1_000

defstruct groups: %{},
latest_commit_offsets: %{},
latest_produce_offsets: %{},
latest_commit_offsets: [],
latest_produce_offsets: [],
start: 0,
duration: @bucket_duration

@type t :: %__MODULE__{
groups: %{non_neg_integer() => Aggregator.Group.t()},
latest_commit_offsets: %{non_neg_integer() => non_neg_integer()},
latest_produce_offsets: %{partition_key() => non_neg_integer()},
latest_commit_offsets: [Aggregator.Offset.t()],
latest_produce_offsets: [Aggregator.Offset.t()],
start: non_neg_integer(),
duration: non_neg_integer()
}

@type partition_key :: %{
partition: non_neg_integer(),
topic: String.t()
}

@spec align_timestamp(non_neg_integer()) :: non_neg_integer()
defp align_timestamp(timestamp) do
timestamp - rem(timestamp, @bucket_duration)
Expand Down
48 changes: 48 additions & 0 deletions lib/datadog/data_streams/aggregator/offset.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
defmodule Datadog.DataStreams.Aggregator.Offset do
@moduledoc false

defstruct offset: 0,
timestamp: 0,
type: :commit,
tags: %{}

@type type :: :commit | :produce

@type t :: %__MODULE__{
offset: integer(),
timestamp: non_neg_integer(),
type: type(),
tags: %{String.t() => any()}
}

@doc """
Creates a new offset map with the given offset and options
"""
@spec new(type(), integer(), non_neg_integer(), Keyword.t()) :: t()
def new(type, offset, timestamp, opts \\ []) do
%__MODULE__{
offset: offset,
timestamp: timestamp,
type: type,
tags: Map.new(opts)
}
end

@doc """
Updates an existing `#{__MODULE__}` where all properties except the
`offset` match. If no match is found, we create a new one.
"""
@spec upsert([t()], t()) :: [t()]
def upsert(offsets, %{tags: upsert_tags} = upsert_offset) do
matching_index =
Enum.find(offsets, fn %{tags: tags} ->
match?(^tags, upsert_tags)
end)

if is_nil(matching_index) do
offsets ++ [upsert_offset]
else
List.replace_at(offsets, matching_index, upsert_offset)
end
end
end
41 changes: 40 additions & 1 deletion lib/datadog/data_streams/integrations/kafka.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Datadog.DataStreams.Integrations.Kafka do

require OpenTelemetry.Tracer, as: Tracer

alias Datadog.DataStreams.{Context, Pathway, Propagator, Tags}
alias Datadog.DataStreams.{Aggregator, Context, Pathway, Propagator, Tags}

@otel_attribute "pathway.hash"

Expand Down Expand Up @@ -88,6 +88,25 @@ defmodule Datadog.DataStreams.Integrations.Kafka do
|> Map.merge(%{type: "kafka", direction: "out"})
end

@doc """
Tracks Kafka produce events via their offset. This is used by Datadog
to calculate the lag without requiring the consumer to be on and
reading trace headers.
"""
@spec track_produce(String.t(), non_neg_integer(), integer()) :: :ok
def track_produce(topic, partition, offset) do
Aggregator.add(%Aggregator.Offset{
offset: offset,
timestamp: :erlang.system_time(:nanosecond),
type: :produce,
tags: %{
"partition" => partition,
"topic" => topic,
"type" => "kafka_produce"
}
})
end

@doc """
Traces a Kafka message being consumed. Requires the current Kafka
consumer group. Uses the pathway in the current
Expand Down Expand Up @@ -130,4 +149,24 @@ defmodule Datadog.DataStreams.Integrations.Kafka do
|> Map.take([:topic, :partition])
|> Map.merge(%{type: "kafka", direction: "in", group: consumer_group})
end

@doc """
Tracks Kafka produce events via their offset. This is used by Datadog
to calculate the lag without requiring the consumer to be on and
reading trace headers.
"""
@spec track_consume(String.t(), String.t(), non_neg_integer(), integer()) :: :ok
def track_consume(group, topic, partition, offset) do
Aggregator.add(%Aggregator.Offset{
offset: offset,
timestamp: :erlang.system_time(:nanosecond),
type: :commit,
tags: %{
"consumer_group" => group,
"partition" => partition,
"topic" => topic,
"type" => "kafka_commit"
}
})
end
end
24 changes: 21 additions & 3 deletions lib/datadog/data_streams/payload/backlog.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Datadog.DataStreams.Payload.Backlog do
@moduledoc false

alias Datadog.DataStreams.Tags
alias Datadog.DataStreams.{Aggregator, Tags}

defstruct tags: [],
value: 0
Expand All @@ -10,10 +10,28 @@ defmodule Datadog.DataStreams.Payload.Backlog do
tags: Tags.encoded(),
value: non_neg_integer()
}

@doc """
Creates a new backlog struct from an aggregator offset.
"""
@spec new(Aggregator.Offset.t()) :: t()
def new(%Aggregator.Offset{offset: offset, tags: tags}) do
%__MODULE__{
tags: tags |> Tags.parse() |> Tags.encode(),
value: offset
}
end
end

defimpl Msgpax.Packer, for: Datadog.DataStreams.Payload.Backlog do
def pack(_data) do
[]
def pack(data) do
[
# Tags
[0x82, 0xA4, 0x54, 0x61, 0x67, 0x73],
Msgpax.Packer.pack(data.tags),
# Value
[0xA5, 0x56, 0x61, 0x6C, 0x75, 0x65],
Msgpax.Packer.pack(data.value)
]
end
end
5 changes: 4 additions & 1 deletion lib/datadog/data_streams/payload/bucket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ defmodule Datadog.DataStreams.Payload.Bucket do
%__MODULE__{
start: bucket.start,
duration: bucket.duration,
stats: bucket.groups |> Map.values() |> Enum.map(&Payload.Point.new(&1, timestamp_type))
stats: bucket.groups |> Map.values() |> Enum.map(&Payload.Point.new(&1, timestamp_type)),
backlogs:
Enum.map(bucket.latest_produce_offsets, &Payload.Backlog.new/1) ++
Enum.map(bucket.latest_commit_offsets, &Payload.Backlog.new/1)
}
end
end
Expand Down
61 changes: 51 additions & 10 deletions test/datadog/data_streams/aggregator_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,29 @@ defmodule Datadog.DataStreams.AggregatorTest do
end

describe "add/1" do
test "sends AggregatorPoint to module when not started" do
test "sends Aggregator.Point to module when not started" do
refute Process.whereis(Aggregator)
assert :ok = Aggregator.add(%Aggregator.Point{})
end

@tag :capture_log
test "sends AggregatorPoint to module when registered" do
test "sends Aggregator.Point to module when registered" do
Application.put_env(:data_streams, :agent, enabled?: true)
start_supervised!(Aggregator)
assert :ok = Aggregator.add(%Aggregator.Point{})
end

test "sends Aggregator.Offset to module when not started" do
refute Process.whereis(Aggregator)
assert :ok = Aggregator.add(%Aggregator.Offset{})
end

@tag :capture_log
test "sends Aggregator.Offset to module when registered" do
Application.put_env(:data_streams, :agent, enabled?: true)
start_supervised!(Aggregator)
assert :ok = Aggregator.add(%Aggregator.Offset{})
end
end

describe "init/1" do
Expand Down Expand Up @@ -52,41 +64,70 @@ defmodule Datadog.DataStreams.AggregatorTest do
)

assert %{
1_678_471_420_000_000_000 => %Datadog.DataStreams.Aggregator.Bucket{
1_678_471_420_000_000_000 => %Aggregator.Bucket{
groups: %{
9_808_874_869_469_701_221 => %Datadog.DataStreams.Aggregator.Group{
9_808_874_869_469_701_221 => %Aggregator.Group{
edge_tags: ["type:test"],
hash: 9_808_874_869_469_701_221,
parent_hash: 17_210_443_572_488_294_574,
pathway_latency: _,
edge_latency: _
}
},
latest_commit_offsets: %{},
latest_produce_offsets: %{},
latest_commit_offsets: [],
latest_produce_offsets: [],
start: 1_678_471_420_000_000_000,
duration: 10_000_000_000
}
} = new_state.ts_type_current_buckets

assert %{
1_678_471_410_000_000_000 => %Datadog.DataStreams.Aggregator.Bucket{
1_678_471_410_000_000_000 => %Aggregator.Bucket{
groups: %{
9_808_874_869_469_701_221 => %Datadog.DataStreams.Aggregator.Group{
9_808_874_869_469_701_221 => %Aggregator.Group{
edge_tags: ["type:test"],
hash: 9_808_874_869_469_701_221,
parent_hash: 17_210_443_572_488_294_574,
pathway_latency: _,
edge_latency: _
}
},
latest_commit_offsets: %{},
latest_produce_offsets: %{},
latest_commit_offsets: [],
latest_produce_offsets: [],
start: 1_678_471_410_000_000_000,
duration: 10_000_000_000
}
} = new_state.ts_type_origin_buckets
end

test "adds aggregator offset to bucket", %{state: state} do
offset = %Aggregator.Offset{
offset: 13,
timestamp: 1_687_986_447_538_450_340,
type: :commit,
tags: %{
"consumer_group" => "test-group",
"partition" => 0,
"topic" => "test-topic",
"type" => "kafka_commit"
}
}

assert {:noreply, new_state} = Aggregator.handle_cast({:add, offset}, state)

assert %{
1_687_986_440_000_000_000 => %Aggregator.Bucket{
groups: %{},
latest_commit_offsets: [^offset],
latest_produce_offsets: [],
start: 1_687_986_440_000_000_000,
duration: 10_000_000_000
}
} = new_state.ts_type_current_buckets

# It should find and update the existing data, not add to the bucket.
assert {:noreply, ^new_state} = Aggregator.handle_cast({:add, offset}, new_state)
end
end

describe "handle_info/2 {task_ref, {:ok, count}}" do
Expand Down

0 comments on commit ebab69b

Please sign in to comment.