Skip to content

Run migrations in order across repos #4466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ jobs:

- run: make minio
if: env.MIX_ENV == 'test'
- run: mix test --include slow --include minio --max-failures 1 --warnings-as-errors
- run: mix test --include slow --include minio --include migrations --max-failures 1 --warnings-as-errors
if: env.MIX_ENV == 'test'
env:
MINIO_HOST_FOR_CLICKHOUSE: "172.17.0.1"

- run: mix test --include slow --max-failures 1 --warnings-as-errors
- run: mix test --include slow --include migrations --max-failures 1 --warnings-as-errors
if: env.MIX_ENV == 'ce_test'

static:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ All notable changes to this project will be documented in this file.
- Don't include imports when showing time series hourly interval. Previously imported data was shown each midnight
- Fix property filter suggestions 500 error when property hasn't been selected
- Bamboo.Mua: add Date and Message-ID headers if missing plausible/analytics#4474
- Fix migration order across `plausible_db` and `plausible_events_db` databases plausible/analytics#4466

## v2.1.1 - 2024-06-06

Expand Down
142 changes: 108 additions & 34 deletions lib/plausible_release.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,120 @@ defmodule Plausible.Release do
end
end

def migrate do
@doc """
`interweave_migrate/0` is a migration function that:

- Lists all pending migrations across multiple repositories.
- Sorts these migrations into a single list.
- Groups consecutive migrations by repository into "streaks".
- Executes the migrations in the correct order by processing each streak sequentially.

### Why Use This Approach?

This function resolves dependencies between migrations that span across different repositories.
The default `migrate/0` function migrates each repository independently, which may result in
migrations running in the wrong order when there are cross-repository dependencies.

Consider the following example (adapted from reality, not 100% accurate):

- **Migration 1**: The PostgreSQL (PG) repository creates a table named `site_imports`.
- **Migration 2**: The ClickHouse (CH) repository creates `import_id` columns in `imported_*` tables.
- **Migration 3**: The PG repository runs a data migration that utilizes both PG and CH databases,
reading from the `import_id` column in `imported_*` tables.

The default `migrate/0` would execute these migrations by repository, resulting in the following order:

1. Migration 1 (PG)
2. Migration 3 (PG)
3. Migration 2 (CH)

This sequence would fail at Migration 3, as the `import_id` columns in the CH repository have not been created yet.

`interweave_migrate/0` addresses this issue by consolidating all pending migrations into a single, ordered queue:

1. Migration 1 (PG)
2. Migration 2 (CH)
3. Migration 3 (PG)

This ensures all dependencies are resolved in the correct order.
"""
def interweave_migrate(repos \\ repos()) do
prepare()
Enum.each(repos(), &run_migrations_for/1)
IO.puts("Migrations successful!")

pending = all_pending_migrations(repos)
streaks = migration_streaks(pending)

Enum.each(streaks, fn {repo, up_to_version} ->
{:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :up, to: up_to_version))
end)
end

defp migration_streaks(pending_migrations) do
sorted_migrations =
pending_migrations
|> Enum.map(fn {repo, version, _name} -> {repo, version} end)
|> Enum.sort_by(fn {_repo, version} -> version end, :asc)

streaks_reversed =
Enum.reduce(sorted_migrations, [], fn {repo, _version} = latest_migration, streaks_acc ->
case streaks_acc do
# start the streak for repo
[] -> [latest_migration]
# extend the streak
[{^repo, _prev_version} | rest] -> [latest_migration | rest]
# end the streak for prev_repo, start the streak for repo
[{_prev_repo, _prev_version} | _rest] -> [latest_migration | streaks_acc]
end
end)

:lists.reverse(streaks_reversed)
end

@spec all_pending_migrations([Ecto.Repo.t()]) :: [{Ecto.Repo.t(), integer, String.t()}]
defp all_pending_migrations(repos) do
Enum.flat_map(repos, fn repo ->
# credo:disable-for-lines:6 Credo.Check.Refactor.Nesting
{:ok, pending, _started} =
Ecto.Migrator.with_repo(repo, fn repo ->
Ecto.Migrator.migrations(repo)
|> Enum.filter(fn {status, _version, _name} -> status == :down end)
|> Enum.map(fn {_status, version, name} -> {repo, version, name} end)
end)

pending
end)
end

def pending_migrations do
def pending_streaks(repos \\ repos()) do
prepare()
IO.puts("Pending migrations")
IO.puts("")
Enum.each(repos(), &list_pending_migrations_for/1)
IO.puts("Collecting pending migrations..")

pending = all_pending_migrations(repos)

if pending == [] do
IO.puts("No pending migrations!")
else
streaks = migration_streaks(pending)
print_migration_streaks(streaks, pending)
end
end

defp print_migration_streaks([{repo, up_to_version} | streaks], pending) do
{streak, pending} =
Enum.split_with(pending, fn {pending_repo, version, _name} ->
pending_repo == repo and version <= up_to_version
end)

IO.puts(
"\n#{inspect(repo)} [#{Path.relative_to_cwd(Ecto.Migrator.migrations_path(repo))}] streak up to version #{up_to_version}:"
)

Enum.each(streak, fn {_repo, version, name} -> IO.puts(" * #{version}_#{name}") end)
print_migration_streaks(streaks, pending)
end

defp print_migration_streaks([], []), do: :ok

def seed do
prepare()
# Run seed script
Expand Down Expand Up @@ -123,33 +224,6 @@ defmodule Plausible.Release do
end
end

defp run_migrations_for(repo) do
IO.puts("Running migrations for #{repo}")
{:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :up, all: true))
end

defp list_pending_migrations_for(repo) do
IO.puts("Listing pending migrations for #{repo}")
IO.puts("")

migration_directory = Ecto.Migrator.migrations_path(repo)

pending =
repo
|> Ecto.Migrator.migrations([migration_directory])
|> Enum.filter(fn {status, _version, _migration} -> status == :down end)

if pending == [] do
IO.puts("No pending migrations")
else
Enum.each(pending, fn {_, version, migration} ->
IO.puts("* #{version}_#{migration}")
end)
end

IO.puts("")
end

defp ensure_repo_created(repo) do
IO.puts("create #{inspect(repo)} database if it doesn't exist")

Expand Down
2 changes: 1 addition & 1 deletion rel/overlays/migrate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

BIN_DIR=$(dirname "$0")

"${BIN_DIR}"/bin/plausible eval Plausible.Release.migrate
"${BIN_DIR}"/bin/plausible eval Plausible.Release.interweave_migrate
2 changes: 1 addition & 1 deletion rel/overlays/pending-migrations.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

BIN_DIR=$(dirname "$0")

"${BIN_DIR}"/bin/plausible eval Plausible.Release.pending_migrations
"${BIN_DIR}"/bin/plausible eval Plausible.Release.pending_streaks
Loading