Skip to content

Commit

Permalink
Add elixir client request pool behaviour (#2138)
Browse files Browse the repository at this point in the history
Allows for a persistent connection per client rather than the default
request coalescing behaviour.

Also expand replication stream messages with the timestamp of the actual
HTTP request and the handle of the shape.

Basically enhancements for the load generation client to enable stats
collection and improve the resume/reconnect behaviour.
  • Loading branch information
magnetised authored Dec 11, 2024
1 parent 3e53a45 commit 71b8ab2
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 94 deletions.
5 changes: 5 additions & 0 deletions .changeset/fresh-beers-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/elixir-client": patch
---

Add pool behaviour for the Elixir client to allow for per-client persistent connections. Add request timestamp and shape handle to replication stream messages.
10 changes: 8 additions & 2 deletions packages/elixir-client/lib/electric/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ defmodule Electric.Client do
:endpoint,
:database_id,
:fetch,
:authenticator
:authenticator,
:pool
]

@api_endpoint_path "/v1/shape"
Expand All @@ -178,6 +179,11 @@ defmodule Electric.Client do
type: :mod_arg,
default: {Client.Authenticator.Unauthenticated, []},
doc: false
],
pool: [
type: :mod_arg,
default: {Electric.Client.Fetch.Pool, []},
doc: false
]
)

Expand Down Expand Up @@ -396,7 +402,7 @@ defmodule Electric.Client do
"""
def delete_shape(%Client{} = client, %ShapeDefinition{} = shape) do
request = request(client, method: :delete, shape: shape)
Electric.Client.Fetch.Request.request(client, request)
Electric.Client.Fetch.request(client, request, [])
end

defp validate_queryable!(queryable) when is_atom(queryable) do
Expand Down
11 changes: 11 additions & 0 deletions packages/elixir-client/lib/electric/client/fetch.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
defmodule Electric.Client.Fetch do
alias Electric.Client.Fetch.{Request, Response}
alias Electric.Client

@callback fetch(Request.t(), Keyword.t()) ::
{:ok, Response.t()}
| {:error, Response.t() | term()}

@behaviour Electric.Client.Fetch.Pool

def request(client, request, opts \\ [])

@impl Electric.Client.Fetch.Pool
def request(%Client{} = client, %Request{} = request, _opts) do
%{pool: {module, opts}} = client
apply(module, :request, [client, request, opts])
end
end
9 changes: 5 additions & 4 deletions packages/elixir-client/lib/electric/client/fetch/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ defmodule Electric.Client.Fetch.HTTP do
end

defp request(request) do
request |> Req.request() |> wrap_resp()
now = DateTime.utc_now()
request |> Req.request() |> wrap_resp(now)
end

defp wrap_resp({:ok, %Req.Response{} = resp}) do
defp wrap_resp({:ok, %Req.Response{} = resp}, timestamp) do
%{status: status, headers: headers, body: body} = resp
{:ok, Fetch.Response.decode!(status, headers, body)}
{:ok, Fetch.Response.decode!(status, headers, body, timestamp)}
end

defp wrap_resp({:error, _} = error) do
defp wrap_resp({:error, _} = error, _timestamp) do
error
end

Expand Down
70 changes: 70 additions & 0 deletions packages/elixir-client/lib/electric/client/fetch/pool.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
defmodule Electric.Client.Fetch.Pool do
@moduledoc """
Coaleses requests so that multiple client instances making the same
(potentially long-polling) request will all use the same request process.
"""

alias Electric.Client
alias Electric.Client.Fetch

require Logger

@callback request(Client.t(), Fetch.Request.t(), opts :: Keyword.t()) ::
Fetch.Response.t() | {:error, Fetch.Response.t() | term()}

@behaviour __MODULE__

@impl Electric.Client.Fetch.Pool
def request(%Client{} = client, %Fetch.Request{} = request, opts) do
request_id = request_id(client, request)

# register this pid before making the request to avoid race conditions for
# very fast responses
{:ok, monitor_pid} = start_monitor(request_id)

try do
ref = Fetch.Monitor.register(monitor_pid, self())

{:ok, _request_pid} = start_request(request_id, request, client, monitor_pid)

Fetch.Monitor.wait(ref)
catch
:exit, {reason, _} ->
Logger.debug(fn ->
"Request process ended with reason #{inspect(reason)} before we could register. Re-attempting."
end)

request(client, request, opts)
end
end

defp start_request(request_id, request, client, monitor_pid) do
DynamicSupervisor.start_child(
Electric.Client.RequestSupervisor,
{Fetch.Request, {request_id, request, client, monitor_pid}}
)
|> return_existing()
end

defp start_monitor(request_id) do
DynamicSupervisor.start_child(
Electric.Client.RequestSupervisor,
{Electric.Client.Fetch.Monitor, request_id}
)
|> return_existing()
end

defp return_existing({:ok, pid}), do: {:ok, pid}
defp return_existing({:error, {:already_started, pid}}), do: {:ok, pid}
defp return_existing(error), do: error

defp request_id(%Client{fetch: {fetch_impl, _}}, %Fetch.Request{shape_handle: nil} = request) do
%{endpoint: endpoint, shape: shape_definition} = request
{fetch_impl, URI.to_string(endpoint), shape_definition}
end

defp request_id(%Client{fetch: {fetch_impl, _}}, %Fetch.Request{} = request) do
%{endpoint: endpoint, offset: offset, live: live, shape_handle: shape_handle} = request
{fetch_impl, URI.to_string(endpoint), shape_handle, Client.Offset.to_tuple(offset), live}
end
end
70 changes: 14 additions & 56 deletions packages/elixir-client/lib/electric/client/fetch/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Electric.Client.Fetch.Request do
require Logger

defstruct [
:stream_id,
:endpoint,
:database_id,
:shape_handle,
Expand All @@ -28,6 +29,7 @@ defmodule Electric.Client.Fetch.Request do
@type headers :: %{String.t() => [String.t()] | String.t()}

fields = [
stream_id: quote(do: term()),
method: quote(do: :get | :head | :delete),
endpoint: quote(do: URI.t()),
offset: quote(do: Electric.Client.Offset.t()),
Expand Down Expand Up @@ -62,21 +64,21 @@ defmodule Electric.Client.Fetch.Request do
{:via, Registry, {Electric.Client.Registry, {__MODULE__, request_id}}}
end

defp request_id(%Client{fetch: {fetch_impl, _}}, %__MODULE__{shape_handle: nil} = request) do
%{endpoint: endpoint, shape: shape_definition} = request
{fetch_impl, URI.to_string(endpoint), shape_definition}
end

defp request_id(%Client{fetch: {fetch_impl, _}}, %__MODULE__{} = request) do
%{endpoint: endpoint, offset: offset, live: live, shape_handle: shape_handle} = request
{fetch_impl, URI.to_string(endpoint), shape_handle, Offset.to_tuple(offset), live}
end

@doc """
Returns the URL for the Request.
"""
@spec url(t()) :: binary()
def url(%__MODULE__{} = request, opts \\ []) do
request
|> uri(opts)
|> URI.to_string()
end

@doc """
Returns the %URI{} for the Request.
"""
@spec uri(t()) :: URI.t()
def uri(%__MODULE__{} = request, opts \\ []) do
%{endpoint: endpoint} = request

if Keyword.get(opts, :query, true) do
Expand All @@ -89,9 +91,9 @@ defmodule Electric.Client.Fetch.Request do
|> List.keysort(0)
|> URI.encode_query(:rfc3986)

URI.to_string(%{endpoint | query: query})
%{endpoint | query: query}
else
URI.to_string(endpoint)
endpoint
end
end

Expand Down Expand Up @@ -119,50 +121,6 @@ defmodule Electric.Client.Fetch.Request do
|> Util.map_put_if("database_id", database_id, !is_nil(database_id))
end

@doc false
def request(%Client{} = client, %__MODULE__{} = request) do
request_id = request_id(client, request)

# register this pid before making the request to avoid race conditions for
# very fast responses
{:ok, monitor_pid} = start_monitor(request_id)

try do
ref = Fetch.Monitor.register(monitor_pid, self())

{:ok, _request_pid} = start_request(request_id, request, client, monitor_pid)

Fetch.Monitor.wait(ref)
catch
:exit, {reason, _} ->
Logger.debug(fn ->
"Request process ended with reason #{inspect(reason)} before we could register. Re-attempting."
end)

request(client, request)
end
end

defp start_request(request_id, request, client, monitor_pid) do
DynamicSupervisor.start_child(
Electric.Client.RequestSupervisor,
{__MODULE__, {request_id, request, client, monitor_pid}}
)
|> return_existing()
end

defp start_monitor(request_id) do
DynamicSupervisor.start_child(
Electric.Client.RequestSupervisor,
{Electric.Client.Fetch.Monitor, request_id}
)
|> return_existing()
end

defp return_existing({:ok, pid}), do: {:ok, pid}
defp return_existing({:error, {:already_started, pid}}), do: {:ok, pid}
defp return_existing(error), do: error

@doc false
def child_spec({request_id, _request, _client, _monitor_pid} = args) do
%{
Expand Down
21 changes: 18 additions & 3 deletions packages/elixir-client/lib/electric/client/fetch/response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Electric.Client.Fetch.Response do
:shape_handle,
:schema,
:next_cursor,
:request_timestamp,
body: [],
headers: %{}
]
Expand All @@ -18,19 +19,33 @@ defmodule Electric.Client.Fetch.Response do
last_offset: nil | Client.Offset.t(),
shape_handle: nil | Client.shape_handle(),
schema: nil | Client.schema(),
next_cursor: nil | Client.cursor()
next_cursor: nil | Client.cursor(),
request_timestamp: DateTime.t()
}

@doc false
def decode!(status, headers, body) when is_integer(status) and is_map(headers) do
@spec decode!(t()) :: t()
def decode!(%__MODULE__{headers: headers} = resp) do
resp
|> Map.put(:shape_handle, decode_shape_handle(headers))
|> Map.put(:last_offset, decode_offset(headers))
|> Map.put(:schema, decode_schema(headers))
|> Map.put(:next_cursor, decode_next_cursor(headers))
end

@doc false
@spec decode!(pos_integer(), %{optional(binary()) => binary()}, [term()], DateTime.t()) :: t()
def decode!(status, headers, body, timestamp \\ DateTime.utc_now())
when is_integer(status) and is_map(headers) do
%__MODULE__{
status: status,
headers: decode_headers(headers),
body: body,
shape_handle: decode_shape_handle(headers),
last_offset: decode_offset(headers),
schema: decode_schema(headers),
next_cursor: decode_next_cursor(headers)
next_cursor: decode_next_cursor(headers),
request_timestamp: timestamp
}
end

Expand Down
Loading

0 comments on commit 71b8ab2

Please sign in to comment.