Skip to content

Commit

Permalink
Merge pull request #201 from coryodaniel/mint-genserver
Browse files Browse the repository at this point in the history
A GenServer for Mint connections
  • Loading branch information
mruoss authored Dec 31, 2022
2 parents 5917729 + 5b846e5 commit 6d26091
Show file tree
Hide file tree
Showing 31 changed files with 1,298 additions and 490 deletions.
4 changes: 1 addition & 3 deletions .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@
{Credo.Check.Refactor.NegatedConditionsInUnless, []},
{Credo.Check.Refactor.NegatedConditionsWithElse, []},
{Credo.Check.Refactor.Nesting, []},
{Credo.Check.Refactor.PipeChainStart,
[excluded_argument_types: [:atom, :binary, :fn, :keyword], excluded_functions: []]},
{Credo.Check.Refactor.UnlessWithElse, []},

#
Expand Down Expand Up @@ -146,7 +144,7 @@
# Custom checks can be created using `mix credo.gen.check`.
#

#
#
# Deprecated after 1.10.4
#
{Credo.Check.Refactor.MapInto, false},
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
k8s_version: [latest]
k8s_version: [v1.26.0-k3s1]
elixir: [1.14.x]
otp: [25.x]
steps:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/k8s_matrix.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
k8s_version: [v1.21.9-k3s1, v1.22.6-k3s1, latest]
otp: [25.x] # with 24.3.0 hackney returns :checkout_failure
k8s_version: [v1.24.9-k3s1, v1.25.5-k3s1, v1.26.0-k3s1]
otp: [25.x]
elixir: [1.14.x]
steps:
- uses: debianmaster/[email protected]
Expand Down
18 changes: 14 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.0.0-rc.2] -

### Added

- `K8s.Client.Mint.HTTPAdapter` - A GenServer handling `Mint.HTTP` connections.

## [2.0.0-rc.1] - 2022-12-19

### Fixed
Expand All @@ -26,6 +32,8 @@ projects to this version.
- `K8s.Selector.label_not/N`, `K8s.Selector.field/N` and `K8s.Selector.field_not/N` - Support for field selectors ([#117](https://github.com/coryodaniel/k8s/pull/117))
- `K8s.Client.Provider.stream/5` callback was added to the behaviour
- `K8s.Client.Runner.Base.stream/3`
- `K8s.Client.Provider.stream_to/6` callback was added to the behaviour
- `K8s.Client.Runner.Base.stream_to/4`
- `K8s.Client.MintHTTPProvider` - The mint client implementation
- `K8s.Client.HTTPTestHelper` - to be used in tests (resides in `lib/` so it can be used by dependents)
- Open `:connect` operations (connections) now accept messages to be sent to pods
Expand All @@ -40,14 +48,16 @@ projects to this version.
### Removed

- `K8s.Client.HTTPProvider` was removed in favor of `K8s.Client.MintHTTPProvider`
- The `:stream_to` in `http_opts` is not supported anymore.
- The `:stream_to` in `http_opts` was removed in favor of `K8s.Client.stream_to/N` and `K8s.Client.stream/N`.
- `K8s.Client.DynamicWebSocketProvider` was removed. Use `K8s.Client.DynamcHTTPProvider.websocket*` functions instead .

### Breaking changes

- Tests using the `DynamicHTTPProvider` which work with `watch_and_stream` are going to need to be changed. The HTTP mocks now need to implement the `stream/5` callback. (See `K8s.Client.Runner.Watch.StreamTest` on this branch for examples)
- The `:stream_to` in `http_opts` is not supported anymore. Instead, [Elixir Streams](https://hexdocs.pm/elixir/1.14/Stream.html) are returned. Map over the returned stream to send messages to other processes.
- Tests using the `DynamicHTTPProvider` which work with `watch_and_stream` are going to need to be changed. The HTTP mocks now need to implement the `stream/5` callback. (See `K8s.Client.Runner.Watch.StreamTest` on this branch for examples)d.
- `K8s.Client.DynamicWebSocketProvider` was removed in favor of `K8s.Client.DynamcHTTPProvider.websocket*` functions.
- The `:stream_to` in `http_opts` is not supported anymore. Use `K8s.Client.stream/N` and `K8s.Client.stream_to/N` instead.
- Errors are encapsulated in `K8s.Client.HTTPError`
- `headers/1` callback was removed from `K8s.Client.Provider` behaviour
- `headers/1` callback was removed from `K8s.Client.Provider` behaviour.
- `K8s.Client.HTTPProvider` (HTTPoison implementation) was removed.
- `K8s.Client.watch/N` now returns a `:watch` or `:watch_all_namespaces` operation to be passed to `K8s.Client.stream/N`

Expand Down
23 changes: 7 additions & 16 deletions guides/migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,20 @@ release, there is now a `:watch` operation. `K8s.Client.watch/N` now is used
to create a `:watch` operation which has to be passed to `K8s.Client.stream/N`.
Also, the `stream_to` option was removed. See below.

### `stream_to` was removed
### Streaming to another process - The `stream_to` option was removed

Before `2.0.0`, you could pass the `stream_to` option to stream packets to a
process. With the migration from HTTPoison to Mint, this option was removed.
It was very low-level and revealed implementation details.

To stream packest to a different process, use `K8s.Client.stream/N` to retrieve
an Elixir Stream. Map over that stream and send the packets to a process.
For `:connect` operations, `K8s.Client.stream_to/N` can be used as a replacement.
Other operations will have to be streamed using `K8s.Client.stream/N`.

#### Example

```elixir
parent_pid = self()
Task.async(fn ->
{:ok, stream} =
K8s.Client.watch("v1", "ConfigMap", namespace: "default")
|> K8s.Client.put_conn(conn)
|> K8s.Client.stream()

stream
|> Stream.map(&send(parent_pid, &1))
|> Stream.run()
end)
:ok =
K8s.Client.watch("v1", "ConfigMap", namespace: "default")
|> K8s.Client.put_conn(conn)
|> K8s.Client.stream_to(self())

receive do
event -> IO.inspect(event)
Expand Down
15 changes: 15 additions & 0 deletions lib/k8s/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule K8s.Application do
@moduledoc false

use Application
@impl true
def start(_type, _args) do
children = [
K8s.Client.Mint.ConnectionRegistry,
{DynamicSupervisor, name: K8s.Client.Mint.ConnectionSupervisor, strategy: :one_for_one}
]

opts = [strategy: :one_for_one, name: K8s.Supervisor]
Supervisor.start_link(children, opts)
end
end
11 changes: 11 additions & 0 deletions lib/k8s/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule K8s.Client do
force: true
}

alias K8s.Client.Runner.StreamTo
alias K8s.Operation
alias K8s.Client.Runner.{Async, Base, Stream, Wait}

Expand Down Expand Up @@ -56,6 +57,7 @@ defmodule K8s.Client do
@doc "alias of `K8s.Client.Runner.Wait.run/3`"
defdelegate wait_until(conn, operation, wait_opts), to: Wait, as: :run

@doc "alias of `K8s.Client.Runner.Stream.run/1`"
defdelegate stream(operation), to: Stream, as: :run

@doc "alias of `K8s.Client.Runner.Stream.run/2`"
Expand All @@ -64,6 +66,15 @@ defmodule K8s.Client do
@doc "alias of `K8s.Client.Runner.Stream.run/3`"
defdelegate stream(conn, operation, http_opts), to: Stream, as: :run

@doc "alias of `K8s.Client.Runner.StreamTo.run/2`"
defdelegate stream_to(operation, stream_to), to: StreamTo, as: :run

@doc "alias of `K8s.Client.Runner.StreamTo.run/3`"
defdelegate stream_to(conn, operation, stream_to), to: StreamTo, as: :run

@doc "alias of `K8s.Client.Runner.StreamTo.run/4`"
defdelegate stream_to(conn, operation, http_opts, stream_to), to: StreamTo, as: :run

@doc """
Returns a `PATCH` operation to server-side-apply the given resource.
Expand Down
16 changes: 16 additions & 0 deletions lib/k8s/client/dynamic_http_provider.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ defmodule K8s.Client.DynamicHTTPProvider do
locate_and_apply(:stream, [method, url, body, headers, opts])
end

@doc """
Dispatch `stream_to/6` to the module registered in the current process or any ancestor.
"""
@impl true
def stream_to(method, url, body, headers, opts, stream_to) do
locate_and_apply(:stream_to, [method, url, body, headers, opts, stream_to])
end

@doc """
Dispatch `request/5` to the module registered in the current process or any ancestor.
"""
Expand All @@ -73,6 +81,14 @@ defmodule K8s.Client.DynamicHTTPProvider do
locate_and_apply(:websocket_stream, [url, headers, opts])
end

@doc """
Dispatch `request/5` to the module registered in the current process or any ancestor.
"""
@impl true
def websocket_stream_to(url, headers, opts, stream_to) do
locate_and_apply(:websocket_stream_to, [url, headers, opts, stream_to])
end

@spec locate_and_apply(atom(), list()) :: K8s.Client.Provider.response_t()
defp locate_and_apply(func, args) do
case locate(self()) do
Expand Down
5 changes: 5 additions & 0 deletions lib/k8s/client/http_error.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ defmodule K8s.Client.HTTPError do
@spec new(keyword()) :: t()
def new(fields), do: struct!(__MODULE__, fields)

@spec from_exception(Exception.t()) :: t()
def from_exception(exception) do
new(message: Exception.message(exception), adapter_specific_error: exception)
end

@spec message(__MODULE__.t()) :: String.t()
def message(%__MODULE__{message: message}), do: message
end
8 changes: 4 additions & 4 deletions lib/k8s/client/http_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ defmodule K8s.Client.HTTPStream do

@type to_lines_t :: %{remainder: binary()}

@spec transform_to_lines(Enumerable.t(Provider.stream_chunk_t())) ::
Enumerable.t(Provider.stream_chunk_t() | {:line, binary()})
@spec transform_to_lines(Enumerable.t(Provider.http_chunk_t())) ::
Enumerable.t(Provider.http_chunk_t() | {:line, binary()})
def transform_to_lines(stream) do
stream
|> Stream.transform(%{remainder: ""}, fn
Expand Down Expand Up @@ -39,8 +39,8 @@ defmodule K8s.Client.HTTPStream do
{resp, Map.put(state, :remainder, remainder)}
end

@spec decode_json_objects(Enumerable.t(Provider.stream_chunk_t())) ::
Enumerable.t(Provider.stream_chunk_t() | {:object, binary()})
@spec decode_json_objects(Enumerable.t(Provider.http_chunk_t())) ::
Enumerable.t(Provider.http_chunk_t() | {:object, binary()})
def decode_json_objects(stream) do
stream
|> transform_to_lines()
Expand Down
63 changes: 63 additions & 0 deletions lib/k8s/client/mint/connection_registry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule K8s.Client.Mint.ConnectionRegistry do
@moduledoc """
Opens `Mint.HTTP2` connections and registers them in the GenServer state.
"""

use GenServer

alias K8s.Client.HTTPError
alias K8s.Client.Mint.HTTPAdapter

@type uriopts :: {URI.t(), keyword()}

@doc """
Starts the registry.
"""
@spec start_link(any) :: GenServer.on_start()
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end

@doc """
Ensures there is an adapter associated with the given `key`.
"""
@spec get(uriopts()) :: {:ok, pid()}
def get({uri, opts}) do
GenServer.call(__MODULE__, {:get_or_open, HTTPAdapter.connection_args(uri, opts)})
end

@impl true
def init(:ok) do
adapters = %{}
refs = %{}
{:ok, {adapters, refs}}
end

@impl true
def handle_call({:get_or_open, key}, _from, {adapters, refs}) do
if Map.has_key?(adapters, key) do
{:reply, {:ok, Map.get(adapters, key)}, {adapters, refs}}
else
case DynamicSupervisor.start_child(
K8s.Client.Mint.ConnectionSupervisor,
{HTTPAdapter, key}
) do
{:ok, adapter} ->
ref = Process.monitor(adapter)
refs = Map.put(refs, ref, adapter)
adapters = Map.put(adapters, key, adapter)
{:reply, {:ok, adapter}, {adapters, refs}}

{:error, error} ->
{:reply, {:error, HTTPError.from_exception(error)}, {adapters, refs}}
end
end
end

@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, {adapters, refs}) do
{key, refs} = Map.pop(refs, ref)
adapters = Map.delete(adapters, key)
{:noreply, {adapters, refs}}
end
end
Loading

0 comments on commit 6d26091

Please sign in to comment.