Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 56 additions & 45 deletions crates/supervisor/storage/src/chaindb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ impl ChainDb {
impl DerivationStorageReader for ChainDb {
fn derived_to_source(&self, derived_block_id: BlockNumHash) -> Result<BlockInfo, StorageError> {
self.observe_call("derived_to_source", || {
self.env.view(|tx| DerivationProvider::new(tx).derived_to_source(derived_block_id))
self.env.view(|tx| {
DerivationProvider::new(tx, self.chain_id).derived_to_source(derived_block_id)
})
})?
}

Expand All @@ -83,14 +85,15 @@ impl DerivationStorageReader for ChainDb {
) -> Result<BlockInfo, StorageError> {
self.observe_call("latest_derived_block_at_source", || {
self.env.view(|tx| {
DerivationProvider::new(tx).latest_derived_block_at_source(source_block_id)
DerivationProvider::new(tx, self.chain_id)
.latest_derived_block_at_source(source_block_id)
})
})?
}

fn latest_derivation_state(&self) -> Result<DerivedRefPair, StorageError> {
self.observe_call("latest_derivation_state", || {
self.env.view(|tx| DerivationProvider::new(tx).latest_derivation_state())
self.env.view(|tx| DerivationProvider::new(tx, self.chain_id).latest_derivation_state())
})?
}
}
Expand All @@ -102,10 +105,10 @@ impl DerivationStorageWriter for ChainDb {
) -> Result<(), StorageError> {
self.observe_call("initialise_derivation_storage", || {
self.env.update(|ctx| {
DerivationProvider::new(ctx).initialise(incoming_pair)?;
SafetyHeadRefProvider::new(ctx)
DerivationProvider::new(ctx, self.chain_id).initialise(incoming_pair)?;
SafetyHeadRefProvider::new(ctx, self.chain_id)
.update_safety_head_ref(SafetyLevel::LocalSafe, &incoming_pair.derived)?;
SafetyHeadRefProvider::new(ctx)
SafetyHeadRefProvider::new(ctx, self.chain_id)
.update_safety_head_ref(SafetyLevel::CrossSafe, &incoming_pair.derived)
})
})?
Expand All @@ -115,39 +118,43 @@ impl DerivationStorageWriter for ChainDb {
self.observe_call("save_derived_block", || {
self.env.update(|ctx| {
let derived_block = incoming_pair.derived;
let block = LogProvider::new(ctx).get_block(derived_block.number).map_err(
|err| match err {
let block = LogProvider::new(ctx, self.chain_id)
.get_block(derived_block.number)
.map_err(|err| match err {
StorageError::EntryNotFound(_) => {
error!(
target: "supervisor_storage",
chain_id = %self.chain_id,
incoming_block = %derived_block,
"Derived block not found in log storage: {derived_block:?}"
);
StorageError::ConflictError
}
other => other, // propagate other errors as-is
},
)?;
})?;

if block != derived_block {
error!(
target: "supervisor_storage",
chain_id = %self.chain_id,
incoming_block = %derived_block,
stored_log_block = %block,
"Derived block does not match the stored log block"
);
return Err(StorageError::ConflictError);
}
DerivationProvider::new(ctx).save_derived_block(incoming_pair)?;
SafetyHeadRefProvider::new(ctx)
DerivationProvider::new(ctx, self.chain_id).save_derived_block(incoming_pair)?;
SafetyHeadRefProvider::new(ctx, self.chain_id)
.update_safety_head_ref(SafetyLevel::LocalSafe, &incoming_pair.derived)
})
})?
}

fn save_source_block(&self, incoming_source: BlockInfo) -> Result<(), StorageError> {
self.observe_call("save_source_block", || {
self.env.update(|ctx| DerivationProvider::new(ctx).save_source_block(incoming_source))
self.env.update(|ctx| {
DerivationProvider::new(ctx, self.chain_id).save_source_block(incoming_source)
})
})?
}
}
Expand All @@ -156,25 +163,25 @@ impl DerivationStorageWriter for ChainDb {
impl LogStorageReader for ChainDb {
fn get_latest_block(&self) -> Result<BlockInfo, StorageError> {
self.observe_call("get_latest_block", || {
self.env.view(|tx| LogProvider::new(tx).get_latest_block())
self.env.view(|tx| LogProvider::new(tx, self.chain_id).get_latest_block())
})?
}

fn get_block(&self, block_number: u64) -> Result<BlockInfo, StorageError> {
self.observe_call("get_block", || {
self.env.view(|tx| LogProvider::new(tx).get_block(block_number))
self.env.view(|tx| LogProvider::new(tx, self.chain_id).get_block(block_number))
})?
}

fn get_log(&self, block_number: u64, log_index: u32) -> Result<Log, StorageError> {
self.observe_call("get_log", || {
self.env.view(|tx| LogProvider::new(tx).get_log(block_number, log_index))
self.env.view(|tx| LogProvider::new(tx, self.chain_id).get_log(block_number, log_index))
})?
}

fn get_logs(&self, block_number: u64) -> Result<Vec<Log>, StorageError> {
self.observe_call("get_logs", || {
self.env.view(|tx| LogProvider::new(tx).get_logs(block_number))
self.env.view(|tx| LogProvider::new(tx, self.chain_id).get_logs(block_number))
})?
}
}
Expand All @@ -183,10 +190,10 @@ impl LogStorageWriter for ChainDb {
fn initialise_log_storage(&self, block: BlockInfo) -> Result<(), StorageError> {
self.observe_call("initialise_log_storage", || {
self.env.update(|ctx| {
LogProvider::new(ctx).initialise(block)?;
SafetyHeadRefProvider::new(ctx)
LogProvider::new(ctx, self.chain_id).initialise(block)?;
SafetyHeadRefProvider::new(ctx, self.chain_id)
.update_safety_head_ref(SafetyLevel::LocalUnsafe, &block)?;
SafetyHeadRefProvider::new(ctx)
SafetyHeadRefProvider::new(ctx, self.chain_id)
.update_safety_head_ref(SafetyLevel::CrossUnsafe, &block)
})
})?
Expand All @@ -195,9 +202,9 @@ impl LogStorageWriter for ChainDb {
fn store_block_logs(&self, block: &BlockInfo, logs: Vec<Log>) -> Result<(), StorageError> {
self.observe_call("store_block_logs", || {
self.env.update(|ctx| {
LogProvider::new(ctx).store_block_logs(block, logs)?;
LogProvider::new(ctx, self.chain_id).store_block_logs(block, logs)?;

SafetyHeadRefProvider::new(ctx)
SafetyHeadRefProvider::new(ctx, self.chain_id)
.update_safety_head_ref(SafetyLevel::LocalUnsafe, block)
})
})?
Expand All @@ -207,15 +214,17 @@ impl LogStorageWriter for ChainDb {
impl HeadRefStorageReader for ChainDb {
fn get_safety_head_ref(&self, safety_level: SafetyLevel) -> Result<BlockInfo, StorageError> {
self.observe_call("get_safety_head_ref", || {
self.env.view(|tx| SafetyHeadRefProvider::new(tx).get_safety_head_ref(safety_level))
self.env.view(|tx| {
SafetyHeadRefProvider::new(tx, self.chain_id).get_safety_head_ref(safety_level)
})
})?
}

/// Fetches all safety heads and current L1 state
fn get_super_head(&self) -> Result<SuperHead, StorageError> {
self.observe_call("get_super_head", || {
self.env.view(|tx| {
let sp = SafetyHeadRefProvider::new(tx);
let sp = SafetyHeadRefProvider::new(tx, self.chain_id);
let local_unsafe =
sp.get_safety_head_ref(SafetyLevel::LocalUnsafe).map_err(|err| {
if matches!(err, StorageError::FutureData) {
Expand Down Expand Up @@ -249,11 +258,12 @@ impl HeadRefStorageReader for ChainDb {
Err(err) => return Err(err),
};

let l1_source = match DerivationProvider::new(tx).latest_derivation_state() {
Ok(pair) => Some(pair.source),
Err(StorageError::DatabaseNotInitialised) => None,
Err(err) => return Err(err),
};
let l1_source =
match DerivationProvider::new(tx, self.chain_id).latest_derivation_state() {
Ok(pair) => Some(pair.source),
Err(StorageError::DatabaseNotInitialised) => None,
Err(err) => return Err(err),
};

Ok(SuperHead {
l1_source,
Expand All @@ -275,16 +285,17 @@ impl HeadRefStorageWriter for ChainDb {
) -> Result<BlockInfo, StorageError> {
self.observe_call("update_finalized_using_source", || {
self.env.update(|tx| {
let sp = SafetyHeadRefProvider::new(tx);
let sp = SafetyHeadRefProvider::new(tx, self.chain_id);
let safe = sp.get_safety_head_ref(SafetyLevel::CrossSafe)?;

let dp = DerivationProvider::new(tx);
let dp = DerivationProvider::new(tx, self.chain_id);
let safe_block_pair = dp.get_derived_block_pair(safe.id())?;

if finalized_source_block.number >= safe_block_pair.source.number {
// this could happen during initial sync
warn!(
target: "supervisor_storage",
chain_id = %self.chain_id,
l1_finalized_block_number = finalized_source_block.number,
safe_source_block_number = safe_block_pair.source.number,
"L1 finalized block is greater than safe block",
Expand All @@ -304,14 +315,15 @@ impl HeadRefStorageWriter for ChainDb {
fn update_current_cross_unsafe(&self, block: &BlockInfo) -> Result<(), StorageError> {
self.observe_call("update_current_cross_unsafe", || {
self.env.update(|tx| {
let lp = LogProvider::new(tx);
let sp = SafetyHeadRefProvider::new(tx);
let lp = LogProvider::new(tx, self.chain_id);
let sp = SafetyHeadRefProvider::new(tx, self.chain_id);

// Check parent-child relationship with current CrossUnsafe head, if it exists.
let parent = sp.get_safety_head_ref(SafetyLevel::CrossUnsafe)?;
if !parent.is_parent_of(block) {
error!(
target: "supervisor_storage",
chain_id = %self.chain_id,
incoming_block = %block,
latest_block = %parent,
"Incoming block is not the child of the current cross-unsafe head",
Expand All @@ -324,17 +336,15 @@ impl HeadRefStorageWriter for ChainDb {
if stored_block.hash != block.hash {
warn!(
target: "supervisor_storage",
chain_id = %self.chain_id,
incoming_block_hash = %block.hash,
stored_block_hash = %stored_block.hash,
"Hash mismatch while updating CrossUnsafe head",
);
return Err(StorageError::EntryNotFound(
"block hash does not match".to_string(),
));
return Err(StorageError::ConflictError);
}

sp.update_safety_head_ref(SafetyLevel::CrossUnsafe, block)?;

Ok(())
})?
})
Expand All @@ -343,14 +353,15 @@ impl HeadRefStorageWriter for ChainDb {
fn update_current_cross_safe(&self, block: &BlockInfo) -> Result<DerivedRefPair, StorageError> {
self.observe_call("update_current_cross_safe", || {
self.env.update(|tx| {
let dp = DerivationProvider::new(tx);
let sp = SafetyHeadRefProvider::new(tx);
let dp = DerivationProvider::new(tx, self.chain_id);
let sp = SafetyHeadRefProvider::new(tx, self.chain_id);

// Check parent-child relationship with current CrossUnsafe head, if it exists.
let parent = sp.get_safety_head_ref(SafetyLevel::CrossSafe)?;
if !parent.is_parent_of(block) {
error!(
target: "supervisor_storage",
chain_id = %self.chain_id,
incoming_block = %block,
latest_block = %parent,
"Incoming block is not the child of the current cross-safe head",
Expand All @@ -373,8 +384,8 @@ impl Rewinder for ChainDb {
fn rewind_log_storage(&self, to: &BlockNumHash) -> Result<(), StorageError> {
self.observe_call("rewind_log_storage", || {
self.env.update(|tx| {
let lp = LogProvider::new(tx);
let hp = SafetyHeadRefProvider::new(tx);
let lp = LogProvider::new(tx, self.chain_id);
let hp = SafetyHeadRefProvider::new(tx, self.chain_id);

lp.rewind_to(to)?;

Expand All @@ -390,9 +401,9 @@ impl Rewinder for ChainDb {
fn rewind(&self, to: &BlockNumHash) -> Result<(), StorageError> {
self.observe_call("rewind", || {
self.env.update(|tx| {
let lp = LogProvider::new(tx);
let dp = DerivationProvider::new(tx);
let hp = SafetyHeadRefProvider::new(tx);
let lp = LogProvider::new(tx, self.chain_id);
let dp = DerivationProvider::new(tx, self.chain_id);
let hp = SafetyHeadRefProvider::new(tx, self.chain_id);

lp.rewind_to(to)?;
dp.rewind_to(to)?;
Expand Down Expand Up @@ -579,7 +590,7 @@ mod tests {
let _ = db
.env
.update(|ctx| {
let sp = SafetyHeadRefProvider::new(ctx);
let sp = SafetyHeadRefProvider::new(ctx, 1);
sp.update_safety_head_ref(SafetyLevel::Finalized, &block)
})
.unwrap();
Expand Down
Loading