Skip to content

Commit

Permalink
Ack the WAL Keep-Alives (#1905)
Browse files Browse the repository at this point in the history
Acknowledge the WAL on keep-alive messages to keep advancing it.

Relevant [Discord
thread](https://discord.com/channels/933657521581858818/1300498640233693364)

---------

Co-authored-by: msfstef <msfstef@gmail.com>
robacourt and msfstef authored Oct 31, 2024
1 parent b40c425 commit b110ed9
Showing 3 changed files with 16 additions and 16 deletions.
5 changes: 5 additions & 0 deletions .changeset/eight-pugs-guess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Update acknowledged WAL on keep alive messages
Original file line number Diff line number Diff line change
@@ -192,13 +192,14 @@ defmodule Electric.Postgres.ReplicationClient do
"Primary Keepalive: wal_end=#{wal_end} (#{Lsn.from_integer(wal_end)}) reply=#{reply}"
end)

messages =
case reply do
1 -> [encode_standby_status_update(state)]
0 -> []
end
case reply do
1 ->
state = update_applied_wal(state, wal_end)
{:noreply, [encode_standby_status_update(state)], state}

{:noreply, messages, state}
0 ->
{:noreply, [], state}
end
end

defp process_x_log_data(data, wal_end, %State{} = state) do
@@ -289,6 +290,6 @@ defmodule Electric.Postgres.ReplicationClient do
@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
defp current_time(), do: System.os_time(:microsecond) - @epoch

defp update_applied_wal(state, wal) when wal > state.applied_wal,
defp update_applied_wal(state, wal) when wal >= state.applied_wal,
do: %{state | applied_wal: wal}
end
Original file line number Diff line number Diff line change
@@ -317,8 +317,6 @@ defmodule Electric.Postgres.ReplicationClientTest do
end

test "correctly responds to a status update request message from PG", ctx do
pg_wal = lsn_to_wal("0/10")

state =
ReplicationClient.State.new(
transaction_received: nil,
@@ -329,17 +327,13 @@ defmodule Electric.Postgres.ReplicationClientTest do
connection_manager: ctx.dummy_pid
)

# All offsets are 0+1 until we've processed a transaction and bumped `state.applied_wal`.
assert {:noreply, [<<?r, 1::64, 1::64, 1::64, _time::64, 0::8>>], state} =
ReplicationClient.handle_data(<<?k, pg_wal::64, 0::64, 1::8>>, state)

###

state = %{state | applied_wal: lsn_to_wal("0/10")}
state = %{state | applied_wal: lsn_to_wal("0/0")}
pg_wal = lsn_to_wal("0/10")

assert {:noreply, [<<?r, app_wal::64, app_wal::64, app_wal::64, _time::64, 0::8>>], state} =
ReplicationClient.handle_data(<<?k, pg_wal::64, 0::64, 1::8>>, state)

assert state.applied_wal == pg_wal
assert app_wal == state.applied_wal + 1
end

0 comments on commit b110ed9

Please sign in to comment.