diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index fea8383247..613e81f322 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -150,6 +150,8 @@ defmodule Electric.Connection.Manager do # implement our custom error handling logic. Process.flag(:trap_exit, true) + Process.set_label({:connection_manager, opts[:stack_id]}) + connection_opts = opts |> Keyword.fetch!(:connection_opts) diff --git a/packages/sync-service/lib/electric/connection/supervisor.ex b/packages/sync-service/lib/electric/connection/supervisor.ex index a32f583db1..080e6adeeb 100644 --- a/packages/sync-service/lib/electric/connection/supervisor.ex +++ b/packages/sync-service/lib/electric/connection/supervisor.ex @@ -29,6 +29,7 @@ defmodule Electric.Connection.Supervisor do end def init(opts) do + Process.set_label({:connection_supervisor, opts[:stack_id]}) Supervisor.init([{Electric.Connection.Manager, opts}], strategy: :rest_for_one) end diff --git a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex index 2f8bdf9688..9580e3dc80 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex @@ -84,6 +84,8 @@ defmodule Electric.Postgres.Inspector.EtsInspector do # when the parent process sends an exit signal Process.flag(:trap_exit, true) + Process.set_label({:ets_inspector, opts.stack_id}) + # Name needs to be an atom but we don't want to dynamically create atoms. # Instead, we will use the reference to the table that is returned by `:ets.new` pg_info_table = :ets.new(opts.pg_info_table, [:named_table, :public, :set]) diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index de095c90ab..fcf55a5258 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -134,7 +134,7 @@ defmodule Electric.Postgres.ReplicationClient do # TODO(alco): this needs additional info about :noreply and :query return tuples. @impl true def init(replication_opts) do - Process.set_label(__MODULE__) + Process.set_label(:replication_client) {:ok, State.new(replication_opts)} end diff --git a/packages/sync-service/lib/electric/replication/shape_log_collector.ex b/packages/sync-service/lib/electric/replication/shape_log_collector.ex index ff9fccc27b..75a985ec40 100644 --- a/packages/sync-service/lib/electric/replication/shape_log_collector.ex +++ b/packages/sync-service/lib/electric/replication/shape_log_collector.ex @@ -56,6 +56,7 @@ defmodule Electric.Replication.ShapeLogCollector do end def init(opts) do + Process.set_label({:shape_log_collector, opts.stack_id}) state = Map.merge(opts, %{producer: nil, subscriptions: {0, MapSet.new()}}) # start in demand: :accumulate mode so that the ShapeCache is able to start # all active consumers before we start sending transactions diff --git a/packages/sync-service/lib/electric/replication/supervisor.ex b/packages/sync-service/lib/electric/replication/supervisor.ex index ddc1d11f11..0e38da4f56 100644 --- a/packages/sync-service/lib/electric/replication/supervisor.ex +++ b/packages/sync-service/lib/electric/replication/supervisor.ex @@ -16,6 +16,7 @@ defmodule Electric.Replication.Supervisor do @impl Supervisor def init(opts) do + Process.set_label({:replication_supervisor, opts[:stack_id]}) Logger.info("Starting shape replication pipeline") # TODO: weird to have these without defaults but `consumer_supervisor` with a default diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 94150fdd8a..b5649cab9d 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -51,6 +51,8 @@ defmodule Electric.Shapes.Consumer do end def init(config) do + Process.set_label({:consumer, config.shape_handle}) + %{log_producer: producer, storage: storage, shape_status: {shape_status, shape_status_state}} = config diff --git a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex index 0189fdf92a..740f6905f0 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex @@ -26,7 +26,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do end def init(%{stack_id: stack_id} = config) do - Process.set_label({:snapshotter, stack_id}) + Process.set_label({:snapshotter, config.shape_handle}) Logger.metadata(stack_id: stack_id) {:ok, config, {:continue, :start_snapshot}} diff --git a/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex b/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex index 5dfb5eba87..bb19413fd2 100644 --- a/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex +++ b/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex @@ -61,6 +61,8 @@ defmodule Electric.Shapes.ConsumerSupervisor do %{shape_handle: shape_handle, storage: {_, _} = storage} = config + Process.set_label({:consumer_supervisor, shape_handle}) + shape_storage = Electric.ShapeCache.Storage.for_shape(shape_handle, storage) shape_config = %{config | storage: shape_storage} diff --git a/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex b/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex index 7d461d6f04..af9008527f 100644 --- a/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex +++ b/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex @@ -13,8 +13,10 @@ defmodule Electric.Shapes.DynamicConsumerSupervisor do end def start_link(opts) do - DynamicSupervisor.start_link(__MODULE__, [], - name: Keyword.get(opts, :name, name(Keyword.fetch!(opts, :stack_id))) + stack_id = Keyword.fetch!(opts, :stack_id) + + DynamicSupervisor.start_link(__MODULE__, [stack_id: stack_id], + name: Keyword.get(opts, :name, name(stack_id)) ) end @@ -49,7 +51,8 @@ defmodule Electric.Shapes.DynamicConsumerSupervisor do end @impl true - def init(_opts) do + def init(stack_id: stack_id) do + Process.set_label({:dynamic_consumer_supervisor, stack_id}) Logger.debug(fn -> "Starting #{__MODULE__}" end) DynamicSupervisor.init(strategy: :one_for_one) end diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index c2cd4f82b4..1510b344b9 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -125,6 +125,8 @@ defmodule Electric.StackSupervisor do @impl true def init(%{stack_id: stack_id} = config) do + Process.set_label({:stack_supervisor, stack_id}) + inspector = Access.get( config,