Skip to content

Commit

Permalink
Refactor electric connection configuration (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
magnetised authored Apr 19, 2023
1 parent 5d56e80 commit 43741b7
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 132 deletions.
37 changes: 23 additions & 14 deletions lib/electric/postgres/postgres_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ defmodule Electric.Replication.PostgresConnectorMng do
alias Electric.Postgres.SchemaRegistry
alias Electric.Replication.Postgres.Client
alias Electric.Replication.PostgresConnector
alias Electric.Replication.Connectors

@behaviour GenServer
require Logger

@type origin :: PostgresConnector.origin()

@update_migration "INSERT INTO electric.migrations (version, hash) VALUES ($1, $2);"
@select_migration "SELECT (version, hash) FROM electric.migrations WHERE version = $1;"

defmodule State do
defstruct [:state, :conn_config, :repl_config, :backoff, :origin]
defstruct [:state, :conn_config, :repl_config, :backoff, :origin, :config]

@type t() :: %__MODULE__{
config: Connectors.config(),
backoff: term,
conn_config: %{},
origin: PostgresConnector.origin(),
origin: Connectors.origin(),
repl_config: %{
publication: String.t(),
slot: String.t(),
Expand All @@ -30,12 +30,19 @@ defmodule Electric.Replication.PostgresConnectorMng do
}
end

@spec start_link(origin()) :: {:ok, pid} | :ignore | {:error, term}
def start_link(origin) do
GenServer.start_link(__MODULE__, origin, [])
@spec start_link(Connectors.config()) :: {:ok, pid} | :ignore | {:error, term}
def start_link(conn_config) do
GenServer.start_link(__MODULE__, conn_config, [])
end

@spec name(Connectors.config()) :: Electric.reg_name()
def name(config) when is_list(config) do
config
|> Connectors.origin()
|> name()
end

@spec name(origin()) :: Electric.reg_name()
@spec name(Connectors.origin()) :: Electric.reg_name()
def name(origin) do
Electric.name(__MODULE__, origin)
end
Expand All @@ -46,28 +53,30 @@ defmodule Electric.Replication.PostgresConnectorMng do
Take into consideration that vsn validation is outside of the scope
of this function
"""
@spec migrate(origin(), String.t()) :: :ok | {:error, term}
@spec migrate(Connectors.origin(), String.t()) :: :ok | {:error, term}
def migrate(origin, vsn) do
GenServer.call(name(origin), {:migrate, vsn}, :infinity)
end

@spec status(origin()) :: :init | :subscription | :ready | :migration
@spec status(Connectors.origin()) :: :init | :subscription | :ready | :migration
def status(origin) do
GenServer.call(name(origin), {:status})
end

@impl GenServer
def init(origin) do
def init(conn_config) do
origin = Connectors.origin(conn_config)
Electric.reg(name(origin))
Logger.metadata(origin: origin)
Process.flag(:trap_exit, true)

{:ok,
%State{
config: conn_config,
backoff: {:backoff.init(1000, 10_000), nil},
conn_config: PostgresConnector.get_connection_opts(origin),
conn_config: Connectors.get_connection_opts(conn_config),
origin: origin,
repl_config: PostgresConnector.get_replication_opts(origin),
repl_config: Connectors.get_replication_opts(conn_config),
state: :init
}, {:continue, :init}}
end
Expand All @@ -77,7 +86,7 @@ defmodule Electric.Replication.PostgresConnectorMng do
when init == :init or init == :reinit do
case initialize_postgres(state) do
{:ok, state1} ->
:ok = PostgresConnector.start_children(state.origin)
:ok = PostgresConnector.start_children(state.config)
Logger.info("successfully initialized connector #{inspect(origin)}")
SchemaRegistry.mark_origin_ready(origin)

Expand Down
86 changes: 86 additions & 0 deletions lib/electric/replication/connectors.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,54 @@
defmodule Electric.Replication.Connectors do
use DynamicSupervisor

@type origin() :: binary()

@type connection_config() :: :epgsql.connect_opts()
@type electric_connection_opt() ::
{:host, binary()} | {:port, pos_integer()} | {:dbname, binary()}
@type electric_connection_opts() :: [electric_connection_opt()]
@type replication_config_opt() ::
{:slot, binary()} | {:electric_connection, electric_connection_opts()}
@type replication_config() :: [replication_config_opt(), ...]
@type downstream_config_producer_opt() ::
{:vaxine_hostname, binary()}
| {:vaxine_port, pos_integer()}
| {:vaxine_connection_timeout, pos_integer()}
@type downstream_producer_opts() :: [downstream_config_producer_opt()]
@type downstream_config_opt() ::
{:producer, module()} | {:producer_opts, downstream_producer_opts()}
@type downstream_config() :: [downstream_config_opt(), ...]

@type config_opt() ::
{:connection, connection_config()}
| {:replication, replication_config()}
| {:downstream, downstream_config()}
| {:origin, origin()}

@type config() :: [config_opt(), ...]

@type replication_opts() :: %{
publication: String.t(),
slot: String.t(),
subscription: String.t(),
publication_tables: :all | [binary] | binary,
electric_connection: %{host: String.t(), port: pos_integer, dbname: String.t()},
opts: Keyword.t()
}
@type connection_opts() :: %{
host: charlist(),
port: pos_integer(),
database: charlist(),
username: charlist(),
password: charlist(),
replication: charlist(),
ssl: boolean()
}
@type downstream_opts() :: %{
producer: module(),
producer_opts: downstream_producer_opts()
}

def start_link(extra_args) do
DynamicSupervisor.start_link(__MODULE__, extra_args, name: __MODULE__)
end
Expand Down Expand Up @@ -34,4 +82,42 @@ defmodule Electric.Replication.Connectors do
|> DynamicSupervisor.which_children()
|> Enum.map(map_fun)
end

@spec origin(config()) :: origin()
def origin(args) do
Keyword.fetch!(args, :origin)
end

@spec get_replication_opts(config()) :: replication_opts()
def get_replication_opts(config) do
origin = origin(config)

config
|> Keyword.fetch!(:replication)
|> Map.new()
|> Map.put_new(:slot, "electric_replication")
|> Map.put_new(:publication_tables, :all)
|> Map.put_new(:subscription, to_string(origin))
end

@spec get_connection_opts(config()) :: connection_opts()
def get_connection_opts(config) do
config
|> Keyword.fetch!(:connection)
|> new_map_with_charlists()
end

@spec get_downstream_opts(config()) :: downstream_opts()
def get_downstream_opts(config) do
config
|> Keyword.fetch!(:downstream)
|> Map.new()
end

defp new_map_with_charlists(list) do
Map.new(list, fn
{k, v} when is_binary(v) -> {k, String.to_charlist(v)}
{k, v} -> {k, v}
end)
end
end
3 changes: 1 addition & 2 deletions lib/electric/replication/downstream_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ defmodule Electric.Replication.DownstreamProducer do
@typedoc "The events produced follow this typespec"
@type event :: {Electric.Replication.Changes.Transaction.t(), offset_state}

@callback start_link(name :: String.t(), opts :: keyword()) ::
{:ok, pid()} | {:error, term()}
@callback start_link(name :: String.t(), opts :: keyword()) :: {:ok, pid()} | {:error, term()}
@callback start_replication(producer :: pid(), offset_state) :: :ok
@callback connected?(producer :: pid()) :: boolean()

Expand Down
31 changes: 16 additions & 15 deletions lib/electric/replication/postgres/logical_replication_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
alias Electric.Postgres.LogicalReplication
alias Electric.Postgres.LogicalReplication.Messages
alias Electric.Replication.Postgres.Client
alias Electric.Replication.PostgresConnector
alias Electric.Replication.Connectors

alias Electric.Postgres.LogicalReplication.Messages.{
Begin,
Expand Down Expand Up @@ -49,19 +49,19 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
relations: %{Messages.relation_id() => %Relation{}},
transaction: {Electric.Postgres.Lsn.t(), %Transaction{}},
publication: String.t(),
origin: PostgresConnector.origin(),
origin: Connectors.origin(),
drop_current_transaction?: boolean(),
types: %{},
ignore_relations: [term()]
}
end

@spec start_link(PostgresConnector.origin()) :: :ignore | {:error, any} | {:ok, pid}
def start_link(origin) do
GenStage.start_link(__MODULE__, [origin])
@spec start_link(Connectors.config()) :: :ignore | {:error, any} | {:ok, pid}
def start_link(conn_config) do
GenStage.start_link(__MODULE__, conn_config)
end

@spec get_name(PostgresConnector.origin()) :: Electric.reg_name()
@spec get_name(Connectors.origin()) :: Electric.reg_name()
def get_name(name) do
{:via, :gproc, name(name)}
end
Expand All @@ -71,20 +71,21 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
end

@impl true
def init([origin]) do
:gproc.reg(name(origin))
def init(conn_config) do
origin = Connectors.origin(conn_config)
conn_opts = Connectors.get_connection_opts(conn_config)
repl_opts = Connectors.get_replication_opts(conn_config)

conn_config = PostgresConnector.get_connection_opts(origin)
repl_config = PostgresConnector.get_replication_opts(origin)
:gproc.reg(name(origin))

publication = repl_config.publication
slot = repl_config.slot
publication = repl_opts.publication
slot = repl_opts.slot

with {:ok, conn} <- Client.connect(conn_config),
with {:ok, conn} <- Client.connect(conn_opts),
:ok <- Client.start_replication(conn, publication, slot, self()) do
Logger.metadata(pg_producer: origin)
Logger.info("Starting replication from #{origin}")
Logger.info("Connection settings: #{inspect(conn_config)}")
Logger.info("Connection settings: #{inspect(conn_opts)}")

{:producer,
%State{
Expand Down Expand Up @@ -313,7 +314,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
}
end

@spec ack(pid(), PostgresConnector.origin(), Electric.Postgres.Lsn.t()) :: :ok
@spec ack(pid(), Connectors.origin(), Electric.Postgres.Lsn.t()) :: :ok
def ack(conn, origin, lsn) do
Logger.debug("Acknowledging #{lsn}", origin: origin)
Client.acknowledge_lsn(conn, lsn)
Expand Down
27 changes: 13 additions & 14 deletions lib/electric/replication/postgres/slot_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ defmodule Electric.Replication.Postgres.SlotServer do
alias Electric.Postgres.LogicalReplication.Messages, as: ReplicationMessages
alias Electric.Postgres.Messaging
alias Electric.Postgres.SchemaRegistry
alias Electric.Replication.Connectors
alias Electric.Replication.Changes
alias Electric.Replication.OffsetStorage

alias Electric.Replication.DownstreamProducer

defmodule State do
defstruct current_lsn: %Lsn{segment: 0, offset: 1},
config: nil,
origin: nil,
send_fn: nil,
slot_name: nil,
Expand All @@ -49,16 +51,9 @@ defmodule Electric.Replication.Postgres.SlotServer do

# Public interface

@spec start_link(
String.t(),
%{
replication: %{subscription: binary()},
downstream: %{producer: module()}
},
Electric.reg_name()
) :: GenServer.on_start()
def start_link(reg_name, init_args, producer) do
GenStage.start_link(__MODULE__, [reg_name, init_args, producer])
@spec start_link(Connectors.config(), Electric.reg_name()) :: GenServer.on_start()
def start_link(conn_config, producer) do
GenStage.start_link(__MODULE__, [conn_config, producer])
end

@spec get_name(String.t()) :: Electric.reg_name()
Expand Down Expand Up @@ -120,8 +115,11 @@ defmodule Electric.Replication.Postgres.SlotServer do
# Server callbacks

@impl true
def init([origin, args, {:via, :gproc, producer}]) do
slot = args.replication.subscription
def init([conn_config, {:via, :gproc, producer}]) do
origin = Connectors.origin(conn_config)
replication_opts = Connectors.get_replication_opts(conn_config)
downstream_opts = Connectors.get_downstream_opts(conn_config)
slot = replication_opts.subscription

:gproc.nb_wait(producer)
:gproc.reg(name(origin))
Expand All @@ -132,11 +130,12 @@ defmodule Electric.Replication.Postgres.SlotServer do

{:consumer,
%State{
config: conn_config,
slot_name: slot,
origin: origin,
producer_name: producer,
producer: args.downstream.producer,
opts: Map.get(args.replication, :opts, [])
producer: downstream_opts.producer,
opts: Map.get(replication_opts, :opts, [])
}}
end

Expand Down
Loading

0 comments on commit 43741b7

Please sign in to comment.