Skip to content

Commit

Permalink
core: Collapse DBIC into HostController
Browse files Browse the repository at this point in the history
Make it so `HostController` manages both the module host (wasm
machinery) and the database (`RelationalDB` / `DatabaseInstanceContext`)
of spacetime databases deployed to a server.

The `DatabaseInstanceContextController` (DBIC) is removed in the
process.

This allows to make database accesses panic-safe, in that uncaught
panics will cause all resouces to be released and the database to be
restarted on subsequent access. This is a prerequisite for #985.

It also allows to move towards storage of the module binary directly in
the database / commitlog. This patch, however, makes some contortions in
order to **not** introduce a breaking change just yet.
  • Loading branch information
kim committed May 2, 2024
1 parent 5cc05b1 commit 4018d12
Show file tree
Hide file tree
Showing 16 changed files with 748 additions and 763 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 2 additions & 25 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ use http::StatusCode;
use spacetimedb::address::Address;
use spacetimedb::auth::identity::{DecodingKey, EncodingKey};
use spacetimedb::client::ClientActorIndex;
use spacetimedb::database_instance_context_controller::DatabaseInstanceContextController;
use spacetimedb::energy::{EnergyBalance, EnergyQuanta};
use spacetimedb::host::{HostController, UpdateDatabaseResult};
use spacetimedb::identity::Identity;
use spacetimedb::messages::control_db::{Database, DatabaseInstance, IdentityEmail, Node};
use spacetimedb::module_host_context::ModuleHostContext;
use spacetimedb::sendgrid_controller::SendGridController;
use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld};
use spacetimedb_client_api_messages::recovery::RecoveryCode;
Expand All @@ -26,11 +24,9 @@ pub mod util;
///
/// Types returned here should be considered internal state and **never** be
/// surfaced to the API.
#[async_trait]
pub trait NodeDelegate: Send + Sync {
fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily>;
fn database_instance_context_controller(&self) -> &DatabaseInstanceContextController;
fn host_controller(&self) -> &Arc<HostController>;
fn host_controller(&self) -> &HostController;
fn client_actor_index(&self) -> &ClientActorIndex;
fn sendgrid_controller(&self) -> Option<&SendGridController>;

Expand All @@ -44,16 +40,6 @@ pub trait NodeDelegate: Send + Sync {

/// Return a JWT encoding key for signing credentials.
fn private_key(&self) -> &EncodingKey;

/// Load the [`ModuleHostContext`] for instance `instance_id` of
/// [`Database`] `db`.
///
/// This method is defined as `async`, as that obliges the implementer to
/// ensure that any necessary blocking I/O is made async-safe. In other
/// words, it is the responsibility of the implementation to make use of
/// `spawn_blocking` or `block_in_place` as appropriate, while the
/// `client-api` assumes that `await`ing the method never blocks.
async fn load_module_host_context(&self, db: Database, instance_id: u64) -> anyhow::Result<ModuleHostContext>;
}

/// Parameters for publishing a database.
Expand Down Expand Up @@ -276,17 +262,12 @@ impl<T: ControlStateWriteAccess + ?Sized> ControlStateWriteAccess for Arc<T> {
}
}

#[async_trait]
impl<T: NodeDelegate + ?Sized> NodeDelegate for Arc<T> {
fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
(**self).gather_metrics()
}

fn database_instance_context_controller(&self) -> &DatabaseInstanceContextController {
(**self).database_instance_context_controller()
}

fn host_controller(&self) -> &Arc<HostController> {
fn host_controller(&self) -> &HostController {
(**self).host_controller()
}

Expand All @@ -309,10 +290,6 @@ impl<T: NodeDelegate + ?Sized> NodeDelegate for Arc<T> {
fn sendgrid_controller(&self) -> Option<&SendGridController> {
(**self).sendgrid_controller()
}

async fn load_module_host_context(&self, db: Database, instance_id: u64) -> anyhow::Result<ModuleHostContext> {
(**self).load_module_host_context(db, instance_id).await
}
}

pub fn log_and_500(e: impl std::fmt::Display) -> ErrorResponse {
Expand Down
101 changes: 30 additions & 71 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,10 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
))?;
let instance_id = database_instance.id;
let host = worker_ctx.host_controller();

let module = match host.get_module_host(instance_id) {
Ok(m) => m,
Err(_) => {
let dbic = worker_ctx
.load_module_host_context(database, instance_id)
.await
.map_err(log_and_500)?;
host.spawn_module_host(dbic).await.map_err(log_and_500)?
}
};
let module = host
.get_or_launch_module_host(database, instance_id)
.await
.map_err(log_and_500)?;

// HTTP callers always need an address to provide to connect/disconnect,
// so generate one if none was provided.
Expand Down Expand Up @@ -281,17 +274,11 @@ where
let call_info = extract_db_call_info(&worker_ctx, auth, &address).await?;

let instance_id = call_info.database_instance.id;
let host = worker_ctx.host_controller();
let module = match host.get_module_host(instance_id) {
Ok(m) => m,
Err(_) => {
let dbic = worker_ctx
.load_module_host_context(database, instance_id)
.await
.map_err(log_and_500)?;
host.spawn_module_host(dbic).await.map_err(log_and_500)?
}
};
let module = worker_ctx
.host_controller()
.get_or_launch_module_host(database, instance_id)
.await
.map_err(log_and_500)?;

let entity_type = entity_type.as_str().parse().map_err(|()| {
log::debug!("Request to describe unhandled entity type: {}", entity_type);
Expand Down Expand Up @@ -339,16 +326,10 @@ where

let instance_id = call_info.database_instance.id;
let host = worker_ctx.host_controller();
let module = match host.get_module_host(instance_id) {
Ok(m) => m,
Err(_) => {
let dbic = worker_ctx
.load_module_host_context(database, instance_id)
.await
.map_err(log_and_500)?;
host.spawn_module_host(dbic).await.map_err(log_and_500)?
}
};
let module = host
.get_or_launch_module_host(database, instance_id)
.await
.map_err(log_and_500)?;
let catalog = module.catalog();
let expand = expand.unwrap_or(false);
let response_catalog: HashMap<_, _> = catalog
Expand Down Expand Up @@ -459,16 +440,10 @@ where

let body = if follow {
let host = worker_ctx.host_controller();
let module = match host.get_module_host(instance_id) {
Ok(m) => m,
Err(_) => {
let dbic = worker_ctx
.load_module_host_context(database, instance_id)
.await
.map_err(log_and_500)?;
host.spawn_module_host(dbic).await.map_err(log_and_500)?
}
};
let module = host
.get_or_launch_module_host(database, instance_id)
.await
.map_err(log_and_500)?;
let log_rx = module.subscribe_to_logs().map_err(log_and_500)?;

let stream = tokio_stream::wrappers::BroadcastStream::new(log_rx).filter_map(move |x| {
Expand Down Expand Up @@ -547,35 +522,19 @@ where
let instance_id = database_instance.id;

let host = worker_ctx.host_controller();
match host.get_module_host(instance_id) {
Ok(_) => {}
Err(_) => {
let dbic = worker_ctx
.load_module_host_context(database, instance_id)
.await
.map_err(log_and_500)?;
host.spawn_module_host(dbic).await.map_err(log_and_500)?;
}
};

let results = match execute(
worker_ctx.database_instance_context_controller(),
instance_id,
body,
auth,
) {
Ok(results) => results,
Err(err) => {
log::warn!("{}", err);
return if let Some(auth_err) = err.get_auth_error() {
let err = format!("{auth_err}");
Err((StatusCode::UNAUTHORIZED, err).into())
} else {
let err = format!("{err}");
Err((StatusCode::BAD_REQUEST, err).into())
};
}
};
let results = host
.with_database(database, instance_id, move |db| {
execute(db, body, auth).map_err(|e| {
log::warn!("{}", e);
if let Some(auth_err) = e.get_auth_error() {
(StatusCode::UNAUTHORIZED, auth_err.to_string())
} else {
(StatusCode::BAD_REQUEST, e.to_string())
}
})
})
.await
.map_err(log_and_500)??;

let json = results
.into_iter()
Expand Down
16 changes: 4 additions & 12 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,10 @@ where
let identity_token = auth.creds.token().to_owned();

let host = ctx.host_controller();
let module = match host.get_module_host(instance_id) {
Ok(m) => m,
Err(_) => {
// TODO(kim): probably wrong -- check if instance node id matches ours
log::debug!("creating fresh module host");
let dbic = ctx
.load_module_host_context(database, instance_id)
.await
.map_err(log_and_500)?;
host.spawn_module_host(dbic).await.map_err(log_and_500)?
}
};
let module = host
.get_or_launch_module_host(database, instance_id)
.await
.map_err(log_and_500)?;

let client_id = ClientActorId {
identity: auth.identity,
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub trait SpacetimeDbFiles {
}

/// The location of paths for the database in a local OR temp folder.
#[derive(Debug)]
pub struct FilesLocal {
dir: PathBuf,
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/database_instance_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ impl Deref for DatabaseInstanceContext {

#[derive(Copy, Clone, Default)]
pub struct TotalDiskUsage {
durability: Option<u64>,
logs: Option<u64>,
pub durability: Option<u64>,
pub logs: Option<u64>,
}

impl TotalDiskUsage {
Expand Down
Loading

0 comments on commit 4018d12

Please sign in to comment.