diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ad75d79f..06805bcd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -138,6 +138,28 @@ jobs: toolchain: ${{ matrix.rust }} - run: cd refinery && cargo test --features mysql_async --test mysql_async -- --test-threads 1 + test-clickhouse: + name: Test clickhouse + runs-on: ubuntu-latest + strategy: + matrix: + rust: [stable, nightly, 1.56.0] + services: + clickhouse: + image: bitnami/clickhouse:22.9.4 + ports: + - 9000:9000 + env: + CLICKHOUSE_ADMIN_USER: default + ALLOW_EMPTY_PASSWORD: yes + + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.rust }} + - run: cd refinery && cargo test --features clickhouse --test clickhouse -- --test-threads 1 + test-tiberius: name: Test tiberius runs-on: ubuntu-latest diff --git a/refinery/Cargo.toml b/refinery/Cargo.toml index eba439d9..889c4a1c 100644 --- a/refinery/Cargo.toml +++ b/refinery/Cargo.toml @@ -13,13 +13,14 @@ categories = ["database"] edition = "2018" [features] -default = [] +default = ["clickhouse"] rusqlite-bundled = ["refinery-core/rusqlite-bundled"] rusqlite = ["refinery-core/rusqlite"] postgres = ["refinery-core/postgres"] mysql = ["refinery-core/mysql", "refinery-core/flate2"] tokio-postgres = ["refinery-core/tokio-postgres"] mysql_async = ["refinery-core/mysql_async"] +clickhouse = ["refinery-core/clickhouse"] tiberius = ["refinery-core/tiberius"] tiberius-config = ["refinery-core/tiberius", "refinery-core/tiberius-config"] diff --git a/refinery/tests/clickhouse.rs b/refinery/tests/clickhouse.rs new file mode 100644 index 00000000..1e632f37 --- /dev/null +++ b/refinery/tests/clickhouse.rs @@ -0,0 +1,657 @@ +#[cfg(feature = "clickhouse")] +mod clickhouse { + use futures::FutureExt; + use refinery::{ + config::{Config, ConfigDbType}, + embed_migrations, + error::Kind, + AsyncMigrate, Migration, Runner, Target, + }; + use refinery_core::klickhouse::{Client, ClientOptions, RawRow, UnitValue}; + use std::panic::AssertUnwindSafe; + use time::OffsetDateTime; + + const DEFAULT_TABLE_NAME: &str = "refinery_schema_history"; + + fn get_migrations() -> Vec { + embed_migrations!("./tests/migrations_clickhouse"); + + let migration1 = Migration::unapplied( + "V1__initial", + include_str!("./migrations_clickhouse/V1-2/V1__initial.sql"), + ) + .unwrap(); + + let migration2 = Migration::unapplied( + "V2__add_cars_and_motos_table", + include_str!("./migrations_clickhouse/V1-2/V2__add_cars_and_motos_table.sql"), + ) + .unwrap(); + + let migration3 = Migration::unapplied( + "V3__add_brand_to_cars_table", + include_str!("./migrations_clickhouse/V3/V3__add_brand_to_cars_table.sql"), + ) + .unwrap(); + + let migration4 = Migration::unapplied( + "V4__add_year_to_motos_table", + include_str!("./migrations_clickhouse/V4__add_year_to_motos_table.sql"), + ) + .unwrap(); + + let migration5 = Migration::unapplied( + "V5__add_year_field_to_cars", + "ALTER TABLE cars ADD COLUMN year Int32;", + ) + .unwrap(); + + vec![migration1, migration2, migration3, migration4, migration5] + } + + mod embedded { + use refinery::embed_migrations; + embed_migrations!("./tests/migrations_clickhouse"); + } + + mod broken { + use refinery::embed_migrations; + embed_migrations!("./tests/migrations_broken"); + } + + mod missing { + use refinery::embed_migrations; + embed_migrations!("./tests/migrations_missing_clickhouse"); + } + + async fn run_test>(t: T) { + let result = AssertUnwindSafe(t).catch_unwind().await; + assert!(result.is_ok()); + } + + async fn connect() -> Client { + let client = Client::connect( + "localhost:9000", + ClientOptions { + username: "default".to_string(), + password: "".to_string(), + default_database: "".to_string(), + }, + ) + .await + .unwrap(); + + client + .execute("DROP DATABASE IF EXISTS refinery_test") + .await + .unwrap(); + client + .execute("CREATE DATABASE refinery_test") + .await + .unwrap(); + drop(client); + let client = Client::connect( + "localhost:9000", + ClientOptions { + username: "default".to_string(), + password: "".to_string(), + default_database: "refinery_test".to_string(), + }, + ) + .await + .unwrap(); + client + } + + #[tokio::test] + async fn creates_migration_table() { + run_test(async { + let mut client = connect().await; + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + client + .query_collect::>(&format!( + "SELECT table_name FROM information_schema.tables WHERE table_name='{}'", + DEFAULT_TABLE_NAME + )) + .await + .unwrap() + .into_iter() + .for_each(|name| { + assert_eq!(DEFAULT_TABLE_NAME, name.0); + }); + }) + .await; + } + + #[tokio::test] + async fn report_contains_applied_migrations() { + run_test(async { + let mut client = connect().await; + + let report = embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let migrations = get_migrations(); + let applied_migrations = report.applied_migrations(); + + assert_eq!(4, applied_migrations.len()); + + assert_eq!(migrations[0].version(), applied_migrations[0].version()); + assert_eq!(migrations[1].version(), applied_migrations[1].version()); + assert_eq!(migrations[2].version(), applied_migrations[2].version()); + assert_eq!(migrations[3].version(), applied_migrations[3].version()); + + assert_eq!(migrations[0].name(), migrations[0].name()); + assert_eq!(migrations[1].name(), applied_migrations[1].name()); + assert_eq!(migrations[2].name(), applied_migrations[2].name()); + assert_eq!(migrations[3].name(), applied_migrations[3].name()); + + assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); + assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); + assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); + assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); + }) + .await; + } + + #[tokio::test] + async fn applies_migration() { + run_test(async { + let mut client = connect().await; + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + client + .execute("INSERT INTO persons (name, city) VALUES ('John Legend', 'New York')") + .await + .unwrap(); + + let mut rows: Vec = client + .query_collect("SELECT name, city FROM persons") + .await + .unwrap(); + + { + assert_eq!("John Legend", rows[0].get::<_, String>(0)); + assert_eq!("New York", rows[0].get::<_, String>(1)); + } + }) + .await + } + + #[tokio::test] + async fn updates_schema_history() { + run_test(async { + let mut client = connect().await; + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let current = client + .get_last_applied_migration(DEFAULT_TABLE_NAME) + .await + .unwrap() + .unwrap(); + + assert_eq!(4, current.version()); + assert_eq!( + OffsetDateTime::now_utc().date(), + current.applied_on().unwrap().date() + ); + }) + .await + } + + #[tokio::test] + async fn gets_applied_migrations() { + run_test(async { + let mut client = connect().await; + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let applied_migrations = client + .get_applied_migrations(DEFAULT_TABLE_NAME) + .await + .unwrap(); + let migrations = get_migrations(); + assert_eq!(4, applied_migrations.len()); + + assert_eq!(migrations[0].version(), applied_migrations[0].version()); + assert_eq!(migrations[1].version(), applied_migrations[1].version()); + assert_eq!(migrations[2].version(), applied_migrations[2].version()); + assert_eq!(migrations[3].version(), applied_migrations[3].version()); + + assert_eq!(migrations[0].name(), migrations[0].name()); + assert_eq!(migrations[1].name(), applied_migrations[1].name()); + assert_eq!(migrations[2].name(), applied_migrations[2].name()); + assert_eq!(migrations[3].name(), applied_migrations[3].name()); + + assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); + assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); + assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); + assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); + }) + .await; + } + + #[tokio::test] + async fn applies_new_migration() { + run_test(async { + let mut client = connect().await; + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let migrations = get_migrations(); + + let mchecksum = migrations[4].checksum(); + client + .migrate( + &migrations, + true, + true, + false, + Target::Latest, + DEFAULT_TABLE_NAME, + ) + .await + .unwrap(); + + let current = client + .get_last_applied_migration(DEFAULT_TABLE_NAME) + .await + .unwrap() + .unwrap(); + + assert_eq!(5, current.version()); + assert_eq!(mchecksum, current.checksum()); + }) + .await; + } + + #[tokio::test] + async fn migrates_to_target_migration() { + run_test(async { + let mut client = connect().await; + + let report = embedded::migrations::runner() + .set_target(Target::Version(3)) + .run_async(&mut client) + .await + .unwrap(); + + let current = client + .get_last_applied_migration(DEFAULT_TABLE_NAME) + .await + .unwrap() + .unwrap(); + let applied_migrations = report.applied_migrations(); + let migrations = get_migrations(); + + assert_eq!(3, current.version()); + + assert_eq!(3, applied_migrations.len()); + + assert_eq!(migrations[0].version(), applied_migrations[0].version()); + assert_eq!(migrations[1].version(), applied_migrations[1].version()); + assert_eq!(migrations[2].version(), applied_migrations[2].version()); + + assert_eq!(migrations[0].name(), migrations[0].name()); + assert_eq!(migrations[1].name(), applied_migrations[1].name()); + assert_eq!(migrations[2].name(), applied_migrations[2].name()); + + assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); + assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); + assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); + }) + .await; + } + + #[tokio::test] + async fn aborts_on_missing_migration_on_filesystem() { + run_test(async { + let mut client = connect().await; + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let migration = Migration::unapplied( + "V4__add_year_field_to_cars", + "ALTER TABLE cars ADD year INTEGER;", + ) + .unwrap(); + + let err = client + .migrate( + &[migration.clone()], + true, + true, + false, + Target::Latest, + DEFAULT_TABLE_NAME, + ) + .await + .unwrap_err(); + + match err.kind() { + Kind::MissingVersion(missing) => { + assert_eq!(1, missing.version()); + assert_eq!("initial", missing.name()); + } + _ => panic!("failed test"), + } + }) + .await; + } + + #[tokio::test] + async fn aborts_on_divergent_migration() { + run_test(async { + let mut client = connect().await; + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let migration = Migration::unapplied( + "V2__add_year_field_to_cars", + "ALTER TABLE cars ADD COLUMN year INTEGER;", + ) + .unwrap(); + + let err = client + .migrate( + &[migration.clone()], + true, + false, + false, + Target::Latest, + DEFAULT_TABLE_NAME, + ) + .await + .unwrap_err(); + + match err.kind() { + Kind::DivergentVersion(applied, divergent) => { + assert_eq!(&migration, divergent); + assert_eq!(2, applied.version()); + assert_eq!("add_cars_and_motos_table", applied.name()); + } + _ => panic!("failed test"), + }; + }) + .await; + } + + #[tokio::test] + async fn aborts_on_missing_migration_on_database() { + run_test(async { + let mut client = connect().await; + + missing::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let migration1 = Migration::unapplied( + "V1__initial", + r#" + CREATE TABLE persons ( + id Int32, + name String, + city String + ) + Engine=MergeTree() ORDER BY id; + "#, + ) + .unwrap(); + + let migration2 = Migration::unapplied( + "V2__add_cars_table", + include_str!("./migrations_missing_clickhouse/V2__add_cars_table.sql"), + ) + .unwrap(); + let err = client + .migrate( + &[migration1, migration2], + true, + true, + false, + Target::Latest, + DEFAULT_TABLE_NAME, + ) + .await + .unwrap_err(); + + match err.kind() { + Kind::MissingVersion(missing) => { + assert_eq!(1, missing.version()); + assert_eq!("initial", missing.name()); + } + _ => panic!("failed test"), + } + }) + .await; + } + + #[tokio::test] + async fn migrates_from_config() { + run_test(async { + // clear database + connect().await; + + let mut config = Config::new(ConfigDbType::Clickhouse) + .set_db_name("refinery_test") + .set_db_host("localhost") + .set_db_port("9000"); + + let migrations = get_migrations(); + let runner = Runner::new(&migrations) + .set_grouped(false) + .set_abort_divergent(true) + .set_abort_missing(true); + + runner.run_async(&mut config).await.unwrap(); + + let applied_migrations = runner + .get_applied_migrations_async(&mut config) + .await + .unwrap(); + assert_eq!(5, applied_migrations.len()); + + assert_eq!(migrations[0].version(), applied_migrations[0].version()); + assert_eq!(migrations[1].version(), applied_migrations[1].version()); + assert_eq!(migrations[2].version(), applied_migrations[2].version()); + assert_eq!(migrations[3].version(), applied_migrations[3].version()); + assert_eq!(migrations[4].version(), applied_migrations[4].version()); + + assert_eq!(migrations[0].name(), migrations[0].name()); + assert_eq!(migrations[1].name(), applied_migrations[1].name()); + assert_eq!(migrations[2].name(), applied_migrations[2].name()); + assert_eq!(migrations[3].name(), applied_migrations[3].name()); + assert_eq!(migrations[4].name(), applied_migrations[4].name()); + + assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); + assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); + assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); + assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); + assert_eq!(migrations[4].checksum(), applied_migrations[4].checksum()); + }) + .await; + } + + #[tokio::test] + async fn migrate_from_config_report_contains_migrations() { + run_test(async { + // clear database + connect().await; + + let mut config = Config::new(ConfigDbType::Clickhouse) + .set_db_name("refinery_test") + .set_db_host("localhost") + .set_db_port("9000"); + + let migrations = get_migrations(); + let runner = Runner::new(&migrations) + .set_grouped(false) + .set_abort_divergent(true) + .set_abort_missing(true); + + let report = runner.run_async(&mut config).await.unwrap(); + + let applied_migrations = report.applied_migrations(); + assert_eq!(5, applied_migrations.len()); + + assert_eq!(migrations[0].version(), applied_migrations[0].version()); + assert_eq!(migrations[1].version(), applied_migrations[1].version()); + assert_eq!(migrations[2].version(), applied_migrations[2].version()); + assert_eq!(migrations[3].version(), applied_migrations[3].version()); + assert_eq!(migrations[4].version(), applied_migrations[4].version()); + + assert_eq!(migrations[0].name(), migrations[0].name()); + assert_eq!(migrations[1].name(), applied_migrations[1].name()); + assert_eq!(migrations[2].name(), applied_migrations[2].name()); + assert_eq!(migrations[3].name(), applied_migrations[3].name()); + assert_eq!(migrations[4].name(), applied_migrations[4].name()); + + assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); + assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); + assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); + assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); + assert_eq!(migrations[4].checksum(), applied_migrations[4].checksum()); + }) + .await; + } + + #[tokio::test] + async fn migrate_from_config_report_returns_last_applied_migration() { + run_test(async { + // clear database + connect().await; + + let mut config = Config::new(ConfigDbType::Clickhouse) + .set_db_name("refinery_test") + .set_db_host("localhost") + .set_db_port("9000"); + + let migrations = get_migrations(); + let runner = Runner::new(&migrations) + .set_grouped(false) + .set_abort_divergent(true) + .set_abort_missing(true); + + runner.run_async(&mut config).await.unwrap(); + + let applied_migration = runner + .get_last_applied_migration_async(&mut config) + .await + .unwrap() + .unwrap(); + assert_eq!(5, applied_migration.version()); + + assert_eq!(migrations[4].version(), applied_migration.version()); + assert_eq!(migrations[4].name(), applied_migration.name()); + assert_eq!(migrations[4].checksum(), applied_migration.checksum()); + }) + .await; + } + + #[tokio::test] + async fn doesnt_run_migrations_if_fake() { + run_test(async { + let mut client = connect().await; + + let report = embedded::migrations::runner() + .set_target(Target::Fake) + .run_async(&mut client) + .await + .unwrap(); + + let applied_migrations = report.applied_migrations(); + assert!(applied_migrations.is_empty()); + + let current = client + .get_last_applied_migration(DEFAULT_TABLE_NAME) + .await + .unwrap() + .unwrap(); + let migrations = get_migrations(); + let mchecksum = migrations[3].checksum(); + + assert_eq!(4, current.version()); + assert_eq!(mchecksum, current.checksum()); + + let row = client + .query_collect::>( + "SELECT table_name FROM information_schema.tables WHERE table_name='persons'", + ) + .await + .unwrap(); + + assert!(row.is_empty()); + }) + .await; + } + + #[tokio::test] + async fn doesnt_run_migrations_if_fake_version() { + run_test(async { + let mut client = connect().await; + + let report = embedded::migrations::runner() + .set_target(Target::FakeVersion(2)) + .run_async(&mut client) + .await + .unwrap(); + + let applied_migrations = report.applied_migrations(); + assert!(applied_migrations.is_empty()); + + let current = client + .get_last_applied_migration(DEFAULT_TABLE_NAME) + .await + .unwrap() + .unwrap(); + let migrations = get_migrations(); + let mchecksum = migrations[1].checksum(); + + assert_eq!(2, current.version()); + assert_eq!(mchecksum, current.checksum()); + + let row = client + .query_collect::>( + "SELECT table_name FROM information_schema.tables WHERE table_name='persons'", + ) + .await + .unwrap(); + + assert!(row.is_empty()); + }) + .await; + } +} diff --git a/refinery/tests/migrations_clickhouse/V1-2/V1__initial.sql b/refinery/tests/migrations_clickhouse/V1-2/V1__initial.sql new file mode 100644 index 00000000..56ff3634 --- /dev/null +++ b/refinery/tests/migrations_clickhouse/V1-2/V1__initial.sql @@ -0,0 +1,7 @@ + +CREATE TABLE persons ( + id UUID, + name String, + city String +) +Engine=MergeTree() ORDER BY id; \ No newline at end of file diff --git a/refinery/tests/migrations_clickhouse/V1-2/V2__add_cars_and_motos_table.sql b/refinery/tests/migrations_clickhouse/V1-2/V2__add_cars_and_motos_table.sql new file mode 100644 index 00000000..b78c5c09 --- /dev/null +++ b/refinery/tests/migrations_clickhouse/V1-2/V2__add_cars_and_motos_table.sql @@ -0,0 +1,10 @@ +CREATE TABLE cars ( + id UUID, + name String +) +Engine=MergeTree() ORDER BY id; +CREATE TABLE motos ( + id UUID, + name String +) +Engine=MergeTree() ORDER BY id; diff --git a/refinery/tests/migrations_clickhouse/V3/V3__add_brand_to_cars_table.sql b/refinery/tests/migrations_clickhouse/V3/V3__add_brand_to_cars_table.sql new file mode 100644 index 00000000..782adafc --- /dev/null +++ b/refinery/tests/migrations_clickhouse/V3/V3__add_brand_to_cars_table.sql @@ -0,0 +1 @@ +ALTER TABLE cars ADD COLUMN brand String; diff --git a/refinery/tests/migrations_clickhouse/V4__add_year_to_motos_table.sql b/refinery/tests/migrations_clickhouse/V4__add_year_to_motos_table.sql new file mode 100644 index 00000000..74c2f4e1 --- /dev/null +++ b/refinery/tests/migrations_clickhouse/V4__add_year_to_motos_table.sql @@ -0,0 +1 @@ +ALTER TABLE motos ADD COLUMN year Int32; diff --git a/refinery/tests/migrations_missing_clickhouse/V2__add_cars_table.sql b/refinery/tests/migrations_missing_clickhouse/V2__add_cars_table.sql new file mode 100644 index 00000000..c8d5b0f2 --- /dev/null +++ b/refinery/tests/migrations_missing_clickhouse/V2__add_cars_table.sql @@ -0,0 +1,5 @@ +CREATE TABLE cars ( + id Int32, + name String +) +Engine=MergeTree() ORDER BY id; \ No newline at end of file diff --git a/refinery_core/Cargo.toml b/refinery_core/Cargo.toml index 11b46917..b8c1a37b 100644 --- a/refinery_core/Cargo.toml +++ b/refinery_core/Cargo.toml @@ -15,6 +15,7 @@ tiberius = ["dep:tiberius", "futures"] tiberius-config = ["tiberius", "tokio", "tokio-util"] tokio-postgres = ["dep:tokio-postgres", "tokio"] mysql_async = ["dep:mysql_async", "tokio"] +clickhouse = ["dep:klickhouse", "tokio"] [dependencies] async-trait = "0.1" @@ -40,6 +41,7 @@ tiberius = { version = "0.7", optional = true } futures = { version = "0.3.16", optional = true } tokio-util = { version = "0.6.7", features = ["compat"], optional = true } time = { version = "0.3.5", features = ["parsing", "formatting"] } +klickhouse = { version = "0.8.3", optional = true } # Flate2 needs to be included for mysql to work flate2 = { version = "1.0", default-features = false, features = [ "zlib" ], optional = true} diff --git a/refinery_core/src/config.rs b/refinery_core/src/config.rs index c1fac221..c19c28b7 100644 --- a/refinery_core/src/config.rs +++ b/refinery_core/src/config.rs @@ -20,6 +20,7 @@ pub enum ConfigDbType { Postgres, Sqlite, Mssql, + Clickhouse, } impl Config { @@ -138,6 +139,18 @@ impl Config { self.main.db_port.as_deref() } + pub fn db_name(&self) -> Option<&str> { + self.main.db_name.as_deref() + } + + pub fn db_user(&self) -> Option<&str> { + self.main.db_user.as_deref() + } + + pub fn db_pass(&self) -> Option<&str> { + self.main.db_pass.as_deref() + } + pub fn set_db_user(self, db_user: &str) -> Config { Config { main: Main { diff --git a/refinery_core/src/drivers/clickhouse.rs b/refinery_core/src/drivers/clickhouse.rs new file mode 100644 index 00000000..c619bf36 --- /dev/null +++ b/refinery_core/src/drivers/clickhouse.rs @@ -0,0 +1,57 @@ +use async_trait::async_trait; +use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction}; +use crate::Migration; +use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; + +use klickhouse::{Client, KlickhouseError, Result, Row, query_parser}; + +#[derive(Row)] +struct MigrationRow { + version: i32, + name: String, + applied_on: String, + checksum: u64, +} + +#[async_trait] +impl AsyncTransaction for Client { + type Error = KlickhouseError; + + async fn execute(&mut self, queries: &[&str]) -> Result { + for query in queries { + for query in query_parser::split_query_statements(query).into_iter().filter(|x| !x.trim().is_empty()) { + Client::execute(self, query).await?; + } + } + Ok(queries.len()) + } +} + +#[async_trait] +impl AsyncQuery> for Client { + async fn query( + &mut self, + query: &str, + ) -> Result, ::Error> { + assert!(!query.is_empty()); + self.query_collect::(query).await? + .into_iter() + .map(|row| Ok(Migration::applied(row.version, row.name, OffsetDateTime::parse(&*row.applied_on, &Rfc3339).map_err(|e| { + KlickhouseError::DeserializeError(format!("failed to parse time: {:?}", e)) + })?, row.checksum))) + .collect() + } +} + +impl AsyncMigrate for Client { + fn assert_migrations_table_query(migration_table_name: &str) -> String { + format!( + "CREATE TABLE IF NOT EXISTS {migration_table_name}( + version INT, + name VARCHAR(255), + applied_on VARCHAR(255), + checksum UInt64) Engine=MergeTree() ORDER BY version;" + ) + } +} diff --git a/refinery_core/src/drivers/config.rs b/refinery_core/src/drivers/config.rs index 92a00582..dd067a1e 100644 --- a/refinery_core/src/drivers/config.rs +++ b/refinery_core/src/drivers/config.rs @@ -90,6 +90,9 @@ macro_rules! with_connection { ConfigDbType::Mssql => { panic!("tried to synchronously migrate from config for a mssql database, but tiberius is an async driver"); } + ConfigDbType::Clickhouse => { + panic!("tried to synchronously migrate from config for a clickhouse database, but klickhouse is an async driver"); + } } } } @@ -97,6 +100,7 @@ macro_rules! with_connection { #[cfg(any( feature = "tokio-postgres", feature = "mysql_async", + feature = "clickhouse", feature = "tiberius-config" ))] macro_rules! with_connection_async { @@ -154,6 +158,26 @@ macro_rules! with_connection_async { } } } + ConfigDbType::Clickhouse => { + cfg_if::cfg_if! { + if #[cfg(feature = "clickhouse")] { + let host = $config.db_host().unwrap_or("127.0.0.1"); + let port = $config.db_port().unwrap_or("9000"); + let default_database = $config.db_name().unwrap_or_default().to_string(); + let username = $config.db_user().unwrap_or("default").to_string(); + let password = $config.db_pass().unwrap_or("").to_string(); + let client = klickhouse::Client::connect(format!("{}:{}", host, port), klickhouse::ClientOptions { + default_database, + username, + password, + }).await.migration_err("could not connect to the database", None)?; + + $op(client).await + } else { + panic!("tried to migrate async from config for a clickhouse database, but feature clickhouse not enabled!"); + } + } + } } } } @@ -219,6 +243,7 @@ impl crate::Migrate for Config { #[cfg(any( feature = "mysql_async", feature = "tokio-postgres", + feature = "clickhouse", feature = "tiberius-config" ))] #[async_trait] diff --git a/refinery_core/src/drivers/mod.rs b/refinery_core/src/drivers/mod.rs index 867d4c4d..b778611c 100644 --- a/refinery_core/src/drivers/mod.rs +++ b/refinery_core/src/drivers/mod.rs @@ -16,4 +16,7 @@ pub mod mysql; #[cfg(feature = "tiberius")] pub mod tiberius; +#[cfg(feature = "clickhouse")] +pub mod clickhouse; + mod config; diff --git a/refinery_core/src/lib.rs b/refinery_core/src/lib.rs index 35d02a47..407dc7c9 100644 --- a/refinery_core/src/lib.rs +++ b/refinery_core/src/lib.rs @@ -26,5 +26,8 @@ pub use tokio_postgres; #[cfg(feature = "mysql_async")] pub use mysql_async; +#[cfg(feature = "clickhouse")] +pub use klickhouse; + #[cfg(feature = "tiberius")] pub use tiberius;