From 2e854fb4f09c59c1ab4c7f6fdaeb88846e6727cc Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Mon, 17 Jun 2024 12:41:14 +0200 Subject: [PATCH] feat: make appflowy-cloud migrate use feature flags --- ...adfe4049234258ff2c8429db85206a96c53c1.json | 14 ----- ...6f06877f5a8c49830a4126ee61f7c2e9db03a.json | 28 --------- ...37ea61f37167a21cefb4b8480bd511ce2a878.json | 32 ---------- Cargo.lock | 1 + Cargo.toml | 3 + libs/database/Cargo.toml | 3 +- .../src/index/collab_embeddings_ops.rs | 33 +++++----- libs/database/src/lib.rs | 4 +- .../20240614171931_collab_embeddings.sql | 24 -------- .../ai/20240614171931_collab_embeddings.sql | 17 ++++++ src/api/mod.rs | 4 +- src/application.rs | 19 ++++-- src/biz/mod.rs | 4 +- src/middleware/feature_migration.rs | 58 ++++++++++++++++++ src/middleware/mod.rs | 1 + tests/sql_test/migrator_test.rs | 61 +++++++++++++++++++ tests/sql_test/mod.rs | 1 + tests/sql_test/util.rs | 6 +- 18 files changed, 187 insertions(+), 126 deletions(-) delete mode 100644 .sqlx/query-6a1722f63a88debb617c20f91d2adfe4049234258ff2c8429db85206a96c53c1.json delete mode 100644 .sqlx/query-e6105ace4f5b9b71a7edc30f80c6f06877f5a8c49830a4126ee61f7c2e9db03a.json delete mode 100644 .sqlx/query-ff77d2a038e3130bf57d055e26b37ea61f37167a21cefb4b8480bd511ce2a878.json delete mode 100644 migrations/20240614171931_collab_embeddings.sql create mode 100644 migrations/ai/20240614171931_collab_embeddings.sql create mode 100644 src/middleware/feature_migration.rs create mode 100644 tests/sql_test/migrator_test.rs diff --git a/.sqlx/query-6a1722f63a88debb617c20f91d2adfe4049234258ff2c8429db85206a96c53c1.json b/.sqlx/query-6a1722f63a88debb617c20f91d2adfe4049234258ff2c8429db85206a96c53c1.json deleted file mode 100644 index 7c848d9aa..000000000 --- a/.sqlx/query-6a1722f63a88debb617c20f91d2adfe4049234258ff2c8429db85206a96c53c1.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM af_collab_embeddings WHERE fragment_id IN (SELECT unnest($1::text[]))", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "TextArray" - ] - }, - "nullable": [] - }, - "hash": "6a1722f63a88debb617c20f91d2adfe4049234258ff2c8429db85206a96c53c1" -} diff --git a/.sqlx/query-e6105ace4f5b9b71a7edc30f80c6f06877f5a8c49830a4126ee61f7c2e9db03a.json b/.sqlx/query-e6105ace4f5b9b71a7edc30f80c6f06877f5a8c49830a4126ee61f7c2e9db03a.json deleted file mode 100644 index 1b1653991..000000000 --- a/.sqlx/query-e6105ace4f5b9b71a7edc30f80c6f06877f5a8c49830a4126ee61f7c2e9db03a.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\nSELECT\n w.settings['disable_indexing']::boolean as disable_indexing,\n CASE\n WHEN w.settings['disable_indexing']::boolean THEN\n FALSE\n ELSE\n EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = c.partition_key AND m.oid = c.oid)\n END as has_index\nFROM af_collab c\nJOIN af_workspace w ON c.workspace_id = w.workspace_id\nWHERE c.oid = $1", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "disable_indexing", - "type_info": "Bool" - }, - { - "ordinal": 1, - "name": "has_index", - "type_info": "Bool" - } - ], - "parameters": { - "Left": [ - "Text" - ] - }, - "nullable": [ - null, - null - ] - }, - "hash": "e6105ace4f5b9b71a7edc30f80c6f06877f5a8c49830a4126ee61f7c2e9db03a" -} diff --git a/.sqlx/query-ff77d2a038e3130bf57d055e26b37ea61f37167a21cefb4b8480bd511ce2a878.json b/.sqlx/query-ff77d2a038e3130bf57d055e26b37ea61f37167a21cefb4b8480bd511ce2a878.json deleted file mode 100644 index 63a9857f7..000000000 --- a/.sqlx/query-ff77d2a038e3130bf57d055e26b37ea61f37167a21cefb4b8480bd511ce2a878.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n select c.workspace_id, c.oid, c.partition_key\n from af_collab c\n where not exists (\n select 1\n from af_collab_embeddings em\n where em.oid = c.oid and em.partition_key = 0)", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "workspace_id", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "oid", - "type_info": "Text" - }, - { - "ordinal": 2, - "name": "partition_key", - "type_info": "Int4" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false, - false - ] - }, - "hash": "ff77d2a038e3130bf57d055e26b37ea61f37167a21cefb4b8480bd511ce2a878" -} diff --git a/Cargo.lock b/Cargo.lock index b7b76e910..7a8deaa2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -656,6 +656,7 @@ dependencies = [ "shared-entity", "snowflake", "sqlx", + "sqlx-core", "tempfile", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 537892c43..a358b2739 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", features = ["registry", "env-filter", "ansi", "json", "tracing-log"] } tracing-bunyan-formatter = "0.3.9" sqlx = { workspace = true, default-features = false, features = ["runtime-tokio-rustls", "macros", "postgres", "uuid", "chrono", "migrate"] } +sqlx-core = { version = "0.7.4", features = ["migrate"] } async-trait.workspace = true prometheus-client.workspace = true itertools = "0.11" @@ -234,4 +235,6 @@ collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "870cd70" } [features] +default = ["ai"] history = [] +ai = ["database/ai"] \ No newline at end of file diff --git a/libs/database/Cargo.toml b/libs/database/Cargo.toml index b071a1aae..a4fdd01e0 100644 --- a/libs/database/Cargo.toml +++ b/libs/database/Cargo.toml @@ -20,7 +20,7 @@ serde_json.workspace = true tonic-proto.workspace = true sqlx = { version = "0.7", default-features = false, features = ["postgres", "chrono", "uuid", "macros", "runtime-tokio-rustls", "rust_decimal"] } -pgvector = { workspace = true, features = ["sqlx"] } +pgvector = { workspace = true, features = ["sqlx"], optional = true } tracing = { version = "0.1.40" } uuid = { version = "1.6.1", features = ["serde", "v4"] } chrono = { version = "0.4", features = ["serde"] } @@ -36,3 +36,4 @@ bincode.workspace = true [features] default = ["s3"] s3 = ["rust-s3"] +ai = ["pgvector"] diff --git a/libs/database/src/index/collab_embeddings_ops.rs b/libs/database/src/index/collab_embeddings_ops.rs index 146ac90ce..23d6a02f0 100644 --- a/libs/database/src/index/collab_embeddings_ops.rs +++ b/libs/database/src/index/collab_embeddings_ops.rs @@ -2,7 +2,7 @@ use std::ops::DerefMut; use collab_entity::CollabType; use pgvector::Vector; -use sqlx::{Executor, Postgres, Transaction}; +use sqlx::{Executor, Postgres, Row, Transaction}; use uuid::Uuid; use database_entity::dto::AFCollabEmbeddingParams; @@ -11,7 +11,7 @@ pub async fn get_index_status( tx: &mut Transaction<'_, sqlx::Postgres>, oid: &str, ) -> Result, sqlx::Error> { - let result = sqlx::query!( + let result = sqlx::query( r#" SELECT w.settings['disable_indexing']::boolean as disable_indexing, @@ -23,15 +23,14 @@ SELECT END as has_index FROM af_collab c JOIN af_workspace w ON c.workspace_id = w.workspace_id -WHERE c.oid = $1"#, - oid - ) +WHERE c.oid = $1"# + ).bind(oid) .fetch_one(tx.deref_mut()) .await?; - if result.disable_indexing.unwrap_or(false) { + if result.get::, _>(0).unwrap_or(false) { return Ok(None); } - Ok(Some(result.has_index.unwrap_or(false))) + Ok(Some(result.get::, _>(1).unwrap_or(false))) } pub async fn upsert_collab_embeddings( @@ -72,12 +71,10 @@ pub async fn remove_collab_embeddings( tx: &mut Transaction<'_, sqlx::Postgres>, ids: &[String], ) -> Result<(), sqlx::Error> { - sqlx::query!( - "DELETE FROM af_collab_embeddings WHERE fragment_id IN (SELECT unnest($1::text[]))", - ids - ) - .execute(tx.deref_mut()) - .await?; + sqlx::query("DELETE FROM af_collab_embeddings WHERE fragment_id IN (SELECT unnest($1::text[]))") + .bind(ids) + .execute(tx.deref_mut()) + .await?; Ok(()) } @@ -87,14 +84,14 @@ pub async fn get_collabs_without_embeddings<'a, E>( where E: Executor<'a, Database = Postgres>, { - let oids = sqlx::query!( + let oids: Vec<(Uuid, String, i32)> = sqlx::query_as( r#" select c.workspace_id, c.oid, c.partition_key from af_collab c where not exists ( select 1 from af_collab_embeddings em - where em.oid = c.oid and em.partition_key = 0)"# // atm. get only documents + where em.oid = c.oid and em.partition_key = 0)"#, // atm. get only documents ) .fetch_all(executor) .await?; @@ -102,9 +99,9 @@ where oids .into_iter() .map(|r| CollabId { - collab_type: CollabType::from(r.partition_key), - workspace_id: r.workspace_id, - object_id: r.oid, + collab_type: CollabType::from(r.2), + workspace_id: r.0, + object_id: r.1, }) .collect(), ) diff --git a/libs/database/src/lib.rs b/libs/database/src/lib.rs index 323ac4e18..587a9a136 100644 --- a/libs/database/src/lib.rs +++ b/libs/database/src/lib.rs @@ -2,9 +2,11 @@ pub mod chat; pub mod collab; pub mod file; pub mod history; -pub mod index; pub mod listener; pub mod pg_row; pub mod resource_usage; pub mod user; pub mod workspace; + +#[cfg(feature = "ai")] +pub mod index; diff --git a/migrations/20240614171931_collab_embeddings.sql b/migrations/20240614171931_collab_embeddings.sql deleted file mode 100644 index 7377990fb..000000000 --- a/migrations/20240614171931_collab_embeddings.sql +++ /dev/null @@ -1,24 +0,0 @@ -DO $$ -BEGIN - -- Add migration script here - CREATE EXTENSION IF NOT EXISTS vector; - - -- create table to store collab embeddings - CREATE TABLE IF NOT EXISTS af_collab_embeddings - ( - fragment_id TEXT NOT NULL PRIMARY KEY, - oid TEXT NOT NULL, - partition_key INTEGER NOT NULL, - content_type INTEGER NOT NULL, - indexed_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW()), - content TEXT, - embedding VECTOR(1536), - FOREIGN KEY (oid, partition_key) REFERENCES af_collab (oid, partition_key) ON DELETE CASCADE - ); - - CREATE INDEX IF NOT EXISTS af_collab_embeddings_similarity_idx ON af_collab_embeddings USING hnsw (embedding vector_cosine_ops); - -EXCEPTION WHEN OTHERS THEN - RAISE NOTICE 'could not create "vector" extension, ignoring this migration'; -END; -$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/migrations/ai/20240614171931_collab_embeddings.sql b/migrations/ai/20240614171931_collab_embeddings.sql new file mode 100644 index 000000000..9c6727726 --- /dev/null +++ b/migrations/ai/20240614171931_collab_embeddings.sql @@ -0,0 +1,17 @@ +-- Add migration script here +CREATE EXTENSION IF NOT EXISTS vector; + +-- create table to store collab embeddings +CREATE TABLE IF NOT EXISTS af_collab_embeddings +( + fragment_id TEXT NOT NULL PRIMARY KEY, + oid TEXT NOT NULL, + partition_key INTEGER NOT NULL, + content_type INTEGER NOT NULL, + indexed_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW()), + content TEXT, + embedding VECTOR(1536), + FOREIGN KEY (oid, partition_key) REFERENCES af_collab (oid, partition_key) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS af_collab_embeddings_similarity_idx ON af_collab_embeddings USING hnsw (embedding vector_cosine_ops); diff --git a/src/api/mod.rs b/src/api/mod.rs index 4e5b667e3..cf5412181 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -4,8 +4,10 @@ pub mod file_storage; pub mod history; pub mod metrics; -pub mod search; pub mod user; pub mod util; pub mod workspace; pub mod ws; + +#[cfg(feature = "ai")] +pub mod search; diff --git a/src/application.rs b/src/application.rs index 28941c0ec..195b16e35 100644 --- a/src/application.rs +++ b/src/application.rs @@ -43,6 +43,7 @@ use openssl::x509::X509; use secrecy::{ExposeSecret, Secret}; use snowflake::Snowflake; use sqlx::{postgres::PgPoolOptions, PgPool}; +use sqlx_core::migrate::Migrator; use std::net::TcpListener; use std::sync::Arc; use std::time::Duration; @@ -50,7 +51,7 @@ use tokio::sync::{Mutex, RwLock}; use tonic_proto::history::history_client::HistoryClient; use crate::api::ai::ai_completion_scope; -use crate::api::search::search_scope; +use crate::middleware::feature_migration::FeatureMigrationSource; use tracing::{info, warn}; use workspace_access::WorkspaceAccessControlImpl; @@ -128,7 +129,7 @@ pub async fn run_actix_server( let realtime_server_actor = Supervisor::start(|_| RealtimeServerActor(realtime_server)); let mut server = HttpServer::new(move || { - App::new() + let app = App::new() .wrap(NormalizePath::trim()) // Middleware is registered for each App, scope, or Resource and executed in opposite order as registration .wrap(MetricsMiddleware) @@ -150,7 +151,6 @@ pub async fn run_actix_server( .service(ai_completion_scope()) .service(history_scope()) .service(metrics_scope()) - .service(search_scope()) .app_data(Data::new(state.metrics.registry.clone())) .app_data(Data::new(state.metrics.request_metrics.clone())) .app_data(Data::new(state.metrics.realtime_metrics.clone())) @@ -158,7 +158,14 @@ pub async fn run_actix_server( .app_data(Data::new(realtime_server_actor.clone())) .app_data(Data::new(state.config.gotrue.jwt_secret.clone())) .app_data(Data::new(state.clone())) - .app_data(Data::new(storage.clone())) + .app_data(Data::new(storage.clone())); + + #[cfg(feature = "ai")] + { + return app.service(crate::api::search::search_scope()); + } + + app }); server = match pair { @@ -435,7 +442,9 @@ async fn get_connection_pool(setting: &DatabaseSetting) -> Result } async fn migrate(pool: &PgPool) -> Result<(), Error> { - sqlx::migrate!("./migrations") + Migrator::new(FeatureMigrationSource::new("./migrations")) + .await + .unwrap() .set_ignore_missing(true) .run(pool) .await diff --git a/src/biz/mod.rs b/src/biz/mod.rs index 2cdd1b8a2..b384dcf70 100644 --- a/src/biz/mod.rs +++ b/src/biz/mod.rs @@ -1,7 +1,9 @@ pub mod chat; pub mod collab; pub mod pg_listener; -pub mod search; pub mod user; pub mod utils; pub mod workspace; + +#[cfg(feature = "ai")] +pub mod search; diff --git a/src/middleware/feature_migration.rs b/src/middleware/feature_migration.rs new file mode 100644 index 000000000..726cb750a --- /dev/null +++ b/src/middleware/feature_migration.rs @@ -0,0 +1,58 @@ +use std::path::{Path, PathBuf}; + +use futures_util::future::BoxFuture; +use sqlx::error::BoxDynError; +use sqlx::migrate::{Migration, MigrationSource}; + +#[derive(Debug)] +pub struct FeatureMigrationSource { + root: PathBuf, + features: Vec<&'static str>, +} + +impl FeatureMigrationSource { + pub const FEATURE_DEFAULT: &'static str = "default"; + + pub fn new>(root: P) -> Self { + let features = vec![ + Self::FEATURE_DEFAULT, + #[cfg(feature = "ai")] + "ai", + ]; + Self::with_features(root, features) + } + + pub fn with_features>(root: P, features: Vec<&'static str>) -> Self { + FeatureMigrationSource { + root: root.as_ref().to_path_buf(), + features, + } + } +} + +impl<'s> MigrationSource<'s> for FeatureMigrationSource { + fn resolve(self) -> BoxFuture<'s, Result, BoxDynError>> { + Box::pin(async move { + if !self.root.exists() { + return Err(format!("Migration path does not exist: {:?}", self.root).into()); + } + let mut migrations = Vec::new(); + for feature in self.features { + let path = match feature { + Self::FEATURE_DEFAULT => self.root.clone(), + _ => self.root.join(feature), + }; + if path.exists() && path.is_dir() { + let feature_migrations: Vec<_> = sqlx_core::migrate::resolve_blocking(path)? + .into_iter() + .map(|(migration, _)| migration) + .collect(); + migrations.extend(feature_migrations); + } + } + + migrations.sort_by_key(|m| m.version); + Ok(migrations) + }) + } +} diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index 6b1181f0c..8d6bf4b9b 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -1,5 +1,6 @@ pub mod access_control_mw; // pub mod cors_mw; pub mod encrypt_mw; +pub mod feature_migration; pub mod metrics_mw; pub mod request_id; diff --git a/tests/sql_test/migrator_test.rs b/tests/sql_test/migrator_test.rs new file mode 100644 index 000000000..1569ea33c --- /dev/null +++ b/tests/sql_test/migrator_test.rs @@ -0,0 +1,61 @@ +use appflowy_cloud::middleware::feature_migration::FeatureMigrationSource; +use itertools::Itertools; +use sqlx_core::migrate::{Migration, MigrationSource}; + +fn has_migration(migrations: &[Migration], name: &str) -> bool { + migrations.iter().any(|m| m.description == name) +} + +#[tokio::test] +async fn migrator_with_feature_test() { + let source = FeatureMigrationSource::with_features( + "./migrations", + vec![FeatureMigrationSource::FEATURE_DEFAULT, "ai"], + ); + let migrations = source.resolve().await.unwrap(); + assert!( + has_migration(&migrations, "collab embeddings"), + "ai migrations should be included" + ); + assert!( + has_migration(&migrations, "user"), + "default migrations should be included" + ); + let sorted_by_date: Vec<_> = migrations.iter().map(|m| m.version).sorted().collect(); + let versions: Vec<_> = migrations.into_iter().map(|m| m.version).collect(); + assert_eq!( + sorted_by_date, versions, + "migrations should be sorted by date" + ); +} + +#[tokio::test] +async fn migrator_without_feature_test() { + let source = FeatureMigrationSource::with_features( + "./migrations", + vec![FeatureMigrationSource::FEATURE_DEFAULT], + ); + let migrations = source.resolve().await.unwrap(); + assert!( + !has_migration(&migrations, "collab embeddings"), + "ai migrations should NOT be included" + ); + assert!( + has_migration(&migrations, "user"), + "default migrations should be included" + ); +} + +#[tokio::test] +async fn migrator_no_default_test() { + let source = FeatureMigrationSource::with_features("./migrations", vec!["ai"]); + let migrations = source.resolve().await.unwrap(); + assert!( + has_migration(&migrations, "collab embeddings"), + "ai migrations should be included" + ); + assert!( + !has_migration(&migrations, "user"), + "default migrations should NOT be included" + ); +} diff --git a/tests/sql_test/mod.rs b/tests/sql_test/mod.rs index f04949068..17dafe708 100644 --- a/tests/sql_test/mod.rs +++ b/tests/sql_test/mod.rs @@ -1,4 +1,5 @@ mod chat_test; mod history_test; +mod migrator_test; pub(crate) mod util; mod workspace_test; diff --git a/tests/sql_test/util.rs b/tests/sql_test/util.rs index 0a3b55d61..859ff8128 100644 --- a/tests/sql_test/util.rs +++ b/tests/sql_test/util.rs @@ -1,8 +1,10 @@ +use appflowy_cloud::middleware::feature_migration::FeatureMigrationSource; use lazy_static::lazy_static; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use snowflake::Snowflake; use sqlx::PgPool; +use sqlx_core::migrate::Migrator; use tokio::sync::RwLock; use uuid::Uuid; @@ -22,7 +24,9 @@ pub async fn setup_db(pool: &PgPool) -> anyhow::Result<()> { .execute(pool) .await?; - sqlx::migrate!("./migrations") + Migrator::new(FeatureMigrationSource::new("./migrations")) + .await + .unwrap() .set_ignore_missing(true) .run(pool) .await