Skip to content

Commit e72c335

Browse files
authored
Preparations for epoch change events. (#3431)
## Motivation To make epoch changes scalable, we want to use events instead of messages (#365). This will be done in an upcoming PR. To make that easier to review, I separated some of the changes I need to make as a preparation. The follow-up PR will add `ProcessNewEpoch` and `ProcessRemovedEpoch` system operations and an `Oracle::Response::Event` variant. Clients will be _notified_ about new epochs because they will always follow/synchronize the admin chain and thus store the epoch change events locally. They will then run `process_inbox` which, in addition to incoming messages, will look for epoch change events, and include the new operations in the proposal accordingly. Processing the operations will check for the event, add the oracle response, and update the chain's epoch and committees. ## Proposal * Add an `Event` oracle response variant, to record read event values in the block. * Don't fail in `make_chain_client` if the admin chain is not in the wallet. Clients will always need to sync/follow the admin chain to learn about epoch change events. * Return early from `try_synchronize_chain_state_from` if the remote node has no new blocks for us. * Always initialize the root chain in the `linera-core` tests, even if it is not mentioned in the test case. * Add events methods to the execution runtime context: We will need these in `system.rs` to load the epoch change events. * Add more assertions to `test_end_to_end_reconfiguration`. This was useful for debugging. * Other minor cleanups. ## Test Plan No new behavior was added. CI will catch regressions. ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links - Part of #365. - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 358bc63 commit e72c335

File tree

13 files changed

+138
-49
lines changed

13 files changed

+138
-49
lines changed

linera-client/src/client_context.rs

+10-11
Original file line numberDiff line numberDiff line change
@@ -238,23 +238,22 @@ where
238238
}
239239

240240
fn make_chain_client(&self, chain_id: ChainId) -> Result<ChainClient<NodeProvider, S>, Error> {
241-
let chain = self
242-
.wallet
243-
.get(chain_id)
244-
.ok_or_else(|| error::Inner::NonexistentChain(chain_id))?;
245-
let known_key_pairs = chain
246-
.key_pair
247-
.as_ref()
248-
.map(|kp| kp.copy())
249-
.into_iter()
250-
.collect();
241+
// We only create clients for chains we have in the wallet, or for the admin chain.
242+
let chain = match self.wallet.get(chain_id) {
243+
Some(chain) => chain.clone(),
244+
None if chain_id == self.wallet.genesis_admin_chain() => {
245+
UserChain::make_other(self.wallet.genesis_admin_chain(), Timestamp::from(0))
246+
}
247+
None => return Err(error::Inner::NonexistentChain(chain_id).into()),
248+
};
249+
let known_key_pairs = chain.key_pair.into_iter().collect();
251250
Ok(self.make_chain_client_internal(
252251
chain_id,
253252
known_key_pairs,
254253
chain.block_hash,
255254
chain.timestamp,
256255
chain.next_block_height,
257-
chain.pending_proposal.clone(),
256+
chain.pending_proposal,
258257
))
259258
}
260259

linera-client/src/wallet.rs

+13
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,19 @@ pub struct UserChain {
213213
pub pending_proposal: Option<PendingProposal>,
214214
}
215215

216+
impl Clone for UserChain {
217+
fn clone(&self) -> Self {
218+
Self {
219+
chain_id: self.chain_id,
220+
key_pair: self.key_pair.as_ref().map(AccountSecretKey::copy),
221+
block_hash: self.block_hash,
222+
timestamp: self.timestamp,
223+
next_block_height: self.next_block_height,
224+
pending_proposal: self.pending_proposal.clone(),
225+
}
226+
}
227+
}
228+
216229
impl UserChain {
217230
/// Create a user chain that we own.
218231
pub fn make_initial<R: CryptoRng>(

linera-core/src/chain_worker/state/attempted_changes.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ where
353353
.events
354354
.iter()
355355
.flatten()
356-
.map(|event| (event.id(chain_id), &event.value[..]));
356+
.map(|event| (event.id(chain_id), event.value.clone()));
357357
self.state.storage.write_events(events).await?;
358358
}
359359

linera-core/src/client/mod.rs

+12-9
Original file line numberDiff line numberDiff line change
@@ -1011,8 +1011,7 @@ where
10111011
// For chains with any owner other than ourselves, we could be missing recent
10121012
// certificates created by other owners. Further synchronize blocks from the network.
10131013
// This is a best-effort that depends on network conditions.
1014-
let nodes = self.validator_nodes().await?;
1015-
info = self.synchronize_chain_state(&nodes, self.chain_id).await?;
1014+
info = self.synchronize_chain_state(self.chain_id).await?;
10161015
}
10171016

10181017
let result = self
@@ -1700,17 +1699,20 @@ where
17001699

17011700
/// Downloads and processes any certificates we are missing for the given chain.
17021701
#[instrument(level = "trace", skip_all)]
1703-
pub async fn synchronize_chain_state(
1702+
async fn synchronize_chain_state(
17041703
&self,
1705-
validators: &[RemoteNode<P::Node>],
17061704
chain_id: ChainId,
17071705
) -> Result<Box<ChainInfo>, ChainClientError> {
17081706
#[cfg(with_metrics)]
17091707
let _latency = metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency();
17101708

1711-
let committee = self.local_committee().await?;
1709+
let (epoch, mut committees) = self.epoch_and_committees(chain_id).await?;
1710+
let committee = committees
1711+
.remove(&epoch.ok_or(LocalNodeError::InvalidChainInfoResponse)?)
1712+
.ok_or(LocalNodeError::InvalidChainInfoResponse)?;
1713+
let validators = self.make_nodes(&committee)?;
17121714
communicate_with_quorum(
1713-
validators,
1715+
&validators,
17141716
&committee,
17151717
|_: &()| (),
17161718
|remote_node| {
@@ -1749,6 +1751,9 @@ where
17491751
.with_sent_certificate_hashes_in_range(range)
17501752
.with_manager_values();
17511753
let info = remote_node.handle_chain_info_query(query).await?;
1754+
if info.next_block_height < local_info.next_block_height {
1755+
return Ok(());
1756+
}
17521757

17531758
let certificates: Vec<ConfirmedBlockCertificate> = remote_node
17541759
.download_certificates(info.requested_sent_certificate_hashes)
@@ -2442,9 +2447,7 @@ where
24422447
pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, ChainClientError> {
24432448
if self.chain_id != self.admin_id {
24442449
// Synchronize the state of the admin chain from the network.
2445-
let local_committee = self.local_committee().await?;
2446-
let nodes = self.make_nodes(&local_committee)?;
2447-
self.synchronize_chain_state(&nodes, self.admin_id).await?;
2450+
self.synchronize_chain_state(self.admin_id).await?;
24482451
}
24492452
let info = self.prepare_chain().await?;
24502453
self.find_received_certificates().await?;

linera-core/src/unit_tests/test_utils.rs

+7
Original file line numberDiff line numberDiff line change
@@ -797,11 +797,18 @@ where
797797
}
798798

799799
/// Creates the root chain with the given `index`, and returns a client for it.
800+
///
801+
/// Root chain 0 is the admin chain and needs to be initialized first, otherwise its balance
802+
/// is automatically set to zero.
800803
pub async fn add_root_chain(
801804
&mut self,
802805
index: u32,
803806
balance: Amount,
804807
) -> Result<ChainClient<NodeProvider<B::Storage>, B::Storage>, anyhow::Error> {
808+
// Make sure the admin chain is initialized.
809+
if self.genesis_storage_builder.accounts.is_empty() && index != 0 {
810+
Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
811+
}
805812
let description = ChainDescription::Root(index);
806813
let key_pair = AccountSecretKey::generate();
807814
let public_key = key_pair.public();

linera-execution/src/lib.rs

+32-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ use linera_base::{
4444
doc_scalar, hex_debug, http,
4545
identifiers::{
4646
Account, AccountOwner, ApplicationId, BlobId, BytecodeId, ChainId, ChannelName,
47-
Destination, GenericApplicationId, MessageId, Owner, StreamName, UserApplicationId,
47+
Destination, EventId, GenericApplicationId, MessageId, Owner, StreamName,
48+
UserApplicationId,
4849
},
4950
ownership::ChainOwnership,
5051
task,
@@ -392,13 +393,21 @@ pub trait ExecutionRuntimeContext {
392393

393394
async fn get_blob(&self, blob_id: BlobId) -> Result<Blob, ViewError>;
394395

396+
async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ViewError>;
397+
395398
async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
396399

397400
#[cfg(with_testing)]
398401
async fn add_blobs(
399402
&self,
400403
blobs: impl IntoIterator<Item = Blob> + Send,
401404
) -> Result<(), ViewError>;
405+
406+
#[cfg(with_testing)]
407+
async fn add_events(
408+
&self,
409+
events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
410+
) -> Result<(), ViewError>;
402411
}
403412

404413
#[derive(Clone, Copy, Debug)]
@@ -1052,6 +1061,7 @@ pub struct TestExecutionRuntimeContext {
10521061
user_contracts: Arc<DashMap<UserApplicationId, UserContractCode>>,
10531062
user_services: Arc<DashMap<UserApplicationId, UserServiceCode>>,
10541063
blobs: Arc<DashMap<BlobId, Blob>>,
1064+
events: Arc<DashMap<EventId, Vec<u8>>>,
10551065
}
10561066

10571067
#[cfg(with_testing)]
@@ -1063,6 +1073,7 @@ impl TestExecutionRuntimeContext {
10631073
user_contracts: Arc::default(),
10641074
user_services: Arc::default(),
10651075
blobs: Arc::default(),
1076+
events: Arc::default(),
10661077
}
10671078
}
10681079
}
@@ -1123,6 +1134,14 @@ impl ExecutionRuntimeContext for TestExecutionRuntimeContext {
11231134
.clone())
11241135
}
11251136

1137+
async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ViewError> {
1138+
Ok(self
1139+
.events
1140+
.get(&event_id)
1141+
.ok_or_else(|| ViewError::EventsNotFound(vec![event_id]))?
1142+
.clone())
1143+
}
1144+
11261145
async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
11271146
Ok(self.blobs.contains_key(&blob_id))
11281147
}
@@ -1138,6 +1157,18 @@ impl ExecutionRuntimeContext for TestExecutionRuntimeContext {
11381157

11391158
Ok(())
11401159
}
1160+
1161+
#[cfg(with_testing)]
1162+
async fn add_events(
1163+
&self,
1164+
events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1165+
) -> Result<(), ViewError> {
1166+
for (event_id, bytes) in events {
1167+
self.events.insert(event_id, bytes);
1168+
}
1169+
1170+
Ok(())
1171+
}
11411172
}
11421173

11431174
impl From<SystemOperation> for Operation {

linera-sdk/Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ test = [
4848
[dependencies]
4949
async-graphql.workspace = true
5050
async-trait = { workspace = true, optional = true }
51+
52+
# TODO(#3421): Remove the pinned version once the `linera-*` crates move to Rust Edition 2024
53+
base64ct = "=1.6.0"
5154
bcs.workspace = true
5255
futures.workspace = true
5356
linera-base.workspace = true
@@ -60,9 +63,6 @@ serde_json.workspace = true
6063
thiserror.workspace = true
6164
wit-bindgen.workspace = true
6265

63-
# TODO(#3421): Remove the pinned version once the `linera-*` crates move to Rust Edition 2024
64-
base64ct = "=1.6.0"
65-
6666
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
6767
async-graphql.workspace = true
6868
anyhow.workspace = true

linera-service/src/cli_wrappers/wallet.rs

+17-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use std::{
55
borrow::Cow,
6-
collections::HashMap,
6+
collections::{BTreeMap, HashMap},
77
env,
88
marker::PhantomData,
99
mem,
@@ -26,7 +26,11 @@ use linera_base::{
2626
};
2727
use linera_client::wallet::Wallet;
2828
use linera_core::worker::Notification;
29-
use linera_execution::{system::SystemChannel, ResourceControlPolicy};
29+
use linera_execution::{
30+
committee::{Committee, Epoch},
31+
system::SystemChannel,
32+
ResourceControlPolicy,
33+
};
3034
use linera_faucet::ClaimOutcome;
3135
use linera_faucet_client::Faucet;
3236
use serde::{de::DeserializeOwned, ser::Serialize};
@@ -1239,6 +1243,17 @@ impl NodeService {
12391243
Ok(bytecode_id.with_abi())
12401244
}
12411245

1246+
pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1247+
let query = format!(
1248+
"query {{ chain(chainId:\"{chain_id}\") {{
1249+
executionState {{ system {{ committees }} }}
1250+
}} }}"
1251+
);
1252+
let mut response = self.query_node(query).await?;
1253+
let committees = response["chain"]["executionState"]["system"]["committees"].take();
1254+
Ok(serde_json::from_value(committees)?)
1255+
}
1256+
12421257
pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
12431258
let n_try = 5;
12441259
let query = query.as_ref();

linera-service/src/proxy/grpc.rs

+1
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ where
352352
}
353353
ViewError::NotFound(_)
354354
| ViewError::BlobsNotFound(_)
355+
| ViewError::EventsNotFound(_)
355356
| ViewError::CannotAcquireCollectionEntry
356357
| ViewError::MissingEntries => Status::not_found(err.to_string()),
357358
};

linera-service/tests/local_net_tests.rs

+14-14
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use linera_base::{
2020
identifiers::{Account, AccountOwner, ChainId},
2121
};
2222
use linera_core::{data_types::ChainInfoQuery, node::ValidatorNode};
23+
use linera_execution::committee::Epoch;
2324
use linera_faucet::ClaimOutcome;
2425
use linera_sdk::base::AccountSecretKey;
2526
use linera_service::{
@@ -148,6 +149,9 @@ async fn test_end_to_end_reconfiguration(config: LocalNetConfig) -> Result<()> {
148149
client.query_validators(Some(chain_1)).await?;
149150
if let Some(service) = &node_service_2 {
150151
service.process_inbox(&chain_2).await?;
152+
let committees = service.query_committees(&chain_2).await?;
153+
let epochs = committees.into_keys().collect::<Vec<_>>();
154+
assert_eq!(&epochs, &[Epoch(3)]);
151155
} else {
152156
client_2.process_inbox(chain_2).await?;
153157
}
@@ -159,6 +163,9 @@ async fn test_end_to_end_reconfiguration(config: LocalNetConfig) -> Result<()> {
159163
client.finalize_committee().await?;
160164
if let Some(service) = &node_service_2 {
161165
service.process_inbox(&chain_2).await?;
166+
let committees = service.query_committees(&chain_2).await?;
167+
let epochs = committees.into_keys().collect::<Vec<_>>();
168+
assert_eq!(&epochs, &[Epoch(4 + i as u32)]);
162169
} else {
163170
client_2.process_inbox(chain_2).await?;
164171
}
@@ -174,30 +181,23 @@ async fn test_end_to_end_reconfiguration(config: LocalNetConfig) -> Result<()> {
174181
)
175182
.await?;
176183

177-
if let Some(mut node_service_2) = node_service_2 {
178-
node_service_2.process_inbox(&chain_2).await?;
184+
if let Some(mut service) = node_service_2 {
185+
service.process_inbox(&chain_2).await?;
179186
let query = format!(
180187
"query {{ chain(chainId:\"{chain_2}\") {{
181188
executionState {{ system {{ balances {{
182189
entry(key:\"{recipient}\") {{ value }}
183190
}} }} }}
184191
}} }}"
185192
);
186-
let response = node_service_2.query_node(query.clone()).await?;
193+
let response = service.query_node(query).await?;
187194
let balances = &response["chain"]["executionState"]["system"]["balances"];
188195
assert_eq!(balances["entry"]["value"].as_str(), Some("5."));
196+
let committees = service.query_committees(&chain_2).await?;
197+
let epochs = committees.into_keys().collect::<Vec<_>>();
198+
assert_eq!(&epochs, &[Epoch(7)]);
189199

190-
let query = format!(
191-
"query {{ chain(chainId:\"{chain_2}\") {{
192-
executionState {{ system {{ committees }} }}
193-
}} }}"
194-
);
195-
let response = node_service_2.query_node(query.clone()).await?;
196-
let committees = &response["chain"]["executionState"]["system"]["committees"];
197-
let epochs = committees.as_object().unwrap().keys().collect::<Vec<_>>();
198-
assert_eq!(&epochs, &["7"]);
199-
200-
node_service_2.ensure_is_running()?;
200+
service.ensure_is_running()?;
201201
} else {
202202
client_2.sync(chain_2).await?;
203203
client_2.process_inbox(chain_2).await?;

linera-storage/src/db_storage.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ trait BatchExt {
216216
fn add_certificate(&mut self, certificate: &ConfirmedBlockCertificate)
217217
-> Result<(), ViewError>;
218218

219-
fn add_event(&mut self, event_id: EventId, value: &[u8]) -> Result<(), ViewError>;
219+
fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError>;
220220
}
221221

222222
impl BatchExt for Batch {
@@ -248,11 +248,11 @@ impl BatchExt for Batch {
248248
Ok(())
249249
}
250250

251-
fn add_event(&mut self, event_id: EventId, value: &[u8]) -> Result<(), ViewError> {
251+
fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError> {
252252
#[cfg(with_metrics)]
253253
WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
254254
let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
255-
self.put_key_value_bytes(event_key.to_vec(), value.to_vec());
255+
self.put_key_value_bytes(event_key.to_vec(), value);
256256
Ok(())
257257
}
258258
}
@@ -809,15 +809,15 @@ where
809809

810810
async fn read_event(&self, event_id: EventId) -> Result<Vec<u8>, ViewError> {
811811
let event_key = bcs::to_bytes(&BaseKey::Event(event_id.clone()))?;
812-
let maybe_value = self.store.read_value::<Vec<u8>>(&event_key).await?;
812+
let maybe_value = self.store.read_value_bytes(&event_key).await?;
813813
#[cfg(with_metrics)]
814814
READ_EVENT_COUNTER.with_label_values(&[]).inc();
815-
maybe_value.ok_or_else(|| ViewError::not_found("value for event ID", event_id))
815+
maybe_value.ok_or_else(|| ViewError::EventsNotFound(vec![event_id]))
816816
}
817817

818818
async fn write_events(
819819
&self,
820-
events: impl IntoIterator<Item = (EventId, &[u8])> + Send,
820+
events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
821821
) -> Result<(), ViewError> {
822822
let mut batch = Batch::new();
823823
for (event_id, value) in events {

0 commit comments

Comments
 (0)