diff --git a/linera-base/src/data_types.rs b/linera-base/src/data_types.rs index 1cad22fecf3d..5c28fc834f0b 100644 --- a/linera-base/src/data_types.rs +++ b/linera-base/src/data_types.rs @@ -34,8 +34,8 @@ use crate::{ crypto::BcsHashable, doc_scalar, hex_debug, identifiers::{ - ApplicationId, BlobId, BlobType, BytecodeId, Destination, GenericApplicationId, MessageId, - UserApplicationId, + ApplicationId, BlobId, BlobType, BytecodeId, Destination, EventId, GenericApplicationId, + MessageId, UserApplicationId, }, time::{Duration, SystemTime}, }; @@ -740,6 +740,8 @@ pub enum OracleResponse { Post(Vec), /// A successful read or write of a blob. Blob(BlobId), + /// A successful read of an event. + Event(EventId, Vec), /// An assertion oracle that passed. Assert, } @@ -759,6 +761,12 @@ impl Display for OracleResponse { } OracleResponse::Post(bytes) => write!(f, "Post:{}", STANDARD_NO_PAD.encode(bytes))?, OracleResponse::Blob(blob_id) => write!(f, "Blob:{}", blob_id)?, + OracleResponse::Event(event_id, event_value) => write!( + f, + "Event:{}:{}", + event_id, + STANDARD_NO_PAD.encode(event_value) + )?, OracleResponse::Assert => write!(f, "Assert")?, }; diff --git a/linera-base/src/identifiers.rs b/linera-base/src/identifiers.rs index e01a2312f360..ae40542cb172 100644 --- a/linera-base/src/identifiers.rs +++ b/linera-base/src/identifiers.rs @@ -4,7 +4,7 @@ //! Core identifiers used by the Linera protocol. use std::{ - fmt::{self, Debug, Display}, + fmt::{self, Debug, Display, Formatter}, hash::{Hash, Hasher}, marker::PhantomData, str::FromStr, @@ -12,6 +12,7 @@ use std::{ use anyhow::{anyhow, Context}; use async_graphql::SimpleObject; +use base64::engine::{general_purpose::STANDARD_NO_PAD, Engine as _}; use linera_witty::{WitLoad, WitStore, WitType}; use serde::{Deserialize, Serialize}; @@ -337,6 +338,15 @@ impl GenericApplicationId { } } +impl Display for GenericApplicationId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + GenericApplicationId::System => write!(f, "System"), + GenericApplicationId::User(app_id) => Display::fmt(app_id, f), + } + } +} + impl From for GenericApplicationId { fn from(user_application_id: ApplicationId) -> Self { GenericApplicationId::User(user_application_id) @@ -412,6 +422,52 @@ pub struct StreamId { pub stream_name: StreamName, } +impl Display for StreamId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}:{}", + self.application_id, + STANDARD_NO_PAD.encode(&self.stream_name.0) + ) + } +} + +/// An event identifier. +#[derive( + Debug, + PartialEq, + Eq, + Hash, + Clone, + Serialize, + Deserialize, + WitLoad, + WitStore, + WitType, + SimpleObject, +)] +pub struct EventId { + /// The ID of the chain that generated this event. + pub chain_id: ChainId, + /// The ID of the stream this event belongs to. + pub stream_id: StreamId, + /// The event key. + pub key: Vec, +} + +impl Display for EventId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}:{}:{}", + self.chain_id, + self.stream_id, + STANDARD_NO_PAD.encode(&self.key) + ) + } +} + /// The destination of a message, relative to a particular application. #[derive( Clone, diff --git a/linera-core/src/chain_worker/state/attempted_changes.rs b/linera-core/src/chain_worker/state/attempted_changes.rs index 0d98e39502d1..c22c74a772f7 100644 --- a/linera-core/src/chain_worker/state/attempted_changes.rs +++ b/linera-core/src/chain_worker/state/attempted_changes.rs @@ -9,7 +9,7 @@ use futures::future::try_join_all; use linera_base::{ data_types::{Blob, BlockHeight, Timestamp}, ensure, - identifiers::{ChainId, MessageId}, + identifiers::{ChainId, EventId, MessageId}, }; use linera_chain::{ data_types::{ @@ -300,11 +300,23 @@ where let blobs_in_block = self.state.get_blobs(required_blob_ids.clone()).await?; let certificate_hash = certificate.hash(); + let events_iter = executed_block.outcome.events.iter().flatten(); + let events = events_iter + .map(|event| { + let event_id = EventId { + chain_id: block.chain_id, + stream_id: event.stream_id.clone(), + key: event.key.clone(), + }; + (event_id, &event.value[..]) + }) + .collect::>(); self.state .storage .write_blobs_and_certificate(&blobs_in_block, &certificate) .await?; + self.state.storage.write_events(&events).await?; // Update the blob state with last used certificate hash. try_join_all(required_blob_ids.into_iter().map(|blob_id| { diff --git a/linera-execution/src/execution_state_actor.rs b/linera-execution/src/execution_state_actor.rs index d11aba653fe8..7718c307e2da 100644 --- a/linera-execution/src/execution_state_actor.rs +++ b/linera-execution/src/execution_state_actor.rs @@ -12,7 +12,7 @@ use futures::channel::mpsc; use linera_base::prometheus_util::{self, MeasureLatency as _}; use linera_base::{ data_types::{Amount, ApplicationPermissions, BlobContent, Timestamp}, - identifiers::{Account, BlobId, MessageId, Owner}, + identifiers::{Account, BlobId, EventId, MessageId, Owner}, ownership::ChainOwnership, }; use linera_views::{batch::Batch, context::Context, views::View}; @@ -318,6 +318,16 @@ where self.system.assert_blob_exists(blob_id).await?; callback.respond(()) } + + ReadEvent { event_id, callback } => { + let blob = self + .context() + .extra() + .get_event(&event_id) + .await + .map_err(|_| ExecutionError::EventNotFound(Box::new(event_id)))?; + callback.respond(blob); + } } Ok(()) @@ -453,6 +463,11 @@ pub enum ExecutionRequest { blob_id: BlobId, callback: Sender<()>, }, + + ReadEvent { + event_id: EventId, + callback: oneshot::Sender>, + }, } impl Debug for ExecutionRequest { @@ -591,6 +606,11 @@ impl Debug for ExecutionRequest { .field("blob_id", blob_id) .finish_non_exhaustive(), + ExecutionRequest::ReadEvent { event_id, .. } => formatter + .debug_struct("ExecutionRequest::ReadEvent") + .field("event_id", event_id) + .finish_non_exhaustive(), + ExecutionRequest::AssertBlobExists { blob_id, .. } => formatter .debug_struct("ExecutionRequest::AssertBlobExists") .field("blob_id", blob_id) diff --git a/linera-execution/src/lib.rs b/linera-execution/src/lib.rs index bb6b0aa4e104..220aebbb0681 100644 --- a/linera-execution/src/lib.rs +++ b/linera-execution/src/lib.rs @@ -38,7 +38,7 @@ use linera_base::{ }, doc_scalar, hex_debug, identifiers::{ - Account, ApplicationId, BlobId, BytecodeId, ChainId, ChannelName, Destination, + Account, ApplicationId, BlobId, BytecodeId, ChainId, ChannelName, Destination, EventId, GenericApplicationId, MessageId, Owner, StreamName, UserApplicationId, }, ownership::ChainOwnership, @@ -201,6 +201,8 @@ pub enum ExecutionError { EventKeyTooLong, #[error("Stream names can be at most {MAX_STREAM_NAME_LEN} bytes.")] StreamNameTooLong, + #[error("Event not found: {0}")] + EventNotFound(Box), // TODO(#2127): Remove this error and the unstable-oracles feature once there are fees // and enforced limits for all oracles. #[error("Unstable oracles are disabled on this network.")] @@ -290,6 +292,8 @@ pub trait ExecutionRuntimeContext { async fn get_blob(&self, blob_id: BlobId) -> Result; async fn contains_blob(&self, blob_id: BlobId) -> Result; + + async fn get_event(&self, event_id: &EventId) -> Result, ExecutionError>; } #[derive(Clone, Copy, Debug)] @@ -524,6 +528,9 @@ pub trait BaseRuntime { /// Asserts the existence of a data blob with the given hash. fn assert_data_blob_exists(&mut self, hash: &CryptoHash) -> Result<(), ExecutionError>; + + /// Reads an event. + fn read_event(&mut self, event_id: EventId) -> Result, ExecutionError>; } pub trait ServiceRuntime: BaseRuntime { @@ -890,6 +897,7 @@ pub struct TestExecutionRuntimeContext { user_contracts: Arc>, user_services: Arc>, blobs: Arc>, + events: Arc>>, } #[cfg(with_testing)] @@ -901,6 +909,7 @@ impl TestExecutionRuntimeContext { user_contracts: Arc::default(), user_services: Arc::default(), blobs: Arc::default(), + events: Arc::default(), } } @@ -967,6 +976,14 @@ impl ExecutionRuntimeContext for TestExecutionRuntimeContext { async fn contains_blob(&self, blob_id: BlobId) -> Result { Ok(self.blobs.contains_key(&blob_id)) } + + async fn get_event(&self, event_id: &EventId) -> Result, ExecutionError> { + Ok(self + .events + .get(event_id) + .ok_or_else(|| ExecutionError::EventNotFound(Box::new(event_id.clone())))? + .clone()) + } } impl From for Operation { diff --git a/linera-execution/src/runtime.rs b/linera-execution/src/runtime.rs index ace33f9ce1c6..899b2c085dfd 100644 --- a/linera-execution/src/runtime.rs +++ b/linera-execution/src/runtime.rs @@ -17,7 +17,7 @@ use linera_base::{ }, ensure, identifiers::{ - Account, ApplicationId, BlobId, BlobType, ChainId, ChannelName, MessageId, Owner, + Account, ApplicationId, BlobId, BlobType, ChainId, ChannelName, EventId, MessageId, Owner, StreamName, }, ownership::ChainOwnership, @@ -694,6 +694,10 @@ impl BaseRuntime for SyncRuntimeHandle { fn assert_data_blob_exists(&mut self, hash: &CryptoHash) -> Result<(), ExecutionError> { self.inner().assert_data_blob_exists(hash) } + + fn read_event(&mut self, event_id: EventId) -> Result, ExecutionError> { + self.inner().read_event(event_id) + } } impl BaseRuntime for SyncRuntimeInternal { @@ -1021,6 +1025,26 @@ impl BaseRuntime for SyncRuntimeInternal { .recv_response()?; Ok(()) } + + fn read_event(&mut self, event_id: EventId) -> Result, ExecutionError> { + let event_value = + if let Some(response) = self.transaction_tracker.next_replayed_oracle_response()? { + match response { + OracleResponse::Event(id, bytes) if id == event_id => bytes, + _ => return Err(ExecutionError::OracleResponseMismatch), + } + } else { + self.execution_state_sender + .send_request(|callback| ExecutionRequest::ReadEvent { + event_id: event_id.clone(), + callback, + })? + .recv_response()? + }; + self.transaction_tracker + .add_oracle_response(OracleResponse::Event(event_id, event_value.clone())); + Ok(event_value) + } } impl Clone for SyncRuntimeHandle { diff --git a/linera-execution/src/wasm/system_api.rs b/linera-execution/src/wasm/system_api.rs index 76596807c716..e939ce1dd2a4 100644 --- a/linera-execution/src/wasm/system_api.rs +++ b/linera-execution/src/wasm/system_api.rs @@ -6,7 +6,9 @@ use std::{any::Any, collections::HashMap, marker::PhantomData}; use linera_base::{ crypto::CryptoHash, data_types::{Amount, ApplicationPermissions, BlockHeight, SendMessageRequest, Timestamp}, - identifiers::{Account, ApplicationId, ChainId, ChannelName, MessageId, Owner, StreamName}, + identifiers::{ + Account, ApplicationId, ChainId, ChannelName, EventId, MessageId, Owner, StreamName, + }, ownership::{ChainOwnership, CloseChainError}, }; use linera_views::batch::{Batch, WriteOperation}; @@ -380,6 +382,15 @@ where .map_err(|error| RuntimeError::Custom(error.into())) } + /// Reads an event from storage. + fn read_event(caller: &mut Caller, event_id: EventId) -> Result, RuntimeError> { + caller + .user_data_mut() + .runtime + .read_event(event_id) + .map_err(|error| RuntimeError::Custom(error.into())) + } + /// Logs a `message` with the provided information `level`. fn log(_caller: &mut Caller, message: String, level: log::Level) -> Result<(), RuntimeError> { match level { @@ -572,6 +583,15 @@ where .map_err(|error| RuntimeError::Custom(error.into())) } + /// Reads an event from storage. + fn read_event(caller: &mut Caller, event_id: EventId) -> Result, RuntimeError> { + caller + .user_data_mut() + .runtime + .read_event(event_id) + .map_err(|error| RuntimeError::Custom(error.into())) + } + /// Aborts the query if the current time at block validation is `>= timestamp`. Note that block /// validation happens at or after the block timestamp, but isn't necessarily the same. fn assert_before(caller: &mut Caller, timestamp: Timestamp) -> Result<(), RuntimeError> { diff --git a/linera-rpc/tests/snapshots/format__format.yaml.snap b/linera-rpc/tests/snapshots/format__format.yaml.snap index 84fa57d04323..ce9099652304 100644 --- a/linera-rpc/tests/snapshots/format__format.yaml.snap +++ b/linera-rpc/tests/snapshots/format__format.yaml.snap @@ -393,6 +393,14 @@ Destination: TYPENAME: ChannelName Epoch: NEWTYPESTRUCT: U32 +EventId: + STRUCT: + - chain_id: + TYPENAME: ChainId + - stream_id: + TYPENAME: StreamId + - key: + SEQ: U8 EventRecord: STRUCT: - stream_id: @@ -649,6 +657,11 @@ OracleResponse: NEWTYPE: TYPENAME: BlobId 3: + Event: + TUPLE: + - TYPENAME: EventId + - SEQ: U8 + 4: Assert: UNIT Origin: STRUCT: diff --git a/linera-sdk/src/contract/conversions_to_wit.rs b/linera-sdk/src/contract/conversions_to_wit.rs index 738040834c1b..ca7d36c86b68 100644 --- a/linera-sdk/src/contract/conversions_to_wit.rs +++ b/linera-sdk/src/contract/conversions_to_wit.rs @@ -10,8 +10,8 @@ use linera_base::{ Timestamp, }, identifiers::{ - Account, ApplicationId, BytecodeId, ChainId, ChannelName, Destination, MessageId, Owner, - StreamName, + Account, ApplicationId, BytecodeId, ChainId, ChannelName, Destination, EventId, + GenericApplicationId, MessageId, Owner, StreamId, StreamName, }, ownership::{ChainOwnership, TimeoutConfig}, }; @@ -147,14 +147,6 @@ impl From for wit_system_api::ChannelName { } } -impl From for wit_system_api::StreamName { - fn from(name: StreamName) -> Self { - wit_system_api::StreamName { - inner0: name.into_bytes(), - } - } -} - impl From for wit_system_api::Resources { fn from(resources: Resources) -> Self { wit_system_api::Resources { @@ -258,3 +250,50 @@ impl From for wit_system_api::ChainOwnership { } } } + +impl From for wit_system_api::EventId { + fn from(event_id: EventId) -> Self { + let EventId { + chain_id, + stream_id, + key, + } = event_id; + Self { + chain_id: chain_id.into(), + stream_id: stream_id.into(), + key, + } + } +} + +impl From for wit_system_api::StreamId { + fn from(stream_id: StreamId) -> Self { + let StreamId { + application_id, + stream_name, + } = stream_id; + Self { + application_id: application_id.into(), + stream_name: stream_name.into(), + } + } +} + +impl From for wit_system_api::StreamName { + fn from(name: StreamName) -> Self { + wit_system_api::StreamName { + inner0: name.into_bytes(), + } + } +} + +impl From for wit_system_api::GenericApplicationId { + fn from(app_id: GenericApplicationId) -> Self { + match app_id { + GenericApplicationId::System => wit_system_api::GenericApplicationId::System, + GenericApplicationId::User(app_id) => { + wit_system_api::GenericApplicationId::User(app_id.into()) + } + } + } +} diff --git a/linera-sdk/src/contract/runtime.rs b/linera-sdk/src/contract/runtime.rs index 5d4137cb0610..9c3311408d30 100644 --- a/linera-sdk/src/contract/runtime.rs +++ b/linera-sdk/src/contract/runtime.rs @@ -9,7 +9,8 @@ use linera_base::{ Amount, ApplicationPermissions, BlockHeight, Resources, SendMessageRequest, Timestamp, }, identifiers::{ - Account, ApplicationId, ChainId, ChannelName, Destination, MessageId, Owner, StreamName, + Account, ApplicationId, ChainId, ChannelName, Destination, EventId, MessageId, Owner, + StreamName, }, ownership::{ChainOwnership, CloseChainError}, }; @@ -292,6 +293,11 @@ where pub fn assert_data_blob_exists(&mut self, hash: DataBlobHash) { wit::assert_data_blob_exists(hash.0.into()) } + + /// Reads an event with the given ID from storage. + pub fn read_event(event_id: EventId) -> Vec { + wit::read_event(&event_id.into()) + } } /// A helper type that uses the builder pattern to configure how a message is sent, and then diff --git a/linera-sdk/src/contract/test_runtime.rs b/linera-sdk/src/contract/test_runtime.rs index 5aecb22fd78e..608d50008fbb 100644 --- a/linera-sdk/src/contract/test_runtime.rs +++ b/linera-sdk/src/contract/test_runtime.rs @@ -14,7 +14,8 @@ use linera_base::{ Amount, ApplicationPermissions, BlockHeight, Resources, SendMessageRequest, Timestamp, }, identifiers::{ - Account, ApplicationId, ChainId, ChannelName, Destination, MessageId, Owner, StreamName, + Account, ApplicationId, ChainId, ChannelName, Destination, EventId, MessageId, Owner, + StreamName, }, ownership::{ChainOwnership, CloseChainError}, }; @@ -54,6 +55,7 @@ where expected_assert_data_blob_exists_requests: VecDeque<(DataBlobHash, Option<()>)>, expected_open_chain_calls: VecDeque<(ChainOwnership, ApplicationPermissions, Amount, MessageId)>, + expected_read_event_requests: VecDeque<(EventId, Vec)>, key_value_store: KeyValueStore, } @@ -99,6 +101,7 @@ where expected_read_data_blob_requests: VecDeque::new(), expected_assert_data_blob_exists_requests: VecDeque::new(), expected_open_chain_calls: VecDeque::new(), + expected_read_event_requests: VecDeque::new(), key_value_store: KeyValueStore::mock().to_mut(), } } @@ -682,7 +685,7 @@ where } /// Adds an expected `read_data_blob` call, and the response it should return in the test. - pub fn add_expected_read_data_blob_requests(&mut self, hash: DataBlobHash, response: Vec) { + pub fn add_expected_read_data_blob_request(&mut self, hash: DataBlobHash, response: Vec) { self.expected_read_data_blob_requests .push_back((hash, response)); } @@ -697,6 +700,12 @@ where .push_back((hash, response)); } + /// Adds an expected `read_event` call, and the response it should return in the test. + pub fn add_expected_read_event_request(&mut self, event_id: EventId, event_value: Vec) { + self.expected_read_event_requests + .push_back((event_id, event_value)); + } + /// Queries our application service as an oracle and returns the response. /// /// Should only be used with queries where it is very likely that all validators will compute @@ -759,6 +768,14 @@ where assert_eq!(hash, expected_blob_hash); response.expect("Blob does not exist!"); } + + /// Reads an event with the given ID from storage. + pub fn read_event(&mut self, event_id: EventId) -> Vec { + let maybe_event = self.expected_read_event_requests.pop_front(); + let (expected_event_id, event_value) = maybe_event.expect("Unexpected read_event request"); + assert_eq!(event_id, expected_event_id); + event_value + } } /// A type alias for the handler for cross-application calls. diff --git a/linera-sdk/src/service/conversions_to_wit.rs b/linera-sdk/src/service/conversions_to_wit.rs index 8319f2af0f0e..ecdde5f90dac 100644 --- a/linera-sdk/src/service/conversions_to_wit.rs +++ b/linera-sdk/src/service/conversions_to_wit.rs @@ -6,7 +6,10 @@ use linera_base::{ crypto::CryptoHash, data_types::BlockHeight, - identifiers::{ApplicationId, BytecodeId, ChainId, MessageId, Owner}, + identifiers::{ + ApplicationId, BytecodeId, ChainId, EventId, GenericApplicationId, MessageId, Owner, + StreamId, StreamName, + }, }; use super::wit::service_system_api as wit_system_api; @@ -87,3 +90,50 @@ impl From for wit_system_api::MessageId { } } } + +impl From for wit_system_api::EventId { + fn from(event_id: EventId) -> Self { + let EventId { + chain_id, + stream_id, + key, + } = event_id; + Self { + chain_id: chain_id.into(), + stream_id: stream_id.into(), + key, + } + } +} + +impl From for wit_system_api::StreamId { + fn from(stream_id: StreamId) -> Self { + let StreamId { + application_id, + stream_name, + } = stream_id; + Self { + application_id: application_id.into(), + stream_name: stream_name.into(), + } + } +} + +impl From for wit_system_api::StreamName { + fn from(name: StreamName) -> Self { + wit_system_api::StreamName { + inner0: name.into_bytes(), + } + } +} + +impl From for wit_system_api::GenericApplicationId { + fn from(app_id: GenericApplicationId) -> Self { + match app_id { + GenericApplicationId::System => wit_system_api::GenericApplicationId::System, + GenericApplicationId::User(app_id) => { + wit_system_api::GenericApplicationId::User(app_id.into()) + } + } + } +} diff --git a/linera-sdk/src/service/runtime.rs b/linera-sdk/src/service/runtime.rs index a1eb73610118..857b27ba34f4 100644 --- a/linera-sdk/src/service/runtime.rs +++ b/linera-sdk/src/service/runtime.rs @@ -8,7 +8,7 @@ use std::cell::Cell; use linera_base::{ abi::ServiceAbi, data_types::{Amount, BlockHeight, Timestamp}, - identifiers::{ApplicationId, ChainId, Owner}, + identifiers::{ApplicationId, ChainId, EventId, Owner}, }; use super::wit::service_system_api as wit; @@ -159,4 +159,9 @@ where pub fn assert_data_blob_exists(&mut self, hash: DataBlobHash) { wit::assert_data_blob_exists(hash.0.into()) } + + /// Reads an event with the given ID from storage. + pub fn read_event(event_id: EventId) -> Vec { + wit::read_event(&event_id.into()) + } } diff --git a/linera-sdk/src/service/test_runtime.rs b/linera-sdk/src/service/test_runtime.rs index 0c9939135a9f..39fff1c2217c 100644 --- a/linera-sdk/src/service/test_runtime.rs +++ b/linera-sdk/src/service/test_runtime.rs @@ -11,7 +11,7 @@ use std::{ use linera_base::{ abi::ServiceAbi, data_types::{Amount, BlockHeight, Timestamp}, - identifiers::{ApplicationId, ChainId, Owner}, + identifiers::{ApplicationId, ChainId, EventId, Owner}, }; use crate::{DataBlobHash, KeyValueStore, Service, ViewStorageContext}; @@ -428,6 +428,11 @@ where cell.set(Some(value.clone())); value } + + /// Reads an event with the given ID from storage. + pub fn read_event(_event_id: EventId) -> Vec { + panic!("Unexpected read_event call"); + } } /// A type alias for the handler for application queries. diff --git a/linera-sdk/wit/contract-system-api.wit b/linera-sdk/wit/contract-system-api.wit index 814d7bfdcfab..4303ed7b9a03 100644 --- a/linera-sdk/wit/contract-system-api.wit +++ b/linera-sdk/wit/contract-system-api.wit @@ -28,6 +28,7 @@ interface contract-system-api { assert-before: func(timestamp: timestamp); read-data-blob: func(hash: crypto-hash) -> list; assert-data-blob-exists: func(hash: crypto-hash); + read-event: func(event-id: event-id) -> list; log: func(message: string, level: log-level); consume-fuel: func(fuel: u64); @@ -91,6 +92,17 @@ interface contract-system-api { subscribers(channel-name), } + record event-id { + chain-id: chain-id, + stream-id: stream-id, + key: list, + } + + variant generic-application-id { + system, + user(application-id), + } + enum log-level { error, warn, @@ -135,6 +147,11 @@ interface contract-system-api { message: list, } + record stream-id { + application-id: generic-application-id, + stream-name: stream-name, + } + record stream-name { inner0: list, } diff --git a/linera-sdk/wit/service-system-api.wit b/linera-sdk/wit/service-system-api.wit index d6d0639d602a..4f55b0c0ff72 100644 --- a/linera-sdk/wit/service-system-api.wit +++ b/linera-sdk/wit/service-system-api.wit @@ -17,6 +17,7 @@ interface service-system-api { http-post: func(query: string, content-type: string, payload: list) -> list; read-data-blob: func(hash: crypto-hash) -> list; assert-data-blob-exists: func(hash: crypto-hash); + read-event: func(event-id: event-id) -> list; assert-before: func(timestamp: timestamp); log: func(message: string, level: log-level); @@ -49,6 +50,17 @@ interface service-system-api { part4: u64, } + record event-id { + chain-id: chain-id, + stream-id: stream-id, + key: list, + } + + variant generic-application-id { + system, + user(application-id), + } + enum log-level { error, warn, @@ -67,6 +79,15 @@ interface service-system-api { inner0: crypto-hash, } + record stream-id { + application-id: generic-application-id, + stream-name: stream-name, + } + + record stream-name { + inner0: list, + } + record timestamp { inner0: u64, } diff --git a/linera-storage/src/db_storage.rs b/linera-storage/src/db_storage.rs index 62ae2ae16b8a..cc5287960853 100644 --- a/linera-storage/src/db_storage.rs +++ b/linera-storage/src/db_storage.rs @@ -3,14 +3,14 @@ #[cfg(with_metrics)] use std::sync::LazyLock; -use std::{fmt::Debug, sync::Arc}; +use std::{borrow::Cow, fmt::Debug, sync::Arc}; use async_trait::async_trait; use dashmap::DashMap; use linera_base::{ crypto::CryptoHash, data_types::{Blob, TimeDelta, Timestamp}, - identifiers::{BlobId, ChainId, UserApplicationId}, + identifiers::{BlobId, ChainId, EventId, UserApplicationId}, }; use linera_chain::{ data_types::{Certificate, CertificateValue, HashedCertificateValue, LiteCertificate}, @@ -192,6 +192,30 @@ pub static WRITE_CERTIFICATE_COUNTER: LazyLock = LazyLock::new(|| .expect("Counter creation should not fail") }); +/// The metric counting how often an event is read from storage. +#[cfg(with_metrics)] +#[doc(hidden)] +pub static READ_EVENT_COUNTER: LazyLock = LazyLock::new(|| { + prometheus_util::register_int_counter_vec( + "read_event", + "The metric counting how often an event is read from storage", + &[], + ) + .expect("Counter creation should not fail") +}); + +/// The metric counting how often an event is written to storage. +#[cfg(with_metrics)] +#[doc(hidden)] +pub static WRITE_EVENT_COUNTER: LazyLock = LazyLock::new(|| { + prometheus_util::register_int_counter_vec( + "write_event", + "The metric counting how often an event is written to storage", + &[], + ) + .expect("Counter creation should not fail") +}); + /// The latency to load a chain state. #[cfg(with_metrics)] #[doc(hidden)] @@ -219,12 +243,13 @@ pub struct DbStorage { } #[derive(Debug, Serialize, Deserialize)] -enum BaseKey { +enum BaseKey<'a> { ChainState(ChainId), Certificate(CryptoHash), CertificateValue(CryptoHash), Blob(BlobId), BlobState(BlobId), + Event(Cow<'a, EventId>), } /// An implementation of [`DualStoreRootKeyAssignment`] that stores the @@ -651,6 +676,22 @@ where self.write_batch(batch).await } + async fn read_event(&self, event_id: &EventId) -> Result, ViewError> { + let event_key = bcs::to_bytes(&BaseKey::Event(Cow::Borrowed(event_id)))?; + let maybe_value = self.store.read_value::>(&event_key).await?; + #[cfg(with_metrics)] + READ_EVENT_COUNTER.with_label_values(&[]).inc(); + maybe_value.ok_or_else(|| ViewError::not_found("value for event ID", event_id)) + } + + async fn write_events(&self, events: &[(EventId, &[u8])]) -> Result<(), ViewError> { + let mut batch = Batch::new(); + for (event_id, value) in events { + Self::add_event_to_batch(event_id, value, &mut batch)?; + } + self.write_batch(batch).await + } + fn wasm_runtime(&self) -> Option { self.wasm_runtime } @@ -707,6 +748,18 @@ where Ok(()) } + fn add_event_to_batch( + event_id: &EventId, + value: &[u8], + batch: &mut Batch, + ) -> Result<(), ViewError> { + #[cfg(with_metrics)] + WRITE_EVENT_COUNTER.with_label_values(&[]).inc(); + let event_key = bcs::to_bytes(&BaseKey::Event(Cow::Borrowed(event_id)))?; + batch.put_key_value_bytes(event_key.to_vec(), value.to_vec()); + Ok(()) + } + async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> { self.store.write_batch(batch).await?; Ok(()) diff --git a/linera-storage/src/lib.rs b/linera-storage/src/lib.rs index 927b982c15c4..3a6ce27ed7cf 100644 --- a/linera-storage/src/lib.rs +++ b/linera-storage/src/lib.rs @@ -15,7 +15,9 @@ use futures::future; use linera_base::{ crypto::{CryptoHash, PublicKey}, data_types::{Amount, Blob, BlockHeight, TimeDelta, Timestamp, UserApplicationDescription}, - identifiers::{BlobId, ChainDescription, ChainId, GenericApplicationId, UserApplicationId}, + identifiers::{ + BlobId, ChainDescription, ChainId, EventId, GenericApplicationId, UserApplicationId, + }, ownership::ChainOwnership, }; use linera_chain::{ @@ -163,6 +165,12 @@ pub trait Storage: Sized { /// Writes a vector of certificates. async fn write_certificates(&self, certificate: &[Certificate]) -> Result<(), ViewError>; + /// Reads the event with the given ID. + async fn read_event(&self, event_id: &EventId) -> Result, ViewError>; + + /// Writes a vector of events. + async fn write_events(&self, events: &[(EventId, &[u8])]) -> Result<(), ViewError>; + /// Loads the view of a chain state and checks that it is active. /// /// # Notes @@ -424,6 +432,10 @@ where async fn contains_blob(&self, blob_id: BlobId) -> Result { self.storage.contains_blob(blob_id).await } + + async fn get_event(&self, event_id: &EventId) -> Result, ExecutionError> { + Ok(self.storage.read_event(event_id).await?) + } } /// A clock that can be used to get the current `Timestamp`. diff --git a/linera-views/src/backends/dual.rs b/linera-views/src/backends/dual.rs index 15c8337bfc0a..e4a2d2aed192 100644 --- a/linera-views/src/backends/dual.rs +++ b/linera-views/src/backends/dual.rs @@ -413,7 +413,11 @@ where K1: KeyIterable, K2: KeyIterable, { - type Iterator<'a> = DualStoreKeyIterator, K2::Iterator<'a>> where K1: 'a, K2: 'a; + type Iterator<'a> + = DualStoreKeyIterator, K2::Iterator<'a>> + where + K1: 'a, + K2: 'a; fn iterator(&self) -> Self::Iterator<'_> { match self { @@ -447,7 +451,11 @@ where K1: KeyValueIterable, K2: KeyValueIterable, { - type Iterator<'a> = DualStoreKeyValueIterator, K2::Iterator<'a>> where K1: 'a, K2: 'a; + type Iterator<'a> + = DualStoreKeyValueIterator, K2::Iterator<'a>> + where + K1: 'a, + K2: 'a; type IteratorOwned = DualStoreKeyValueIteratorOwned; fn iterator(&self) -> Self::Iterator<'_> { diff --git a/linera-views/src/backends/dynamo_db.rs b/linera-views/src/backends/dynamo_db.rs index 70f3588c01db..cfa1336a6fa0 100644 --- a/linera-views/src/backends/dynamo_db.rs +++ b/linera-views/src/backends/dynamo_db.rs @@ -748,7 +748,10 @@ pub struct DynamoDbKeys { } impl KeyIterable for DynamoDbKeys { - type Iterator<'a> = DynamoDbKeyBlockIterator<'a> where Self: 'a; + type Iterator<'a> + = DynamoDbKeyBlockIterator<'a> + where + Self: 'a; fn iterator(&self) -> Self::Iterator<'_> { let pos = 0; @@ -835,7 +838,10 @@ impl Iterator for DynamoDbKeyValueIteratorOwned { } impl KeyValueIterable for DynamoDbKeyValues { - type Iterator<'a> = DynamoDbKeyValueIterator<'a> where Self: 'a; + type Iterator<'a> + = DynamoDbKeyValueIterator<'a> + where + Self: 'a; type IteratorOwned = DynamoDbKeyValueIteratorOwned; fn iterator(&self) -> Self::Iterator<'_> { diff --git a/linera-witty/src/exported_function_interface/result_storage.rs b/linera-witty/src/exported_function_interface/result_storage.rs index 3f1c93b64a4c..a66bd2df4e61 100644 --- a/linera-witty/src/exported_function_interface/result_storage.rs +++ b/linera-witty/src/exported_function_interface/result_storage.rs @@ -49,7 +49,8 @@ pub trait ResultStorage { } impl ResultStorage for () { - type OutputFor = ::Flat + type OutputFor + = ::Flat where HostResults: WitStore; @@ -68,7 +69,8 @@ impl ResultStorage for () { } impl ResultStorage for GuestPointer { - type OutputFor = HNil + type OutputFor + = HNil where HostResults: WitStore; diff --git a/linera-witty/src/runtime/borrowed_instance.rs b/linera-witty/src/runtime/borrowed_instance.rs index c944c18fac0d..ff6a0d345be9 100644 --- a/linera-witty/src/runtime/borrowed_instance.rs +++ b/linera-witty/src/runtime/borrowed_instance.rs @@ -20,11 +20,13 @@ where { type Runtime = I::Runtime; type UserData = I::UserData; - type UserDataReference<'a> = I::UserDataReference<'a> + type UserDataReference<'a> + = I::UserDataReference<'a> where Self::UserData: 'a, Self: 'a; - type UserDataMutReference<'a> = I::UserDataMutReference<'a> + type UserDataMutReference<'a> + = I::UserDataMutReference<'a> where Self::UserData: 'a, Self: 'a; diff --git a/linera-witty/src/runtime/test.rs b/linera-witty/src/runtime/test.rs index 42eef478c435..67cce31cd33c 100644 --- a/linera-witty/src/runtime/test.rs +++ b/linera-witty/src/runtime/test.rs @@ -180,11 +180,13 @@ impl MockInstance { impl Instance for MockInstance { type Runtime = MockRuntime; type UserData = UserData; - type UserDataReference<'a> = MutexGuard<'a, UserData> + type UserDataReference<'a> + = MutexGuard<'a, UserData> where Self::UserData: 'a, Self: 'a; - type UserDataMutReference<'a> = MutexGuard<'a, UserData> + type UserDataMutReference<'a> + = MutexGuard<'a, UserData> where Self::UserData: 'a, Self: 'a; diff --git a/linera-witty/src/runtime/wasmer/mod.rs b/linera-witty/src/runtime/wasmer/mod.rs index cfb572592481..d3c5106f18f1 100644 --- a/linera-witty/src/runtime/wasmer/mod.rs +++ b/linera-witty/src/runtime/wasmer/mod.rs @@ -131,11 +131,13 @@ impl EntrypointInstance { impl Instance for EntrypointInstance { type Runtime = Wasmer; type UserData = UserData; - type UserDataReference<'a> = MutexGuard<'a, UserData> + type UserDataReference<'a> + = MutexGuard<'a, UserData> where Self::UserData: 'a, Self: 'a; - type UserDataMutReference<'a> = MutexGuard<'a, UserData> + type UserDataMutReference<'a> + = MutexGuard<'a, UserData> where Self::UserData: 'a, Self: 'a; @@ -160,11 +162,13 @@ pub type ReentrantInstance<'a, UserData> = FunctionEnvMut<'a, Environment Instance for ReentrantInstance<'_, UserData> { type Runtime = Wasmer; type UserData = UserData; - type UserDataReference<'a> = MutexGuard<'a, UserData> + type UserDataReference<'a> + = MutexGuard<'a, UserData> where Self::UserData: 'a, Self: 'a; - type UserDataMutReference<'a> = MutexGuard<'a, UserData> + type UserDataMutReference<'a> + = MutexGuard<'a, UserData> where Self::UserData: 'a, Self: 'a; diff --git a/linera-witty/src/runtime/wasmtime/mod.rs b/linera-witty/src/runtime/wasmtime/mod.rs index 20c95af972c9..aa2c4c5901a2 100644 --- a/linera-witty/src/runtime/wasmtime/mod.rs +++ b/linera-witty/src/runtime/wasmtime/mod.rs @@ -55,11 +55,13 @@ impl AsContextMut for EntrypointInstance { impl Instance for EntrypointInstance { type Runtime = Wasmtime; type UserData = UserData; - type UserDataReference<'a> = &'a UserData + type UserDataReference<'a> + = &'a UserData where Self: 'a, UserData: 'a; - type UserDataMutReference<'a> = &'a mut UserData + type UserDataMutReference<'a> + = &'a mut UserData where Self: 'a, UserData: 'a; @@ -84,11 +86,13 @@ pub type ReentrantInstance<'a, UserData> = Caller<'a, UserData>; impl Instance for Caller<'_, UserData> { type Runtime = Wasmtime; type UserData = UserData; - type UserDataReference<'a> = &'a UserData + type UserDataReference<'a> + = &'a UserData where Self: 'a, UserData: 'a; - type UserDataMutReference<'a> = &'a mut UserData + type UserDataMutReference<'a> + = &'a mut UserData where Self: 'a, UserData: 'a; diff --git a/linera-witty/src/wit_generation/stub_instance.rs b/linera-witty/src/wit_generation/stub_instance.rs index cb64ed69d5b1..5d4b8bae4b2f 100644 --- a/linera-witty/src/wit_generation/stub_instance.rs +++ b/linera-witty/src/wit_generation/stub_instance.rs @@ -30,11 +30,13 @@ impl Default for StubInstance { impl Instance for StubInstance { type Runtime = StubRuntime; type UserData = UserData; - type UserDataReference<'a> = &'a UserData + type UserDataReference<'a> + = &'a UserData where Self::UserData: 'a, Self: 'a; - type UserDataMutReference<'a> = &'a mut UserData + type UserDataMutReference<'a> + = &'a mut UserData where Self::UserData: 'a, Self: 'a;