Skip to content

Commit

Permalink
feat: add container id to transport headers (#12)
Browse files Browse the repository at this point in the history
Matches the upstream data-streams-go library implementation here:
DataDog/data-streams-go#29
  • Loading branch information
btkostner authored May 23, 2023
1 parent 849b46a commit 4d58826
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/datadog/data_streams/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Datadog.DataStreams.Application do
def start(_type, _args) do
children = [
{Finch, name: Datadog.Finch},
Datadog.DataStreams.Container,
Datadog.DataStreams.Aggregator
]

Expand Down
76 changes: 76 additions & 0 deletions lib/datadog/data_streams/container.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
defmodule Datadog.DataStreams.Container do
@moduledoc """
Logic for linking current running container id to data stream traces.
"""

use Agent

# the path to the cgroup file where we can find the container id if one exists.
@cgroup_path "/proc/self/cgroup"

@uuid_source Regex.source(
~r/[0-9a-f]{8}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{12}|[0-9a-f]{8}(?:-[0-9a-f]{4}){4}$/
)
@container_source Regex.source(~r/[0-9a-f]{64}/)
@task_source Regex.source(~r/[0-9a-f]{32}-\d+/)

@exp_line ~r/^\d+:[^:]*:(.+)$/
@exp_container_id Regex.compile!(
"(#{@uuid_source}|#{@container_source}|#{@task_source})(?:.scope)?$"
)

@doc """
Starts the agent and stores the current container id in memory.
"""
def start_link(_opts) do
Agent.start_link(&read_container_id/0, name: __MODULE__)
end

@doc """
Returns the current container id.
"""
@spec get() :: String.t() | nil
def get() do
Agent.get(__MODULE__, & &1)
end

@doc """
Attempts to return the container id from the cgroup path (`#{@cgroup_path}`). Empty on failure.
"""
@spec read_container_id() :: String.t() | nil
def read_container_id(),
do: read_container_id(@cgroup_path)

@doc """
Attempts to return the container id from the provided file path. Empty on failure.
"""
@spec read_container_id(String.t()) :: String.t() | nil
def read_container_id(file) do
file
|> File.stream!()
|> parse_container_id()
catch
_ -> nil
end

@doc """
Attempts to return the container id from the provided file stream. Empty on failure.
"""
def parse_container_id(stream) do
stream
|> Stream.map(&parse_line/1)
|> Stream.filter(fn value -> not is_nil(value) end)
|> Enum.at(0)
catch
_ -> nil
end

defp parse_line(line) do
with [_part_one, part_two] <- Regex.run(@exp_line, line),
[_part_one, container_id] <- Regex.run(@exp_container_id, part_two) do
container_id
else
_ -> nil
end
end
end
16 changes: 14 additions & 2 deletions lib/datadog/data_streams/transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Datadog.DataStreams.Transport do
library for requests.
"""

alias Datadog.DataStreams.Config
alias Datadog.DataStreams.{Config, Container}

@headers [
{"Content-Type", "application/msgpack"},
Expand All @@ -27,7 +27,12 @@ defmodule Datadog.DataStreams.Transport do
@spec send_pipeline_stats(binary) :: :ok | {:error, any()}
def send_pipeline_stats(stats) do
request =
Finch.build(:post, Config.agent_url("/v0.1/pipeline_stats"), @headers, :zlib.gzip(stats))
Finch.build(
:post,
Config.agent_url("/v0.1/pipeline_stats"),
request_headers(),
:zlib.gzip(stats)
)

case request |> Finch.request(Datadog.Finch) |> handle_response() do
{:ok, %Finch.Response{status: 202, body: %{"acknowledged" => true}}} -> :ok
Expand All @@ -39,6 +44,13 @@ defmodule Datadog.DataStreams.Transport do
end
end

defp request_headers() do
case Container.get() do
nil -> @headers
container_id -> [{"Datadog-Container-ID", container_id}] ++ @headers
end
end

defp handle_response({:error, error}),
do: {:error, error}

Expand Down
165 changes: 165 additions & 0 deletions test/datadog/data_streams/container_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
defmodule Datadog.DataStreams.ContainerTest do
# Test samples were taken from the original data-streams-go PR
# https://github.com/DataDog/data-streams-go/pull/29/files#diff-024e3d4ff20badf054922b05099e1f8c8bfb9b562ce2ccf9bfdb5c5378432b19

use ExUnit.Case, async: true

alias Datadog.DataStreams.Container

test "parse_container_id/1 can parse a stream (example 1)" do
file = ~s"""
other_line
10:hugetlb:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
9:cpuset:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
8:pids:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
7:freezer:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
6:cpu,cpuacct:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
5:perf_event:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
4:blkio:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
3:devices:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
2:net_cls,net_prio:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
"""

{:ok, stream} = StringIO.open(file)

assert Container.parse_container_id(IO.binstream(stream, :line)) ===
"8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa"
end

test "parse_container_id/1 can parse a stream (example 2)" do
file = ~s"""
10:hugetlb:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
"""

{:ok, stream} = StringIO.open(file)

assert Container.parse_container_id(IO.binstream(stream, :line)) ===
"8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa"
end

test "parse_container_id/1 can parse a stream (example 3)" do
file = ~s"""
10:hugetlb:/kubepods
"""

{:ok, stream} = StringIO.open(file)

assert Container.parse_container_id(IO.binstream(stream, :line)) === nil
end

test "parse_container_id/1 can parse a stream (example 4)" do
file = ~s"""
11:hugetlb:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
"""

{:ok, stream} = StringIO.open(file)

assert Container.parse_container_id(IO.binstream(stream, :line)) ===
"432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"
end

test "parse_container_id/1 can parse a stream (example 5)" do
file = ~s"""
1:name=systemd:/docker/34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376
"""

{:ok, stream} = StringIO.open(file)

assert Container.parse_container_id(IO.binstream(stream, :line)) ===
"34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376"
end

test "parse_container_id/1 can parse a stream (example 6)" do
file = ~s"""
1:name=systemd:/uuid/34dc0b5e-626f-2c5c-4c51-70e34b10e765
"""

{:ok, stream} = StringIO.open(file)

assert Container.parse_container_id(IO.binstream(stream, :line)) ===
"34dc0b5e-626f-2c5c-4c51-70e34b10e765"
end

test "parse_container_id/1 can parse a stream (example 7)" do
file = ~s"""
1:name=systemd:/ecs/34dc0b5e626f2c5c4c5170e34b10e765-1234567890
"""

{:ok, stream} = StringIO.open(file)

assert Container.parse_container_id(IO.binstream(stream, :line)) ===
"34dc0b5e626f2c5c4c5170e34b10e765-1234567890"
end

test "parse_container_id/1 can parse a stream (example 8)" do
file = ~s"""
1:name=systemd:/docker/34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376.scope
"""

{:ok, stream} = StringIO.open(file)

assert Container.parse_container_id(IO.binstream(stream, :line)) ===
"34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376"
end

test "parse_container_id/1 can parse a stream (example 9)" do
file = ~s"""
1:name=systemd:/nope
2:pids:/docker/34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376
3:cpu:/invalid
"""

{:ok, stream} = StringIO.open(file)

assert Container.parse_container_id(IO.binstream(stream, :line)) ===
"34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376"
end

test "parse_container_id/1 can parse a stream (example 10)" do
file = ~s"""
other_line
12:memory:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1
11:rdma:/
10:freezer:/garden/6f265890-5165-7fab-6b52-18d1
9:hugetlb:/garden/6f265890-5165-7fab-6b52-18d1
8:pids:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1
7:perf_event:/garden/6f265890-5165-7fab-6b52-18d1
6:cpu,cpuacct:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1
5:net_cls,net_prio:/garden/6f265890-5165-7fab-6b52-18d1
4:cpuset:/garden/6f265890-5165-7fab-6b52-18d1
3:blkio:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1
2:devices:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1
1:name=systemd:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1
"""

{:ok, stream} = StringIO.open(file)

assert Container.parse_container_id(IO.binstream(stream, :line)) ===
"6f265890-5165-7fab-6b52-18d1"
end

test "parse_container_id/1 can parse a stream (example 11)" do
file = ~s"""
1:name=systemd:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1
"""

{:ok, stream} = StringIO.open(file)

assert Container.parse_container_id(IO.binstream(stream, :line)) ===
"6f265890-5165-7fab-6b52-18d1"
end

@tag :tmp_dir
test "read_container_id/1 can parse a file", %{tmp_dir: tmp_dir} do
cid = "8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa"

cgroup_contents =
"10:hugetlb:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/" <> cid

file_path = Path.join(tmp_dir, "fake-cgroup")

File.write!(file_path, cgroup_contents)

assert ^cid = Container.read_container_id(file_path)
end
end

0 comments on commit 4d58826

Please sign in to comment.