From 218b7d44dde5d0bbeae7046c3952abce5f7cd03d Mon Sep 17 00:00:00 2001 From: Ilia Borovitinov Date: Thu, 12 Dec 2024 14:53:48 +0300 Subject: [PATCH] fix: truncates no longer crash replication (#2156) We had an issue where a truncate caused the `Consumer` process to `GenServer.call` `ShapeCache`, which in turn `GenServer.call`-ed the `Consumer` and then both crashed with a timeout and weren't cleaned up correctly. This addresses that and makes the shape process after the Truncate clean up and stop gracefully, test included. --- .changeset/gorgeous-bottles-mate.md | 5 ++ .../sync-service/lib/electric/shape_cache.ex | 18 ----- .../lib/electric/shapes/consumer.ex | 4 +- packages/sync-service/mix.lock | 2 +- .../test/electric/plug/router_test.exs | 76 +++++++++++++++++++ .../test/electric/shape_cache_test.exs | 55 -------------- .../test/electric/shapes/consumer_test.exs | 8 +- 7 files changed, 87 insertions(+), 81 deletions(-) create mode 100644 .changeset/gorgeous-bottles-mate.md diff --git a/.changeset/gorgeous-bottles-mate.md b/.changeset/gorgeous-bottles-mate.md new file mode 100644 index 0000000000..48e567ce3d --- /dev/null +++ b/.changeset/gorgeous-bottles-mate.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +fix: truncates no longer cause a stop to an incoming replication stream diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index 44d08bf816..88c4febde1 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -18,7 +18,6 @@ defmodule Electric.ShapeCacheBehaviour do {shape_handle(), current_snapshot_offset :: LogOffset.t()} @callback list_shapes(keyword() | map()) :: [{shape_handle(), Shape.t()}] @callback await_snapshot_start(shape_handle(), opts :: keyword()) :: :started | {:error, term()} - @callback handle_truncate(shape_handle(), keyword()) :: :ok @callback clean_shape(shape_handle(), keyword()) :: :ok @callback clean_all_shapes(keyword()) :: :ok @callback has_shape?(shape_handle(), keyword()) :: boolean() @@ -154,13 +153,6 @@ defmodule Electric.ShapeCache do GenServer.call(server, {:clean_all}) end - @impl Electric.ShapeCacheBehaviour - @spec handle_truncate(shape_handle(), keyword()) :: :ok - def handle_truncate(shape_handle, opts \\ []) do - server = Access.get(opts, :server, name(opts)) - GenStage.call(server, {:truncate, shape_handle}) - end - @impl Electric.ShapeCacheBehaviour @spec await_snapshot_start(shape_handle(), keyword()) :: :started | {:error, term()} def await_snapshot_start(shape_handle, opts \\ []) when is_binary(shape_handle) do @@ -276,16 +268,6 @@ defmodule Electric.ShapeCache do state} end - def handle_call({:truncate, shape_handle}, _from, state) do - with :ok <- clean_up_shape(state, shape_handle) do - Logger.info( - "Truncating and rotating shape handle, previous shape handle #{shape_handle} cleaned up" - ) - end - - {:reply, :ok, state} - end - def handle_call({:clean, shape_handle}, _from, state) do # ignore errors when cleaning up non-existant shape id with :ok <- clean_up_shape(state, shape_handle) do diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 8a4efc5263..be9c352fa2 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -283,9 +283,7 @@ defmodule Electric.Shapes.Consumer do "Truncate operation encountered while processing txn #{txn.xid} for #{shape_handle}" ) - :ok = shape_cache.handle_truncate(shape_handle, shape_cache_opts) - - :ok = ShapeCache.Storage.cleanup!(storage) + cleanup(state) {:halt, {:truncate, notify(txn, %{state | log_state: @initial_log_state})}} diff --git a/packages/sync-service/mix.lock b/packages/sync-service/mix.lock index 6217650168..6c8b70a500 100644 --- a/packages/sync-service/mix.lock +++ b/packages/sync-service/mix.lock @@ -28,7 +28,7 @@ "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, "mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"}, - "mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"}, + "mox": {:hex, :mox, "1.2.0", "a2cd96b4b80a3883e3100a221e8adc1b98e4c3a332a8fc434c39526babafd5b3", [:mix], [{:nimble_ownership, "~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}], "hexpm", "c7b92b3cc69ee24a7eeeaf944cd7be22013c52fcb580c1f33f50845ec821089a"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_ownership": {:hex, :nimble_ownership, "1.0.0", "3f87744d42c21b2042a0aa1d48c83c77e6dd9dd357e425a038dd4b49ba8b79a1", [:mix], [], "hexpm", "7c16cc74f4e952464220a73055b557a273e8b1b7ace8489ec9d86e9ad56cb2cc"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, diff --git a/packages/sync-service/test/electric/plug/router_test.exs b/packages/sync-service/test/electric/plug/router_test.exs index f21ccaaeb6..4b0589aafc 100644 --- a/packages/sync-service/test/electric/plug/router_test.exs +++ b/packages/sync-service/test/electric/plug/router_test.exs @@ -832,6 +832,82 @@ defmodule Electric.Plug.RouterTest do [^shape_handle] = Plug.Conn.get_resp_header(conn, "electric-handle") end + @tag with_sql: [ + "INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')" + ] + test "GET returns a 409 on a truncate and can follow a new shape afterwards", %{ + opts: opts, + db_conn: db_conn + } do + conn = Router.call(conn("GET", "/v1/shape?table=items&offset=-1"), opts) + + assert %{status: 200} = conn + handle = get_resp_shape_handle(conn) + offset = get_resp_last_offset(conn) + assert [%{"value" => %{"value" => "test value 1"}}] = Jason.decode!(conn.resp_body) + + task = + Task.async(fn -> + Router.call( + conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{handle}&live"), + opts + ) + end) + + Postgrex.query!(db_conn, "INSERT INTO items VALUES (gen_random_uuid(), 'test value 2')", []) + + conn = Task.await(task) + + assert %{status: 200} = conn + assert ^handle = get_resp_shape_handle(conn) + offset = get_resp_last_offset(conn) + assert [%{"value" => %{"value" => "test value 2"}}, _] = Jason.decode!(conn.resp_body) + + task = + Task.async(fn -> + Router.call( + conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{handle}&live"), + opts + ) + end) + + Postgrex.query!(db_conn, "TRUNCATE TABLE items", []) + assert %{status: 204} = Task.await(task) + + conn = + Router.call(conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{handle}"), opts) + + assert %{status: 409} = conn + assert [%{"headers" => %{"control" => "must-refetch"}}] = Jason.decode!(conn.resp_body) + + conn = + Router.call(conn("GET", "/v1/shape?table=items&offset=-1"), opts) + + assert %{status: 200} = conn + new_handle = get_resp_shape_handle(conn) + refute new_handle == handle + offset = get_resp_last_offset(conn) + assert [] = Jason.decode!(conn.resp_body) + + task = + Task.async(fn -> + Router.call( + conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{new_handle}&live"), + opts + ) + end) + + Postgrex.query!(db_conn, "INSERT INTO items VALUES (gen_random_uuid(), 'test value 3')", []) + + conn = Task.await(task) + + assert %{status: 200} = conn + assert ^new_handle = get_resp_shape_handle(conn) + # offset = get_resp_last_offset(conn) + assert [%{"value" => %{"value" => "test value 3"}}, @up_to_date] = + Jason.decode!(conn.resp_body) + end + @tag with_sql: [ "INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')" ] diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index 2e6b9411cc..9a6ed196a9 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -679,61 +679,6 @@ defmodule Electric.ShapeCacheTest do end end - describe "handle_truncate/2" do - setup [ - :with_stack_id_from_test, - :with_in_memory_storage, - :with_log_chunking, - :with_registry, - :with_shape_log_collector - ] - - test "cleans up shape data and rotates the shape handle", ctx do - %{shape_cache_opts: opts} = - with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), - run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, - create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> - GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10}) - Storage.make_new_snapshot!([["test"]], storage) - GenServer.cast(parent, {:snapshot_started, shape_handle}) - end - ) - - {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape, opts) - Process.sleep(50) - assert :started = ShapeCache.await_snapshot_start(shape_handle, opts) - - storage = Storage.for_shape(shape_handle, ctx.storage) - - Storage.append_to_log!( - changes_to_log_items([ - %Electric.Replication.Changes.NewRecord{ - relation: {"public", "items"}, - record: %{"id" => "1", "value" => "Alice"}, - log_offset: LogOffset.new(Electric.Postgres.Lsn.from_integer(1000), 0) - } - ]), - storage - ) - - assert Storage.snapshot_started?(storage) - assert Enum.count(Storage.get_log_stream(@zero_offset, storage)) == 1 - - ref = - Shapes.Consumer.whereis(ctx.stack_id, shape_handle) - |> Process.monitor() - - log = capture_log(fn -> ShapeCache.handle_truncate(shape_handle, opts) end) - assert log =~ "Truncating and rotating shape handle" - - assert_receive {:DOWN, ^ref, :process, _pid, _} - # Wait a bit for the async cleanup to complete - - refute Storage.snapshot_started?(storage) - end - end - describe "clean_shape/2" do setup [ :with_stack_id_from_test, diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index c044679812..abade9c501 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -313,8 +313,8 @@ defmodule Electric.Shapes.ConsumerTest do lsn = Lsn.from_string("0/10") last_log_offset = LogOffset.new(lsn, 0) - Mock.ShapeCache - |> expect(:handle_truncate, fn @shape_handle1, _ -> :ok end) + Mock.ShapeStatus + |> expect(:remove_shape, fn _, @shape_handle1 -> :ok end) |> allow( self(), Shapes.Consumer.name(ctx.stack_id, @shape_handle1) @@ -366,8 +366,8 @@ defmodule Electric.Shapes.ConsumerTest do lsn = Lsn.from_string("0/10") last_log_offset = LogOffset.new(lsn, 0) - Mock.ShapeCache - |> expect(:handle_truncate, fn @shape_handle1, _ -> :ok end) + Mock.ShapeStatus + |> expect(:remove_shape, fn _, @shape_handle1 -> :ok end) |> allow( self(), Shapes.Consumer.name(ctx.stack_id, @shape_handle1)