From a5fcca407615fb162f70f3af347d885fc22512b9 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 24 May 2024 15:59:57 +0530 Subject: [PATCH] handle empty reducer --- crates/core/src/host/module_host.rs | 103 ++++++++++++------ .../src/host/wasm_common/module_host_actor.rs | 31 +----- 2 files changed, 73 insertions(+), 61 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 565d99fea1b..6f0bd207272 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -3,6 +3,8 @@ use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, use crate::client::{ClientActorId, ClientConnectionSender}; use crate::database_instance_context::DatabaseInstanceContext; use crate::database_logger::LogLevel; +use crate::db::datastore::locking_tx_datastore::MutTxId; +use crate::db::datastore::system_tables::{StClientsFields, StClientsRow, ST_CLIENTS_ID}; use crate::db::datastore::traits::TxData; use crate::db::update::UpdateDatabaseError; use crate::energy::EnergyQuanta; @@ -21,13 +23,14 @@ use derive_more::{From, Into}; use futures::{Future, FutureExt}; use indexmap::IndexMap; use itertools::{Either, Itertools}; +use smallvec::SmallVec; use spacetimedb_client_api_messages::client_api::table_row_operation::OperationType; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap, IntMap}; use spacetimedb_lib::bsatn::to_vec; use spacetimedb_lib::identity::RequestId; use spacetimedb_lib::{Address, ReducerDef, TableDesc}; -use spacetimedb_primitives::TableId; -use spacetimedb_sats::{ProductValue, Typespace, WithTypespace}; +use spacetimedb_primitives::{col_list, TableId}; +use spacetimedb_sats::{algebraic_value, ProductValue, Typespace, WithTypespace}; use spacetimedb_vm::relation::{MemTable, RelValue}; use std::fmt; use std::sync::{Arc, Weak}; @@ -370,7 +373,6 @@ pub trait Module: Send + Sync + 'static { query: String, ) -> Result, DBError>; fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error>; - fn delete_st_clients(&self, caller_identity: Identity, caller_address: Address) -> Result<(), DBError>; #[cfg(feature = "tracelogging")] fn get_trace(&self) -> Option; #[cfg(feature = "tracelogging")] @@ -469,7 +471,6 @@ trait DynModuleHost: Send + Sync + 'static { fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error>; fn exit(&self) -> Closed<'_>; fn exited(&self) -> Closed<'_>; - fn delete_st_clients(&self, caller_identity: Identity, caller_address: Address) -> Result<(), DBError>; } struct HostControllerActor { @@ -553,10 +554,6 @@ impl DynModuleHost for HostControllerActor { fn exited(&self) -> Closed<'_> { self.instance_pool.closed() } - - fn delete_st_clients(&self, caller_identity: Identity, caller_address: Address) -> Result<(), DBError> { - self.module.delete_st_clients(caller_identity, caller_address) - } } pub struct WeakModuleHost { @@ -674,6 +671,18 @@ impl ModuleHost { CLIENT_DISCONNECTED_DUNDER }; + let db = &self.inner.dbic().relational_db; + let ctx = &ExecutionContext::reducer( + db.address(), + ReducerContext { + name: reducer_name.to_owned(), + caller_identity, + caller_address, + timestamp: Timestamp::now(), + arg_bsatn: Bytes::new(), + }, + ); + let result = self .call_reducer_inner( caller_identity, @@ -688,39 +697,35 @@ impl ModuleHost { .map(drop) .or_else(|e| match e { // If the module doesn't define connected or disconnected, commit - // an empty transaction to ensure we always have those events + // a transaction to update `st_clients` and to ensure we always have those events // paired in the commitlog. // // This is necessary to be able to disconnect clients after a server // crash. - ReducerCallError::NoSuchReducer => { - let db = &self.inner.dbic().relational_db; - db.with_auto_commit( - &ExecutionContext::reducer( - db.address(), - ReducerContext { - name: reducer_name.to_owned(), - caller_identity, - caller_address, - timestamp: Timestamp::now(), - arg_bsatn: Bytes::new(), - }, - ), - |_| anyhow::Ok(()), - ) - .ok(); - - Ok(()) - } + ReducerCallError::NoSuchReducer => db + .with_auto_commit(ctx, |mut_tx| { + if connected { + self.update_st_clients(mut_tx, caller_identity, caller_address, connected) + } else { + Ok(()) + } + }) + .map_err(|err| { + InvalidReducerArguments { + err: err.into(), + reducer: reducer_name.into(), + } + .into() + }), e => Err(e), }); - // We only perform deletion from `st_clients` here as we want to ensure it even if `__identity_disconnected__` fails. - // insertion to `st_clients` is handled by `wasm_common::module_host_actor::WasmModuleInstance::call_reducer_with_tx` for transactionality reasons. + //Deleting client from `st_clients`does not depend upon result of disconnect reducer hence done in a separate tx. if !connected { - let _ = self - .inner - .delete_st_clients(caller_identity, caller_address) + let _ = db + .with_auto_commit(ctx, |mut_tx| { + self.update_st_clients(mut_tx, caller_identity, caller_address, connected) + }) .map_err(|e| { log::error!("st_clients table update failed with params with error: {:?}", e); }); @@ -728,6 +733,38 @@ impl ModuleHost { result } + fn update_st_clients( + &self, + mut_tx: &mut MutTxId, + caller_identity: Identity, + caller_address: Address, + connected: bool, + ) -> Result<(), DBError> { + let db = &*self.inner.dbic().relational_db; + let ctx = &ExecutionContext::internal(db.address()); + let row = &StClientsRow { + identity: caller_identity, + address: caller_address, + }; + + if connected { + db.insert(mut_tx, ST_CLIENTS_ID, row.into()).map(|_| ()) + } else { + let row = db + .iter_by_col_eq_mut( + ctx, + mut_tx, + ST_CLIENTS_ID, + col_list![StClientsFields::Identity, StClientsFields::Address], + &algebraic_value::AlgebraicValue::product(row), + )? + .map(|row_ref| row_ref.pointer()) + .collect::>(); + db.delete(mut_tx, ST_CLIENTS_ID, row); + Ok::<(), DBError>(()) + } + } + async fn call_reducer_inner( &self, caller_identity: Identity, diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index d49e0ac755d..3b029d5db59 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1,8 +1,5 @@ use anyhow::{anyhow, Context}; use bytes::Bytes; -use smallvec::SmallVec; -use spacetimedb_primitives::col_list; -use spacetimedb_sats::algebraic_value; use std::sync::Arc; use std::time::Duration; @@ -15,7 +12,7 @@ use super::instrumentation::CallTimes; use crate::database_instance_context::DatabaseInstanceContext; use crate::database_logger::{LogLevel, Record, SystemLogger}; use crate::db::datastore::locking_tx_datastore::MutTxId; -use crate::db::datastore::system_tables::{StClientsFields, StClientsRow, ST_CLIENTS_ID}; +use crate::db::datastore::system_tables::{StClientsRow, ST_CLIENTS_ID}; use crate::db::datastore::traits::IsolationLevel; use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget, ReducerFingerprint}; use crate::execution_context::{self, ExecutionContext, ReducerContext}; @@ -312,30 +309,6 @@ impl Module for WasmModuleHostActor { Ok(()) }) } - - fn delete_st_clients(&self, caller_identity: Identity, caller_address: Address) -> Result<(), DBError> { - let db = &*self.database_instance_context.relational_db; - let ctx = &ExecutionContext::internal(db.address()); - let row = &StClientsRow { - identity: caller_identity, - address: caller_address, - }; - - db.with_auto_commit(ctx, |tx| { - let row = db - .iter_by_col_eq_mut( - ctx, - tx, - ST_CLIENTS_ID, - col_list![StClientsFields::Identity, StClientsFields::Address], - &algebraic_value::AlgebraicValue::product(row), - )? - .map(|row_ref| row_ref.pointer()) - .collect::>(); - db.delete(tx, ST_CLIENTS_ID, row); - Ok::<(), DBError>(()) - }) - } } pub struct WasmModuleInstance { @@ -629,6 +602,8 @@ impl WasmModuleInstance { // we haven't actually comitted yet - `commit_and_broadcast_event` will commit // for us and replace this with the actual database update. Ok(Ok(())) => { + // Detecing a new client, and inserting it in `st_clients` + // Disconnect logic is written in module_host.rs, due to different transacationality requirements. if reducer_name == CLIENT_CONNECTED_DUNDER { match self.insert_st_client(&mut tx, caller_identity, caller_address) { Ok(_) => EventStatus::Committed(DatabaseUpdate::default()),