Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions linera-base/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,34 +418,24 @@ pub struct ChannelName(
/// A channel name together with its application ID.
pub struct ChannelFullName {
/// The application owning the channel.
pub application_id: GenericApplicationId,
pub application_id: ApplicationId,
/// The name of the channel.
pub name: ChannelName,
}

impl fmt::Display for ChannelFullName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = hex::encode(&self.name);
match self.application_id {
GenericApplicationId::System => write!(f, "system channel {name}"),
GenericApplicationId::User(app_id) => write!(f, "user channel {name} for app {app_id}"),
}
let app_id = self.application_id;
write!(f, "user channel {name} for app {app_id}")
}
}

impl ChannelFullName {
/// Creates a full system channel name.
pub fn system(name: ChannelName) -> Self {
Self {
application_id: GenericApplicationId::System,
name,
}
}

/// Creates a full user channel name.
pub fn user(name: ChannelName, application_id: ApplicationId) -> Self {
pub fn new(name: ChannelName, application_id: ApplicationId) -> Self {
Self {
application_id: application_id.into(),
application_id,
name,
}
}
Expand Down
27 changes: 12 additions & 15 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use linera_base::{
},
ensure,
identifiers::{
AccountOwner, ApplicationId, BlobType, ChainId, ChannelFullName, Destination, MessageId,
AccountOwner, ApplicationId, BlobType, ChainId, ChannelFullName, Destination,
GenericApplicationId, MessageId,
},
ownership::ChainOwnership,
};
Expand Down Expand Up @@ -506,7 +507,7 @@ where
bundle: MessageBundle,
local_time: Timestamp,
add_to_received_log: bool,
) -> Result<bool, ChainError> {
) -> Result<(), ChainError> {
assert!(!bundle.messages.is_empty());
let chain_id = self.chain_id();
tracing::trace!(
Expand All @@ -517,8 +518,6 @@ where
chain_id: origin.sender,
height: bundle.height,
};
let mut subscribe_names_and_ids = Vec::new();
let mut unsubscribe_names_and_ids = Vec::new();

// Handle immediate messages.
for posted_message in &bundle.messages {
Expand All @@ -528,17 +527,8 @@ where
self.execute_init_message(message_id, config, bundle.timestamp, local_time)
.await?;
}
} else if let Some((id, subscription)) = posted_message.message.matches_subscribe() {
let name = ChannelFullName::system(subscription.name.clone());
subscribe_names_and_ids.push((name, *id));
}
if let Some((id, subscription)) = posted_message.message.matches_unsubscribe() {
let name = ChannelFullName::system(subscription.name.clone());
unsubscribe_names_and_ids.push((name, *id));
}
}
self.process_unsubscribes(unsubscribe_names_and_ids).await?;
let new_outbox_entries = self.process_subscribes(subscribe_names_and_ids).await?;

if bundle.goes_to_inbox() {
// Process the inbox bundle and update the inbox state.
Expand Down Expand Up @@ -569,7 +559,7 @@ where
if add_to_received_log {
self.received_log.push(chain_and_height);
}
Ok(new_outbox_entries)
Ok(())
}

/// Updates the `received_log` trackers.
Expand Down Expand Up @@ -1167,8 +1157,15 @@ where
message.grant == Amount::ZERO,
ChainError::GrantUseOnBroadcast
);
let GenericApplicationId::User(application_id) =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is GenericApplicationId::System used anywhere else now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in events.

message.message.application_id()
else {
return Err(ChainError::InternalError(
"System messages cannot be sent to channels".to_string(),
));
};
channel_broadcasts.insert(ChannelFullName {
application_id: message.message.application_id(),
application_id,
name: name.clone(),
});
}
Expand Down
8 changes: 6 additions & 2 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use linera_base::{
hashed::Hashed,
hex_debug,
identifiers::{
Account, AccountOwner, BlobId, ChainId, ChannelFullName, Destination, MessageId,
Account, AccountOwner, BlobId, ChainId, ChannelFullName, Destination, GenericApplicationId,
MessageId,
},
};
use linera_execution::{
Expand Down Expand Up @@ -330,7 +331,10 @@ impl OutgoingMessageExt for OutgoingMessage {
application_id,
name,
}),
) => *application_id == self.message.application_id() && name == dest_name,
) => {
GenericApplicationId::User(*application_id) == self.message.application_id()
&& name == dest_name
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ where
origin: Origin,
bundles: Vec<(Epoch, MessageBundle)>,
#[debug(skip)]
callback: oneshot::Sender<Result<Option<(BlockHeight, NetworkActions)>, WorkerError>>,
callback: oneshot::Sender<Result<Option<BlockHeight>, WorkerError>>,
},

/// Handle cross-chain request to confirm that the recipient was updated.
Expand Down
19 changes: 4 additions & 15 deletions linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ where
&mut self,
origin: Origin,
bundles: Vec<(Epoch, MessageBundle)>,
) -> Result<Option<(BlockHeight, NetworkActions)>, WorkerError> {
) -> Result<Option<BlockHeight>, WorkerError> {
// Only process certificates with relevant heights and epochs.
let next_height_to_receive = self
.state
Expand All @@ -467,19 +467,14 @@ where
// Process the received messages in certificates.
let local_time = self.state.storage.clock().current_time();
let mut previous_height = None;
let mut new_outbox_entries = false;
for bundle in bundles {
let add_to_received_log = previous_height != Some(bundle.height);
previous_height = Some(bundle.height);
// Update the staged chain state with the received block.
if self
.state
self.state
.chain
.receive_message_bundle(&origin, bundle, local_time, add_to_received_log)
.await?
{
new_outbox_entries = true;
}
.await?;
}
if !self.state.config.allow_inactive_chains && !self.state.chain.is_active() {
// Refuse to create a chain state if the chain is still inactive by
Expand All @@ -491,15 +486,9 @@ where
);
return Ok(None);
}
let actions = if new_outbox_entries {
self.state.create_network_actions().await?
} else {
// Don't create network actions, so that old entries don't cause retry loops.
NetworkActions::default()
};
// Save the chain.
self.save().await?;
Ok(Some((last_updated_height, actions)))
Ok(Some(last_updated_height))
}

/// Handles the cross-chain request confirming that the recipient was updated.
Expand Down
2 changes: 1 addition & 1 deletion linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ where
&mut self,
origin: Origin,
bundles: Vec<(Epoch, MessageBundle)>,
) -> Result<Option<(BlockHeight, NetworkActions)>, WorkerError> {
) -> Result<Option<BlockHeight>, WorkerError> {
ChainWorkerStateWithAttemptedChanges::new(self)
.await
.process_cross_chain_update(origin, bundles)
Expand Down
20 changes: 3 additions & 17 deletions linera-core/src/chain_worker/state/temporary_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ use std::collections::HashMap;
use linera_base::{
data_types::{ApplicationDescription, ArithmeticError, Blob, Timestamp},
ensure,
identifiers::{AccountOwner, ApplicationId, ChannelFullName, GenericApplicationId},
identifiers::{AccountOwner, ApplicationId},
};
use linera_chain::{
data_types::{
BlockExecutionOutcome, BlockProposal, ExecutedBlock, IncomingBundle, Medium, MessageAction,
BlockExecutionOutcome, BlockProposal, ExecutedBlock, IncomingBundle, MessageAction,
ProposalContent, ProposedBlock,
},
manager,
};
use linera_execution::{ChannelSubscription, Query, QueryOutcome};
use linera_execution::{Query, QueryOutcome};
use linera_storage::{Clock as _, Storage};
use linera_views::views::{View, ViewError};
#[cfg(with_testing)]
Expand Down Expand Up @@ -305,21 +305,7 @@ where
} else {
MessageAction::Accept
};
let subscriptions = &chain.execution_state.system.subscriptions;
for (origin, inbox) in pairs {
if let Medium::Channel(ChannelFullName {
application_id: GenericApplicationId::System,
name,
}) = &origin.medium
{
let subscription = ChannelSubscription {
chain_id: origin.sender,
name: name.clone(),
};
if !subscriptions.contains(&subscription).await? {
continue; // We are not subscribed to this channel.
}
}
for bundle in inbox.added_bundles.elements().await? {
messages.push(IncomingBundle {
origin: origin.clone(),
Expand Down
5 changes: 2 additions & 3 deletions linera-core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ where
origin: Origin,
recipient: ChainId,
bundles: Vec<(Epoch, MessageBundle)>,
) -> Result<Option<(BlockHeight, NetworkActions)>, WorkerError> {
) -> Result<Option<BlockHeight>, WorkerError> {
self.query_chain_worker(recipient, move |callback| {
ChainWorkerRequest::ProcessCrossChainUpdate {
origin,
Expand Down Expand Up @@ -988,11 +988,10 @@ where
let mut actions = NetworkActions::default();
for (medium, bundles) in bundle_vecs {
let origin = Origin { sender, medium };
if let Some((height, new_actions)) = self
if let Some(height) = self
.process_cross_chain_update(origin.clone(), recipient, bundles)
.await?
{
actions.extend(new_actions);
height_by_origin.push((origin, height));
}
}
Expand Down
8 changes: 1 addition & 7 deletions linera-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use linera_base::{
};
use linera_views::{batch::Batch, views::ViewError};
use serde::{Deserialize, Serialize};
use system::{AdminOperation, OpenChainConfig, SystemChannel};
use system::{AdminOperation, OpenChainConfig};
use thiserror::Error;

#[cfg(with_revm)]
Expand Down Expand Up @@ -314,12 +314,6 @@ pub enum ExecutionError {
InvalidCommitteeEpoch { expected: Epoch, provided: Epoch },
#[error("Failed to remove committee")]
InvalidCommitteeRemoval,
#[error("Cannot subscribe to a channel ({1}) on the same chain ({0})")]
SelfSubscription(ChainId, SystemChannel),
#[error("Chain {0} tried to subscribe to channel {1} but it is already subscribed")]
AlreadySubscribedToChannel(ChainId, SystemChannel),
#[error("Invalid unsubscription request to channel {1} on chain {0}")]
InvalidUnsubscription(ChainId, SystemChannel),
#[error("Amount overflow")]
AmountOverflow,
#[error("Amount underflow")]
Expand Down
4 changes: 2 additions & 2 deletions linera-execution/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ impl ContractRuntime for ContractSyncRuntimeHandle {
fn subscribe(&mut self, chain: ChainId, name: ChannelName) -> Result<(), ExecutionError> {
let mut this = self.inner();
let application_id = this.current_application().id;
let full_name = ChannelFullName::user(name, application_id);
let full_name = ChannelFullName::new(name, application_id);
this.transaction_tracker.subscribe(full_name, chain);

Ok(())
Expand All @@ -1209,7 +1209,7 @@ impl ContractRuntime for ContractSyncRuntimeHandle {
fn unsubscribe(&mut self, chain: ChainId, name: ChannelName) -> Result<(), ExecutionError> {
let mut this = self.inner();
let application_id = this.current_application().id;
let full_name = ChannelFullName::user(name, application_id);
let full_name = ChannelFullName::new(name, application_id);
this.transaction_tracker.unsubscribe(full_name, chain);

Ok(())
Expand Down
Loading