Skip to content

Commit

Permalink
feat: add basic kafka tracking support with data streams (#3)
Browse files Browse the repository at this point in the history
This adds the basic data streams code and Kafka integration functions.
It tracks pathways between services. It _does not_ yet have the
functions for tracking Kafka offsets.

TODO:
- [x] add more tests
- [x] fill in usage documentation
  • Loading branch information
btkostner authored Mar 10, 2023
1 parent 64ef99f commit bfc6a0b
Show file tree
Hide file tree
Showing 27 changed files with 1,883 additions and 6 deletions.
3 changes: 2 additions & 1 deletion .doctor.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
%Doctor.Config{
ignore_modules: [
~r/Enumerable/,
~r/Datadog.Sketch.Protobuf/
~r/Datadog.Sketch.Protobuf/,
~r/Msgpax/
],
ignore_paths: [],
min_module_doc_coverage: 40,
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ config :dd_data_streams, :metadata,

We recommend you keep these tags matching all your other instrumentation, like Open Telemetry and `:telemetry`, to ensure Datadog can aggregate data accurately.

### Kafka
### Integrations

To instrument your data stream services that use Kafka queues, you can use the `Datadog.DataStreams.Integrations.Kafka` module.
This library contains integration modules to help integrate with various async data pipelines. See one of these modules for usage details.

> **NOTE**: TODO
- `Datadog.DataStreams.Integrations.Kafka`
240 changes: 240 additions & 0 deletions lib/datadog/data_streams/aggregator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
defmodule Datadog.DataStreams.Aggregator do
@moduledoc """
A `GenServer` instance responsible for aggregating many points of data
together into 10 second buckets, and then sending them to the Datadog
agent. It holds many structs in its memory, looking something like this:
```mermaid
graph TD
aggregator[Datadog.DataStreams.Aggregator]
aggregator --> bucket[Datadog.DataStreams.Aggregator.Bucket]
bucket --> group[Datadog.DataStreams.Aggregator.Group]
```
When adding data, the calling code will create a new
`Datadog.DataStreams.Aggregator.Point` which contains all of the needed
data. It will then call `#{__MODULE__}.add/1` to add that point of data to the
aggregator, where the aggregator will find (or create) a bucket that matches
the 10 second window for the point. It will then find (or create) a group in
that bucket based on the point's `hash`. Once the group is found, the
`pathway_latency` and `edge_latency` `Datadog.Sketch` will be updated with
the new latency.
Every 10 seconds the aggregator will convert all non active (outside the 10
second window) to a `Datadog.DataStreams.Payload`, encode it, and send it to
the Datadog agent. If there is an error sending the payload, the old payloads
are still removed from memory, but the
`datadog.datastreams.aggregator.flush_errors.count` telemetry metric is
incremented.
"""

use GenServer

alias Datadog.DataStreams.{Aggregator, Config, Payload, Transport}

require Logger

@send_interval 10_000

@doc """
Starts a new `#{__MODULE__}` instance. This takes no options as it
uses the global `Datadog.DataStreams.Config` module. It is also started
by the `Datadog.DataStreams.Application` and should not need to be started
manually.
"""
@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(_opts) do
opts = [enabled?: Config.agent_enabled?()]
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@doc """
Adds new metrics to the aggregator.
Note, this function will still return `:ok` if the aggregator is disabled.
## Examples
iex> :ok = Aggregator.add(%Aggregator.Point{})
"""
@spec add(Aggregator.Point.t()) :: :ok
def add(%Aggregator.Point{} = point) do
:telemetry.execute([:datadog, :datastreams, :aggregator, :payloads_in], %{count: 1})
GenServer.cast(__MODULE__, {:add, point})
end

@doc """
Sends all stored data to the Datadog agent.
## Examples
iex> :ok = Aggregator.flush()
"""
@spec flush() :: :ok
def flush() do
Process.send(__MODULE__, :send, [])
end

@doc false
def init([{:enabled?, false} | _rest]), do: :ignore

def init(_opts) do
Process.flag(:trap_exit, true)

{:ok,
%{
send_timer: Process.send_after(self(), :send, @send_interval),
ts_type_current_buckets: %{},
ts_type_origin_buckets: %{}
}}
end

@doc false
def handle_cast({:add, %Aggregator.Point{} = point}, state) do
new_ts_type_current_buckets =
Aggregator.Bucket.upsert(state.ts_type_current_buckets, point.timestamp, fn bucket ->
new_groups =
Aggregator.Group.upsert(bucket.groups, point, fn group ->
Aggregator.Group.add(group, point)
end)

%{bucket | groups: new_groups}
end)

origin_timestamp = point.timestamp - point.pathway_latency

new_ts_type_origin_buckets =
Aggregator.Bucket.upsert(state.ts_type_origin_buckets, origin_timestamp, fn bucket ->
new_groups =
Aggregator.Group.upsert(bucket.groups, point, fn group ->
Aggregator.Group.add(group, point)
end)

%{bucket | groups: new_groups}
end)

{:noreply,
%{
state
| ts_type_current_buckets: new_ts_type_current_buckets,
ts_type_origin_buckets: new_ts_type_origin_buckets
}}
end

@doc false
def handle_info(:send, state) do
Process.cancel_timer(state.send_timer)

now = :erlang.system_time(:nanosecond)

{active_ts_type_current_buckets, past_ts_type_current_buckets} =
split_with(state.ts_type_current_buckets, fn {_k, v} ->
Aggregator.Bucket.current?(v, now)
end)

{active_ts_type_origin_buckets, past_ts_type_origin_buckets} =
split_with(state.ts_type_origin_buckets, fn {_k, v} ->
Aggregator.Bucket.current?(v, now)
end)

payload =
Payload.new()
|> Payload.add_buckets(past_ts_type_current_buckets, :current)
|> Payload.add_buckets(past_ts_type_origin_buckets, :origin)

unless Payload.stats_count(payload) == 0 do
Task.async(fn ->
with {:ok, encoded_payload} <- Payload.encode(payload),
:ok <- Transport.send_pipeline_stats(encoded_payload) do
{:ok, Payload.stats_count(payload)}
else
{:error, reason} -> {:error, reason}
something -> {:error, something}
end
end)
end

{:noreply,
%{
state
| send_timer: Process.send_after(self(), :send, @send_interval),
ts_type_current_buckets: active_ts_type_current_buckets,
ts_type_origin_buckets: active_ts_type_origin_buckets
}}
end

@doc false
def handle_info({task_ref, {:ok, count}}, state) when is_reference(task_ref) do
Logger.debug("Successfully sent metrics to Datadog")
:telemetry.execute([:datadog, :datastreams, :aggregator, :flushed_payloads], %{count: 1})
:telemetry.execute([:datadog, :datastreams, :aggregator, :flushed_buckets], %{count: count})
{:noreply, state}
end

@doc false
def handle_info({task_ref, {:error, error}}, state) when is_reference(task_ref) do
Logger.error("Error sending metrics to Datadog", error: error)
:telemetry.execute([:datadog, :datastreams, :aggregator, :flush_errors], %{count: 1})
{:noreply, state}
end

@doc false
def handle_info(_, state), do: {:noreply, state}

@doc false
def terminate(_reason, %{ts_type_current_buckets: %{}, ts_type_origin_buckets: %{}}) do
Logger.debug("Stopping #{__MODULE__} with an empty state")
end

@doc false
def terminate(_reason, state) do
payload =
Payload.new()
|> Payload.add_buckets(state.ts_type_current_buckets, :current)
|> Payload.add_buckets(state.ts_type_origin_buckets, :origin)

with {:ok, encoded_payload} <- Payload.encode(payload),
:ok <- Transport.send_pipeline_stats(encoded_payload) do
Logger.debug("Successfully sent metrics to Datadog before termination")
:telemetry.execute([:datadog, :datastreams, :aggregator, :flushed_payloads], %{count: 1})

:telemetry.execute([:datadog, :datastreams, :aggregator, :flushed_buckets], %{
count: Payload.stats_count(payload)
})
else
error ->
Logger.error("Error sending metrics to Datadog before termination", error: error)
:telemetry.execute([:datadog, :datastreams, :aggregator, :flush_errors], %{count: 1})
end
rescue
error ->
Logger.error("Error attempting to sending metrics to Datadog before termination",
error: error
)

:telemetry.execute([:datadog, :datastreams, :aggregator, :flush_errors], %{count: 1})
end

# Splits the `map` into two maps according to the given function `fun`.
# This function was taken from Elixir 1.15 for backwards support with older
# versions.
defp split_with(map, fun) when is_map(map) and is_function(fun, 1) do
iter = :maps.iterator(map)
next = :maps.next(iter)

do_split_with(next, [], [], fun)
end

defp do_split_with(:none, while_true, while_false, _fun) do
{:maps.from_list(while_true), :maps.from_list(while_false)}
end

defp do_split_with({key, value, iter}, while_true, while_false, fun) do
if fun.({key, value}) do
do_split_with(:maps.next(iter), [{key, value} | while_true], while_false, fun)
else
do_split_with(:maps.next(iter), while_true, [{key, value} | while_false], fun)
end
end
end
67 changes: 67 additions & 0 deletions lib/datadog/data_streams/aggregator/bucket.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule Datadog.DataStreams.Aggregator.Bucket do
@moduledoc false

alias Datadog.DataStreams.Aggregator

# 10 seconds in nanoseconds
@bucket_duration 10 * 1_000 * 1_000 * 1_000

defstruct groups: %{},
latest_commit_offsets: %{},
latest_produce_offsets: %{},
start: 0,
duration: @bucket_duration

@type t :: %__MODULE__{
groups: %{non_neg_integer() => Aggregator.Group.t()},
latest_commit_offsets: %{non_neg_integer() => non_neg_integer()},
latest_produce_offsets: %{partition_key() => non_neg_integer()},
start: non_neg_integer(),
duration: non_neg_integer()
}

@type partition_key :: %{
partition: non_neg_integer(),
topic: String.t()
}

@spec align_timestamp(non_neg_integer()) :: non_neg_integer()
defp align_timestamp(timestamp) do
timestamp - rem(timestamp, @bucket_duration)
end

@doc """
Creates a new aggregator bucket based on the aligned start timestamp.
"""
@spec new(non_neg_integer()) :: t()
def new(timestamp) do
%__MODULE__{start: align_timestamp(timestamp)}
end

@doc """
Checks if the bucket is currently within its active duration.
"""
@spec current?(t(), non_neg_integer()) :: boolean()
def current?(%{start: start}, now) do
start > now + @bucket_duration
end

@doc """
Updates an existing `#{__MODULE__}` that matches the given timestamp,
or creates a new bucket matching the timestamp given if one can not
be found.
"""
@spec upsert(%{required(non_neg_integer()) => t()}, non_neg_integer(), (t() -> t())) :: %{
required(non_neg_integer()) => t()
}
def upsert(buckets, timestamp, fun) do
timestamp = align_timestamp(timestamp)

new_bucket =
buckets
|> Map.get(timestamp, new(timestamp))
|> fun.()

Map.put(buckets, timestamp, new_bucket)
end
end
64 changes: 64 additions & 0 deletions lib/datadog/data_streams/aggregator/group.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule Datadog.DataStreams.Aggregator.Group do
@moduledoc false

alias Datadog.Sketch
alias Datadog.DataStreams.Aggregator

defstruct edge_tags: [],
hash: 0,
parent_hash: 0,
pathway_latency: Sketch.new_default(),
edge_latency: Sketch.new_default()

@type t :: %__MODULE__{
edge_tags: [String.t()],
hash: non_neg_integer(),
parent_hash: non_neg_integer(),
pathway_latency: map(),
edge_latency: map()
}

@doc """
Creates a new aggregate stats group based on the information from a given
point.
"""
@spec new(Aggregator.Point.t()) :: t()
def new(%Aggregator.Point{edge_tags: edge_tags, parent_hash: parent_hash, hash: hash}) do
%__MODULE__{
edge_tags: edge_tags,
parent_hash: parent_hash,
hash: hash
}
end

@doc """
Adds latency metrics to a group from a given point.
"""
@spec add(t(), Aggregator.Point.t()) :: t()
def add(group, %Aggregator.Point{pathway_latency: pathway_latency, edge_latency: edge_latency}) do
normalized_pathway_latency = max(pathway_latency / 1_000_000_000, 0)
normalized_edge_latency = max(edge_latency / 1_000_000_000, 0)

%{
group
| pathway_latency: Sketch.add(group.pathway_latency, normalized_pathway_latency),
edge_latency: Sketch.add(group.edge_latency, normalized_edge_latency)
}
end

@doc """
Updates an existing `#{__MODULE__}` with latency data, or creates a new
`#{__MODULE__}` if one can not be found.
"""
@spec upsert(%{required(non_neg_integer()) => t()}, Aggregator.Point.t(), (t() -> t())) :: %{
required(non_neg_integer()) => t()
}
def upsert(groups, point, fun) do
new_group =
groups
|> Map.get(point.hash, new(point))
|> fun.()

Map.put(groups, point.hash, new_group)
end
end
Loading

0 comments on commit bfc6a0b

Please sign in to comment.