Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make appflowy-cloud migrate use feature flags #627

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

This file was deleted.

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tracing = { version = "0.1.40", features = ["log"] }
tracing-subscriber = { version = "0.3.18", features = ["registry", "env-filter", "ansi", "json", "tracing-log"] }
tracing-bunyan-formatter = "0.3.9"
sqlx = { workspace = true, default-features = false, features = ["runtime-tokio-rustls", "macros", "postgres", "uuid", "chrono", "migrate"] }
sqlx-core = { version = "0.7.4", features = ["migrate"] }
async-trait.workspace = true
prometheus-client.workspace = true
itertools = "0.11"
Expand Down Expand Up @@ -234,4 +235,6 @@ collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev =
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "870cd70" }

[features]
default = ["ai"]
history = []
ai = ["database/ai"]
3 changes: 2 additions & 1 deletion libs/database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ serde_json.workspace = true
tonic-proto.workspace = true

sqlx = { version = "0.7", default-features = false, features = ["postgres", "chrono", "uuid", "macros", "runtime-tokio-rustls", "rust_decimal"] }
pgvector = { workspace = true, features = ["sqlx"] }
pgvector = { workspace = true, features = ["sqlx"], optional = true }
tracing = { version = "0.1.40" }
uuid = { version = "1.6.1", features = ["serde", "v4"] }
chrono = { version = "0.4", features = ["serde"] }
Expand All @@ -36,3 +36,4 @@ bincode.workspace = true
[features]
default = ["s3"]
s3 = ["rust-s3"]
ai = ["pgvector"]
33 changes: 15 additions & 18 deletions libs/database/src/index/collab_embeddings_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ops::DerefMut;

use collab_entity::CollabType;
use pgvector::Vector;
use sqlx::{Executor, Postgres, Transaction};
use sqlx::{Executor, Postgres, Row, Transaction};
use uuid::Uuid;

use database_entity::dto::AFCollabEmbeddingParams;
Expand All @@ -11,7 +11,7 @@ pub async fn get_index_status(
tx: &mut Transaction<'_, sqlx::Postgres>,
oid: &str,
) -> Result<Option<bool>, sqlx::Error> {
let result = sqlx::query!(
let result = sqlx::query(
r#"
SELECT
w.settings['disable_indexing']::boolean as disable_indexing,
Expand All @@ -23,15 +23,14 @@ SELECT
END as has_index
FROM af_collab c
JOIN af_workspace w ON c.workspace_id = w.workspace_id
WHERE c.oid = $1"#,
oid
)
WHERE c.oid = $1"#
).bind(oid)
.fetch_one(tx.deref_mut())
.await?;
if result.disable_indexing.unwrap_or(false) {
if result.get::<Option<bool>, _>(0).unwrap_or(false) {
return Ok(None);
}
Ok(Some(result.has_index.unwrap_or(false)))
Ok(Some(result.get::<Option<bool>, _>(1).unwrap_or(false)))
}

pub async fn upsert_collab_embeddings(
Expand Down Expand Up @@ -72,12 +71,10 @@ pub async fn remove_collab_embeddings(
tx: &mut Transaction<'_, sqlx::Postgres>,
ids: &[String],
) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM af_collab_embeddings WHERE fragment_id IN (SELECT unnest($1::text[]))",
ids
)
.execute(tx.deref_mut())
.await?;
sqlx::query("DELETE FROM af_collab_embeddings WHERE fragment_id IN (SELECT unnest($1::text[]))")
.bind(ids)
.execute(tx.deref_mut())
.await?;
Ok(())
}

Expand All @@ -87,24 +84,24 @@ pub async fn get_collabs_without_embeddings<'a, E>(
where
E: Executor<'a, Database = Postgres>,
{
let oids = sqlx::query!(
let oids: Vec<(Uuid, String, i32)> = sqlx::query_as(
r#"
select c.workspace_id, c.oid, c.partition_key
from af_collab c
where not exists (
select 1
from af_collab_embeddings em
where em.oid = c.oid and em.partition_key = 0)"# // atm. get only documents
where em.oid = c.oid and em.partition_key = 0)"#, // atm. get only documents
)
.fetch_all(executor)
.await?;
Ok(
oids
.into_iter()
.map(|r| CollabId {
collab_type: CollabType::from(r.partition_key),
workspace_id: r.workspace_id,
object_id: r.oid,
collab_type: CollabType::from(r.2),
workspace_id: r.0,
object_id: r.1,
})
.collect(),
)
Expand Down
4 changes: 3 additions & 1 deletion libs/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ pub mod chat;
pub mod collab;
pub mod file;
pub mod history;
pub mod index;
pub mod listener;
pub mod pg_row;
pub mod resource_usage;
pub mod user;
pub mod workspace;

#[cfg(feature = "ai")]
pub mod index;
24 changes: 0 additions & 24 deletions migrations/20240614171931_collab_embeddings.sql

This file was deleted.

17 changes: 17 additions & 0 deletions migrations/ai/20240614171931_collab_embeddings.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Add migration script here
CREATE EXTENSION IF NOT EXISTS vector;

-- create table to store collab embeddings
CREATE TABLE IF NOT EXISTS af_collab_embeddings
(
fragment_id TEXT NOT NULL PRIMARY KEY,
oid TEXT NOT NULL,
partition_key INTEGER NOT NULL,
content_type INTEGER NOT NULL,
indexed_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW()),
content TEXT,
embedding VECTOR(1536),
FOREIGN KEY (oid, partition_key) REFERENCES af_collab (oid, partition_key) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS af_collab_embeddings_similarity_idx ON af_collab_embeddings USING hnsw (embedding vector_cosine_ops);
4 changes: 3 additions & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ pub mod file_storage;

pub mod history;
pub mod metrics;
pub mod search;
pub mod user;
pub mod util;
pub mod workspace;
pub mod ws;

#[cfg(feature = "ai")]
pub mod search;
19 changes: 14 additions & 5 deletions src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ use openssl::x509::X509;
use secrecy::{ExposeSecret, Secret};
use snowflake::Snowflake;
use sqlx::{postgres::PgPoolOptions, PgPool};
use sqlx_core::migrate::Migrator;
use std::net::TcpListener;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use tonic_proto::history::history_client::HistoryClient;

use crate::api::ai::ai_completion_scope;
use crate::api::search::search_scope;
use crate::middleware::feature_migration::FeatureMigrationSource;
use tracing::{info, warn};
use workspace_access::WorkspaceAccessControlImpl;

Expand Down Expand Up @@ -128,7 +129,7 @@ pub async fn run_actix_server(

let realtime_server_actor = Supervisor::start(|_| RealtimeServerActor(realtime_server));
let mut server = HttpServer::new(move || {
App::new()
let app = App::new()
.wrap(NormalizePath::trim())
// Middleware is registered for each App, scope, or Resource and executed in opposite order as registration
.wrap(MetricsMiddleware)
Expand All @@ -150,15 +151,21 @@ pub async fn run_actix_server(
.service(ai_completion_scope())
.service(history_scope())
.service(metrics_scope())
.service(search_scope())
.app_data(Data::new(state.metrics.registry.clone()))
.app_data(Data::new(state.metrics.request_metrics.clone()))
.app_data(Data::new(state.metrics.realtime_metrics.clone()))
.app_data(Data::new(state.metrics.access_control_metrics.clone()))
.app_data(Data::new(realtime_server_actor.clone()))
.app_data(Data::new(state.config.gotrue.jwt_secret.clone()))
.app_data(Data::new(state.clone()))
.app_data(Data::new(storage.clone()))
.app_data(Data::new(storage.clone()));

#[cfg(feature = "ai")]
{
return app.service(crate::api::search::search_scope());
}

app
});

server = match pair {
Expand Down Expand Up @@ -435,7 +442,9 @@ async fn get_connection_pool(setting: &DatabaseSetting) -> Result<PgPool, Error>
}

async fn migrate(pool: &PgPool) -> Result<(), Error> {
sqlx::migrate!("./migrations")
Migrator::new(FeatureMigrationSource::new("./migrations"))
.await
.unwrap()
.set_ignore_missing(true)
.run(pool)
.await
Expand Down
4 changes: 3 additions & 1 deletion src/biz/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub mod chat;
pub mod collab;
pub mod pg_listener;
pub mod search;
pub mod user;
pub mod utils;
pub mod workspace;

#[cfg(feature = "ai")]
pub mod search;
58 changes: 58 additions & 0 deletions src/middleware/feature_migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::path::{Path, PathBuf};

use futures_util::future::BoxFuture;
use sqlx::error::BoxDynError;
use sqlx::migrate::{Migration, MigrationSource};

#[derive(Debug)]
pub struct FeatureMigrationSource {
root: PathBuf,
features: Vec<&'static str>,
}

impl FeatureMigrationSource {
pub const FEATURE_DEFAULT: &'static str = "default";

pub fn new<P: AsRef<Path>>(root: P) -> Self {
let features = vec![
Self::FEATURE_DEFAULT,
#[cfg(feature = "ai")]
"ai",
];
Self::with_features(root, features)
}

pub fn with_features<P: AsRef<Path>>(root: P, features: Vec<&'static str>) -> Self {
FeatureMigrationSource {
root: root.as_ref().to_path_buf(),
features,
}
}
}

impl<'s> MigrationSource<'s> for FeatureMigrationSource {
fn resolve(self) -> BoxFuture<'s, Result<Vec<Migration>, BoxDynError>> {
Box::pin(async move {
if !self.root.exists() {
return Err(format!("Migration path does not exist: {:?}", self.root).into());
}
let mut migrations = Vec::new();
for feature in self.features {
let path = match feature {
Self::FEATURE_DEFAULT => self.root.clone(),
_ => self.root.join(feature),
};
if path.exists() && path.is_dir() {
let feature_migrations: Vec<_> = sqlx_core::migrate::resolve_blocking(path)?
.into_iter()
.map(|(migration, _)| migration)
.collect();
migrations.extend(feature_migrations);
}
}

migrations.sort_by_key(|m| m.version);
Ok(migrations)
})
}
}
Loading
Loading