diff --git a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/interactive_tx.rs b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/interactive_tx.rs index da0db0a0e70a..b8827a3bf009 100644 --- a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/interactive_tx.rs +++ b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/interactive_tx.rs @@ -213,7 +213,7 @@ mod interactive_tx { Ok(()) } - #[connector_test(exclude(Vitess("planetscale.js.wasm"), Sqlite("cfd1")))] + #[connector_test(exclude(Sqlite("cfd1")))] async fn batch_queries_failure(mut runner: Runner) -> TestResult<()> { // Tx expires after five second. let tx_id = runner.start_tx(5000, 5000, None).await?; @@ -256,7 +256,7 @@ mod interactive_tx { Ok(()) } - #[connector_test(exclude(Vitess("planetscale.js.wasm")))] + #[connector_test] async fn tx_expiration_failure_cycle(mut runner: Runner) -> TestResult<()> { // Tx expires after one seconds. let tx_id = runner.start_tx(5000, 1000, None).await?; @@ -573,10 +573,7 @@ mod itx_isolation { use query_engine_tests::*; // All (SQL) connectors support serializable. - // However, there's a bug in the PlanetScale driver adapter: - // "Transaction characteristics can't be changed while a transaction is in progress - // (errno 1568) (sqlstate 25001) during query: SET TRANSACTION ISOLATION LEVEL SERIALIZABLE" - #[connector_test(exclude(MongoDb, Vitess("planetscale.js", "planetscale.js.wasm"), Sqlite("cfd1")))] + #[connector_test(exclude(MongoDb, Sqlite("cfd1")))] async fn basic_serializable(mut runner: Runner) -> TestResult<()> { let tx_id = runner.start_tx(5000, 5000, Some("Serializable".to_owned())).await?; runner.set_active_tx(tx_id.clone()); @@ -598,9 +595,7 @@ mod itx_isolation { Ok(()) } - // On PlanetScale, this fails with: - // `InteractiveTransactionError("Error in connector: Error querying the database: Server error: `ERROR 25001 (1568): Transaction characteristics can't be changed while a transaction is in progress'")` - #[connector_test(exclude(MongoDb, Vitess("planetscale.js", "planetscale.js.wasm"), Sqlite("cfd1")))] + #[connector_test(exclude(MongoDb, Sqlite("cfd1")))] async fn casing_doesnt_matter(mut runner: Runner) -> TestResult<()> { let tx_id = runner.start_tx(5000, 5000, Some("sErIaLiZaBlE".to_owned())).await?; runner.set_active_tx(tx_id.clone()); diff --git a/query-engine/connector-test-kit-rs/query-engine-tests/tests/queries/batching/transactional_batch.rs b/query-engine/connector-test-kit-rs/query-engine-tests/tests/queries/batching/transactional_batch.rs index 022f8f9e96ab..cf0a769d3543 100644 --- a/query-engine/connector-test-kit-rs/query-engine-tests/tests/queries/batching/transactional_batch.rs +++ b/query-engine/connector-test-kit-rs/query-engine-tests/tests/queries/batching/transactional_batch.rs @@ -145,9 +145,7 @@ mod transactional { Ok(()) } - // On PlanetScale, this fails with: - // "Error in connector: Error querying the database: Server error: `ERROR 25001 (1568): Transaction characteristics can't be changed while a transaction is in progress'"" - #[connector_test(exclude(MongoDb, Vitess("planetscale.js", "planetscale.js.wasm")))] + #[connector_test(exclude(MongoDb))] async fn valid_isolation_level(runner: Runner) -> TestResult<()> { let queries = vec![r#"mutation { createOneModelB(data: { id: 1 }) { id }}"#.to_string()]; diff --git a/query-engine/driver-adapters/executor/src/recording.ts b/query-engine/driver-adapters/executor/src/recording.ts index 88b9d369bc23..5ac0f52b4cb7 100644 --- a/query-engine/driver-adapters/executor/src/recording.ts +++ b/query-engine/driver-adapters/executor/src/recording.ts @@ -21,7 +21,7 @@ function recorder(adapter: DriverAdapter, recordings: Recordings) { return { provider: adapter.provider, adapterName: adapter.adapterName, - startTransaction: () => { + transactionContext: () => { throw new Error("Not implemented"); }, getConnectionInfo: () => { @@ -43,7 +43,7 @@ function replayer(adapter: DriverAdapter, recordings: Recordings) { provider: adapter.provider, adapterName: adapter.adapterName, recordings: recordings, - startTransaction: () => { + transactionContext: () => { throw new Error("Not implemented"); }, getConnectionInfo: () => { diff --git a/query-engine/driver-adapters/src/proxy.rs b/query-engine/driver-adapters/src/proxy.rs index 8e1d39138cb6..1c2a5a68240b 100644 --- a/query-engine/driver-adapters/src/proxy.rs +++ b/query-engine/driver-adapters/src/proxy.rs @@ -1,10 +1,10 @@ -use crate::send_future::UnsafeFuture; use crate::types::JsConnectionInfo; pub use crate::types::{JSResultSet, Query, TransactionOptions}; use crate::{ from_js_value, get_named_property, get_optional_named_property, to_rust_str, AdapterMethod, JsObject, JsResult, JsString, JsTransaction, }; +use crate::{send_future::UnsafeFuture, transaction::JsTransactionContext}; use futures::Future; use metrics::increment_gauge; @@ -28,8 +28,19 @@ pub(crate) struct CommonProxy { /// This is a JS proxy for accessing the methods specific to top level /// JS driver objects pub(crate) struct DriverProxy { - start_transaction: AdapterMethod<(), JsTransaction>, + /// Retrieve driver-specific info, such as the maximum number of query parameters get_connection_info: Option>, + + /// Provide a transaction context, in which raw commands are guaranteed to be executed in + /// the same scope as a future transaction, which can be spawned by via + /// [`driver_adapters::transaction::JsTransactionContext::start_transaction`]. + /// This was first introduced for supporting Isolation Levels in PlanetScale. + transaction_context: AdapterMethod<(), JsTransactionContext>, +} + +/// This is a JS proxy for accessing the methods specific to JS transaction contexts. +pub(crate) struct TransactionContextProxy { + start_transaction: AdapterMethod<(), JsTransaction>, } /// This a JS proxy for accessing the methods, specific @@ -48,6 +59,7 @@ pub(crate) struct TransactionProxy { closed: AtomicBool, } +// TypeScript: Queryable impl CommonProxy { pub fn new(object: &JsObject) -> JsResult { let provider: JsString = get_named_property(object, "provider")?; @@ -68,11 +80,12 @@ impl CommonProxy { } } +// TypeScript: DriverAdapter impl DriverProxy { pub fn new(object: &JsObject) -> JsResult { Ok(Self { - start_transaction: get_named_property(object, "startTransaction")?, get_connection_info: get_optional_named_property(object, "getConnectionInfo")?, + transaction_context: get_named_property(object, "transactionContext")?, }) } @@ -87,6 +100,20 @@ impl DriverProxy { .await } + pub async fn transaction_context(&self) -> quaint::Result { + let ctx = self.transaction_context.call_as_async(()).await?; + + Ok(ctx) + } +} + +impl TransactionContextProxy { + pub fn new(object: &JsObject) -> JsResult { + let start_transaction = get_named_property(object, "startTransaction")?; + + Ok(Self { start_transaction }) + } + async fn start_transaction_inner(&self) -> quaint::Result> { let tx = self.start_transaction.call_as_async(()).await?; @@ -98,7 +125,7 @@ impl DriverProxy { Ok(Box::new(tx)) } - pub fn start_transaction(&self) -> UnsafeFuture>> + '_> { + pub fn start_transaction(&self) -> impl Future>> + '_ { UnsafeFuture(self.start_transaction_inner()) } } @@ -184,6 +211,8 @@ macro_rules! impl_send_sync_on_wasm { // Assume the proxy object will not be sent to service workers, we can unsafe impl Send + Sync. impl_send_sync_on_wasm!(TransactionProxy); +impl_send_sync_on_wasm!(JsTransaction); +impl_send_sync_on_wasm!(TransactionContextProxy); +impl_send_sync_on_wasm!(JsTransactionContext); impl_send_sync_on_wasm!(DriverProxy); impl_send_sync_on_wasm!(CommonProxy); -impl_send_sync_on_wasm!(JsTransaction); diff --git a/query-engine/driver-adapters/src/queryable.rs b/query-engine/driver-adapters/src/queryable.rs index 4e47e9c51631..dde42c5fb420 100644 --- a/query-engine/driver-adapters/src/queryable.rs +++ b/query-engine/driver-adapters/src/queryable.rs @@ -301,25 +301,32 @@ impl QuaintQueryable for JsQueryable { } } -#[async_trait] -impl TransactionCapable for JsQueryable { - async fn start_transaction<'a>( +impl JsQueryable { + async fn start_transaction_inner<'a>( &'a self, isolation: Option, ) -> quaint::Result> { - let tx = self.driver_proxy.start_transaction().await?; + // 1. Obtain a transaction context from the driver. + // Any command run on this context is guaranteed to be part of the same session + // as the transaction spawned from it. + let tx_ctx = self.driver_proxy.transaction_context().await?; - let isolation_first = tx.requires_isolation_first(); + let requires_isolation_first = tx_ctx.requires_isolation_first(); - if isolation_first { + // 2. Set the isolation level (if specified) if the provider requires it to be set before + // creating the transaction. + if requires_isolation_first { if let Some(isolation) = isolation { - tx.set_tx_isolation_level(isolation).await?; + tx_ctx.set_tx_isolation_level(isolation).await?; } } - let begin_stmt = tx.begin_statement(); + // 3. Spawn a transaction from the context. + let tx = tx_ctx.start_transaction().await?; + let begin_stmt = tx.begin_statement(); let tx_opts = tx.options(); + if tx_opts.use_phantom_query { let begin_stmt = JsBaseQueryable::phantom_query_message(begin_stmt); tx.raw_phantom_cmd(begin_stmt.as_str()).await?; @@ -327,7 +334,8 @@ impl TransactionCapable for JsQueryable { tx.raw_cmd(begin_stmt).await?; } - if !isolation_first { + // 4. Set the isolation level (if specified) if we didn't do it before. + if !requires_isolation_first { if let Some(isolation) = isolation { tx.set_tx_isolation_level(isolation).await?; } @@ -339,6 +347,16 @@ impl TransactionCapable for JsQueryable { } } +#[async_trait] +impl TransactionCapable for JsQueryable { + async fn start_transaction<'a>( + &'a self, + isolation: Option, + ) -> quaint::Result> { + UnsafeFuture(self.start_transaction_inner(isolation)).await + } +} + pub fn from_js(driver: JsObject) -> JsQueryable { let common = CommonProxy::new(&driver).unwrap(); let driver_proxy = DriverProxy::new(&driver).unwrap(); diff --git a/query-engine/driver-adapters/src/transaction.rs b/query-engine/driver-adapters/src/transaction.rs index b3dd64630899..8d124bd4da0a 100644 --- a/query-engine/driver-adapters/src/transaction.rs +++ b/query-engine/driver-adapters/src/transaction.rs @@ -1,3 +1,5 @@ +use std::future::Future; + use async_trait::async_trait; use metrics::decrement_gauge; use quaint::{ @@ -6,10 +8,78 @@ use quaint::{ Value, }; -use crate::proxy::{TransactionOptions, TransactionProxy}; +use crate::proxy::{TransactionContextProxy, TransactionOptions, TransactionProxy}; use crate::{proxy::CommonProxy, queryable::JsBaseQueryable, send_future::UnsafeFuture}; use crate::{JsObject, JsResult}; +pub(crate) struct JsTransactionContext { + tx_ctx_proxy: TransactionContextProxy, + inner: JsBaseQueryable, +} + +// Wrapper around JS transaction context objects that implements Queryable. Can be used in place of quaint transaction, +// context, but delegates most operations to JS +impl JsTransactionContext { + pub(crate) fn new(inner: JsBaseQueryable, tx_ctx_proxy: TransactionContextProxy) -> Self { + Self { inner, tx_ctx_proxy } + } + + pub fn start_transaction(&self) -> impl Future>> + '_ { + UnsafeFuture(self.tx_ctx_proxy.start_transaction()) + } +} + +#[async_trait] +impl Queryable for JsTransactionContext { + async fn query(&self, q: QuaintQuery<'_>) -> quaint::Result { + self.inner.query(q).await + } + + async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result { + self.inner.query_raw(sql, params).await + } + + async fn query_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result { + self.inner.query_raw_typed(sql, params).await + } + + async fn execute(&self, q: QuaintQuery<'_>) -> quaint::Result { + self.inner.execute(q).await + } + + async fn execute_raw(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result { + self.inner.execute_raw(sql, params).await + } + + async fn execute_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result { + self.inner.execute_raw_typed(sql, params).await + } + + async fn raw_cmd(&self, cmd: &str) -> quaint::Result<()> { + self.inner.raw_cmd(cmd).await + } + + async fn version(&self) -> quaint::Result> { + self.inner.version().await + } + + async fn describe_query(&self, sql: &str) -> quaint::Result { + self.inner.describe_query(sql).await + } + + fn is_healthy(&self) -> bool { + self.inner.is_healthy() + } + + async fn set_tx_isolation_level(&self, isolation_level: IsolationLevel) -> quaint::Result<()> { + self.inner.set_tx_isolation_level(isolation_level).await + } + + fn requires_isolation_first(&self) -> bool { + self.inner.requires_isolation_first() + } +} + // Wrapper around JS transaction objects that implements Queryable // and quaint::Transaction. Can be used in place of quaint transaction, // but delegates most operations to JS @@ -149,3 +219,30 @@ impl ::napi::bindgen_prelude::FromNapiValue for JsTransaction { Ok(Self::new(JsBaseQueryable::new(common_proxy), tx_proxy)) } } + +#[cfg(target_arch = "wasm32")] +impl super::wasm::FromJsValue for JsTransactionContext { + fn from_js_value(value: wasm_bindgen::prelude::JsValue) -> JsResult { + use wasm_bindgen::JsCast; + + let object = value.dyn_into::()?; + let common_proxy = CommonProxy::new(&object)?; + let base = JsBaseQueryable::new(common_proxy); + let tx_ctx_proxy = TransactionContextProxy::new(&object)?; + + Ok(Self::new(base, tx_ctx_proxy)) + } +} + +/// Implementing unsafe `from_napi_value` allows retrieving a threadsafe `JsTransactionContext` in `DriverProxy` +/// while keeping derived futures `Send`. +#[cfg(not(target_arch = "wasm32"))] +impl ::napi::bindgen_prelude::FromNapiValue for JsTransactionContext { + unsafe fn from_napi_value(env: napi::sys::napi_env, napi_val: napi::sys::napi_value) -> JsResult { + let object = JsObject::from_napi_value(env, napi_val)?; + let common_proxy = CommonProxy::new(&object)?; + let tx_ctx_proxy = TransactionContextProxy::new(&object)?; + + Ok(Self::new(JsBaseQueryable::new(common_proxy), tx_ctx_proxy)) + } +}