Skip to content

Commit

Permalink
handle empty reducer
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubham8287 committed May 24, 2024
1 parent 43cc06c commit a5fcca4
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 61 deletions.
103 changes: 70 additions & 33 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult,
use crate::client::{ClientActorId, ClientConnectionSender};
use crate::database_instance_context::DatabaseInstanceContext;
use crate::database_logger::LogLevel;
use crate::db::datastore::locking_tx_datastore::MutTxId;
use crate::db::datastore::system_tables::{StClientsFields, StClientsRow, ST_CLIENTS_ID};
use crate::db::datastore::traits::TxData;
use crate::db::update::UpdateDatabaseError;
use crate::energy::EnergyQuanta;
Expand All @@ -21,13 +23,14 @@ use derive_more::{From, Into};
use futures::{Future, FutureExt};
use indexmap::IndexMap;
use itertools::{Either, Itertools};
use smallvec::SmallVec;
use spacetimedb_client_api_messages::client_api::table_row_operation::OperationType;
use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap, IntMap};
use spacetimedb_lib::bsatn::to_vec;
use spacetimedb_lib::identity::RequestId;
use spacetimedb_lib::{Address, ReducerDef, TableDesc};
use spacetimedb_primitives::TableId;
use spacetimedb_sats::{ProductValue, Typespace, WithTypespace};
use spacetimedb_primitives::{col_list, TableId};
use spacetimedb_sats::{algebraic_value, ProductValue, Typespace, WithTypespace};
use spacetimedb_vm::relation::{MemTable, RelValue};
use std::fmt;
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -370,7 +373,6 @@ pub trait Module: Send + Sync + 'static {
query: String,
) -> Result<Vec<spacetimedb_vm::relation::MemTable>, DBError>;
fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error>;
fn delete_st_clients(&self, caller_identity: Identity, caller_address: Address) -> Result<(), DBError>;
#[cfg(feature = "tracelogging")]
fn get_trace(&self) -> Option<bytes::Bytes>;
#[cfg(feature = "tracelogging")]
Expand Down Expand Up @@ -469,7 +471,6 @@ trait DynModuleHost: Send + Sync + 'static {
fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error>;
fn exit(&self) -> Closed<'_>;
fn exited(&self) -> Closed<'_>;
fn delete_st_clients(&self, caller_identity: Identity, caller_address: Address) -> Result<(), DBError>;
}

struct HostControllerActor<T: Module> {
Expand Down Expand Up @@ -553,10 +554,6 @@ impl<T: Module> DynModuleHost for HostControllerActor<T> {
fn exited(&self) -> Closed<'_> {
self.instance_pool.closed()
}

fn delete_st_clients(&self, caller_identity: Identity, caller_address: Address) -> Result<(), DBError> {
self.module.delete_st_clients(caller_identity, caller_address)
}
}

pub struct WeakModuleHost {
Expand Down Expand Up @@ -674,6 +671,18 @@ impl ModuleHost {
CLIENT_DISCONNECTED_DUNDER
};

let db = &self.inner.dbic().relational_db;
let ctx = &ExecutionContext::reducer(
db.address(),
ReducerContext {
name: reducer_name.to_owned(),
caller_identity,
caller_address,
timestamp: Timestamp::now(),
arg_bsatn: Bytes::new(),
},
);

let result = self
.call_reducer_inner(
caller_identity,
Expand All @@ -688,46 +697,74 @@ impl ModuleHost {
.map(drop)
.or_else(|e| match e {
// If the module doesn't define connected or disconnected, commit
// an empty transaction to ensure we always have those events
// a transaction to update `st_clients` and to ensure we always have those events
// paired in the commitlog.
//
// This is necessary to be able to disconnect clients after a server
// crash.
ReducerCallError::NoSuchReducer => {
let db = &self.inner.dbic().relational_db;
db.with_auto_commit(
&ExecutionContext::reducer(
db.address(),
ReducerContext {
name: reducer_name.to_owned(),
caller_identity,
caller_address,
timestamp: Timestamp::now(),
arg_bsatn: Bytes::new(),
},
),
|_| anyhow::Ok(()),
)
.ok();

Ok(())
}
ReducerCallError::NoSuchReducer => db
.with_auto_commit(ctx, |mut_tx| {
if connected {
self.update_st_clients(mut_tx, caller_identity, caller_address, connected)
} else {
Ok(())
}
})
.map_err(|err| {
InvalidReducerArguments {
err: err.into(),
reducer: reducer_name.into(),
}
.into()
}),
e => Err(e),
});

// We only perform deletion from `st_clients` here as we want to ensure it even if `__identity_disconnected__` fails.
// insertion to `st_clients` is handled by `wasm_common::module_host_actor::WasmModuleInstance::call_reducer_with_tx` for transactionality reasons.
//Deleting client from `st_clients`does not depend upon result of disconnect reducer hence done in a separate tx.
if !connected {
let _ = self
.inner
.delete_st_clients(caller_identity, caller_address)
let _ = db
.with_auto_commit(ctx, |mut_tx| {
self.update_st_clients(mut_tx, caller_identity, caller_address, connected)
})
.map_err(|e| {
log::error!("st_clients table update failed with params with error: {:?}", e);
});
}
result
}

fn update_st_clients(
&self,
mut_tx: &mut MutTxId,
caller_identity: Identity,
caller_address: Address,
connected: bool,
) -> Result<(), DBError> {
let db = &*self.inner.dbic().relational_db;
let ctx = &ExecutionContext::internal(db.address());
let row = &StClientsRow {
identity: caller_identity,
address: caller_address,
};

if connected {
db.insert(mut_tx, ST_CLIENTS_ID, row.into()).map(|_| ())
} else {
let row = db
.iter_by_col_eq_mut(
ctx,
mut_tx,
ST_CLIENTS_ID,
col_list![StClientsFields::Identity, StClientsFields::Address],
&algebraic_value::AlgebraicValue::product(row),
)?
.map(|row_ref| row_ref.pointer())
.collect::<SmallVec<[_; 1]>>();
db.delete(mut_tx, ST_CLIENTS_ID, row);
Ok::<(), DBError>(())
}
}

async fn call_reducer_inner(
&self,
caller_identity: Identity,
Expand Down
31 changes: 3 additions & 28 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::{anyhow, Context};
use bytes::Bytes;
use smallvec::SmallVec;
use spacetimedb_primitives::col_list;
use spacetimedb_sats::algebraic_value;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -15,7 +12,7 @@ use super::instrumentation::CallTimes;
use crate::database_instance_context::DatabaseInstanceContext;
use crate::database_logger::{LogLevel, Record, SystemLogger};
use crate::db::datastore::locking_tx_datastore::MutTxId;
use crate::db::datastore::system_tables::{StClientsFields, StClientsRow, ST_CLIENTS_ID};
use crate::db::datastore::system_tables::{StClientsRow, ST_CLIENTS_ID};
use crate::db::datastore::traits::IsolationLevel;
use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget, ReducerFingerprint};
use crate::execution_context::{self, ExecutionContext, ReducerContext};
Expand Down Expand Up @@ -312,30 +309,6 @@ impl<T: WasmModule> Module for WasmModuleHostActor<T> {
Ok(())
})
}

fn delete_st_clients(&self, caller_identity: Identity, caller_address: Address) -> Result<(), DBError> {
let db = &*self.database_instance_context.relational_db;
let ctx = &ExecutionContext::internal(db.address());
let row = &StClientsRow {
identity: caller_identity,
address: caller_address,
};

db.with_auto_commit(ctx, |tx| {
let row = db
.iter_by_col_eq_mut(
ctx,
tx,
ST_CLIENTS_ID,
col_list![StClientsFields::Identity, StClientsFields::Address],
&algebraic_value::AlgebraicValue::product(row),
)?
.map(|row_ref| row_ref.pointer())
.collect::<SmallVec<[_; 1]>>();
db.delete(tx, ST_CLIENTS_ID, row);
Ok::<(), DBError>(())
})
}
}

pub struct WasmModuleInstance<T: WasmInstance> {
Expand Down Expand Up @@ -629,6 +602,8 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
// we haven't actually comitted yet - `commit_and_broadcast_event` will commit
// for us and replace this with the actual database update.
Ok(Ok(())) => {
// Detecing a new client, and inserting it in `st_clients`
// Disconnect logic is written in module_host.rs, due to different transacationality requirements.
if reducer_name == CLIENT_CONNECTED_DUNDER {
match self.insert_st_client(&mut tx, caller_identity, caller_address) {
Ok(_) => EventStatus::Committed(DatabaseUpdate::default()),
Expand Down

0 comments on commit a5fcca4

Please sign in to comment.