From b74a384a93b41a5e66da8b42c7284ebc1fa75793 Mon Sep 17 00:00:00 2001 From: Michael Ruoss Date: Thu, 2 Mar 2023 12:04:15 +0100 Subject: [PATCH 1/2] make sure objects with message fields are processed --- lib/k8s/client/runner/stream/watch.ex | 36 +++++++++++--------- test/k8s/client/runner/stream/watch_test.exs | 13 +++++-- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/lib/k8s/client/runner/stream/watch.ex b/lib/k8s/client/runner/stream/watch.ex index e6db7d9c..c6d211bd 100644 --- a/lib/k8s/client/runner/stream/watch.ex +++ b/lib/k8s/client/runner/stream/watch.ex @@ -167,19 +167,6 @@ defmodule K8s.Client.Runner.Stream.Watch do {[], new_state} end - defp process_object(%{"object" => %{"message" => message} = object}, state) do - Logger.error( - log_prefix( - "Erronous event received from watcher: #{message} - resetting the resource version" - ), - library: :k8s, - object: object - ) - - new_state = struct!(state, resource_version: nil) - {[], new_state} - end - defp process_object(%{"type" => "BOOKMARK", "object" => object}, state) do Logger.debug( log_prefix("Bookmark received"), @@ -192,17 +179,32 @@ defmodule K8s.Client.Runner.Stream.Watch do defp process_object( %{"object" => %{"metadata" => %{"resourceVersion" => resource_version}}}, - state - ) - when resource_version == state.resource_version do + %{resource_version: resource_version} = state + ) do + # resource version already obeserved. {[], state} end - defp process_object(%{"object" => object} = new_event, state) do + defp process_object(%{"object" => %{"kind" => _} = object} = new_event, state) do + # Emit new event new_state = struct!(state, resource_version: object["metadata"]["resourceVersion"]) {[new_event], new_state} end + defp process_object(%{"object" => %{"message" => message} = object}, state) do + # Objects with only the "message" field but no "kind" are cosidered errors. + Logger.error( + log_prefix( + "Erronous event received from watcher: #{message} - resetting the resource version" + ), + library: :k8s, + object: object + ) + + new_state = struct!(state, resource_version: nil) + {[], new_state} + end + @spec get_resource_version(K8s.Conn.t(), K8s.Operation.t()) :: {:ok, binary} | Base.error_t() defp get_resource_version(%K8s.Conn{} = conn, %K8s.Operation{} = operation) do with {:ok, payload} <- Base.run(conn, operation) do diff --git a/test/k8s/client/runner/stream/watch_test.exs b/test/k8s/client/runner/stream/watch_test.exs index 67ebf5d7..2f295787 100644 --- a/test/k8s/client/runner/stream/watch_test.exs +++ b/test/k8s/client/runner/stream/watch_test.exs @@ -292,6 +292,14 @@ defmodule K8s.Client.Runner.Stream.WatchTest do "metadata" => %{"resourceVersion" => "11"} } }), + HTTPTestHelper.stream_object(%{ + "type" => "MODIFIED", + "object" => %{ + "apiVersion" => "apps/v1", + "kind" => "ReplicaSet", + "message" => "some message" + } + }), HTTPTestHelper.stream_object(%{ "type" => "ERROR", "object" => %{ @@ -535,10 +543,11 @@ defmodule K8s.Client.Runner.Stream.WatchTest do events = stream - |> Stream.take(4) + |> Stream.take(6) |> Enum.to_list() - assert ["ADDED", "DELETED", "ADDED", "DELETED"] == Enum.map(events, & &1["type"]) + assert ["ADDED", "MODIFIED", "DELETED", "ADDED", "MODIFIED", "DELETED"] == + Enum.map(events, & &1["type"]) end assert capture_log(test) =~ "Erronous event received" From d78558fa3801ff2b6fafd51b8067cb8a293df67c Mon Sep 17 00:00:00 2001 From: Michael Ruoss Date: Thu, 2 Mar 2023 12:06:27 +0100 Subject: [PATCH 2/2] add changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7a0999d..c214d610 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +### Fixed + +- Watcher reset resource version for objects with a "message" field. - [#232](https://github.com/coryodaniel/k8s/issues/232), [#231](https://github.com/coryodaniel/k8s/issues/231) + ## [2.1.0] - 2023-02-25