Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 21 additions & 24 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,30 +532,27 @@ where
}
}
}

if bundle.goes_to_inbox() {
// Process the inbox bundle and update the inbox state.
let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
#[cfg(with_metrics)]
NUM_INBOXES
.with_label_values(&[])
.observe(self.inboxes.count().await? as f64);
let entry = BundleInInbox::new(origin.clone(), &bundle);
let skippable = bundle.is_skippable();
let newly_added = inbox
.add_bundle(bundle)
.await
.map_err(|error| match error {
InboxError::ViewError(error) => ChainError::ViewError(error),
error => ChainError::InternalError(format!(
"while processing messages in certified block: {error}"
)),
})?;
if newly_added && !skippable {
let seen = local_time;
self.unskippable_bundles
.push_back(TimestampedBundleInInbox { entry, seen });
}
// Process the inbox bundle and update the inbox state.
let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
#[cfg(with_metrics)]
NUM_INBOXES
.with_label_values(&[])
.observe(self.inboxes.count().await? as f64);
let entry = BundleInInbox::new(origin.clone(), &bundle);
let skippable = bundle.is_skippable();
let newly_added = inbox
.add_bundle(bundle)
.await
.map_err(|error| match error {
InboxError::ViewError(error) => ChainError::ViewError(error),
error => ChainError::InternalError(format!(
"while processing messages in certified block: {error}"
)),
})?;
if newly_added && !skippable {
let seen = local_time;
self.unskippable_bundles
.push_back(TimestampedBundleInInbox { entry, seen });
}

// Remember the certificate for future validator/client synchronizations.
Expand Down
9 changes: 0 additions & 9 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,15 +558,6 @@ impl MessageBundle {
pub fn is_protected(&self) -> bool {
self.messages.iter().any(PostedMessage::is_protected)
}

/// Returns whether this bundle must be added to the inbox.
///
/// If this is `false`, it gets handled immediately and should never be received in a block.
pub fn goes_to_inbox(&self) -> bool {
self.messages
.iter()
.any(|posted_message| posted_message.message.goes_to_inbox())
}
}

impl PostedMessage {
Expand Down
28 changes: 2 additions & 26 deletions linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,22 +444,10 @@ where
let mut heights_by_recipient = BTreeMap::<_, BTreeMap<_, _>>::new();
let mut targets = self.chain.outboxes.indices().await?;
if let Some(tracked_chains) = self.tracked_chains.as_ref() {
let publishers = self
.chain
.execution_state
.system
.subscriptions
.indices()
.await?
.iter()
.map(|subscription| subscription.chain_id)
.collect::<HashSet<_>>();
let tracked_chains = tracked_chains
.read()
.expect("Panics should not happen while holding a lock to `tracked_chains`");
targets.retain(|target| {
tracked_chains.contains(&target.recipient) || publishers.contains(&target.recipient)
});
targets.retain(|target| tracked_chains.contains(&target.recipient));
}
let outboxes = self.chain.outboxes.try_load_entries(&targets).await?;
for (target, outbox) in targets.into_iter().zip(outboxes) {
Expand Down Expand Up @@ -544,20 +532,8 @@ where
};
let mut targets = self.chain.outboxes.indices().await?;
{
let publishers = self
.chain
.execution_state
.system
.subscriptions
.indices()
.await?
.iter()
.map(|subscription| subscription.chain_id)
.collect::<HashSet<_>>();
let tracked_chains = tracked_chains.read().unwrap();
targets.retain(|target| {
tracked_chains.contains(&target.recipient) || publishers.contains(&target.recipient)
});
targets.retain(|target| tracked_chains.contains(&target.recipient));
}
let outboxes = self.chain.outboxes.try_load_entries(&targets).await?;
for outbox in outboxes {
Expand Down
20 changes: 0 additions & 20 deletions linera-core/src/unit_tests/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2510,16 +2510,6 @@ where
*user_chain.execution_state.system.admin_id.get(),
Some(admin_id)
);
assert_eq!(
user_chain
.execution_state
.system
.subscriptions
.indices()
.await?
.len(),
0
);
user_chain.validate_incoming_bundles().await?;
matches!(
&user_chain
Expand Down Expand Up @@ -2629,16 +2619,6 @@ where
*user_chain.execution_state.system.admin_id.get(),
Some(admin_id)
);
assert_eq!(
user_chain
.execution_state
.system
.subscriptions
.indices()
.await?
.len(),
0
);
assert_eq!(user_chain.execution_state.system.committees.get().len(), 2);
user_chain.validate_incoming_bundles().await?;
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions linera-execution/src/execution_state_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,7 @@ where
if !app_permissions.can_close_chain(&application_id) {
callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
} else {
let chain_id = self.context().extra().chain_id();
self.system.close_chain(chain_id).await?;
self.system.close_chain().await?;
callback.respond(Ok(()));
}
}
Expand Down
7 changes: 1 addition & 6 deletions linera-execution/src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use linera_views::{context::Context, map_view::MapView};
use crate::{
committee::{Committee, Epoch, ValidatorState},
system::{Recipient, UserData},
ChannelSubscription, ExecutionStateView, SystemExecutionStateView,
ExecutionStateView, SystemExecutionStateView,
};

doc_scalar!(
Expand Down Expand Up @@ -73,11 +73,6 @@ impl<C: Send + Sync + Context> SystemExecutionStateView<C> {
self.admin_id.get()
}

#[graphql(derived(name = "subscription"))]
async fn _subscriptions(&self) -> Result<Vec<ChannelSubscription>, async_graphql::Error> {
Ok(self.subscriptions.indices().await?)
}

#[graphql(derived(name = "committees"))]
async fn _committees(&self) -> &BTreeMap<Epoch, Committee> {
self.committees.get()
Expand Down
37 changes: 0 additions & 37 deletions linera-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,17 +976,6 @@ impl OutgoingMessage {
}
}

/// The identifier of a channel, relative to a particular application.
#[derive(
Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Hash, Serialize, Deserialize, SimpleObject,
)]
pub struct ChannelSubscription {
/// The chain ID broadcasting on this channel.
pub chain_id: ChainId,
/// The name of the channel.
pub name: ChannelName,
}

impl OperationContext {
/// Returns an account for the refund.
/// Returns `None` if there is no authenticated signer of the [`OperationContext`].
Expand Down Expand Up @@ -1223,32 +1212,6 @@ impl Message {
}
}

/// Returns whether this message must be added to the inbox.
pub fn goes_to_inbox(&self) -> bool {
!matches!(
self,
Message::System(SystemMessage::Subscribe { .. } | SystemMessage::Unsubscribe { .. })
)
}

pub fn matches_subscribe(&self) -> Option<(&ChainId, &ChannelSubscription)> {
match self {
Message::System(SystemMessage::Subscribe { id, subscription }) => {
Some((id, subscription))
}
_ => None,
}
}

pub fn matches_unsubscribe(&self) -> Option<(&ChainId, &ChannelSubscription)> {
match self {
Message::System(SystemMessage::Unsubscribe { id, subscription }) => {
Some((id, subscription))
}
_ => None,
}
}

pub fn matches_open_chain(&self) -> Option<&OpenChainConfig> {
match self {
Message::System(SystemMessage::OpenChain(config)) => Some(config),
Expand Down
47 changes: 7 additions & 40 deletions linera-execution/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ use {linera_base::prometheus_util::register_int_counter_vec, prometheus::IntCoun
use crate::test_utils::SystemExecutionState;
use crate::{
committee::{Committee, Epoch},
ApplicationDescription, ApplicationId, ChannelSubscription, ExecutionError,
ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext, OutgoingMessage,
QueryContext, QueryOutcome, ResourceController, TransactionTracker,
ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext, MessageContext,
MessageKind, OperationContext, OutgoingMessage, QueryContext, QueryOutcome, ResourceController,
TransactionTracker,
};

/// The relative index of the `OpenChain` message created by the `OpenChain` operation.
Expand Down Expand Up @@ -72,8 +72,6 @@ pub struct SystemExecutionStateView<C> {
pub epoch: HashedRegisterView<C, Option<Epoch>>,
/// The admin of the chain.
pub admin_id: HashedRegisterView<C, Option<ChainId>>,
/// Track the channels that we have subscribed to.
pub subscriptions: HashedSetView<C, ChannelSubscription>,
/// The committees that we trust, indexed by epoch number.
// Not using a `MapView` because the set active of committees is supposed to be
// small. Plus, currently, we would create the `BTreeMap` anyway in various places
Expand Down Expand Up @@ -211,16 +209,6 @@ pub enum SystemMessage {
},
/// Creates (or activates) a new chain.
OpenChain(OpenChainConfig),
/// Subscribes to a channel.
Subscribe {
id: ChainId,
subscription: ChannelSubscription,
},
/// Unsubscribes from a channel.
Unsubscribe {
id: ChainId,
subscription: ChannelSubscription,
},
/// Notifies that a new application was created.
ApplicationCreated,
}
Expand Down Expand Up @@ -353,10 +341,7 @@ where
ChangeApplicationPermissions(application_permissions) => {
self.application_permissions.set(application_permissions);
}
CloseChain => {
let messages = self.close_chain(context.chain_id).await?;
txn_tracker.add_outgoing_messages(messages)?;
}
CloseChain => self.close_chain().await?,
Transfer {
owner,
amount,
Expand Down Expand Up @@ -682,7 +667,7 @@ where
}
}
// These messages are executed immediately when cross-chain requests are received.
Subscribe { .. } | Unsubscribe { .. } | OpenChain(_) => {}
OpenChain(_) => {}
// This message is only a placeholder: Its ID is part of the application ID.
ApplicationCreated => {}
}
Expand Down Expand Up @@ -762,27 +747,9 @@ where
Ok(OutgoingMessage::new(child_id, message).with_kind(MessageKind::Protected))
}

pub async fn close_chain(
&mut self,
id: ChainId,
) -> Result<Vec<OutgoingMessage>, ExecutionError> {
let mut messages = Vec::new();
// Unsubscribe from all channels.
self.subscriptions
.for_each_index(|subscription| {
messages.push(
OutgoingMessage::new(
subscription.chain_id,
SystemMessage::Unsubscribe { id, subscription },
)
.with_kind(MessageKind::Protected),
);
Ok(())
})
.await?;
self.subscriptions.clear();
pub async fn close_chain(&mut self) -> Result<(), ExecutionError> {
self.closed.set(true);
Ok(messages)
Ok(())
}

pub async fn create_application(
Expand Down
14 changes: 3 additions & 11 deletions linera-execution/src/test_utils/system_execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use super::{MockApplication, RegisterMockApplication};
use crate::{
committee::{Committee, Epoch},
execution::UserAction,
ApplicationDescription, ChannelSubscription, ExecutionError, ExecutionRuntimeConfig,
ExecutionRuntimeContext, ExecutionStateView, OperationContext, ResourceControlPolicy,
ResourceController, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
ApplicationDescription, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext,
ExecutionStateView, OperationContext, ResourceControlPolicy, ResourceController,
ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
};

/// A system execution state, not represented as a view but as a simple struct.
Expand All @@ -35,7 +35,6 @@ pub struct SystemExecutionState {
pub description: Option<ChainDescription>,
pub epoch: Option<Epoch>,
pub admin_id: Option<ChainId>,
pub subscriptions: BTreeSet<ChannelSubscription>,
pub committees: BTreeMap<Epoch, Committee>,
pub ownership: ChainOwnership,
pub balance: Amount,
Expand Down Expand Up @@ -88,7 +87,6 @@ impl SystemExecutionState {
description,
epoch,
admin_id,
subscriptions,
committees,
ownership,
balance,
Expand Down Expand Up @@ -120,12 +118,6 @@ impl SystemExecutionState {
view.system.description.set(description);
view.system.epoch.set(epoch);
view.system.admin_id.set(admin_id);
for subscription in subscriptions {
view.system
.subscriptions
.insert(&subscription)
.expect("serialization of subscription should not fail");
}
view.system.committees.set(committees);
view.system.ownership.set(ownership);
view.system.balance.set(balance);
Expand Down
20 changes: 0 additions & 20 deletions linera-rpc/tests/snapshots/format__format.yaml.snap
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,6 @@ ChannelFullName:
TYPENAME: ChannelName
ChannelName:
NEWTYPESTRUCT: BYTES
ChannelSubscription:
STRUCT:
- chain_id:
TYPENAME: ChainId
- name:
TYPENAME: ChannelName
Committee:
STRUCT:
- validators:
Expand Down Expand Up @@ -1085,20 +1079,6 @@ SystemMessage:
NEWTYPE:
TYPENAME: OpenChainConfig
3:
Subscribe:
STRUCT:
- id:
TYPENAME: ChainId
- subscription:
TYPENAME: ChannelSubscription
4:
Unsubscribe:
STRUCT:
- id:
TYPENAME: ChainId
- subscription:
TYPENAME: ChannelSubscription
5:
ApplicationCreated: UNIT
SystemOperation:
ENUM:
Expand Down
Loading