Skip to content

Commit

Permalink
core: Determine dangling clients from st_clients
Browse files Browse the repository at this point in the history
When opening a `RelationalDB`, determine the set of dangling clients
(`ConnectedClients`) by scanning the `st_clients` table instead of
tracking unmatched `__identity_connected__` calls during history replay.

We left the replay tracking in place in #1288, treating the commitlog as
the sole source of truth. With #1344 (snapshotting), this is no longer
correct: the snapshot may contain rows in `st_clients`, but leave no
history suffix for replay.
  • Loading branch information
kim committed Jun 11, 2024
1 parent 0bcd9bb commit 91bbaa1
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 57 deletions.
56 changes: 16 additions & 40 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::{
db::{
datastore::{
system_tables::{
read_st_module_bytes_col, system_table_schema, ModuleKind, StModuleFields, StModuleRow, StTableFields,
ST_MODULE_ID, ST_TABLES_ID,
read_st_module_bytes_col, system_table_schema, ModuleKind, StClientsRow, StModuleFields, StModuleRow,
StTableFields, ST_CLIENTS_ID, ST_MODULE_ID, ST_TABLES_ID,
},
traits::{
DataRow, IsolationLevel, Metadata, MutTx, MutTxDatastore, RowTypeForTable, Tx, TxData, TxDatastore,
Expand All @@ -36,7 +36,7 @@ use spacetimedb_sats::{bsatn, buffer::BufReader, hash::Hash, AlgebraicValue, Pro
use spacetimedb_snapshot::ReconstructedSnapshot;
use spacetimedb_table::{indexes::RowPointer, table::RowRef};
use std::time::{Duration, Instant};
use std::{borrow::Cow, collections::HashSet, sync::Arc};
use std::{borrow::Cow, sync::Arc};
use thiserror::Error;

pub type Result<T> = std::result::Result<T, DBError>;
Expand Down Expand Up @@ -126,7 +126,6 @@ impl Locking {
database_address: self.database_address,
committed_state: self.committed_state.clone(),
progress: RefCell::new(progress),
connected_clients: RefCell::new(HashSet::new()),
}
}

Expand Down Expand Up @@ -204,6 +203,19 @@ impl Locking {
Ok(datastore)
}

pub fn connected_clients<'a>(
&'a self,
ctx: &'a ExecutionContext,
tx: &'a TxId,
) -> Result<impl Iterator<Item = Result<(Identity, Address)>> + 'a> {
let iter = self.iter_tx(ctx, tx, ST_CLIENTS_ID)?.map(|row_ref| {
let row = StClientsRow::try_from(row_ref)?;
Ok((row.identity, row.address))
});

Ok(iter)
}

pub(crate) fn alter_table_access_mut_tx(&self, tx: &mut MutTxId, name: Box<str>, access: StAccess) -> Result<()> {
let table_id = self
.table_id_from_name_mut_tx(tx, &name)?
Expand Down Expand Up @@ -668,17 +680,6 @@ pub struct Replay<F> {
database_address: Address,
committed_state: Arc<RwLock<CommittedState>>,
progress: RefCell<F>,
/// Tracks the connect / disconnect calls recorded in the transaction history.
///
/// If non-empty after a replay, the remaining entries were not gracefully
/// disconnected. A disconnect call should be performed for each.
connected_clients: RefCell<HashSet<(Identity, Address)>>,
}

impl<F> Replay<F> {
pub fn into_connected_clients(self) -> HashSet<(Identity, Address)> {
self.connected_clients.into_inner()
}
}

impl<F> Replay<F> {
Expand All @@ -688,7 +689,6 @@ impl<F> Replay<F> {
database_address: &self.database_address,
committed_state: &mut committed_state,
progress: &mut *self.progress.borrow_mut(),
connected_clients: &mut self.connected_clients.borrow_mut(),
};
f(&mut visitor)
}
Expand Down Expand Up @@ -794,7 +794,6 @@ struct ReplayVisitor<'a, F> {
database_address: &'a Address,
committed_state: &'a mut CommittedState,
progress: &'a mut F,
connected_clients: &'a mut HashSet<(Identity, Address)>,
}

impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, F> {
Expand Down Expand Up @@ -905,29 +904,6 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi

Ok(())
}

fn visit_inputs(&mut self, inputs: &txdata::Inputs) -> std::result::Result<(), Self::Error> {
let decode_caller = || {
let buf = &mut inputs.reducer_args.as_ref();
let caller_identity: Identity = bsatn::from_reader(buf).context("Could not decode caller identity")?;
let caller_address: Address = bsatn::from_reader(buf).context("Could not decode caller address")?;
anyhow::Ok((caller_identity, caller_address))
};
if let Some(action) = inputs.reducer_name.strip_prefix("__identity_") {
let caller = decode_caller()?;
match action {
"connected__" => {
self.connected_clients.insert(caller);
}
"disconnected__" => {
self.connected_clients.remove(&caller);
}
_ => {}
}
}

Ok(())
}
}

/// Construct a [`Metadata`] from the given [`RowRef`],
Expand Down
21 changes: 21 additions & 0 deletions crates/core/src/db/datastore/system_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,27 @@ impl From<&StClientsRow> for ProductValue {
}
}

impl TryFrom<RowRef<'_>> for StClientsRow {
type Error = DBError;

fn try_from(row: RowRef<'_>) -> Result<Self, Self::Error> {
fn read_col<T: TryFrom<AlgebraicValue>>(row: RowRef<'_>, col: StClientsFields) -> Result<T, DBError> {
row.read_col::<AlgebraicValue>(col.col_id())?.try_into().map_err(|_| {
InvalidFieldError {
name: Some(col.name()),
col_pos: col.col_id(),
}
.into()
})
}

let identity = read_col(row, StClientsFields::Identity)?;
let address = read_col(row, StClientsFields::Address)?;

Ok(Self { identity, address })
}
}

/// A handle for reading system variables from `st_var`
pub struct StVarTable;

Expand Down
46 changes: 29 additions & 17 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,15 @@ pub type DiskSizeFn = Arc<dyn Fn() -> io::Result<u64> + Send + Sync>;

pub type Txdata = commitlog::payload::Txdata<ProductValue>;

/// Clients for which a connect reducer call was found in the [`History`], but
/// no corresponding disconnect.
/// The set of clients considered connected to the database.
///
/// A client is considered connected if there exists a corresponding row in the
/// `st_clients` system table.
///
/// If rows exist in `st_clients` upon [`RelationalDB::open`], the database was
/// not shut down gracefully. Such "dangling" clients should be removed by
/// calling [`crate::host::ModuleHost::call_identity_connected_disconnected`]
/// for each entry in [`ConnectedClients`].
pub type ConnectedClients = HashSet<(Identity, Address)>;

#[derive(Clone)]
Expand Down Expand Up @@ -261,9 +268,11 @@ impl RelationalDB {
///
/// # Return values
///
/// Alongside `Self`, the set of clients who were connected as of the most
/// recent transaction in the `history` is returned as a [`ConnectedClients`].
/// `__disconnect__` should be called for each entry.
/// Alongside `Self`, [`ConnectedClients`] is returned, which is the set of
/// clients considered connected at the given snapshot and `history`.
///
/// If [`ConnectedClients`] is non-empty, the database did not shut down
/// gracefully. The caller is responsible for disconnecting the clients.
///
/// [ModuleHost]: crate::host::module_host::ModuleHost
pub fn open(
Expand All @@ -290,7 +299,7 @@ impl RelationalDB {
log::info!("[{address}] DATABASE: durable_tx_offset is {durable_tx_offset:?}");
let inner = Self::restore_from_snapshot_or_bootstrap(address, snapshot_repo.as_deref(), durable_tx_offset)?;

let connected_clients = apply_history(&inner, address, history)?;
apply_history(&inner, address, history)?;
let db = Self::new(lock, address, owner_identity, inner, durability, snapshot_repo);

if let Some(meta) = db.metadata()? {
Expand All @@ -306,6 +315,7 @@ impl RelationalDB {
.into());
}
};
let connected_clients = db.connected_clients()?;

Ok((db, connected_clients))
}
Expand Down Expand Up @@ -373,6 +383,12 @@ impl RelationalDB {
self.with_read_only(&ctx, |tx| self.inner.program_bytes(&ctx, tx))
}

/// Read the set of clients currently connected to the database.
pub fn connected_clients(&self) -> Result<ConnectedClients, DBError> {
let ctx = ExecutionContext::internal(self.address);
self.with_read_only(&ctx, |tx| self.inner.connected_clients(&ctx, tx)?.collect())
}

/// Update the module associated with this database.
///
/// The caller must ensure that:
Expand Down Expand Up @@ -438,22 +454,18 @@ impl RelationalDB {
Locking::bootstrap(address)
}

/// Replay ("fold") the provided [`spacetimedb_durability::History`] onto
/// the database state.
/// Apply the provided [`spacetimedb_durability::History`] onto the database
/// state.
///
/// Consumes `self` in order to ensure exclusive access, and to prevent use
/// of the database in case of an incomplete replay.
/// This restriction may be lifted in the future to allow for "live" followers.
///
/// Alongside `Self`, the set of clients who were connected as of the most
/// recent transaction is returned as a [`ConnectedClients`].
/// `__disconnect__` should be called for each entry.
pub fn apply<T>(self, history: T) -> Result<(Self, ConnectedClients), DBError>
pub fn apply<T>(self, history: T) -> Result<Self, DBError>
where
T: durability::History<TxData = Txdata>,
{
let connected_clients = apply_history(&self.inner, self.address, history)?;
Ok((self, connected_clients))
apply_history(&self.inner, self.address, history)?;
Ok(self)
}

/// Returns an approximate row count for a particular table.
Expand Down Expand Up @@ -1135,7 +1147,7 @@ impl fmt::Debug for LockFile {
}
}

fn apply_history<H>(datastore: &Locking, address: Address, history: H) -> Result<ConnectedClients, DBError>
fn apply_history<H>(datastore: &Locking, address: Address, history: H) -> Result<(), DBError>
where
H: durability::History<TxData = Txdata>,
{
Expand Down Expand Up @@ -1171,7 +1183,7 @@ where
datastore.rebuild_state_after_replay()?;
log::info!("[{}] DATABASE: rebuilt state after replay", address);

Ok(replay.into_connected_clients())
Ok(())
}

/// Initialize local durability with the default parameters.
Expand Down
15 changes: 15 additions & 0 deletions crates/lib/src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,21 @@ impl From<Address> for AlgebraicValue {
}
}

impl TryFrom<AlgebraicValue> for Address {
type Error = AlgebraicValue;

fn try_from(av: AlgebraicValue) -> Result<Self, Self::Error> {
match &av {
AlgebraicValue::Product(pv) => pv
.field_as_bytes(0, Some("__address_bytes"))
.map(Self::from_slice)
.map_err(|_| av),

_ => Err(av),
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct AddressForUrl(u128);

Expand Down
14 changes: 14 additions & 0 deletions crates/lib/src/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,20 @@ impl From<Identity> for AlgebraicValue {
}
}

impl TryFrom<AlgebraicValue> for Identity {
type Error = AlgebraicValue;

fn try_from(av: AlgebraicValue) -> Result<Self, Self::Error> {
match &av {
AlgebraicValue::Product(pv) => pv
.field_as_bytes(0, Some("__identity_bytes"))
.map(Self::from_slice)
.map_err(|_| av),
_ => Err(av),
}
}
}

#[cfg(feature = "serde")]
impl serde::Serialize for Identity {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
Expand Down

0 comments on commit 91bbaa1

Please sign in to comment.