Skip to content

Commit

Permalink
Initial commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
thruflo committed Jun 1, 2022
0 parents commit e2e930f
Show file tree
Hide file tree
Showing 23 changed files with 1,466 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
26 changes: 26 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
electric-*.tar

# Temporary files, for example, from tests.
/tmp/
87 changes: 87 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@

# Electric

This is a POC of Postgres active-active replication using Vaxine.

## Pre-reqs

Docker and Elixir 1.13.

## Run databases

```sh
docker-compose -f databases.yaml up
```

## Run app

```sh
mix run --no-halt
```

## Generate workload

For now, manually issue some SQL statements, e.g.:

```
psql -h localhost -p 54321 -U electric -d electric
...
electric=# INSERT INTO entries (content) VALUES ('a');
electric=# select * from entries;
electric=# update entries set content = 'b';
```

Then view the app logs, should look a bit like:

```
{:message,
%Broadway.Message{
acknowledger: {Electric.Replication, :ack_id, {#PID<0.218.0>, {0, 24336352}}},
batch_key: :default,
batch_mode: :bulk,
batcher: :default,
data: %Electric.Replication.Changes.Transaction{
changes: [
%Electric.Replication.Changes.NewRecord{
record: %{
"content" => "a",
"id" => "9be3b616-17e9-4264-9f33-5bdb36c48443"
},
relation: {"public", "entries"}
}
],
commit_timestamp: ~U[2022-06-01 14:07:56Z]
},
metadata: %{},
status: :ok
}}
{:ack, {0, 24336352}}
{:message,
%Broadway.Message{
acknowledger: {Electric.Replication, :ack_id, {#PID<0.218.0>, {0, 24336568}}},
batch_key: :default,
batch_mode: :bulk,
batcher: :default,
data: %Electric.Replication.Changes.Transaction{
changes: [
%Electric.Replication.Changes.UpdatedRecord{
old_record: %{
"content" => "a",
"id" => "9be3b616-17e9-4264-9f33-5bdb36c48443"
},
record: %{
"content" => "b",
"id" => "9be3b616-17e9-4264-9f33-5bdb36c48443"
},
relation: {"public", "entries"}
}
],
commit_timestamp: ~U[2022-06-01 14:08:39Z]
},
metadata: %{},
status: :ok
}}
```

Note the `old_record` as well as the `new_record`.

26 changes: 26 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Todo

Step 1:

- [ ] two Postgres database nodes (same single table schema for now)
- [ ] setup logical replication between them

Step 2:

- [ ] consume and decode logical replication stream from Postgres A
- [ ] implement enough of the backend server logical replication protocol in order to replicate through Elixir
- [ ] encode and produce logical replication stream for Postgres B

Step 3:

- [ ] write changes into Antidote
- [ ] somehow handle `LSN` polling / pull from Postgres B
- [ ] query relevant materialised values from Antidote
- [ ] construct into encodable stream

Step 4:

- [ ] add a third Postgres
- [ ] pair each Postgres with an Antidote
- [ ] replicate between Antidotes
- [ ] demonstrate Postgres replication working with TCC+
17 changes: 17 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# This file is responsible for configuring your application
# and its dependencies with the aid of the Mix.Config module.
#
# This configuration file is loaded before any dependency and
# is restricted to this project.

# General application configuration
import Config

# Configures Elixir's Logger
config :logger, :console,
format: "$time $metadata[$level] $message\n",
metadata: [:request_id]

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
19 changes: 19 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import Config

# Configure your database
config :electric, Electric.Replication,
epgsql: %{
host: 'localhost',
port: 54321,
database: 'electric',
username: 'electric',
password: 'password',
replication: 'database',
ssl: false
},
producer: Electric.Replication.Producer,
publication: "all_tables",
slot: "all_changes"

# Do not include metadata nor timestamps in development logs
config :logger, :console, format: "[$level] $message\n"
4 changes: 4 additions & 0 deletions config/prod.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import Config

# Do not print debug messages in production
config :logger, level: :info
12 changes: 12 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# This file is executed after the code compilation on all environments
# (dev, test, and prod) - for both Mix and releases.
#
# We use it for runtime configuration of releases in production --
# because that allows us to read environment variables at runtime
# rather than compile time.

import Config

if config_env() == :prod do
throw(:NotImplemented)
end
6 changes: 6 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import Config

config :electric, Electric.Replication, producer: Broadway.DummyProducer

# Print only warnings and errors during test
config :logger, level: :warn
37 changes: 37 additions & 0 deletions databases.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Run using `docker-compose -f databases.yaml up`.
version: '3.1'

services:
db_a:
image: postgres
restart: always
environment:
POSTGRES_DB: electric
POSTGRES_USER: electric
POSTGRES_PASSWORD: password
ports:
- "54321:5432"
volumes:
- ./init-user-db.sh:/docker-entrypoint-initdb.d/init-user-db.sh:ro
- ./postgres.conf:/etc/postgresql.conf:ro
entrypoint:
- docker-entrypoint.sh
- -c
- config_file=/etc/postgresql.conf

db_b:
image: postgres
restart: always
environment:
POSTGRES_DB: electric
POSTGRES_USER: electric
POSTGRES_PASSWORD: password
ports:
- "54322:5432"
volumes:
- ./init-user-db.sh:/docker-entrypoint-initdb.d/init-user-db.sh:ro
- ./postgres.conf:/etc/postgresql.conf:ro
entrypoint:
- docker-entrypoint.sh
- -c
- config_file=/etc/postgresql.conf
16 changes: 16 additions & 0 deletions init-user-db.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash
set -e

psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" "dbname=$POSTGRES_DB replication=database" <<-EOSQL
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE entries (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
content VARCHAR(64) NOT NULL
);
ALTER TABLE entries REPLICA IDENTITY FULL;
CREATE PUBLICATION all_tables FOR ALL TABLES;
CREATE_REPLICATION_SLOT all_changes LOGICAL pgoutput NOEXPORT_SNAPSHOT;
EOSQL
3 changes: 3 additions & 0 deletions lib/electric.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
defmodule Electric do
@moduledoc false
end
14 changes: 14 additions & 0 deletions lib/electric/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Electric.Application do
@moduledoc false

use Application

def start(_type, _args) do
children = [
Electric.Replication
]

opts = [strategy: :one_for_one, name: Electric.Supervisor]
Supervisor.start_link(children, opts)
end
end
68 changes: 68 additions & 0 deletions lib/electric/replication.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
defmodule Electric.Replication do
use Broadway

alias Broadway.Message
alias __MODULE__

alias Replication.Config
alias Replication.Changes.Transaction

def start_link(_opts) do
Broadway.start_link(
Replication,
name: Replication,
producer: [
module: {Config.producer(), []},
transformer: {Replication, :transform, []},
concurrency: 1
],
processors: [
default: [concurrency: 1]
]
)
end

def transform({txn, end_lsn, conn}, _opts) do
%Message{
data: txn,
acknowledger: {__MODULE__, :ack_id, {conn, end_lsn}}
}
end

@impl true
def handle_message(_, %Message{data: %Transaction{changes: changes}} = message, _) do
IO.inspect({:message, message})

errors =
changes
|> Enum.reduce([], &handle_change/2)

message =
case errors do
[] ->
message

reason ->
Message.failed(message, reason)
end

message
end

def handle_change(_, acc), do: acc

def ack(:ack_id, [], []), do: nil
def ack(:ack_id, _, [_head | _tail]), do: throw("XXX ack failure handling not yet implemented")

def ack(:ack_id, successful, []) do
last_message =
successful
|> Enum.reverse()
|> Enum.at(0)

%{acknowledger: {_, _, {conn, end_lsn}}} = last_message
IO.inspect({:ack, end_lsn})

Replication.Client.acknowledge_lsn(conn, end_lsn)
end
end
7 changes: 7 additions & 0 deletions lib/electric/replication/changes.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule Electric.Replication.Changes do
defmodule(Transaction, do: defstruct([:changes, :commit_timestamp]))
defmodule(NewRecord, do: defstruct([:relation, :record]))
defmodule(UpdatedRecord, do: defstruct([:relation, :old_record, :record]))
defmodule(DeletedRecord, do: defstruct([:relation, :old_record]))
defmodule(TruncatedRelation, do: defstruct([:relation]))
end
37 changes: 37 additions & 0 deletions lib/electric/replication/client.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule Electric.Replication.Client do
@moduledoc """
Database replication client.
Uses `:epgsql` for it's `start_replication` function. Note that epgsql
doesn't support connecting via a unix socket.
"""

def connect(%{} = config) do
:epgsql.connect(config)
end

@doc """
Start consuming logical replication feed using a given `publication` and `slot`.
The handler can be a pid or a module implementing the `handle_x_log_data` callback.
Returns `:ok` on success.
"""
def start_replication(conn, publication, slot, handler) do
opts = 'proto_version \'1\', publication_names \'#{publication}\''

conn
|> :epgsql.start_replication(slot, handler, [], '0/0', opts)
end

@doc """
Confirm successful processing of a WAL segment.
Returns `:ok` on success.
"""
def acknowledge_lsn(conn, {xlog, offset} = _lsn_tup) do
<<decimal_lsn::integer-64>> = <<xlog::integer-32, offset::integer-32>>

:epgsql.standby_status_update(conn, decimal_lsn, decimal_lsn)
end
end
Loading

0 comments on commit e2e930f

Please sign in to comment.