Skip to content

Commit

Permalink
captures/rescues 410 Gone inside chunk plus some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Ruoss committed May 13, 2022
1 parent 6a74b12 commit 6323a74
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 91 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Request BOOKMARK events and process them when watching resource collectons.
- Fix 410 Gone not rescued in `watch_and_stream/2`
- Request BOOKMARK events and process them when watching resource collections.

## [1.1.4] - 2022-03-15

Expand Down
127 changes: 69 additions & 58 deletions lib/k8s/client/runner/watch/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,12 @@ defmodule K8s.Client.Runner.Watch.Stream do
@spec resource(K8s.Conn.t(), K8s.Operation.t(), keyword()) :: Enumerable.t() | {:error, any()}
def resource(conn, operation, http_opts) do
Stream.resource(
fn -> {:start, %{conn: conn, operation: operation, http_opts: http_opts}} end,
fn -> {:start, %__MODULE__{conn: conn, operation: operation, http_opts: http_opts}} end,
&next_fun/1,
fn _state -> :ok end
)
end

@spec get_latest_rv_and_watch(K8s.Conn.t(), K8s.Operation.t(), keyword()) ::
{:ok, t()} | {:error, any()}
defp get_latest_rv_and_watch(conn, operation, http_opts) do
with {:ok, resource_version} <- Watch.get_resource_version(conn, operation) do
watch(conn, operation, resource_version, http_opts)
end
end

@spec watch(K8s.Conn.t(), K8s.Operation.t(), binary(), keyword()) ::
{:ok, t()} | {:error, any()}
defp watch(conn, operation, resource_version, http_opts) do
http_opts =
http_opts
|> Keyword.put_new(:params, [])
|> put_in([:params, :resourceVersion], resource_version)
|> put_in([:params, :allowWatchBookmarks], true)
|> put_in([:params, :watch], true)
|> Keyword.put(:stream_to, self())
|> Keyword.put(:async, :once)

with {:ok, ref} <- Base.run(conn, operation, http_opts) do
{:ok,
%__MODULE__{
resp: %HTTPoison.AsyncResponse{id: ref},
conn: conn,
operation: operation,
resource_version: resource_version,
http_opts: http_opts
}}
end
end

@docp """
Producing the next elements in the stream.
* If the accumulator is {:recv, state}, receives and processes events from the HTTPoison process
Expand All @@ -80,7 +48,7 @@ defmodule K8s.Client.Runner.Watch.Stream do
defp next_fun({:recv, %__MODULE__{} = state}) do
receive do
%HTTPoison.AsyncEnd{} ->
Logger.warn(
Logger.debug(
@log_prefix <> "AsyncEnd received - tryin to restart watcher",
library: :k8s
)
Expand All @@ -97,11 +65,12 @@ defmodule K8s.Client.Runner.Watch.Stream do

%HTTPoison.AsyncStatus{code: 410} ->
Logger.warn(
@log_prefix <> "410 Gone received from watcher - trying to restart",
@log_prefix <> "410 Gone received from watcher - resetting the resource version",
library: :k8s
)

{[], {:start, state}}
new_state = struct!(state, resource_version: nil)
{[], {:recv, new_state}}

%HTTPoison.AsyncStatus{code: _} = error ->
Logger.warn(
Expand All @@ -125,16 +94,7 @@ defmodule K8s.Client.Runner.Watch.Stream do
library: :k8s
)

%{
conn: conn,
operation: operation,
http_opts: http_opts,
resource_version: resource_version
} = state

new_http_opts = http_opts |> put_in([:params, :resourceVersion], resource_version)
{:ok, ref} = Base.run(conn, operation, new_http_opts)
{[], {:recv, %{state | resp: %HTTPoison.AsyncResponse{id: ref}}}}
{[], {:start, state}}

other ->
Logger.debug(
Expand All @@ -152,10 +112,41 @@ defmodule K8s.Client.Runner.Watch.Stream do
Tries to make new HTTPoison watcher request (self-healing)
"""
@spec next_fun({:start, map()}) :: {[map()], {:recv, t()}} | {:halt, nil}
defp next_fun({:start, %{conn: conn, operation: operation, http_opts: http_opts}}) do
case get_latest_rv_and_watch(conn, operation, http_opts) do
{:ok, state} ->
{[], {:recv, state}}
defp next_fun({:start, state}) do
%{conn: conn, operation: operation, http_opts: http_opts} = state

#  get latest resource version if it is missing in the state
resource_version =
case Map.get(state, :resource_version) do
nil ->
{:ok, resource_version} = Watch.get_resource_version(conn, operation)
resource_version

rv ->
rv
end

#  prepare http_opts
http_opts =
http_opts
|> Keyword.put_new(:params, [])
|> put_in([:params, :resourceVersion], resource_version)
|> put_in([:params, :allowWatchBookmarks], true)
|> put_in([:params, :watch], true)
|> Keyword.put(:stream_to, self())
|> Keyword.put(:async, :once)

#  start the watcher
case Base.run(conn, operation, http_opts) do
{:ok, ref} ->
new_state =
struct!(
state,
resp: %HTTPoison.AsyncResponse{id: ref},
resource_version: resource_version
)

{[], {:recv, new_state}}

error ->
Logger.error(
Expand Down Expand Up @@ -213,7 +204,35 @@ defmodule K8s.Client.Runner.Watch.Stream do

{events, acc}

{:ok, %{"type" => "ERROR", "object" => %{"message" => message, "code" => 410} = object}},
{events, {_, state}} ->
Logger.debug(
@log_prefix <> "#{message} - resetting the resource version",
library: :k8s,
object: object
)

new_state = struct!(state, resource_version: nil)
{events, {:recv, new_state}}

{:ok, %{"object" => %{"message" => message} = object}}, {events, {_, state}} ->
Logger.error(
@log_prefix <>
"Erronous event received from watcher: #{message} - resetting the resource version",
library: :k8s,
object: object
)

new_state = struct!(state, resource_version: nil)
{events, {:recv, new_state}}

{:ok, %{"type" => "BOOKMARK", "object" => object}}, {events, {:recv, state}} ->
Logger.debug(
@log_prefix <> "Bookmark received",
library: :k8s,
object: object
)

{events,
{:recv, %__MODULE__{state | resource_version: object["metadata"]["resourceVersion"]}}}

Expand All @@ -226,14 +245,6 @@ defmodule K8s.Client.Runner.Watch.Stream do
# new resource_version => append new event to the stream
{events ++ [new_event],
{:recv, %__MODULE__{state | resource_version: object["metadata"]["resourceVersion"]}}}

{:ok, %{"object" => %{"message" => message}}}, {events, {_, state}} ->
Logger.error(
@log_prefix <> "Erronous event received from watcher: #{message}",
library: :k8s
)

{events, {:start, state}}
end)
end
end
84 changes: 53 additions & 31 deletions test/k8s/client/watch/stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -51,35 +51,13 @@ defmodule K8s.Client.Runner.Watch.StreamTest do
end
end

def request(:get, "https://localhost:6443/api/v1/services", _body, _headers, opts) do
case get_in(opts, [:params, :watch]) do
true ->
assert "10" == get_in(opts, [:params, :resourceVersion])
pid = Keyword.fetch!(opts, :stream_to)
send(pid, %HTTPoison.AsyncEnd{})

send_object(pid, %{
"type" => "ADDED",
"object" => %{
"apiVersion" => "v1",
"kind" => "Service",
"metadata" => %{"resourceVersion" => "11"}
}
})

{:ok, %HTTPoison.AsyncResponse{id: make_ref()}}

nil ->
render(%{"metadata" => %{"resourceVersion" => "10"}})
end
end

def request(:get, "https://localhost:6443/api/v1/pods", _body, _headers, opts) do
case get_in(opts, [:params, :watch]) do
true ->
assert "10" == get_in(opts, [:params, :resourceVersion])
pid = Keyword.fetch!(opts, :stream_to)
send(pid, %HTTPoison.AsyncStatus{code: 410})
send(pid, %HTTPoison.AsyncEnd{})

send_object(pid, %{
"type" => "ADDED",
Expand Down Expand Up @@ -213,6 +191,8 @@ defmodule K8s.Client.Runner.Watch.StreamTest do
}
})

send(pid, %HTTPoison.AsyncEnd{})

send_object(pid, %{
"type" => "DELETED",
"object" => %{
Expand All @@ -229,6 +209,48 @@ defmodule K8s.Client.Runner.Watch.StreamTest do
end
end

def request(:get, "https://localhost:6443/api/v1/services", _body, _headers, opts) do
case get_in(opts, [:params, :watch]) do
true ->
case get_in(opts, [:params, :resourceVersion]) do
"10" ->
pid = Keyword.fetch!(opts, :stream_to)
send(pid, %HTTPoison.AsyncStatus{code: 200})

send_object(pid, %{
"type" => "BOOKMARK",
"object" => %{
"apiVersion" => "v1",
"kind" => "Service",
"metadata" => %{"resourceVersion" => "11"}
}
})

send(pid, %HTTPoison.AsyncEnd{})

{:ok, %HTTPoison.AsyncResponse{id: make_ref()}}

"11" ->
pid = Keyword.fetch!(opts, :stream_to)
send(pid, %HTTPoison.AsyncStatus{code: 200})

send_object(pid, %{
"type" => "ADDED",
"object" => %{
"apiVersion" => "v1",
"kind" => "Service",
"metadata" => %{"resourceVersion" => "12"}
}
})

{:ok, %HTTPoison.AsyncResponse{id: make_ref()}}
end

nil ->
render(%{"metadata" => %{"resourceVersion" => "10"}})
end
end

def request(_method, _url, _body, _headers, _opts) do
Logger.error("Call to #{__MODULE__}.request/5 not handled: #{inspect(binding())}")
{:error, %HTTPoison.Error{reason: "request not mocked"}}
Expand All @@ -255,9 +277,9 @@ defmodule K8s.Client.Runner.Watch.StreamTest do
end

@tag timeout: 1_000
test "Resumes the stream when AsyncEnd is sent", %{conn: conn} do
test "Resumes the stream when 410 Gone is sent", %{conn: conn} do
test = fn ->
operation = K8s.Client.list("v1", "Service")
operation = K8s.Client.list("v1", "Pod")

events =
MUT.resource(conn, operation, [])
Expand All @@ -268,24 +290,24 @@ defmodule K8s.Client.Runner.Watch.StreamTest do
assert ["ADDED", "ADDED", "ADDED", "ADDED"] == Enum.map(events, & &1["type"])
end

assert capture_log(test) =~ "AsyncEnd received"
assert capture_log(test) =~ "410 Gone received"
end

@tag timeout: 1_000
test "Resumes the stream when 410 Gone is sent", %{conn: conn} do
test "Updates the resource version when BOOKMARK event is received", %{conn: conn} do
test = fn ->
operation = K8s.Client.list("v1", "Pod")
operation = K8s.Client.list("v1", "Service")

events =
MUT.resource(conn, operation, [])
|> Stream.take(4)
|> Stream.take(1)
|> Enum.to_list()

#  goes on forever, but we only took 4
assert ["ADDED", "ADDED", "ADDED", "ADDED"] == Enum.map(events, & &1["type"])
assert ["ADDED"] == Enum.map(events, & &1["type"])
end

assert capture_log(test) =~ "410 Gone received"
assert capture_log(test) =~ "Bookmark received"
end

@tag timeout: 1_000
Expand Down
9 changes: 8 additions & 1 deletion test/k8s/discovery/driver/file_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@ defmodule K8s.Discovery.Driver.FileTest do

resource_names = Enum.map(resources, & &1["name"])
sorted_resource_names = Enum.sort(resource_names)
assert sorted_resource_names == ["namespaces", "pods", "pods/eviction", "services"]

assert sorted_resource_names == [
"configmaps",
"namespaces",
"pods",
"pods/eviction",
"services"
]
end
end

Expand Down
15 changes: 15 additions & 0 deletions test/support/discovery/example.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@
"namespaced": false,
"verbs": ["create", "delete", "get", "list", "patch", "update", "watch"]
},
{
"kind": "ConfigMap",
"name": "configmaps",
"namespaced": true,
"verbs": [
"create",
"delete",
"deletecollection",
"get",
"list",
"patch",
"update",
"watch"
]
},
{
"kind": "Service",
"name": "services",
Expand Down

0 comments on commit 6323a74

Please sign in to comment.