Skip to content

Commit

Permalink
feat: introduce initial snapshot chunking (#2119)
Browse files Browse the repository at this point in the history
This PR changes the way we write snapshots to disk, the storage API to
request them, and some logic around offsets.

1. The `Storage.get_snapshot/1` method is gone. It's now unified under
`Storage.get_log_stream/3`, with some caveats.
2. The offsets within the snapshot rows are not updated, so now latest
item on the sent log may have mismatching `offset` field to the
`electric-offset` header of the same response. All our clients respect
the header, and that's the only correct behaviour going forward. There
is a decision to remove `offset` from our items entirely, but that's a
separate PR.
3. While we're making the snapshot, we cannot know how many chunks are
going to be there until we make them. Thus we allow clients to request
any chunks up to `LogOffset.new(0, :infinity)`, and consider all offsets
at txn 0 to be virtual pointers.
4. The `get_log_stream/3` for the snapshot section ignores the contract
a little bit - you cannot request the stream to be up to an arbitrary
point, it's always up to the chunk boundary, even when specified
otherwise. Since we've moved to "pure" chunking behaviour, it should be
enshrined in the `Storage` interface so that reading functions can act
optimally and honestly. I'm hoping to address that in a separate PR
5. If the instance is started with the old snapshot already created, we
update the naming and metadata to avoid recreating shapes if we can
avoid it.
  • Loading branch information
icehaunter authored Dec 10, 2024
1 parent 4b1680a commit 6ca47df
Show file tree
Hide file tree
Showing 27 changed files with 720 additions and 443 deletions.
5 changes: 5 additions & 0 deletions .changeset/polite-frogs-yell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

feat: introduce chunked snapshot generation
10 changes: 8 additions & 2 deletions packages/elixir-client/lib/electric/client/offset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ defmodule Electric.Client.Offset do
iex> from_string("1378734_3")
{:ok, %#{__MODULE__}{tx: 1378734, op: 3}}
iex> from_string("0_inf")
{:ok, %#{__MODULE__}{tx: 0, op: :infinity}}
iex> from_string("not a real offset")
{:error, "has invalid format"}
Expand All @@ -78,14 +81,17 @@ defmodule Electric.Client.Offset do
else
with [tx_offset_str, op_offset_str] <- :binary.split(str, "_"),
{tx_offset, ""} <- Integer.parse(tx_offset_str),
{op_offset, ""} <- Integer.parse(op_offset_str) do
{op_offset, ""} <- parse_int_or_inf(op_offset_str) do
{:ok, %__MODULE__{tx: tx_offset, op: op_offset}}
else
_ -> {:error, "has invalid format"}
end
end
end

defp parse_int_or_inf("inf"), do: {:infinity, ""}
defp parse_int_or_inf(int), do: Integer.parse(int)

@doc """
Create a new #{__MODULE__} struct from the given LSN and operation
offsets.
Expand Down Expand Up @@ -115,7 +121,7 @@ defmodule Electric.Client.Offset do
end

def to_string(%__MODULE__{tx: tx, op: op}) do
"#{Integer.to_string(tx)}_#{Integer.to_string(op)}"
"#{Integer.to_string(tx)}_#{if op == :infinity, do: "inf", else: Integer.to_string(op)}"
end

@spec to_tuple(t()) :: {tx_offset(), op_offset()}
Expand Down
4 changes: 2 additions & 2 deletions packages/elixir-client/test/electric/client/mock_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule Electric.Client.MockTest do
Client.Mock.response(client,
status: 200,
schema: %{id: %{type: "int8"}},
last_offset: Offset.new(0, 0),
last_offset: Offset.new(0, 1),
shape_handle: "my-shape",
body: [
Client.Mock.change(value: %{id: "4444"}),
Expand All @@ -60,7 +60,7 @@ defmodule Electric.Client.MockTest do
%ChangeMessage{value: %{"id" => 2222}},
%ChangeMessage{value: %{"id" => 3333}},
%ChangeMessage{value: %{"id" => 4444}},
up_to_date0()
up_to_date()
] = events
end
end
4 changes: 2 additions & 2 deletions packages/elixir-client/test/support/client_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Support.ClientHelpers do

defmacro offset(tx, op), do: quote(do: %Offset{tx: unquote(tx), op: unquote(op)})

defmacro offset0, do: quote(do: offset(0, 0))
defmacro offset0, do: quote(do: offset(0, :infinity))

defmacro up_to_date() do
quote(do: %ControlMessage{control: :up_to_date, offset: %Offset{tx: _, op: _}})
Expand All @@ -19,5 +19,5 @@ defmodule Support.ClientHelpers do
)
end

defmacro up_to_date0(), do: quote(do: up_to_date(0, 0))
defmacro up_to_date0(), do: quote(do: up_to_date(0, :infinity))
end
20 changes: 13 additions & 7 deletions packages/react-hooks/test/react-hooks.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { describe, expect, inject, it as bareIt } from 'vitest'
import { setTimeout as sleep } from 'node:timers/promises'
import { testWithIssuesTable as it } from './support/test-context'
import { useShape, sortedOptionsHash, UseShapeResult } from '../src/react-hooks'
import { Shape, Message } from '@electric-sql/client'
import { Shape, ShapeStream } from '@electric-sql/client'

const BASE_URL = inject(`baseUrl`)

Expand Down Expand Up @@ -351,14 +351,20 @@ describe(`useShape`, () => {
unmount()

// Add another row to shape
const [newId] = await insertIssues({ title: `other row` })
const [_] = await insertIssues({ title: `other row` })

const parallelWaiterStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: {
table: issuesTableUrl,
},
signal: aborter.signal,
subscribe: true,
})

// And wait until it's definitely seen
await waitFor(async () => {
const res = await fetch(
`${BASE_URL}/v1/shape?table=${issuesTableUrl}&offset=-1`
)
const body = (await res.json()) as Message[]
expect(body).toMatchObject([{}, { value: { id: newId } }])
return parallelWaiterStream.isUpToDate || (await sleep(50))
})

await sleep(50)
Expand Down
80 changes: 17 additions & 63 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ defmodule Electric.Plug.ServeShapePlug do
plug :put_resp_cache_headers
plug :generate_etag
plug :validate_and_put_etag
plug :serve_log_or_snapshot
plug :serve_shape_log

# end_telemetry_span needs to always be the last plug here.
plug :end_telemetry_span
Expand Down Expand Up @@ -337,7 +337,8 @@ defmodule Electric.Plug.ServeShapePlug do
# The log can't be up to date if the last_offset is not the actual end.
# Also if client is requesting the start of the log, we don't set `up-to-date`
# here either as we want to set a long max-age on the cache-control.
if LogOffset.compare(chunk_end_offset, last_offset) == :lt or offset == @before_all_offset do
if LogOffset.compare(chunk_end_offset, last_offset) == :lt or
offset == @before_all_offset do
conn
|> assign(:up_to_date, [])
# header might have been added on first pass but no longer valid
Expand Down Expand Up @@ -420,66 +421,13 @@ defmodule Electric.Plug.ServeShapePlug do
"public, max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}"
)

# If offset is -1, we're serving a snapshot
defp serve_log_or_snapshot(
%Conn{assigns: %{offset: @before_all_offset, config: config}} = conn,
_
) do
OpenTelemetry.with_span("shape_get.plug.serve_snapshot", [], config[:stack_id], fn ->
serve_snapshot(conn)
end)
end

# Otherwise, serve log since that offset
defp serve_log_or_snapshot(%Conn{assigns: %{config: config}} = conn, _) do
defp serve_shape_log(%Conn{assigns: %{config: config}} = conn, _) do
OpenTelemetry.with_span("shape_get.plug.serve_shape_log", [], config[:stack_id], fn ->
serve_shape_log(conn)
do_serve_shape_log(conn)
end)
end

defp serve_snapshot(
%Conn{
assigns: %{
chunk_end_offset: chunk_end_offset,
active_shape_handle: shape_handle,
up_to_date: maybe_up_to_date
}
} = conn
) do
case Shapes.get_snapshot(conn.assigns.config, shape_handle) do
{:ok, {offset, snapshot}} ->
log =
Shapes.get_log_stream(conn.assigns.config, shape_handle,
since: offset,
up_to: chunk_end_offset
)

[snapshot, log, maybe_up_to_date]
|> Stream.concat()
|> to_json_stream()
|> Stream.chunk_every(500)
|> send_stream(conn, 200)

{:error, reason} ->
error_msg = "Could not serve a snapshot because of #{inspect(reason)}"

Logger.warning(error_msg)
OpenTelemetry.record_exception(error_msg)

{status_code, message} =
if match?(%DBConnection.ConnectionError{reason: :queue_timeout}, reason),
do: {429, "Could not establish connection to database - try again later"},
else: {500, "Failed creating or fetching the snapshot"}

send_resp(
conn,
status_code,
Jason.encode_to_iodata!(%{error: message})
)
end
end

defp serve_shape_log(
defp do_serve_shape_log(
%Conn{
assigns: %{
offset: offset,
Expand All @@ -490,7 +438,7 @@ defmodule Electric.Plug.ServeShapePlug do
} = conn
) do
log =
Shapes.get_log_stream(conn.assigns.config, shape_handle,
Shapes.get_merged_log_stream(conn.assigns.config, shape_handle,
since: offset,
up_to: chunk_end_offset
)
Expand Down Expand Up @@ -558,7 +506,10 @@ defmodule Electric.Plug.ServeShapePlug do

defp listen_for_new_changes(%Conn{assigns: assigns} = conn, _) do
# Only start listening when we know there is a possibility that nothing is going to be returned
if LogOffset.compare(assigns.offset, assigns.last_offset) != :lt do
# There is an edge case in that the snapshot is served in chunks but `last_offset` is not updated
# by that process. In that case, we'll start listening for changes but not receive any updates.
if LogOffset.compare(assigns.offset, assigns.last_offset) != :lt or
assigns.last_offset == LogOffset.last_before_real_offsets() do
shape_handle = assigns.handle

ref = make_ref()
Expand Down Expand Up @@ -587,7 +538,7 @@ defmodule Electric.Plug.ServeShapePlug do
# update last offset header
|> put_resp_header("electric-offset", "#{latest_log_offset}")
|> determine_up_to_date([])
|> serve_shape_log()
|> do_serve_shape_log()

{^ref, :shape_rotation} ->
# We may want to notify the client better that the shape handle had changed, but just closing the response
Expand All @@ -612,7 +563,9 @@ defmodule Electric.Plug.ServeShapePlug do

maybe_up_to_date = if up_to_date = assigns[:up_to_date], do: up_to_date != []

Electric.Telemetry.OpenTelemetry.get_stack_span_attrs(assigns.config[:stack_id])
Electric.Telemetry.OpenTelemetry.get_stack_span_attrs(
get_in(conn.assigns, [:config, :stack_id])
)
|> Map.merge(Electric.Plug.Utils.common_open_telemetry_attrs(conn))
|> Map.merge(%{
"shape.handle" => shape_handle,
Expand Down Expand Up @@ -668,7 +621,7 @@ defmodule Electric.Plug.ServeShapePlug do
conn.query_params["handle"] || assigns[:active_shape_handle] || assigns[:handle],
client_ip: conn.remote_ip,
status: conn.status,
stack_id: assigns.config[:stack_id]
stack_id: get_in(conn.assigns, [:config, :stack_id])
}
)

Expand Down Expand Up @@ -699,6 +652,7 @@ defmodule Electric.Plug.ServeShapePlug do
error_str = Exception.format(error.kind, error.reason)

conn
|> fetch_query_params()
|> assign(:error_str, error_str)
|> end_telemetry_span()

Expand Down
33 changes: 31 additions & 2 deletions packages/sync-service/lib/electric/replication/log_offset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ defmodule Electric.Replication.LogOffset do
** (FunctionClauseError) no function clause matching in Electric.Replication.LogOffset.new/2
"""
def new(tx_offset, op_offset)
when is_integer(tx_offset) and tx_offset >= 0 and is_integer(op_offset) and op_offset >= 0 do
when is_integer(tx_offset) and tx_offset >= 0 and is_integer(op_offset) and op_offset >= 0
when is_integer(tx_offset) and tx_offset >= 0 and op_offset == :infinity do
%LogOffset{tx_offset: tx_offset, op_offset: op_offset}
end

Expand Down Expand Up @@ -99,6 +100,10 @@ defmodule Electric.Replication.LogOffset do
(offset1.tx_offset == offset2.tx_offset and
offset1.op_offset < offset2.op_offset)

defguard is_min_offset(offset) when offset.tx_offset == -1

defguard is_virtual_offset(offset) when offset.tx_offset == 0

@doc """
An offset that is smaller than all offsets in the log.
Expand Down Expand Up @@ -130,6 +135,12 @@ defmodule Electric.Replication.LogOffset do
@spec last() :: t
def last(), do: %LogOffset{tx_offset: 0xFFFFFFFFFFFFFFFF, op_offset: :infinity}

@doc """
The last possible offset for the "virtual" part of the log - i.e. snapshots.
"""
@spec last_before_real_offsets() :: t()
def last_before_real_offsets(), do: %LogOffset{tx_offset: 0, op_offset: :infinity}

@doc """
Increments the offset of the change inside the transaction.
Expand Down Expand Up @@ -184,6 +195,10 @@ defmodule Electric.Replication.LogOffset do
[Integer.to_string(-1)]
end

def to_iolist(%LogOffset{tx_offset: tx_offset, op_offset: :infinity}) do
[Integer.to_string(tx_offset), ?_, "inf"]
end

def to_iolist(%LogOffset{tx_offset: tx_offset, op_offset: op_offset}) do
[Integer.to_string(tx_offset), ?_, Integer.to_string(op_offset)]
end
Expand All @@ -205,6 +220,9 @@ defmodule Electric.Replication.LogOffset do
iex> from_string("0_02")
{:ok, %LogOffset{tx_offset: 0, op_offset: 2}}
iex> from_string("0_inf")
{:ok, %LogOffset{tx_offset: 0, op_offset: :infinity}}
iex> from_string("1_2_3")
{:error, "has invalid format"}
Expand All @@ -224,7 +242,7 @@ defmodule Electric.Replication.LogOffset do
else
with [tx_offset_str, op_offset_str] <- String.split(str, "_"),
{tx_offset, ""} <- Integer.parse(tx_offset_str),
{op_offset, ""} <- Integer.parse(op_offset_str),
{op_offset, ""} <- parse_int_or_inf(op_offset_str),
offset <- new(tx_offset, op_offset) do
{:ok, offset}
else
Expand All @@ -233,11 +251,22 @@ defmodule Electric.Replication.LogOffset do
end
end

defp parse_int_or_inf("inf"), do: {:infinity, ""}
defp parse_int_or_inf(int), do: Integer.parse(int)

defimpl Inspect do
def inspect(%LogOffset{tx_offset: -1, op_offset: 0}, _opts) do
"LogOffset.before_all()"
end

def inspect(%LogOffset{tx_offset: 0xFFFFFFFFFFFFFFFF, op_offset: :infinity}, _opts) do
"LogOffset.last()"
end

def inspect(%LogOffset{tx_offset: 0, op_offset: :infinity}, _opts) do
"LogOffset.last_before_real_offsets()"
end

def inspect(%LogOffset{tx_offset: tx, op_offset: op}, _opts) do
"LogOffset.new(#{tx}, #{op})"
end
Expand Down
4 changes: 2 additions & 2 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ defmodule Electric.ShapeCache do
],
prepare_tables_fn: [type: {:or, [:mfa, {:fun, 2}]}, required: true],
create_snapshot_fn: [
type: {:fun, 6},
default: &Shapes.Consumer.Snapshotter.query_in_readonly_txn/6
type: {:fun, 7},
default: &Shapes.Consumer.Snapshotter.query_in_readonly_txn/7
],
purge_all_shapes?: [type: :boolean, required: false]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ defmodule Electric.ShapeCache.CrashingFileStorage do
defdelegate get_current_position(opts), to: FileStorage
defdelegate set_snapshot_xmin(xmin, opts), to: FileStorage
defdelegate snapshot_started?(opts), to: FileStorage
defdelegate get_snapshot(opts), to: FileStorage
defdelegate make_new_snapshot!(data_stream, opts), to: FileStorage
defdelegate mark_snapshot_as_started(opts), to: FileStorage
defdelegate get_log_stream(offset, max_offset, opts), to: FileStorage
Expand Down
Loading

0 comments on commit 6ca47df

Please sign in to comment.