Skip to content

Commit

Permalink
fix(driver-adapters): PlanetScale transactions (#4967)
Browse files Browse the repository at this point in the history
* fix(driver-adapters): fix transaction ordering for ISOLATION LEVEL, impacting PlanetScale

* chore(driver-adapters): uncomment fixed PlanetScale tests

* chore: retrigger CI

* feat(driver-adapters): add support for TransactionContext

* test(connector-test-kit-rs): uncomment succeeding "basic_serializable" test

* chore(driver-adapters): fix types in executor

* DRIVER_ADAPTERS_BRANCH=feat-driver-adapters-transaction-context retrigger CI/CD

* feat(driver-adapters): impl Send on JsTransactionContext on wasm32

* DRIVER_ADAPTERS_BRANCH=feat-driver-adapters-transaction-context retrigger CI/CD

* feat(driver-adapters): impl FromJsValue for JsTransactionContext on wasm32

* DRIVER_ADAPTERS_BRANCH=feat-driver-adapters-transaction-context retrigger CI/CD

* feat(driver-adapters): impl Send for TransactionContextProxy on wasm32

* DRIVER_ADAPTERS_BRANCH=feat-driver-adapters-transaction-context retrigger CI/CD

* chore: attempt Send/Sync-compatibility for JS transactions

* chore: add "parse_raw_query" to "JsTransactionContext"

* fix(driver-adapters): enable wasm32 compilation of "JsQueryable::start_transaction"

* chore: remove commented-out method

* test(connector-test-kit-rs): uncomment "interactive_tx" tests

* DRIVER_ADAPTERS_BRANCH=feat/driver-adapters-transaction-context chore: retrigger CI/CD

* tmp(connector-test-kit-rs): add (failing) concurrent_create_select regression test

* Revert "tmp(connector-test-kit-rs): add (failing) concurrent_create_select regression test"

This reverts commit cc0baab.

* DRIVER_ADAPTERS_BRANCH=feat/driver-adapters-transaction-context chore: retrigger CI/CD

* feat(driver-adapters): rename "parse_raw_query" to "describe_query" after #4979

* DRIVER_ADAPTERS_BRANCH=feat/driver-adapters-transaction-context chore: retrigger CI/CD

* chore: don't leak concrete UnsafeFuture types

* chore(driver-adapters): nit, replace "::napi" with "napi"

* Revert "chore(driver-adapters): nit, replace "::napi" with "napi""

This reverts commit 808fdbe.

* DRIVER_ADAPTERS_BRANCH=feat/driver-adapters-transaction-context chore: retrigger CI/CD

* chore: unbox JsTransactionContext

* DRIVER_ADAPTERS_BRANCH=feat/driver-adapters-transaction-context chore: retrigger CI/CD
  • Loading branch information
jkomyno authored Sep 5, 2024
1 parent f2561ec commit 190a98f
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ mod interactive_tx {
Ok(())
}

#[connector_test(exclude(Vitess("planetscale.js.wasm"), Sqlite("cfd1")))]
#[connector_test(exclude(Sqlite("cfd1")))]
async fn batch_queries_failure(mut runner: Runner) -> TestResult<()> {
// Tx expires after five second.
let tx_id = runner.start_tx(5000, 5000, None).await?;
Expand Down Expand Up @@ -256,7 +256,7 @@ mod interactive_tx {
Ok(())
}

#[connector_test(exclude(Vitess("planetscale.js.wasm")))]
#[connector_test]
async fn tx_expiration_failure_cycle(mut runner: Runner) -> TestResult<()> {
// Tx expires after one seconds.
let tx_id = runner.start_tx(5000, 1000, None).await?;
Expand Down Expand Up @@ -573,10 +573,7 @@ mod itx_isolation {
use query_engine_tests::*;

// All (SQL) connectors support serializable.
// However, there's a bug in the PlanetScale driver adapter:
// "Transaction characteristics can't be changed while a transaction is in progress
// (errno 1568) (sqlstate 25001) during query: SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"
#[connector_test(exclude(MongoDb, Vitess("planetscale.js", "planetscale.js.wasm"), Sqlite("cfd1")))]
#[connector_test(exclude(MongoDb, Sqlite("cfd1")))]
async fn basic_serializable(mut runner: Runner) -> TestResult<()> {
let tx_id = runner.start_tx(5000, 5000, Some("Serializable".to_owned())).await?;
runner.set_active_tx(tx_id.clone());
Expand All @@ -598,9 +595,7 @@ mod itx_isolation {
Ok(())
}

// On PlanetScale, this fails with:
// `InteractiveTransactionError("Error in connector: Error querying the database: Server error: `ERROR 25001 (1568): Transaction characteristics can't be changed while a transaction is in progress'")`
#[connector_test(exclude(MongoDb, Vitess("planetscale.js", "planetscale.js.wasm"), Sqlite("cfd1")))]
#[connector_test(exclude(MongoDb, Sqlite("cfd1")))]
async fn casing_doesnt_matter(mut runner: Runner) -> TestResult<()> {
let tx_id = runner.start_tx(5000, 5000, Some("sErIaLiZaBlE".to_owned())).await?;
runner.set_active_tx(tx_id.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ mod transactional {
Ok(())
}

// On PlanetScale, this fails with:
// "Error in connector: Error querying the database: Server error: `ERROR 25001 (1568): Transaction characteristics can't be changed while a transaction is in progress'""
#[connector_test(exclude(MongoDb, Vitess("planetscale.js", "planetscale.js.wasm")))]
#[connector_test(exclude(MongoDb))]
async fn valid_isolation_level(runner: Runner) -> TestResult<()> {
let queries = vec![r#"mutation { createOneModelB(data: { id: 1 }) { id }}"#.to_string()];

Expand Down
4 changes: 2 additions & 2 deletions query-engine/driver-adapters/executor/src/recording.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ function recorder(adapter: DriverAdapter, recordings: Recordings) {
return {
provider: adapter.provider,
adapterName: adapter.adapterName,
startTransaction: () => {
transactionContext: () => {
throw new Error("Not implemented");
},
getConnectionInfo: () => {
Expand All @@ -43,7 +43,7 @@ function replayer(adapter: DriverAdapter, recordings: Recordings) {
provider: adapter.provider,
adapterName: adapter.adapterName,
recordings: recordings,
startTransaction: () => {
transactionContext: () => {
throw new Error("Not implemented");
},
getConnectionInfo: () => {
Expand Down
39 changes: 34 additions & 5 deletions query-engine/driver-adapters/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::send_future::UnsafeFuture;
use crate::types::JsConnectionInfo;
pub use crate::types::{JSResultSet, Query, TransactionOptions};
use crate::{
from_js_value, get_named_property, get_optional_named_property, to_rust_str, AdapterMethod, JsObject, JsResult,
JsString, JsTransaction,
};
use crate::{send_future::UnsafeFuture, transaction::JsTransactionContext};

use futures::Future;
use metrics::increment_gauge;
Expand All @@ -28,8 +28,19 @@ pub(crate) struct CommonProxy {
/// This is a JS proxy for accessing the methods specific to top level
/// JS driver objects
pub(crate) struct DriverProxy {
start_transaction: AdapterMethod<(), JsTransaction>,
/// Retrieve driver-specific info, such as the maximum number of query parameters
get_connection_info: Option<AdapterMethod<(), JsConnectionInfo>>,

/// Provide a transaction context, in which raw commands are guaranteed to be executed in
/// the same scope as a future transaction, which can be spawned by via
/// [`driver_adapters::transaction::JsTransactionContext::start_transaction`].
/// This was first introduced for supporting Isolation Levels in PlanetScale.
transaction_context: AdapterMethod<(), JsTransactionContext>,
}

/// This is a JS proxy for accessing the methods specific to JS transaction contexts.
pub(crate) struct TransactionContextProxy {
start_transaction: AdapterMethod<(), JsTransaction>,
}

/// This a JS proxy for accessing the methods, specific
Expand All @@ -48,6 +59,7 @@ pub(crate) struct TransactionProxy {
closed: AtomicBool,
}

// TypeScript: Queryable
impl CommonProxy {
pub fn new(object: &JsObject) -> JsResult<Self> {
let provider: JsString = get_named_property(object, "provider")?;
Expand All @@ -68,11 +80,12 @@ impl CommonProxy {
}
}

// TypeScript: DriverAdapter
impl DriverProxy {
pub fn new(object: &JsObject) -> JsResult<Self> {
Ok(Self {
start_transaction: get_named_property(object, "startTransaction")?,
get_connection_info: get_optional_named_property(object, "getConnectionInfo")?,
transaction_context: get_named_property(object, "transactionContext")?,
})
}

Expand All @@ -87,6 +100,20 @@ impl DriverProxy {
.await
}

pub async fn transaction_context(&self) -> quaint::Result<JsTransactionContext> {
let ctx = self.transaction_context.call_as_async(()).await?;

Ok(ctx)
}
}

impl TransactionContextProxy {
pub fn new(object: &JsObject) -> JsResult<Self> {
let start_transaction = get_named_property(object, "startTransaction")?;

Ok(Self { start_transaction })
}

async fn start_transaction_inner(&self) -> quaint::Result<Box<JsTransaction>> {
let tx = self.start_transaction.call_as_async(()).await?;

Expand All @@ -98,7 +125,7 @@ impl DriverProxy {
Ok(Box::new(tx))
}

pub fn start_transaction(&self) -> UnsafeFuture<impl Future<Output = quaint::Result<Box<JsTransaction>>> + '_> {
pub fn start_transaction(&self) -> impl Future<Output = quaint::Result<Box<JsTransaction>>> + '_ {
UnsafeFuture(self.start_transaction_inner())
}
}
Expand Down Expand Up @@ -184,6 +211,8 @@ macro_rules! impl_send_sync_on_wasm {

// Assume the proxy object will not be sent to service workers, we can unsafe impl Send + Sync.
impl_send_sync_on_wasm!(TransactionProxy);
impl_send_sync_on_wasm!(JsTransaction);
impl_send_sync_on_wasm!(TransactionContextProxy);
impl_send_sync_on_wasm!(JsTransactionContext);
impl_send_sync_on_wasm!(DriverProxy);
impl_send_sync_on_wasm!(CommonProxy);
impl_send_sync_on_wasm!(JsTransaction);
36 changes: 27 additions & 9 deletions query-engine/driver-adapters/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,33 +301,41 @@ impl QuaintQueryable for JsQueryable {
}
}

#[async_trait]
impl TransactionCapable for JsQueryable {
async fn start_transaction<'a>(
impl JsQueryable {
async fn start_transaction_inner<'a>(
&'a self,
isolation: Option<IsolationLevel>,
) -> quaint::Result<Box<dyn Transaction + 'a>> {
let tx = self.driver_proxy.start_transaction().await?;
// 1. Obtain a transaction context from the driver.
// Any command run on this context is guaranteed to be part of the same session
// as the transaction spawned from it.
let tx_ctx = self.driver_proxy.transaction_context().await?;

let isolation_first = tx.requires_isolation_first();
let requires_isolation_first = tx_ctx.requires_isolation_first();

if isolation_first {
// 2. Set the isolation level (if specified) if the provider requires it to be set before
// creating the transaction.
if requires_isolation_first {
if let Some(isolation) = isolation {
tx.set_tx_isolation_level(isolation).await?;
tx_ctx.set_tx_isolation_level(isolation).await?;
}
}

let begin_stmt = tx.begin_statement();
// 3. Spawn a transaction from the context.
let tx = tx_ctx.start_transaction().await?;

let begin_stmt = tx.begin_statement();
let tx_opts = tx.options();

if tx_opts.use_phantom_query {
let begin_stmt = JsBaseQueryable::phantom_query_message(begin_stmt);
tx.raw_phantom_cmd(begin_stmt.as_str()).await?;
} else {
tx.raw_cmd(begin_stmt).await?;
}

if !isolation_first {
// 4. Set the isolation level (if specified) if we didn't do it before.
if !requires_isolation_first {
if let Some(isolation) = isolation {
tx.set_tx_isolation_level(isolation).await?;
}
Expand All @@ -339,6 +347,16 @@ impl TransactionCapable for JsQueryable {
}
}

#[async_trait]
impl TransactionCapable for JsQueryable {
async fn start_transaction<'a>(
&'a self,
isolation: Option<IsolationLevel>,
) -> quaint::Result<Box<dyn Transaction + 'a>> {
UnsafeFuture(self.start_transaction_inner(isolation)).await
}
}

pub fn from_js(driver: JsObject) -> JsQueryable {
let common = CommonProxy::new(&driver).unwrap();
let driver_proxy = DriverProxy::new(&driver).unwrap();
Expand Down
99 changes: 98 additions & 1 deletion query-engine/driver-adapters/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::future::Future;

use async_trait::async_trait;
use metrics::decrement_gauge;
use quaint::{
Expand All @@ -6,10 +8,78 @@ use quaint::{
Value,
};

use crate::proxy::{TransactionOptions, TransactionProxy};
use crate::proxy::{TransactionContextProxy, TransactionOptions, TransactionProxy};
use crate::{proxy::CommonProxy, queryable::JsBaseQueryable, send_future::UnsafeFuture};
use crate::{JsObject, JsResult};

pub(crate) struct JsTransactionContext {
tx_ctx_proxy: TransactionContextProxy,
inner: JsBaseQueryable,
}

// Wrapper around JS transaction context objects that implements Queryable. Can be used in place of quaint transaction,
// context, but delegates most operations to JS
impl JsTransactionContext {
pub(crate) fn new(inner: JsBaseQueryable, tx_ctx_proxy: TransactionContextProxy) -> Self {
Self { inner, tx_ctx_proxy }
}

pub fn start_transaction(&self) -> impl Future<Output = quaint::Result<Box<JsTransaction>>> + '_ {
UnsafeFuture(self.tx_ctx_proxy.start_transaction())
}
}

#[async_trait]
impl Queryable for JsTransactionContext {
async fn query(&self, q: QuaintQuery<'_>) -> quaint::Result<ResultSet> {
self.inner.query(q).await
}

async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<ResultSet> {
self.inner.query_raw(sql, params).await
}

async fn query_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<ResultSet> {
self.inner.query_raw_typed(sql, params).await
}

async fn execute(&self, q: QuaintQuery<'_>) -> quaint::Result<u64> {
self.inner.execute(q).await
}

async fn execute_raw(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<u64> {
self.inner.execute_raw(sql, params).await
}

async fn execute_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<u64> {
self.inner.execute_raw_typed(sql, params).await
}

async fn raw_cmd(&self, cmd: &str) -> quaint::Result<()> {
self.inner.raw_cmd(cmd).await
}

async fn version(&self) -> quaint::Result<Option<String>> {
self.inner.version().await
}

async fn describe_query(&self, sql: &str) -> quaint::Result<DescribedQuery> {
self.inner.describe_query(sql).await
}

fn is_healthy(&self) -> bool {
self.inner.is_healthy()
}

async fn set_tx_isolation_level(&self, isolation_level: IsolationLevel) -> quaint::Result<()> {
self.inner.set_tx_isolation_level(isolation_level).await
}

fn requires_isolation_first(&self) -> bool {
self.inner.requires_isolation_first()
}
}

// Wrapper around JS transaction objects that implements Queryable
// and quaint::Transaction. Can be used in place of quaint transaction,
// but delegates most operations to JS
Expand Down Expand Up @@ -149,3 +219,30 @@ impl ::napi::bindgen_prelude::FromNapiValue for JsTransaction {
Ok(Self::new(JsBaseQueryable::new(common_proxy), tx_proxy))
}
}

#[cfg(target_arch = "wasm32")]
impl super::wasm::FromJsValue for JsTransactionContext {
fn from_js_value(value: wasm_bindgen::prelude::JsValue) -> JsResult<Self> {
use wasm_bindgen::JsCast;

let object = value.dyn_into::<JsObject>()?;
let common_proxy = CommonProxy::new(&object)?;
let base = JsBaseQueryable::new(common_proxy);
let tx_ctx_proxy = TransactionContextProxy::new(&object)?;

Ok(Self::new(base, tx_ctx_proxy))
}
}

/// Implementing unsafe `from_napi_value` allows retrieving a threadsafe `JsTransactionContext` in `DriverProxy`
/// while keeping derived futures `Send`.
#[cfg(not(target_arch = "wasm32"))]
impl ::napi::bindgen_prelude::FromNapiValue for JsTransactionContext {
unsafe fn from_napi_value(env: napi::sys::napi_env, napi_val: napi::sys::napi_value) -> JsResult<Self> {
let object = JsObject::from_napi_value(env, napi_val)?;
let common_proxy = CommonProxy::new(&object)?;
let tx_ctx_proxy = TransactionContextProxy::new(&object)?;

Ok(Self::new(JsBaseQueryable::new(common_proxy), tx_ctx_proxy))
}
}

0 comments on commit 190a98f

Please sign in to comment.