Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate snapshotting into core #1344

Merged
merged 17 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ spacetimedb-primitives.workspace = true
spacetimedb-sats = { workspace = true, features = ["serde"] }
spacetimedb-table.workspace = true
spacetimedb-vm.workspace = true
spacetimedb-snapshot.workspace = true

anyhow = { workspace = true, features = ["backtrace"] }
arrayvec.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ use std::collections::BTreeMap;
use std::sync::Arc;

#[derive(Default)]
pub(super) struct CommittedState {
pub(super) next_tx_offset: u64,
pub(super) tables: IntMap<TableId, Table>,
pub(super) blob_store: HashMapBlobStore,
pub(crate) struct CommittedState {
pub(crate) next_tx_offset: u64,
pub(crate) tables: IntMap<TableId, Table>,
pub(crate) blob_store: HashMapBlobStore,
}

impl StateView for CommittedState {
Expand Down Expand Up @@ -97,7 +97,7 @@ fn ignore_duplicate_insert_error<T>(res: std::result::Result<T, InsertError>) ->
}

impl CommittedState {
pub fn bootstrap_system_tables(&mut self, database_address: Address) -> Result<()> {
pub(super) fn bootstrap_system_tables(&mut self, database_address: Address) -> Result<()> {
// NOTE: the `rdb_num_table_rows` metric is used by the query optimizer,
// and therefore has performance implications and must not be disabled.
let with_label_values = |table_id: TableId, table_name: &str| {
Expand Down Expand Up @@ -235,20 +235,30 @@ impl CommittedState {
with_label_values(ST_SEQUENCES_ID, ST_SEQUENCES_NAME).inc();
}

self.reset_system_table_schemas(database_address)?;

Ok(())
}

/// Compute the system table schemas from the system tables,
/// and store those schemas in the in-memory [`Table`] structures.
///
/// Necessary during bootstrap because system tables include autoinc IDs
/// for objects like indexes and constraints
/// which are computed at insert-time,
/// and therefore not included in the hardcoded schemas.
pub(super) fn reset_system_table_schemas(&mut self, database_address: Address) -> Result<()> {
// Re-read the schema with the correct ids...
let ctx = ExecutionContext::internal(database_address);
for table_id in ref_schemas.map(|s| s.table_id) {
let schema = self.schema_for_table_raw(&ctx, table_id)?;
self.tables
.get_mut(&table_id)
.unwrap()
.with_mut_schema(|ts| *ts = schema);
for schema in system_tables() {
self.tables.get_mut(&schema.table_id).unwrap().schema =
Arc::new(self.schema_for_table_raw(&ctx, schema.table_id)?);
Centril marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(())
}

pub fn replay_delete_by_rel(&mut self, table_id: TableId, rel: &ProductValue) -> Result<()> {
pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, rel: &ProductValue) -> Result<()> {
let table = self
.tables
.get_mut(&table_id)
Expand All @@ -262,13 +272,18 @@ impl CommittedState {
Ok(())
}

pub fn replay_insert(&mut self, table_id: TableId, schema: &Arc<TableSchema>, row: &ProductValue) -> Result<()> {
pub(super) fn replay_insert(
&mut self,
table_id: TableId,
schema: &Arc<TableSchema>,
row: &ProductValue,
) -> Result<()> {
let (table, blob_store) = self.get_table_and_blob_store_or_create(table_id, schema);
table.insert_internal(blob_store, row).map_err(TableError::Insert)?;
Ok(())
}

pub fn build_sequence_state(&mut self, sequence_state: &mut SequencesState) -> Result<()> {
pub(super) fn build_sequence_state(&mut self, sequence_state: &mut SequencesState) -> Result<()> {
let st_sequences = self.tables.get(&ST_SEQUENCES_ID).unwrap();
for row_ref in st_sequences.scan_rows(&self.blob_store) {
let sequence = StSequenceRow::try_from(row_ref)?;
Expand All @@ -290,7 +305,7 @@ impl CommittedState {
Ok(())
}

pub fn build_indexes(&mut self) -> Result<()> {
pub(super) fn build_indexes(&mut self) -> Result<()> {
let st_indexes = self.tables.get(&ST_INDEXES_ID).unwrap();
let rows = st_indexes
.scan_rows(&self.blob_store)
Expand All @@ -309,7 +324,7 @@ impl CommittedState {
/// After replaying all old transactions, tables which have rows will
/// have been created in memory, but tables with no rows will not have
/// been created. This function ensures that they are created.
pub fn build_missing_tables(&mut self) -> Result<()> {
pub(super) fn build_missing_tables(&mut self) -> Result<()> {
// Find all ids of tables that are in `st_tables` but haven't been built.
let table_ids = self
.get_table(ST_TABLES_ID)
Expand All @@ -327,7 +342,7 @@ impl CommittedState {
Ok(())
}

pub fn index_seek<'a>(
pub(super) fn index_seek<'a>(
&'a self,
table_id: TableId,
cols: &ColList,
Expand All @@ -349,7 +364,7 @@ impl CommittedState {
// with `table_id`
// within the current transaction (i.e. without an intervening call to self.merge)
// is sufficient to demonstrate that a call to `self.get` is safe.
pub fn get(&self, table_id: TableId, row_ptr: RowPointer) -> RowRef<'_> {
pub(super) fn get(&self, table_id: TableId, row_ptr: RowPointer) -> RowRef<'_> {
debug_assert!(
row_ptr.squashed_offset().is_committed_state(),
"Cannot get TX_STATE RowPointer from CommittedState.",
Expand Down Expand Up @@ -389,7 +404,7 @@ impl CommittedState {
)
}

pub fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
let mut tx_data = TxData::default();

// First, apply deletes. This will free up space in the committed tables.
Expand Down Expand Up @@ -534,15 +549,15 @@ impl CommittedState {
}
}

pub fn get_table(&self, table_id: TableId) -> Option<&Table> {
pub(super) fn get_table(&self, table_id: TableId) -> Option<&Table> {
self.tables.get(&table_id)
}

pub fn get_table_mut(&mut self, table_id: TableId) -> Option<&mut Table> {
pub(super) fn get_table_mut(&mut self, table_id: TableId) -> Option<&mut Table> {
self.tables.get_mut(&table_id)
}

pub fn get_table_and_blob_store(&mut self, table_id: TableId) -> Option<(&mut Table, &mut dyn BlobStore)> {
pub(super) fn get_table_and_blob_store(&mut self, table_id: TableId) -> Option<(&mut Table, &mut dyn BlobStore)> {
self.tables
.get_mut(&table_id)
.map(|tbl| (tbl, &mut self.blob_store as &mut dyn BlobStore))
Expand All @@ -556,7 +571,7 @@ impl CommittedState {
self.tables.insert(table_id, Self::make_table(schema));
}

pub fn get_table_and_blob_store_or_create<'this>(
pub(super) fn get_table_and_blob_store_or_create<'this>(
&'this mut self,
table_id: TableId,
schema: &Arc<TableSchema>,
Expand Down
85 changes: 82 additions & 3 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::{
db::{
datastore::{
system_tables::{
read_st_module_bytes_col, ModuleKind, StModuleFields, StModuleRow, StTableFields, ST_MODULE_ID,
ST_TABLES_ID,
read_st_module_bytes_col, system_table_schema, ModuleKind, StModuleFields, StModuleRow, StTableFields,
ST_MODULE_ID, ST_TABLES_ID,
},
traits::{
DataRow, IsolationLevel, Metadata, MutTx, MutTxDatastore, RowTypeForTable, Tx, TxData, TxDatastore,
Expand All @@ -33,6 +33,7 @@ use spacetimedb_sats::db::{
def::{IndexDef, SequenceDef, TableDef, TableSchema},
};
use spacetimedb_sats::{bsatn, buffer::BufReader, hash::Hash, AlgebraicValue, ProductValue};
use spacetimedb_snapshot::ReconstructedSnapshot;
use spacetimedb_table::{indexes::RowPointer, table::RowRef};
use std::time::{Duration, Instant};
use std::{borrow::Cow, collections::HashSet, sync::Arc};
Expand All @@ -53,7 +54,7 @@ pub type Result<T> = std::result::Result<T, DBError>;
#[derive(Clone)]
pub struct Locking {
/// The state of the database up to the point of the last committed transaction.
committed_state: Arc<RwLock<CommittedState>>,
pub(crate) committed_state: Arc<RwLock<CommittedState>>,
/// The state of sequence generation in this database.
sequence_state: Arc<Mutex<SequencesState>>,
/// The address of this database.
Expand Down Expand Up @@ -129,6 +130,80 @@ impl Locking {
}
}

/// Construct a new [`Locking`] datastore containing the state stored in `snapshot`.
///
/// - Construct all the tables referenced by `snapshot`, computing their schemas
/// either from known system table schemas or from `st_table` and friends.
/// - Populate those tables with all rows in `snapshot`.
/// - Construct a [`HashMapBlobStore`] containing all the large blobs referenced by `snapshot`,
/// with reference counts specified in `snapshot`.
/// - Do [`CommittedState::reset_system_table_schemas`] to fix-up autoinc IDs in the system tables,
/// to ensure those schemas match what [`Self::bootstrap`] would install.
/// - Notably, **do not** construct indexes or sequences.
/// This should be done by [`Self::rebuild_state_after_replay`],
/// after replaying the suffix of the commitlog.
pub fn restore_from_snapshot(snapshot: ReconstructedSnapshot) -> Result<Self> {
gefjon marked this conversation as resolved.
Show resolved Hide resolved
let ReconstructedSnapshot {
database_address,
tx_offset,
blob_store,
tables,
..
} = snapshot;

let datastore = Self::new(database_address);
let mut committed_state = datastore.committed_state.write_arc();
committed_state.blob_store = blob_store;

let ctx = ExecutionContext::internal(datastore.database_address);

// Note that `tables` is a `BTreeMap`, and so iterates in increasing order.
// This means that we will instantiate and populate the system tables before any user tables.
for (table_id, pages) in tables {
let schema = match system_table_schema(table_id) {
gefjon marked this conversation as resolved.
Show resolved Hide resolved
Some(schema) => Arc::new(schema),
// In this case, `schema_for_table` will never see a cached schema,
// as the committed state is newly constructed and we have not accessed this schema yet.
// As such, this call will compute and save the schema from `st_table` and friends.
None => committed_state.schema_for_table(&ctx, table_id)?,
};
let (table, blob_store) = committed_state.get_table_and_blob_store_or_create(table_id, &schema);
unsafe {
// Safety:
// - The snapshot is uncorrupted because reconstructing it verified its hashes.
// - The schema in `table` is either derived from the st_table and st_column,
// which were restored from the snapshot,
// or it is a known schema for a system table.
// - We trust that the snapshot was consistent when created,
// so the layout used in the `pages` must be consistent with the schema.
table.set_pages(pages, blob_store);
}
Centril marked this conversation as resolved.
Show resolved Hide resolved

// Set the `rdb_num_table_rows` metric for the table.
// NOTE: the `rdb_num_table_rows` metric is used by the query optimizer,
// and therefore has performance implications and must not be disabled.
DB_METRICS
.rdb_num_table_rows
.with_label_values(&database_address, &table_id.0, &schema.table_name)
.set(table.row_count as i64);

// Also set the `rdb_table_size` metric for the table.
let table_size = table.bytes_occupied_overestimate();
DB_METRICS
.rdb_table_size
.with_label_values(&database_address, &table_id.into(), &schema.table_name)
.set(table_size as i64);
}

// Fix up autoinc IDs in the cached system table schemas.
committed_state.reset_system_table_schemas(database_address)?;

// The next TX offset after restoring from a snapshot is one greater than the snapshotted offset.
committed_state.next_tx_offset = tx_offset + 1;

Ok(datastore)
}

pub(crate) fn alter_table_access_mut_tx(&self, tx: &mut MutTxId, name: Box<str>, access: StAccess) -> Result<()> {
let table_id = self
.table_id_from_name_mut_tx(tx, &name)?
Expand Down Expand Up @@ -617,6 +692,10 @@ impl<F> Replay<F> {
};
f(&mut visitor)
}

pub(crate) fn next_tx_offset(&self) -> u64 {
self.committed_state.read_arc().next_tx_offset
}
}

impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for Replay<F> {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![forbid(unsafe_op_in_unsafe_fn)]

mod committed_state;
pub(crate) mod committed_state;
pub mod datastore;
mod mut_tx;
pub use mut_tx::MutTxId;
Expand Down
31 changes: 31 additions & 0 deletions crates/core/src/db/datastore/system_tables.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
//! Schema definitions and accesses to the system tables,
//! which store metadata about a SpacetimeDB database.
//!
//! When defining a new system table, remember to:
//! - Define constants for its ID and name.
//! - Add it to [`system_tables`], and define a constant for its index there.
//! - Use [`st_fields_enum`] to define its column enum.
//! - Define a function that returns its schema.
//! - Add its schema to [`system_table_schema`].
//! - Define a Rust struct which holds its rows, and implement `TryFrom<RowRef<'_>>` for that struct.

use crate::db::relational_db::RelationalDB;
use crate::error::{DBError, TableError};
use crate::execution_context::ExecutionContext;
Expand Down Expand Up @@ -392,6 +403,26 @@ pub fn st_var_schema() -> TableSchema {
.into_schema(ST_VAR_ID)
}

/// If `table_id` refers to a known system table, return its schema.
///
/// Used when restoring from a snapshot; system tables are reinstantiated with this schema,
/// whereas user tables are reinstantiated with a schema computed from the snapshotted system tables.
///
/// This must be kept in sync with the set of system tables.
pub(crate) fn system_table_schema(table_id: TableId) -> Option<TableSchema> {
match table_id {
ST_TABLES_ID => Some(st_table_schema()),
ST_COLUMNS_ID => Some(st_columns_schema()),
ST_SEQUENCES_ID => Some(st_sequences_schema()),
ST_INDEXES_ID => Some(st_indexes_schema()),
ST_CONSTRAINTS_ID => Some(st_constraints_schema()),
ST_MODULE_ID => Some(st_module_schema()),
ST_CLIENTS_ID => Some(st_clients_schema()),
ST_VAR_ID => Some(st_var_schema()),
_ => None,
}
}

pub(crate) fn table_name_is_system(table_name: &str) -> bool {
table_name.starts_with("st_")
}
Expand Down
Loading
Loading