Skip to content

Commit

Permalink
core: Collapse DBIC into HostController
Browse files Browse the repository at this point in the history
Make it so `HostController` manages both the module host (wasm
machinery) and the database (`RelationalDB` / `DatabaseInstanceContext`)
of spacetime databases deployed to a server.

The `DatabaseInstanceContextController` (DBIC) is removed in the
process.

This allows to make database accesses panic-safe, in that uncaught
panics will cause all resouces to be released and the database to be
restarted on subsequent access. This is a prerequisite for #985.

It also allows to move towards storage of the module binary directly in
the database / commitlog. This patch, however, makes some contortions in
order to **not** introduce a breaking change just yet.
  • Loading branch information
kim committed May 14, 2024
1 parent 6810821 commit 1b7e5d2
Show file tree
Hide file tree
Showing 16 changed files with 892 additions and 788 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ tempdir = "0.3.7"
tempfile = "3.8"
termcolor = "1.2.0"
thiserror = "1.0.37"
tokio = { version = "1.34", features = ["full"] }
tokio = { version = "1.37", features = ["full"] }
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }
tokio-tungstenite = { version = "0.21", features = ["native-tls"] }
tokio-util = { version = "0.7.4", features = ["time"] }
Expand Down
27 changes: 2 additions & 25 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ use http::StatusCode;
use spacetimedb::address::Address;
use spacetimedb::auth::identity::{DecodingKey, EncodingKey};
use spacetimedb::client::ClientActorIndex;
use spacetimedb::database_instance_context_controller::DatabaseInstanceContextController;
use spacetimedb::energy::{EnergyBalance, EnergyQuanta};
use spacetimedb::host::{HostController, UpdateDatabaseResult};
use spacetimedb::identity::Identity;
use spacetimedb::messages::control_db::{Database, DatabaseInstance, IdentityEmail, Node};
use spacetimedb::module_host_context::ModuleHostContext;
use spacetimedb::sendgrid_controller::SendGridController;
use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld};
use spacetimedb_client_api_messages::recovery::RecoveryCode;
Expand All @@ -26,11 +24,9 @@ pub mod util;
///
/// Types returned here should be considered internal state and **never** be
/// surfaced to the API.
#[async_trait]
pub trait NodeDelegate: Send + Sync {
fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily>;
fn database_instance_context_controller(&self) -> &DatabaseInstanceContextController;
fn host_controller(&self) -> &Arc<HostController>;
fn host_controller(&self) -> &HostController;
fn client_actor_index(&self) -> &ClientActorIndex;
fn sendgrid_controller(&self) -> Option<&SendGridController>;

Expand All @@ -44,16 +40,6 @@ pub trait NodeDelegate: Send + Sync {

/// Return a JWT encoding key for signing credentials.
fn private_key(&self) -> &EncodingKey;

/// Load the [`ModuleHostContext`] for instance `instance_id` of
/// [`Database`] `db`.
///
/// This method is defined as `async`, as that obliges the implementer to
/// ensure that any necessary blocking I/O is made async-safe. In other
/// words, it is the responsibility of the implementation to make use of
/// `spawn_blocking` or `block_in_place` as appropriate, while the
/// `client-api` assumes that `await`ing the method never blocks.
async fn load_module_host_context(&self, db: Database, instance_id: u64) -> anyhow::Result<ModuleHostContext>;
}

/// Parameters for publishing a database.
Expand Down Expand Up @@ -276,17 +262,12 @@ impl<T: ControlStateWriteAccess + ?Sized> ControlStateWriteAccess for Arc<T> {
}
}

#[async_trait]
impl<T: NodeDelegate + ?Sized> NodeDelegate for Arc<T> {
fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
(**self).gather_metrics()
}

fn database_instance_context_controller(&self) -> &DatabaseInstanceContextController {
(**self).database_instance_context_controller()
}

fn host_controller(&self) -> &Arc<HostController> {
fn host_controller(&self) -> &HostController {
(**self).host_controller()
}

Expand All @@ -309,10 +290,6 @@ impl<T: NodeDelegate + ?Sized> NodeDelegate for Arc<T> {
fn sendgrid_controller(&self) -> Option<&SendGridController> {
(**self).sendgrid_controller()
}

async fn load_module_host_context(&self, db: Database, instance_id: u64) -> anyhow::Result<ModuleHostContext> {
(**self).load_module_host_context(db, instance_id).await
}
}

pub fn log_and_500(e: impl std::fmt::Display) -> ErrorResponse {
Expand Down
155 changes: 58 additions & 97 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use serde_json::{json, Value};
use spacetimedb::address::Address;
use spacetimedb::auth::identity::encode_token;
use spacetimedb::database_logger::DatabaseLogger;
use spacetimedb::error::DatabaseError;
use spacetimedb::host::DescribedEntityType;
use spacetimedb::host::EntityDef;
use spacetimedb::host::ReducerArgs;
Expand Down Expand Up @@ -89,17 +88,10 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
))?;
let instance_id = database_instance.id;
let host = worker_ctx.host_controller();

let module = match host.get_module_host(instance_id) {
Ok(m) => m,
Err(_) => {
let dbic = worker_ctx
.load_module_host_context(database, instance_id)
.await
.map_err(log_and_500)?;
host.spawn_module_host(dbic).await.map_err(log_and_500)?
}
};
let module = host
.get_or_launch_module_host(database, instance_id)
.await
.map_err(log_and_500)?;

// HTTP callers always need an address to provide to connect/disconnect,
// so generate one if none was provided.
Expand Down Expand Up @@ -283,17 +275,11 @@ where
let call_info = extract_db_call_info(&worker_ctx, auth, &address).await?;

let instance_id = call_info.database_instance.id;
let host = worker_ctx.host_controller();
let module = match host.get_module_host(instance_id) {
Ok(m) => m,
Err(_) => {
let dbic = worker_ctx
.load_module_host_context(database, instance_id)
.await
.map_err(log_and_500)?;
host.spawn_module_host(dbic).await.map_err(log_and_500)?
}
};
let module = worker_ctx
.host_controller()
.get_or_launch_module_host(database, instance_id)
.await
.map_err(log_and_500)?;

let entity_type = entity_type.as_str().parse().map_err(|()| {
log::debug!("Request to describe unhandled entity type: {}", entity_type);
Expand Down Expand Up @@ -341,16 +327,10 @@ where

let instance_id = call_info.database_instance.id;
let host = worker_ctx.host_controller();
let module = match host.get_module_host(instance_id) {
Ok(m) => m,
Err(_) => {
let dbic = worker_ctx
.load_module_host_context(database, instance_id)
.await
.map_err(log_and_500)?;
host.spawn_module_host(dbic).await.map_err(log_and_500)?
}
};
let module = host
.get_or_launch_module_host(database, instance_id)
.await
.map_err(log_and_500)?;
let catalog = module.catalog();
let expand = expand.unwrap_or(false);
let response_catalog: HashMap<_, _> = catalog
Expand Down Expand Up @@ -461,16 +441,10 @@ where

let body = if follow {
let host = worker_ctx.host_controller();
let module = match host.get_module_host(instance_id) {
Ok(m) => m,
Err(_) => {
let dbic = worker_ctx
.load_module_host_context(database, instance_id)
.await
.map_err(log_and_500)?;
host.spawn_module_host(dbic).await.map_err(log_and_500)?
}
};
let module = host
.get_or_launch_module_host(database, instance_id)
.await
.map_err(log_and_500)?;
let log_rx = module.subscribe_to_logs().map_err(log_and_500)?;

let stream = tokio_stream::wrappers::BroadcastStream::new(log_rx).filter_map(move |x| {
Expand Down Expand Up @@ -548,60 +522,47 @@ where
))?;
let instance_id = database_instance.id;

let host = worker_ctx.host_controller();
match host.get_module_host(instance_id) {
Ok(_) => {}
Err(_) => {
let dbic = worker_ctx
.load_module_host_context(database, instance_id)
.await
.map_err(log_and_500)?;
host.spawn_module_host(dbic).await.map_err(log_and_500)?;
}
};

let dbicc = worker_ctx.database_instance_context_controller();

tracing::info!(sql = body);
let Some((dbic, _)) = dbicc.get(instance_id) else {
let err = format!("{}", DatabaseError::NotFound(instance_id));
return Err((StatusCode::BAD_REQUEST, err).into());
};

let stdb = &dbic.relational_db;
let results = match sql::execute::run(stdb, &body, auth) {
Ok(results) => results,
Err(err) => {
log::warn!("{}", err);
return if let Some(auth_err) = err.get_auth_error() {
let err = format!("{auth_err}");
Err((StatusCode::UNAUTHORIZED, err).into())
} else {
let err = format!("{err}");
Err((StatusCode::BAD_REQUEST, err).into())
};
}
};

let json = stdb.with_read_only(&ctx_sql(stdb), |tx| {
results
.into_iter()
.map(|result| {
let rows = result.data;
let schema = result
.head
.fields
.iter()
.map(|x| {
let ty = x.algebraic_type.clone();
let name = translate_col(tx, x.field);
ProductTypeElement::new(ty, name)
})
.collect();
StmtResultJson { schema, rows }
})
.collect::<Vec<_>>()
});
let json = worker_ctx
.host_controller()
.using_database(
database,
instance_id,
move |db| -> axum::response::Result<_, (StatusCode, String)> {
tracing::info!(sql = body);
let results = sql::execute::run(db, &body, auth).map_err(|e| {
log::warn!("{}", e);
if let Some(auth_err) = e.get_auth_error() {
(StatusCode::UNAUTHORIZED, auth_err.to_string())
} else {
(StatusCode::BAD_REQUEST, e.to_string())
}
})?;

let json = db.with_read_only(&ctx_sql(db), |tx| {
results
.into_iter()
.map(|result| {
let rows = result.data;
let schema = result
.head
.fields
.iter()
.map(|x| {
let ty = x.algebraic_type.clone();
let name = translate_col(tx, x.field);
ProductTypeElement::new(ty, name)
})
.collect();
StmtResultJson { schema, rows }
})
.collect::<Vec<_>>()
});

Ok(json)
},
)
.await
.map_err(log_and_500)??;

Ok((StatusCode::OK, axum::Json(json)))
}
Expand Down
16 changes: 4 additions & 12 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,10 @@ where
let identity_token = auth.creds.token().to_owned();

let host = ctx.host_controller();
let module = match host.get_module_host(instance_id) {
Ok(m) => m,
Err(_) => {
// TODO(kim): probably wrong -- check if instance node id matches ours
log::debug!("creating fresh module host");
let dbic = ctx
.load_module_host_context(database, instance_id)
.await
.map_err(log_and_500)?;
host.spawn_module_host(dbic).await.map_err(log_and_500)?
}
};
let module = host
.get_or_launch_module_host(database, instance_id)
.await
.map_err(log_and_500)?;

let client_id = ClientActorId {
identity: auth.identity,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/database_instance_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ impl Deref for DatabaseInstanceContext {

#[derive(Copy, Clone, Default)]
pub struct TotalDiskUsage {
durability: Option<u64>,
logs: Option<u64>,
pub durability: Option<u64>,
pub logs: Option<u64>,
}

impl TotalDiskUsage {
Expand Down
Loading

0 comments on commit 1b7e5d2

Please sign in to comment.