Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions linera-base/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,29 @@ pub struct StreamId {
pub stream_name: StreamName,
}

/// An event identifier.
#[derive(
Debug,
PartialEq,
Eq,
Hash,
Clone,
Serialize,
Deserialize,
WitLoad,
WitStore,
WitType,
SimpleObject,
)]
pub struct EventId {
/// The ID of the chain that generated this event.
pub chain_id: ChainId,
/// The ID of the stream this event belongs to.
pub stream_id: StreamId,
/// The event key.
pub key: Vec<u8>,
}

/// The destination of a message, relative to a particular application.
#[derive(
Clone,
Expand Down
15 changes: 13 additions & 2 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use linera_base::{
hashed::Hashed,
hex_debug,
identifiers::{
Account, BlobId, BlobType, ChainId, ChannelName, Destination, GenericApplicationId,
MessageId, Owner, StreamId,
Account, BlobId, BlobType, ChainId, ChannelName, Destination, EventId,
GenericApplicationId, MessageId, Owner, StreamId,
},
};
use linera_execution::{
Expand Down Expand Up @@ -431,6 +431,17 @@ pub struct EventRecord {
pub value: Vec<u8>,
}

impl EventRecord {
/// Returns the ID of this event record, given the publisher chain ID.
pub fn id(&self, chain_id: ChainId) -> EventId {
EventId {
chain_id,
stream_id: self.stream_id.clone(),
key: self.key.clone(),
}
}
}

impl<'de> BcsHashable<'de> for EventRecord {}

/// The hash and chain ID of a `CertificateValue`.
Expand Down
7 changes: 7 additions & 0 deletions linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ where
.storage
.write_blobs_and_certificate(blobs, &certificate)
.await?;
let events = executed_block
.outcome
.events
.iter()
.flatten()
.map(|event| (event.id(chain_id), &event.value[..]));
self.state.storage.write_events(events).await?;
}

// Update the blob state with last used certificate hash.
Expand Down
54 changes: 53 additions & 1 deletion linera-storage/src/db_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use linera_base::{
crypto::CryptoHash,
data_types::{Blob, TimeDelta, Timestamp},
hashed::Hashed,
identifiers::{BlobId, ChainId, UserApplicationId},
identifiers::{BlobId, ChainId, EventId, UserApplicationId},
};
use linera_chain::{
types::{ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
Expand Down Expand Up @@ -185,13 +185,37 @@ pub static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
)
});

/// The metric counting how often an event is read from storage.
#[cfg(with_metrics)]
#[doc(hidden)]
pub static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec(
"read_event",
"The metric counting how often an event is read from storage",
&[],
)
});

/// The metric counting how often an event is written to storage.
#[cfg(with_metrics)]
#[doc(hidden)]
pub static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec(
"write_event",
"The metric counting how often an event is written to storage",
&[],
)
});

trait BatchExt {
fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError>;

fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError>;

fn add_certificate(&mut self, certificate: &ConfirmedBlockCertificate)
-> Result<(), ViewError>;

fn add_event(&mut self, event_id: EventId, value: &[u8]) -> Result<(), ViewError>;
}

impl BatchExt for Batch {
Expand Down Expand Up @@ -222,6 +246,14 @@ impl BatchExt for Batch {
self.put_key_value(value_key.to_vec(), certificate.value())?;
Ok(())
}

fn add_event(&mut self, event_id: EventId, value: &[u8]) -> Result<(), ViewError> {
#[cfg(with_metrics)]
WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
self.put_key_value_bytes(event_key.to_vec(), value.to_vec());
Ok(())
}
}

/// Main implementation of the [`Storage`] trait.
Expand All @@ -242,6 +274,7 @@ enum BaseKey {
ConfirmedBlock(CryptoHash),
Blob(BlobId),
BlobState(BlobId),
Event(EventId),
}

const INDEX_BLOB: u8 = 3;
Expand Down Expand Up @@ -741,6 +774,25 @@ where
Ok(certificates)
}

async fn read_event(&self, event_id: EventId) -> Result<Vec<u8>, ViewError> {
let event_key = bcs::to_bytes(&BaseKey::Event(event_id.clone()))?;
let maybe_value = self.store.read_value::<Vec<u8>>(&event_key).await?;
#[cfg(with_metrics)]
READ_EVENT_COUNTER.with_label_values(&[]).inc();
maybe_value.ok_or_else(|| ViewError::not_found("value for event ID", event_id))
}

async fn write_events(
&self,
events: impl IntoIterator<Item = (EventId, &[u8])> + Send,
) -> Result<(), ViewError> {
let mut batch = Batch::new();
for (event_id, value) in events {
batch.add_event(event_id, value)?;
}
self.write_batch(batch).await
}

fn wasm_runtime(&self) -> Option<WasmRuntime> {
self.wasm_runtime
}
Expand Down
11 changes: 10 additions & 1 deletion linera-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use linera_base::{
data_types::{Amount, Blob, BlockHeight, TimeDelta, Timestamp, UserApplicationDescription},
hashed::Hashed,
identifiers::{
BlobId, ChainDescription, ChainId, GenericApplicationId, Owner, UserApplicationId,
BlobId, ChainDescription, ChainId, EventId, GenericApplicationId, Owner, UserApplicationId,
},
ownership::ChainOwnership,
};
Expand Down Expand Up @@ -163,6 +163,15 @@ pub trait Storage: Sized {
hashes: I,
) -> Result<Vec<ConfirmedBlockCertificate>, ViewError>;

/// Reads the event with the given ID.
async fn read_event(&self, id: EventId) -> Result<Vec<u8>, ViewError>;

/// Writes a vector of events.
async fn write_events(
&self,
events: impl IntoIterator<Item = (EventId, &[u8])> + Send,
) -> Result<(), ViewError>;

/// Loads the view of a chain state and checks that it is active.
///
/// # Notes
Expand Down