Skip to content

Commit

Permalink
Move with_conn to pg client, catch exit message if trap_exit in set t…
Browse files Browse the repository at this point in the history
…o true (#62)

* Antidote sync mode is set to "true", that was previously causing issues for
  exunit tests.

* Make vaxine test more explicit to detect missconfiguration when concurent txs
  are aborted

* Add compose-mounts docker-compose file to ease dealing with Vaxine volume
  • Loading branch information
define-null authored Nov 8, 2022
1 parent cb40577 commit c1393ce
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 53 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,15 @@ DC_CONFIG=compose.yaml
start_dev_env:
docker-compose -f ${DC_CONFIG} up -d

export UID=$(shell id -u)
export GID=$(shell id -g)
start_dev_env_mounted:
mkdir -p _tmp_vaxine_data
docker-compose -f compose-mounts.yaml up -d

stop_dev_env:
docker-compose -f ${DC_CONFIG} down
rm -rf _tmp_vaxine_data

DOCKER_PREFIX:=$(shell basename $(CURDIR))
docker-pgsql-%:
Expand Down
27 changes: 27 additions & 0 deletions compose-mounts.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Run using `docker-compose -f databases.yaml up`.
version: '3.1'

services:
vaxine:
extends:
file: ./compose.yaml
service: vaxine
environment:
NODE_NAME: "vaxine@vaxine_1"
UID: ${UID}
GID: ${GID}
volumes:
- /etc/group:/etc/group:ro
- /etc/passwd:/etc/passwd:ro
- /etc/shadow:/etc/shadow:ro
- ./_tmp_vaxine_data:/vaxine-data

db_a:
extends:
file: ./compose.yaml
service: db_a

db_b:
extends:
file: ./compose.yaml
service: db_b
3 changes: 2 additions & 1 deletion compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ services:
ANTIDOTE_TXN_PROT: "clocksi"
ANTIDOTE_RECOVER_FROM_LOG: "true"
ANTIDOTE_META_DATA_ON_START: "true"
ANTIDOTE_SYNC_LOG: "false"
ANTIDOTE_SYNC_LOG: "true"
ANTIDOTE_ENABLE_LOGGING: "true"
ANTIDOTE_AUTO_START_READ_SERVERS: "true"
COOKIE: "secret"
NODE_NAME: "antidote@dc1n1"
DEBUG_LOGGER_LEVEL: "debug"
ports:
- "8087:8087"
- "8088:8088"
Expand Down
20 changes: 4 additions & 16 deletions integration_tests/single_dc/bidirectional_multiple_electric.lux
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[doc Test one direction replication from pg_1 to pg_2]

[global fail_pattern=[Ee][Rr][Rr][Oo][Rr]]
[global psql=electric]
[global psql=electric=#]

[include shared.luxinc]
[invoke setup]
Expand All @@ -22,9 +22,7 @@
?.* origin=postgres_3 .* \[debug\] Sending \d messages to the subscriber

[shell pg_3]
!SELECT * FROM entries;
?Hello from a
?(1 row)
[invoke wait-for "SELECT * FROM entries;" "Hello from a" 10 $psql]

[shell pg_3]
!INSERT INTO entries (content) VALUES ('Hello from c');
Expand All @@ -36,20 +34,10 @@
[timeout 10]

[shell pg_1]
[loop iter 1..10]
@Hello from c
!select * from entries;
?electric=
[sleep 1]
[endloop]
[invoke wait-for "SELECT * FROM entries;" "Hello from c" 10 $psql]

[shell pg_2]
[loop iter 1..10]
@Hello from c
!select * from entries;
?electric=
[sleep 1]
[endloop]
[invoke wait-for "SELECT * FROM entries;" "Hello from c" 10 $psql]

[cleanup]
[invoke teardown]
Expand Down
28 changes: 4 additions & 24 deletions lib/electric/postgres/postgres_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ defmodule Electric.Replication.PostgresConnectorMng do
handle_continue(:subscribe, state)
end

def handle_info({:EXIT, _, :econnrefused}, %State{} = state) do
{:noreply, state}
end

def handle_info(msg, %State{} = state) do
Logger.error("unhandled info msg: #{inspect(msg)}")
{:noreply, state}
Expand All @@ -162,7 +158,7 @@ defmodule Electric.Replication.PostgresConnectorMng do
md5_hash <- Base.encode16(:erlang.md5(migration_file)) do
Logger.notice("ready to migrate to version: #{vsn}")

case with_conn(
case Client.with_conn(
Map.delete(conn_config, :replication),
fn conn ->
:epgsql.with_transaction(
Expand Down Expand Up @@ -203,7 +199,7 @@ defmodule Electric.Replication.PostgresConnectorMng do
end

defp start_subscription(%State{conn_config: conn_config, repl_config: rep_conf} = state) do
case with_conn(
case Client.with_conn(
conn_config,
fn conn ->
Client.start_subscription(conn, rep_conf.subscription)
Expand All @@ -220,7 +216,7 @@ defmodule Electric.Replication.PostgresConnectorMng do
end

defp stop_subscription(%State{conn_config: conn_config, repl_config: rep_conf} = state) do
case with_conn(
case Client.with_conn(
conn_config,
fn conn ->
Client.stop_subscription(conn, rep_conf.subscription)
Expand All @@ -246,7 +242,7 @@ defmodule Electric.Replication.PostgresConnectorMng do

Logger.debug("attempting to initialize #{origin}")

with_conn(conn_config, fn conn ->
Client.with_conn(conn_config, fn conn ->
with {:ok, _system_id} <- Client.get_system_id(conn),
{:ok, publication} <-
Client.create_publication(conn, publication_name, publication_tables),
Expand Down Expand Up @@ -274,20 +270,4 @@ defmodule Electric.Replication.PostgresConnectorMng do
end
end)
end

def with_conn(conn_config, func) do
Logger.debug("connect: #{inspect(conn_config)}")

case Client.connect(conn_config) do
{:ok, conn} ->
try do
func.(conn)
after
Client.close(conn)
end

error ->
error
end
end
end
35 changes: 35 additions & 0 deletions lib/electric/replication/postgres/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Electric.Replication.Postgres.Client do
Uses `:epgsql` for it's `start_replication` function. Note that epgsql
doesn't support connecting via a unix socket.
"""
require Logger

alias Electric.Postgres.OidDatabase

Expand Down Expand Up @@ -67,6 +68,40 @@ defmodule Electric.Replication.Postgres.Client do
:epgsql.connect(config)
end

@spec with_conn(:epgsql.connect_opts(), fun()) :: term() | {:error, term()}
def with_conn(%{host: host, username: username, password: password} = config, fun) do
# Best effort capture exit message, expect trap_exit to be set
wait_exit = fn conn, res ->
receive do
{:EXIT, ^conn, _} -> res
after
500 -> res
end
end

Logger.info("connect: #{inspect(Map.drop(config, [:password]))}")

{:ok, conn} = :epgsql_sock.start_link()

case :epgsql.connect(conn, host, username, password, config) do
{:ok, ^conn} ->
try do
fun.(conn)
rescue
e ->
Logger.error(Exception.format(:error, e, __STACKTRACE__))
{:error, e}
after
close(conn)
wait_exit.(conn, :ok)
end

error ->
close(conn)
wait_exit.(conn, error)
end
end

def close(conn) do
:epgsql.close(conn)
end
Expand Down
26 changes: 14 additions & 12 deletions test/electric/to_vaxine_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -180,25 +180,27 @@ defmodule Electric.Replication.VaxineTest do
spawn(fn ->
assert_receive {:start, p2}

VaxRepo.transaction(fn ->
operation_set_1.()
send(p2, :continue)
assert_receive :commit
end)
:commit =
VaxRepo.transaction(fn ->
operation_set_1.()
send(p2, :continue)
assert_receive :commit
end)

send(parent, :commited_1)
send(p2, :commit)
end)

_p2 =
spawn(fn ->
VaxRepo.transaction(fn ->
send(p1, {:start, self()})
assert_receive :continue
operation_set_2.()
send(p1, :commit)
assert_receive :commit
end)
:commit =
VaxRepo.transaction(fn ->
send(p1, {:start, self()})
assert_receive :continue
operation_set_2.()
send(p1, :commit)
assert_receive :commit
end)

send(parent, :commited_2)
end)
Expand Down

0 comments on commit c1393ce

Please sign in to comment.