Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion linera-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ web-default = ["web", "wasmer", "indexed-db"]

[dependencies]
anyhow = { workspace = true, optional = true }
async-trait.workspace = true
bcs.workspace = true
cfg-if.workspace = true
clap.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions linera-client/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use linera_chain::{
data_types::{BlockProposal, ProposedBlock},
types::ConfirmedBlock,
};
use linera_core::{client::ChainClient, local_node::LocalNodeClient};
use linera_core::{client::ChainClient, local_node::LocalNodeClient, node::ValidatorNodeProvider};
use linera_execution::{
committee::Committee,
system::{Recipient, SystemOperation},
Expand Down Expand Up @@ -583,8 +583,8 @@ where
}

/// Closes the chain that was created for the benchmark.
pub async fn close_benchmark_chain(
chain_client: &ChainClient<NodeProvider, S>,
pub async fn close_benchmark_chain<P: ValidatorNodeProvider>(
chain_client: &ChainClient<P, S>,
) -> Result<(), BenchmarkError> {
let start = Instant::now();
chain_client
Expand Down
75 changes: 28 additions & 47 deletions linera-client/src/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ use std::{
time::Duration,
};

use async_trait::async_trait;
use futures::{
future::{join_all, select_all},
lock::Mutex,
FutureExt as _, StreamExt,
};
use linera_base::{
crypto::{AccountSecretKey, CryptoHash},
crypto::CryptoHash,
data_types::Timestamp,
identifiers::{ChainId, Destination},
task::NonBlockingFuture,
Expand All @@ -30,7 +29,7 @@ use linera_storage::{Clock as _, Storage};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, warn, Instrument as _};

use crate::{wallet::Wallet, Error};
use crate::{client_context::ClientContext, persistent::Persist, wallet::Wallet, Error};

#[derive(Debug, Default, Clone, clap::Args)]
pub struct ChainListenerConfig {
Expand Down Expand Up @@ -59,45 +58,14 @@ pub struct ChainListenerConfig {
pub delay_after_ms: u64,
}

type ContextChainClient<C> =
ChainClient<<C as ClientContext>::ValidatorNodeProvider, <C as ClientContext>::Storage>;

#[cfg_attr(not(web), async_trait, trait_variant::make(Send))]
#[cfg_attr(web, async_trait(?Send))]
pub trait ClientContext: 'static {
type ValidatorNodeProvider: ValidatorNodeProvider + Sync;
type Storage: Storage + Clone + Send + Sync + 'static;

fn wallet(&self) -> &Wallet;

fn make_chain_client(&self, chain_id: ChainId) -> Result<ContextChainClient<Self>, Error>;

async fn update_wallet_for_new_chain(
&mut self,
chain_id: ChainId,
key_pair: Option<AccountSecretKey>,
timestamp: Timestamp,
) -> Result<(), Error>;

async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;

fn clients(&self) -> Result<Vec<ContextChainClient<Self>>, Error> {
let mut clients = vec![];
for chain_id in &self.wallet().chain_ids() {
clients.push(self.make_chain_client(*chain_id)?);
}
Ok(clients)
}
}

/// A chain client together with the stream of notifications from the local node.
///
/// A background task listens to the validators and updates the local node, so any updates to
/// this chain will trigger a notification. The background task is terminated when this gets
/// dropped.
struct ListeningClient<C: ClientContext> {
struct ListeningClient<P, S: Storage> {
/// The chain client.
client: ContextChainClient<C>,
client: ChainClient<P, S>,
/// The abort handle for the task that listens to the validators.
abort_handle: AbortOnDrop,
/// The listening task's join handle.
Expand All @@ -108,9 +76,9 @@ struct ListeningClient<C: ClientContext> {
timeout: Timestamp,
}

impl<C: ClientContext> ListeningClient<C> {
impl<P, S: Storage> ListeningClient<P, S> {
fn new(
client: ContextChainClient<C>,
client: ChainClient<P, S>,
abort_handle: AbortOnDrop,
join_handle: NonBlockingFuture<()>,
notification_stream: NotificationStream,
Expand All @@ -135,20 +103,25 @@ impl<C: ClientContext> ListeningClient<C> {

/// A `ChainListener` is a process that listens to notifications from validators and reacts
/// appropriately.
pub struct ChainListener<C: ClientContext> {
context: Arc<Mutex<C>>,
storage: C::Storage,
pub struct ChainListener<P: ValidatorNodeProvider, S: Storage, W> {
context: Arc<Mutex<ClientContext<P, S, W>>>,
storage: S,
config: Arc<ChainListenerConfig>,
listening: BTreeMap<ChainId, ListeningClient<C>>,
listening: BTreeMap<ChainId, ListeningClient<P, S>>,
cancellation_token: CancellationToken,
}

impl<C: ClientContext> ChainListener<C> {
impl<P, S, W> ChainListener<P, S, W>
where
S: linera_storage::Storage + Send + Sync + Clone + 'static,
P: ValidatorNodeProvider,
W: Persist<Target = Wallet>,
{
/// Creates a new chain listener given client chains.
pub fn new(
config: ChainListenerConfig,
context: Arc<Mutex<C>>,
storage: C::Storage,
context: Arc<Mutex<ClientContext<P, S, W>>>,
storage: S,
cancellation_token: CancellationToken,
) -> Self {
Self {
Expand All @@ -165,8 +138,8 @@ impl<C: ClientContext> ChainListener<C> {
pub async fn run(mut self) -> Result<(), Error> {
let chain_ids = {
let guard = self.context.lock().await;
let chain_ids = guard.wallet().chain_ids().into_iter();
chain_ids.chain(iter::once(guard.wallet().genesis_admin_chain()))
let chain_ids = guard.wallet.chain_ids().into_iter();
chain_ids.chain(iter::once(guard.wallet.genesis_admin_chain()))
};
for chain_id in chain_ids {
self.listen(chain_id).await?;
Expand Down Expand Up @@ -394,6 +367,14 @@ impl<C: ClientContext> ChainListener<C> {
}
}

impl<P, S, W> ChainListener<P, S, W>
where
S: linera_storage::Storage + Clone + Send + Sync,
P: ValidatorNodeProvider,
W: Persist<Target = Wallet>,
{
}

enum Action {
ProcessInbox(ChainId),
Notification(Notification),
Expand Down
Loading
Loading