Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions linera-base/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use crate::{
crypto::BcsHashable,
doc_scalar, hex_debug,
identifiers::{
ApplicationId, BlobId, BlobType, BytecodeId, Destination, GenericApplicationId, MessageId,
UserApplicationId,
ApplicationId, BlobId, BlobType, BytecodeId, Destination, EventId, GenericApplicationId,
MessageId, UserApplicationId,
},
time::{Duration, SystemTime},
};
Expand Down Expand Up @@ -740,6 +740,8 @@ pub enum OracleResponse {
Post(Vec<u8>),
/// A successful read or write of a blob.
Blob(BlobId),
/// A successful read of an event.
Event(EventId, Vec<u8>),
/// An assertion oracle that passed.
Assert,
}
Expand All @@ -759,6 +761,12 @@ impl Display for OracleResponse {
}
OracleResponse::Post(bytes) => write!(f, "Post:{}", STANDARD_NO_PAD.encode(bytes))?,
OracleResponse::Blob(blob_id) => write!(f, "Blob:{}", blob_id)?,
OracleResponse::Event(event_id, event_value) => write!(
f,
"Event:{}:{}",
event_id,
STANDARD_NO_PAD.encode(event_value)
)?,
OracleResponse::Assert => write!(f, "Assert")?,
};

Expand Down
58 changes: 57 additions & 1 deletion linera-base/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
//! Core identifiers used by the Linera protocol.

use std::{
fmt::{self, Debug, Display},
fmt::{self, Debug, Display, Formatter},
hash::{Hash, Hasher},
marker::PhantomData,
str::FromStr,
};

use anyhow::{anyhow, Context};
use async_graphql::SimpleObject;
use base64::engine::{general_purpose::STANDARD_NO_PAD, Engine as _};
use linera_witty::{WitLoad, WitStore, WitType};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -337,6 +338,15 @@ impl GenericApplicationId {
}
}

impl Display for GenericApplicationId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
GenericApplicationId::System => write!(f, "System"),
GenericApplicationId::User(app_id) => Display::fmt(app_id, f),
}
}
}

impl From<ApplicationId> for GenericApplicationId {
fn from(user_application_id: ApplicationId) -> Self {
GenericApplicationId::User(user_application_id)
Expand Down Expand Up @@ -412,6 +422,52 @@ pub struct StreamId {
pub stream_name: StreamName,
}

impl Display for StreamId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}:{}",
self.application_id,
STANDARD_NO_PAD.encode(&self.stream_name.0)
)
}
}

/// An event identifier.
#[derive(
Debug,
PartialEq,
Eq,
Hash,
Clone,
Serialize,
Deserialize,
WitLoad,
WitStore,
WitType,
SimpleObject,
)]
pub struct EventId {
/// The ID of the chain that generated this event.
pub chain_id: ChainId,
/// The ID of the stream this event belongs to.
pub stream_id: StreamId,
/// The event key.
pub key: Vec<u8>,
Comment thread
deuszx marked this conversation as resolved.
}

impl Display for EventId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}:{}:{}",
self.chain_id,
self.stream_id,
STANDARD_NO_PAD.encode(&self.key)
)
}
}

/// The destination of a message, relative to a particular application.
#[derive(
Clone,
Expand Down
14 changes: 13 additions & 1 deletion linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::future::try_join_all;
use linera_base::{
data_types::{Blob, BlockHeight, Timestamp},
ensure,
identifiers::{ChainId, MessageId},
identifiers::{ChainId, EventId, MessageId},
};
use linera_chain::{
data_types::{
Expand Down Expand Up @@ -300,11 +300,23 @@ where

let blobs_in_block = self.state.get_blobs(required_blob_ids.clone()).await?;
let certificate_hash = certificate.hash();
let events_iter = executed_block.outcome.events.iter().flatten();
let events = events_iter
.map(|event| {
let event_id = EventId {
chain_id: block.chain_id,
stream_id: event.stream_id.clone(),
key: event.key.clone(),
};
(event_id, &event.value[..])
})
.collect::<Vec<_>>();

self.state
.storage
.write_blobs_and_certificate(&blobs_in_block, &certificate)
.await?;
self.state.storage.write_events(&events).await?;

// Update the blob state with last used certificate hash.
try_join_all(required_blob_ids.into_iter().map(|blob_id| {
Expand Down
22 changes: 21 additions & 1 deletion linera-execution/src/execution_state_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::channel::mpsc;
use linera_base::prometheus_util::{self, MeasureLatency as _};
use linera_base::{
data_types::{Amount, ApplicationPermissions, BlobContent, Timestamp},
identifiers::{Account, BlobId, MessageId, Owner},
identifiers::{Account, BlobId, EventId, MessageId, Owner},
ownership::ChainOwnership,
};
use linera_views::{batch::Batch, context::Context, views::View};
Expand Down Expand Up @@ -318,6 +318,16 @@ where
self.system.assert_blob_exists(blob_id).await?;
callback.respond(())
}

ReadEvent { event_id, callback } => {
let blob = self
.context()
.extra()
.get_event(&event_id)
.await
.map_err(|_| ExecutionError::EventNotFound(Box::new(event_id)))?;
callback.respond(blob);
}
}

Ok(())
Expand Down Expand Up @@ -453,6 +463,11 @@ pub enum ExecutionRequest {
blob_id: BlobId,
callback: Sender<()>,
},

ReadEvent {
event_id: EventId,
callback: oneshot::Sender<Vec<u8>>,
},
}

impl Debug for ExecutionRequest {
Expand Down Expand Up @@ -591,6 +606,11 @@ impl Debug for ExecutionRequest {
.field("blob_id", blob_id)
.finish_non_exhaustive(),

ExecutionRequest::ReadEvent { event_id, .. } => formatter
.debug_struct("ExecutionRequest::ReadEvent")
.field("event_id", event_id)
.finish_non_exhaustive(),

ExecutionRequest::AssertBlobExists { blob_id, .. } => formatter
.debug_struct("ExecutionRequest::AssertBlobExists")
.field("blob_id", blob_id)
Expand Down
19 changes: 18 additions & 1 deletion linera-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use linera_base::{
},
doc_scalar, hex_debug,
identifiers::{
Account, ApplicationId, BlobId, BytecodeId, ChainId, ChannelName, Destination,
Account, ApplicationId, BlobId, BytecodeId, ChainId, ChannelName, Destination, EventId,
GenericApplicationId, MessageId, Owner, StreamName, UserApplicationId,
},
ownership::ChainOwnership,
Expand Down Expand Up @@ -201,6 +201,8 @@ pub enum ExecutionError {
EventKeyTooLong,
#[error("Stream names can be at most {MAX_STREAM_NAME_LEN} bytes.")]
StreamNameTooLong,
#[error("Event not found: {0}")]
EventNotFound(Box<EventId>),
// TODO(#2127): Remove this error and the unstable-oracles feature once there are fees
// and enforced limits for all oracles.
#[error("Unstable oracles are disabled on this network.")]
Expand Down Expand Up @@ -290,6 +292,8 @@ pub trait ExecutionRuntimeContext {
async fn get_blob(&self, blob_id: BlobId) -> Result<Blob, ExecutionError>;

async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;

async fn get_event(&self, event_id: &EventId) -> Result<Vec<u8>, ExecutionError>;
}

#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -524,6 +528,9 @@ pub trait BaseRuntime {

/// Asserts the existence of a data blob with the given hash.
fn assert_data_blob_exists(&mut self, hash: &CryptoHash) -> Result<(), ExecutionError>;

/// Reads an event.
fn read_event(&mut self, event_id: EventId) -> Result<Vec<u8>, ExecutionError>;
}

pub trait ServiceRuntime: BaseRuntime {
Expand Down Expand Up @@ -890,6 +897,7 @@ pub struct TestExecutionRuntimeContext {
user_contracts: Arc<DashMap<UserApplicationId, UserContractCode>>,
user_services: Arc<DashMap<UserApplicationId, UserServiceCode>>,
blobs: Arc<DashMap<BlobId, Blob>>,
events: Arc<DashMap<EventId, Vec<u8>>>,
}

#[cfg(with_testing)]
Expand All @@ -901,6 +909,7 @@ impl TestExecutionRuntimeContext {
user_contracts: Arc::default(),
user_services: Arc::default(),
blobs: Arc::default(),
events: Arc::default(),
}
}

Expand Down Expand Up @@ -967,6 +976,14 @@ impl ExecutionRuntimeContext for TestExecutionRuntimeContext {
async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
Ok(self.blobs.contains_key(&blob_id))
}

async fn get_event(&self, event_id: &EventId) -> Result<Vec<u8>, ExecutionError> {
Ok(self
.events
.get(event_id)
.ok_or_else(|| ExecutionError::EventNotFound(Box::new(event_id.clone())))?
.clone())
}
}

impl From<SystemOperation> for Operation {
Expand Down
26 changes: 25 additions & 1 deletion linera-execution/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use linera_base::{
},
ensure,
identifiers::{
Account, ApplicationId, BlobId, BlobType, ChainId, ChannelName, MessageId, Owner,
Account, ApplicationId, BlobId, BlobType, ChainId, ChannelName, EventId, MessageId, Owner,
StreamName,
},
ownership::ChainOwnership,
Expand Down Expand Up @@ -694,6 +694,10 @@ impl<UserInstance> BaseRuntime for SyncRuntimeHandle<UserInstance> {
fn assert_data_blob_exists(&mut self, hash: &CryptoHash) -> Result<(), ExecutionError> {
self.inner().assert_data_blob_exists(hash)
}

fn read_event(&mut self, event_id: EventId) -> Result<Vec<u8>, ExecutionError> {
self.inner().read_event(event_id)
}
}

impl<UserInstance> BaseRuntime for SyncRuntimeInternal<UserInstance> {
Expand Down Expand Up @@ -1021,6 +1025,26 @@ impl<UserInstance> BaseRuntime for SyncRuntimeInternal<UserInstance> {
.recv_response()?;
Ok(())
}

fn read_event(&mut self, event_id: EventId) -> Result<Vec<u8>, ExecutionError> {
let event_value =
if let Some(response) = self.transaction_tracker.next_replayed_oracle_response()? {
match response {
OracleResponse::Event(id, bytes) if id == event_id => bytes,
_ => return Err(ExecutionError::OracleResponseMismatch),
}
} else {
self.execution_state_sender
.send_request(|callback| ExecutionRequest::ReadEvent {
event_id: event_id.clone(),
callback,
})?
.recv_response()?
};
self.transaction_tracker
.add_oracle_response(OracleResponse::Event(event_id, event_value.clone()));
Ok(event_value)
}
}

impl<UserInstance> Clone for SyncRuntimeHandle<UserInstance> {
Expand Down
22 changes: 21 additions & 1 deletion linera-execution/src/wasm/system_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use std::{any::Any, collections::HashMap, marker::PhantomData};
use linera_base::{
crypto::CryptoHash,
data_types::{Amount, ApplicationPermissions, BlockHeight, SendMessageRequest, Timestamp},
identifiers::{Account, ApplicationId, ChainId, ChannelName, MessageId, Owner, StreamName},
identifiers::{
Account, ApplicationId, ChainId, ChannelName, EventId, MessageId, Owner, StreamName,
},
ownership::{ChainOwnership, CloseChainError},
};
use linera_views::batch::{Batch, WriteOperation};
Expand Down Expand Up @@ -380,6 +382,15 @@ where
.map_err(|error| RuntimeError::Custom(error.into()))
}

/// Reads an event from storage.
fn read_event(caller: &mut Caller, event_id: EventId) -> Result<Vec<u8>, RuntimeError> {
caller
.user_data_mut()
.runtime
.read_event(event_id)
.map_err(|error| RuntimeError::Custom(error.into()))
}

/// Logs a `message` with the provided information `level`.
fn log(_caller: &mut Caller, message: String, level: log::Level) -> Result<(), RuntimeError> {
match level {
Expand Down Expand Up @@ -572,6 +583,15 @@ where
.map_err(|error| RuntimeError::Custom(error.into()))
}

/// Reads an event from storage.
fn read_event(caller: &mut Caller, event_id: EventId) -> Result<Vec<u8>, RuntimeError> {
caller
.user_data_mut()
.runtime
.read_event(event_id)
.map_err(|error| RuntimeError::Custom(error.into()))
}

/// Aborts the query if the current time at block validation is `>= timestamp`. Note that block
/// validation happens at or after the block timestamp, but isn't necessarily the same.
fn assert_before(caller: &mut Caller, timestamp: Timestamp) -> Result<(), RuntimeError> {
Expand Down
13 changes: 13 additions & 0 deletions linera-rpc/tests/snapshots/format__format.yaml.snap
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,14 @@ Destination:
TYPENAME: ChannelName
Epoch:
NEWTYPESTRUCT: U32
EventId:
STRUCT:
- chain_id:
TYPENAME: ChainId
- stream_id:
TYPENAME: StreamId
- key:
SEQ: U8
EventRecord:
STRUCT:
- stream_id:
Expand Down Expand Up @@ -649,6 +657,11 @@ OracleResponse:
NEWTYPE:
TYPENAME: BlobId
3:
Event:
TUPLE:
- TYPENAME: EventId
- SEQ: U8
4:
Assert: UNIT
Origin:
STRUCT:
Expand Down
Loading