Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ syn2mas = { path = "./crates/syn2mas", version = "=0.14.1" }
version = "0.14.1"
features = ["axum", "axum-extra", "axum-json", "axum-query", "macros"]

[workspace.dependencies.arc-swap]
version = "1.7.1"

# GraphQL server
[workspace.dependencies.async-graphql]
version = "7.0.15"
Expand Down
85 changes: 80 additions & 5 deletions crates/cli/src/commands/syn2mas.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, process::ExitCode};
use std::{collections::HashMap, process::ExitCode, sync::atomic::Ordering, time::Duration};

use anyhow::Context;
use camino::Utf8PathBuf;
Expand All @@ -10,12 +10,18 @@ use mas_config::{
};
use mas_storage::SystemClock;
use mas_storage_pg::MIGRATOR;
use opentelemetry::KeyValue;
use rand::thread_rng;
use sqlx::{Connection, Either, PgConnection, postgres::PgConnectOptions, types::Uuid};
use syn2mas::{LockedMasDatabase, MasWriter, SynapseReader, synapse_config};
use tracing::{Instrument, error, info_span, warn};
use syn2mas::{
LockedMasDatabase, MasWriter, Progress, ProgressStage, SynapseReader, synapse_config,
};
use tracing::{Instrument, error, info, info_span, warn};

use crate::util::{DatabaseConnectOptions, database_connection_from_config_with_options};
use crate::{
telemetry::METER,
util::{DatabaseConnectOptions, database_connection_from_config_with_options},
};

/// The exit code used by `syn2mas check` and `syn2mas migrate` when there are
/// errors preventing migration.
Expand Down Expand Up @@ -248,7 +254,12 @@ impl Options {
#[allow(clippy::disallowed_methods)]
let mut rng = thread_rng();

// TODO progress reporting
let progress = Progress::default();

let occasional_progress_logger_task =
tokio::spawn(occasional_progress_logger(progress.clone()));
let progress_telemetry_task = tokio::spawn(progress_telemetry(progress.clone()));

let mas_matrix = MatrixConfig::extract(figment)?;
eprintln!("\n\n");
syn2mas::migrate(
Expand All @@ -258,11 +269,75 @@ impl Options {
&clock,
&mut rng,
provider_id_mappings,
&progress,
)
.await?;

occasional_progress_logger_task.abort();
progress_telemetry_task.abort();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this would be done through a CancellationToken, I don't particularly like aborting tasks, but since we haven't set that up in the rest of the migration process, I'm happy to keep it like this for now


Ok(ExitCode::SUCCESS)
}
}
}
}

/// Logs progress every 30 seconds, as a lightweight alternative to a progress
/// bar. For most deployments, the migration will not take 30 seconds so this
/// will not be relevant. In other cases, this will give the operator an idea of
/// what's going on.
async fn occasional_progress_logger(progress: Progress) {
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
match &**progress.get_current_stage() {
ProgressStage::SettingUp => {
info!(name: "progress", "still setting up");
}
ProgressStage::MigratingData {
entity,
migrated,
approx_count,
} => {
let migrated = migrated.load(Ordering::Relaxed);
#[allow(clippy::cast_precision_loss)]
let percent = (f64::from(migrated) / *approx_count as f64) * 100.0;
info!(name: "progress", "migrating {entity}: {migrated}/~{approx_count} (~{percent:.1}%)");
}
ProgressStage::RebuildIndex { index_name } => {
info!(name: "progress", "still waiting for rebuild of index {index_name}");
}
ProgressStage::RebuildConstraint { constraint_name } => {
info!(name: "progress", "still waiting for rebuild of constraint {constraint_name}");
}
}
}
}

/// Reports migration progress as OpenTelemetry metrics
async fn progress_telemetry(progress: Progress) {
let migrated_data_counter = METER
.u64_gauge("migrated_data")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a prefix, plus a common namespace, e.g. syn2mas.data.migrated and syn2mas.data.total. Note that when translated to Prometheus, dots will be replaced by underscores. Feel free to come up something else, this is just a suggestion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syn2mas.entity... maybe. Don't like how vague these words are but something like 'migratee' is no better haha. I guess it doesn't exactly matter as long as it makes some sense.

.with_description("How many entities have been migrated so far")
.build();
let max_data_counter = METER
.u64_gauge("max_data")
.with_description("How many entities of the given type exist (approximate)")
.build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upping counters directly is probably performant enough? I would make those counters, static with a std::sync::LazyLock and just record every time we insert

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't make them static but maybe this is OK?


loop {
tokio::time::sleep(Duration::from_secs(10)).await;
if let ProgressStage::MigratingData {
entity,
migrated,
approx_count,
} = &**progress.get_current_stage()
{
let metrics_kv = [KeyValue::new("entity", *entity)];
let migrated = migrated.load(Ordering::Relaxed);
migrated_data_counter.record(u64::from(migrated), &metrics_kv);
max_data_counter.record(*approx_count, &metrics_kv);
} else {
// not sure how to map other stages
}
}
}
1 change: 1 addition & 0 deletions crates/syn2mas/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ repository.workspace = true

[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
bitflags.workspace = true
camino.workspace = true
figment.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions crates/syn2mas/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ mod mas_writer;
mod synapse_reader;

mod migration;
mod progress;

type RandomState = rustc_hash::FxBuildHasher;
type HashMap<K, V> = rustc_hash::FxHashMap<K, V>;

pub use self::{
mas_writer::{MasWriter, checks::mas_pre_migration_checks, locking::LockedMasDatabase},
migration::migrate,
progress::{Progress, ProgressStage},
synapse_reader::{
SynapseReader,
checks::{
Expand Down
85 changes: 78 additions & 7 deletions crates/syn2mas/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@
//! This module does not implement any of the safety checks that should be run
//! *before* the migration.

use std::{pin::pin, time::Instant};
use std::{
pin::pin,
sync::{
Arc,
atomic::{AtomicU32, Ordering},
},
time::Instant,
};

use chrono::{DateTime, Utc};
use compact_str::CompactString;
Expand All @@ -32,6 +39,7 @@ use crate::{
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
MasNewUserPassword, MasWriteBuffer, MasWriter,
},
progress::{Progress, ProgressStage},
synapse_reader::{
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
Expand Down Expand Up @@ -147,6 +155,7 @@ pub async fn migrate(
clock: &dyn Clock,
rng: &mut impl RngCore,
provider_id_mapping: std::collections::HashMap<String, Uuid>,
progress: &Progress,
) -> Result<(), Error> {
let counts = synapse.count_rows().await.into_synapse("counting users")?;

Expand All @@ -162,14 +171,58 @@ pub async fn migrate(
provider_id_mapping,
};

let (mas, state) = migrate_users(&mut synapse, mas, state, rng).await?;
let (mas, state) = migrate_threepids(&mut synapse, mas, rng, state).await?;
let (mas, state) = migrate_external_ids(&mut synapse, mas, rng, state).await?;
let migrated_counter = Arc::new(AtomicU32::new(0));
progress.set_current_stage(ProgressStage::MigratingData {
entity: "users",
migrated: migrated_counter.clone(),
approx_count: counts.users as u64,
});
let (mas, state) = migrate_users(&mut synapse, mas, state, rng, migrated_counter).await?;

let migrated_counter = Arc::new(AtomicU32::new(0));
progress.set_current_stage(ProgressStage::MigratingData {
entity: "threepids",
migrated: migrated_counter.clone(),
approx_count: counts.users as u64,
});
let (mas, state) = migrate_threepids(&mut synapse, mas, rng, state, &migrated_counter).await?;

let migrated_counter = Arc::new(AtomicU32::new(0));
progress.set_current_stage(ProgressStage::MigratingData {
entity: "external_ids",
migrated: migrated_counter.clone(),
approx_count: counts.users as u64,
});
let (mas, state) =
migrate_external_ids(&mut synapse, mas, rng, state, &migrated_counter).await?;

let migrated_counter = Arc::new(AtomicU32::new(0));
progress.set_current_stage(ProgressStage::MigratingData {
entity: "unrefreshable_access_tokens",
migrated: migrated_counter.clone(),
approx_count: counts.users as u64,
});
let (mas, state) =
migrate_unrefreshable_access_tokens(&mut synapse, mas, clock, rng, state).await?;
migrate_unrefreshable_access_tokens(&mut synapse, mas, clock, rng, state, migrated_counter)
.await?;

let migrated_counter = Arc::new(AtomicU32::new(0));
progress.set_current_stage(ProgressStage::MigratingData {
entity: "refreshable_token_pairs",
migrated: migrated_counter.clone(),
approx_count: counts.users as u64,
});
let (mas, state) =
migrate_refreshable_token_pairs(&mut synapse, mas, clock, rng, state).await?;
let (mas, _state) = migrate_devices(&mut synapse, mas, rng, state).await?;
migrate_refreshable_token_pairs(&mut synapse, mas, clock, rng, state, &migrated_counter)
.await?;

let migrated_counter = Arc::new(AtomicU32::new(0));
progress.set_current_stage(ProgressStage::MigratingData {
entity: "devices",
migrated: migrated_counter.clone(),
approx_count: counts.users as u64,
});
let (mas, _state) = migrate_devices(&mut synapse, mas, rng, state, migrated_counter).await?;

synapse
.finish()
Expand All @@ -189,6 +242,7 @@ async fn migrate_users(
mut mas: MasWriter,
mut state: MigrationState,
rng: &mut impl RngCore,
progress_counter: Arc<AtomicU32>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();

Expand Down Expand Up @@ -261,6 +315,8 @@ async fn migrate_users(
.await
.into_mas("writing password")?;
}

progress_counter.fetch_add(1, Ordering::Relaxed);
}

user_buffer
Expand Down Expand Up @@ -304,6 +360,7 @@ async fn migrate_threepids(
mut mas: MasWriter,
rng: &mut impl RngCore,
state: MigrationState,
progress_counter: &AtomicU32,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();

Expand Down Expand Up @@ -365,6 +422,8 @@ async fn migrate_threepids(
.await
.into_mas("writing unsupported threepid")?;
}

progress_counter.fetch_add(1, Ordering::Relaxed);
}

email_buffer
Expand Down Expand Up @@ -394,6 +453,7 @@ async fn migrate_external_ids(
mut mas: MasWriter,
rng: &mut impl RngCore,
state: MigrationState,
progress_counter: &AtomicU32,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();

Expand Down Expand Up @@ -447,6 +507,8 @@ async fn migrate_external_ids(
)
.await
.into_mas("failed to write upstream link")?;

progress_counter.fetch_add(1, Ordering::Relaxed);
}

write_buffer
Expand Down Expand Up @@ -476,6 +538,7 @@ async fn migrate_devices(
mut mas: MasWriter,
rng: &mut impl RngCore,
mut state: MigrationState,
progress_counter: Arc<AtomicU32>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();

Expand Down Expand Up @@ -563,6 +626,8 @@ async fn migrate_devices(
)
.await
.into_mas("writing compat sessions")?;

progress_counter.fetch_add(1, Ordering::Relaxed);
}

write_buffer
Expand Down Expand Up @@ -605,6 +670,7 @@ async fn migrate_unrefreshable_access_tokens(
clock: &dyn Clock,
rng: &mut impl RngCore,
mut state: MigrationState,
progress_counter: Arc<AtomicU32>,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();

Expand Down Expand Up @@ -704,6 +770,8 @@ async fn migrate_unrefreshable_access_tokens(
)
.await
.into_mas("writing compat access tokens")?;

progress_counter.fetch_add(1, Ordering::Relaxed);
}
write_buffer
.finish(&mut mas)
Expand Down Expand Up @@ -749,6 +817,7 @@ async fn migrate_refreshable_token_pairs(
clock: &dyn Clock,
rng: &mut impl RngCore,
mut state: MigrationState,
progress_counter: &AtomicU32,
) -> Result<(MasWriter, MigrationState), Error> {
let start = Instant::now();

Expand Down Expand Up @@ -830,6 +899,8 @@ async fn migrate_refreshable_token_pairs(
)
.await
.into_mas("writing compat refresh tokens")?;

progress_counter.fetch_add(1, Ordering::Relaxed);
}

access_token_write_buffer
Expand Down
Loading
Loading