Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,11 @@ Run a GraphQL service that exposes a faucet where users can claim tokens. This g
Default value: `8080`
* `--amount <AMOUNT>` — The number of tokens to send to each new chain
* `--limit-rate-until <LIMIT_RATE_UNTIL>` — The end timestamp: The faucet will rate-limit the token supply so it runs out of money no earlier than this
* `--max-claims-per-chain <MAX_CLAIMS_PER_CHAIN>` — The maximum number of claims per faucet chain, after which a new one is created.

A lower number improves performance for clients but creates overhead in the faucet.

Default value: `100`
* `--listener-skip-process-inbox` — Do not create blocks automatically to receive incoming messages. Instead, wait for an explicit mutation `processInbox`
* `--listener-delay-before-ms <DELAY_BEFORE_MS>` — Wait before processing any notification (useful for testing)

Expand Down
22 changes: 11 additions & 11 deletions linera-base/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ macro_rules! impl_wrapped_number {
}

/// Saturating addition.
pub fn saturating_add(self, other: Self) -> Self {
pub const fn saturating_add(self, other: Self) -> Self {
let val = self.0.saturating_add(other.0);
Self(val)
}
Expand All @@ -392,7 +392,7 @@ macro_rules! impl_wrapped_number {
}

/// Saturating subtraction.
pub fn saturating_sub(self, other: Self) -> Self {
pub const fn saturating_sub(self, other: Self) -> Self {
let val = self.0.saturating_sub(other.0);
Self(val)
}
Expand All @@ -413,7 +413,7 @@ macro_rules! impl_wrapped_number {
}

/// Saturating in-place addition.
pub fn saturating_add_assign(&mut self, other: Self) {
pub const fn saturating_add_assign(&mut self, other: Self) {
self.0 = self.0.saturating_add(other.0);
}

Expand All @@ -427,7 +427,7 @@ macro_rules! impl_wrapped_number {
}

/// Saturating multiplication.
pub fn saturating_mul(&self, other: $wrapped) -> Self {
pub const fn saturating_mul(&self, other: $wrapped) -> Self {
Self(self.0.saturating_mul(other))
}

Expand Down Expand Up @@ -662,37 +662,37 @@ impl Amount {
pub const ONE: Amount = Amount(10u128.pow(Amount::DECIMAL_PLACES as u32));

/// Returns an `Amount` corresponding to that many tokens, or `Amount::MAX` if saturated.
pub fn from_tokens(tokens: u128) -> Amount {
pub const fn from_tokens(tokens: u128) -> Amount {
Self::ONE.saturating_mul(tokens)
}

/// Returns an `Amount` corresponding to that many millitokens, or `Amount::MAX` if saturated.
pub fn from_millis(millitokens: u128) -> Amount {
pub const fn from_millis(millitokens: u128) -> Amount {
Amount(10u128.pow(Amount::DECIMAL_PLACES as u32 - 3)).saturating_mul(millitokens)
}

/// Returns an `Amount` corresponding to that many microtokens, or `Amount::MAX` if saturated.
pub fn from_micros(microtokens: u128) -> Amount {
pub const fn from_micros(microtokens: u128) -> Amount {
Amount(10u128.pow(Amount::DECIMAL_PLACES as u32 - 6)).saturating_mul(microtokens)
}

/// Returns an `Amount` corresponding to that many nanotokens, or `Amount::MAX` if saturated.
pub fn from_nanos(nanotokens: u128) -> Amount {
pub const fn from_nanos(nanotokens: u128) -> Amount {
Amount(10u128.pow(Amount::DECIMAL_PLACES as u32 - 9)).saturating_mul(nanotokens)
}

/// Returns an `Amount` corresponding to that many attotokens.
pub fn from_attos(attotokens: u128) -> Amount {
pub const fn from_attos(attotokens: u128) -> Amount {
Amount(attotokens)
}

/// Helper function to obtain the 64 most significant bits of the balance.
pub fn upper_half(self) -> u64 {
pub const fn upper_half(self) -> u64 {
(self.0 >> 64) as u64
}

/// Helper function to obtain the 64 least significant bits of the balance.
pub fn lower_half(self) -> u64 {
pub const fn lower_half(self) -> u64 {
self.0 as u64
}

Expand Down
2 changes: 2 additions & 0 deletions linera-client/src/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub trait ClientContext: 'static {
}
Ok(clients)
}

async fn forget_chain(&mut self, chain_id: &ChainId) -> Result<(), Error>;
}

/// A `ChainListener` is a process that listens to notifications from validators and reacts
Expand Down
8 changes: 8 additions & 0 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ where
async fn update_wallet(&mut self, client: &ChainClient<NodeProvider, S>) -> Result<(), Error> {
self.update_wallet_from_client(client).await
}

async fn forget_chain(&mut self, chain_id: &ChainId) -> Result<(), Error> {
self.wallet
.mutate(|w| w.forget_chain(chain_id))
.await
.map_err(|e| error::Inner::Persistence(Box::new(e)))??;
Ok(())
}
}

impl<S, W> ClientContext<S, W>
Expand Down
5 changes: 5 additions & 0 deletions linera-client/src/unit_tests/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ impl chain_listener::ClientContext for ClientContext {
self.wallet.update_from_state(client).await;
Ok(())
}

async fn forget_chain(&mut self, chain_id: &ChainId) -> Result<(), Error> {
self.wallet.forget_chain(chain_id)?;
Ok(())
}
}

/// Tests that the chain listener, if there is a message in the inbox, will continue requesting
Expand Down
147 changes: 125 additions & 22 deletions linera-faucet/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use serde::Deserialize;
use tower_http::cors::CorsLayer;
use tracing::info;

/// A rough estimate of the maximum fee for a block.
const MAX_FEE: Amount = Amount::from_millis(100);

/// Returns an HTML response constructing the GraphiQL web page for the given URI.
pub(crate) async fn graphiql(uri: axum::http::Uri) -> impl axum::response::IntoResponse {
axum::response::Html(
Expand All @@ -47,9 +50,13 @@ pub struct QueryRoot<C> {

/// The root GraphQL mutation type.
pub struct MutationRoot<C> {
chain_id: ChainId,
/// The chain from which new faucet chains are opened.
main_chain_id: ChainId,
/// The chain that is currently used to open requested chains for clients.
faucet_chain_id: Arc<Mutex<Option<ChainId>>>,
context: Arc<Mutex<C>>,
amount: Amount,
faucet_init_balance: Amount,
end_timestamp: Timestamp,
start_timestamp: Timestamp,
start_balance: Amount,
Expand Down Expand Up @@ -118,17 +125,31 @@ where
C: ClientContext,
{
async fn do_claim(&self, owner: AccountOwner) -> Result<ClaimOutcome, Error> {
let client = self.context.lock().await.make_chain_client(self.chain_id)?;
let maybe_faucet_chain_id = *self.faucet_chain_id.lock().await;
let faucet_chain_id = match maybe_faucet_chain_id {
Some(faucet_chain_id) => faucet_chain_id,
None => self.open_new_faucet_chain().await?,
};
let (faucet_client, main_client) = {
let guard = self.context.lock().await;
(
guard.make_chain_client(faucet_chain_id)?,
guard.make_chain_client(self.main_chain_id)?,
)
};

if self.start_timestamp < self.end_timestamp {
let local_time = client.storage_client().clock().current_time();
let local_time = faucet_client.storage_client().clock().current_time();
if local_time < self.end_timestamp {
let full_duration = self
.end_timestamp
.delta_since(self.start_timestamp)
.as_micros();
let remaining_duration = self.end_timestamp.delta_since(local_time).as_micros();
let balance = client.local_balance().await?;
let balance = faucet_client
.local_balance()
.await?
.try_add(main_client.local_balance().await?)?;
let Ok(remaining_balance) = balance.try_sub(self.amount) else {
return Err(Error::new("The faucet is empty."));
};
Expand All @@ -145,27 +166,69 @@ where
}

let ownership = ChainOwnership::single(owner);
let result = client
let (message_id, certificate) = faucet_client
.open_chain(ownership, ApplicationPermissions::default(), self.amount)
.await;
self.context.lock().await.update_wallet(&client).await?;
let (message_id, certificate) = match result? {
ClientOutcome::Committed(result) => result,
ClientOutcome::WaitForTimeout(timeout) => {
return Err(Error::new(format!(
"This faucet is using a multi-owner chain and is not the leader right now. \
Try again at {}",
timeout.timestamp,
)));
.await?
.try_unwrap()?;
self.context
.lock()
.await
.update_wallet(&faucet_client)
.await?;

// Only keep using this chain if there will still be enough balance to close it.
if faucet_client.local_balance().await? < self.amount.try_add(MAX_FEE.try_mul(2)?)? {
// TODO(#1795): Move the remaining tokens back to the main chain.
match faucet_client.close_chain().await {
Ok(outcome) => {
outcome.try_unwrap()?;
}
Err(err) => tracing::warn!("Failed to close the temporary faucet chain: {err:?}"),
}
};
self.context
.lock()
.await
.forget_chain(&faucet_chain_id)
.await?;
self.faucet_chain_id.lock().await.take();
}

let chain_id = ChainId::child(message_id);
Ok(ClaimOutcome {
message_id,
chain_id,
certificate_hash: certificate.hash(),
})
}

async fn open_new_faucet_chain(&self) -> Result<ChainId, Error> {
let main_client = self
.context
.lock()
.await
.make_chain_client(self.main_chain_id)?;
let main_balance = main_client.local_balance().await?;
let key_pair = main_client.key_pair().await?;
let balance = self.faucet_init_balance.min(main_balance.try_sub(MAX_FEE)?);
let ownership = main_client.chain_state_view().await?.ownership().clone();
let (message_id, certificate) = main_client
.open_chain(ownership, ApplicationPermissions::default(), balance)
.await?
.try_unwrap()?;
let chain_id = ChainId::child(message_id);
info!("Switching to a new faucet chain {chain_id:8}");
self.context
.lock()
.await
.update_wallet_for_new_chain(
chain_id,
Some(key_pair),
certificate.block().header.timestamp,
)
.await?;
*self.faucet_chain_id.lock().await = Some(chain_id);
Ok(chain_id)
}
}

impl<C> MutationRoot<C> {
Expand All @@ -185,14 +248,16 @@ pub struct FaucetService<C>
where
C: ClientContext,
{
chain_id: ChainId,
main_chain_id: ChainId,
faucet_chain_id: Arc<Mutex<Option<ChainId>>>,
context: Arc<Mutex<C>>,
genesis_config: Arc<GenesisConfig>,
config: ChainListenerConfig,
storage: C::Storage,
port: NonZeroU16,
amount: Amount,
end_timestamp: Timestamp,
faucet_init_balance: Amount,
start_timestamp: Timestamp,
start_balance: Amount,
}
Expand All @@ -203,13 +268,15 @@ where
{
fn clone(&self) -> Self {
Self {
chain_id: self.chain_id,
main_chain_id: self.main_chain_id,
faucet_chain_id: Arc::clone(&self.faucet_chain_id),
context: Arc::clone(&self.context),
genesis_config: Arc::clone(&self.genesis_config),
config: self.config.clone(),
storage: self.storage.clone(),
port: self.port,
amount: self.amount,
faucet_init_balance: self.faucet_init_balance,
end_timestamp: self.end_timestamp,
start_timestamp: self.start_timestamp,
start_balance: self.start_balance,
Expand All @@ -228,24 +295,31 @@ where
chain_id: ChainId,
context: C,
amount: Amount,
max_claims_per_chain: u32,
end_timestamp: Timestamp,
genesis_config: Arc<GenesisConfig>,
config: ChainListenerConfig,
storage: C::Storage,
) -> anyhow::Result<Self> {
let faucet_init_balance = amount
.try_add(MAX_FEE)?
.try_mul(u128::from(max_claims_per_chain))?
.try_add(MAX_FEE)?; // One more block fee for closing the chain.
let client = context.make_chain_client(chain_id)?;
let context = Arc::new(Mutex::new(context));
let start_timestamp = client.storage_client().clock().current_time();
client.process_inbox().await?;
let start_balance = client.local_balance().await?;
Ok(Self {
chain_id,
main_chain_id: chain_id,
faucet_chain_id: Arc::new(Mutex::new(None)),
context,
genesis_config,
config,
storage,
port,
amount,
faucet_init_balance,
end_timestamp,
start_timestamp,
start_balance,
Expand All @@ -254,23 +328,28 @@ where

pub fn schema(&self) -> Schema<QueryRoot<C>, MutationRoot<C>, EmptySubscription> {
let mutation_root = MutationRoot {
chain_id: self.chain_id,
main_chain_id: self.main_chain_id,
faucet_chain_id: Arc::clone(&self.faucet_chain_id),
context: Arc::clone(&self.context),
amount: self.amount,
faucet_init_balance: self.faucet_init_balance,
end_timestamp: self.end_timestamp,
start_timestamp: self.start_timestamp,
start_balance: self.start_balance,
};
let query_root = QueryRoot {
genesis_config: Arc::clone(&self.genesis_config),
context: Arc::clone(&self.context),
chain_id: self.chain_id,
chain_id: self.main_chain_id,
};
Schema::build(query_root, mutation_root, EmptySubscription).finish()
}

/// Runs the faucet.
#[tracing::instrument(name = "FaucetService::run", skip_all, fields(port = self.port, chain_id = ?self.chain_id))]
#[tracing::instrument(
name = "FaucetService::run",
skip_all, fields(port = self.port, chain_id = ?self.main_chain_id))
]
pub async fn run(self) -> anyhow::Result<()> {
let port = self.port.get();
let index_handler = axum::routing::get(graphiql).post(Self::index_handler);
Expand Down Expand Up @@ -303,3 +382,27 @@ where
schema.execute(request.into_inner()).await.into()
}
}

trait ClientOutcomeExt {
type Output;

/// Returns the committed result or an error if we are not the leader.
///
/// It is recommended to use single-owner chains for the faucet to avoid this error.
fn try_unwrap(self) -> Result<Self::Output, Error>;
}

impl<T> ClientOutcomeExt for ClientOutcome<T> {
type Output = T;

fn try_unwrap(self) -> Result<Self::Output, Error> {
match self {
ClientOutcome::Committed(result) => Ok(result),
ClientOutcome::WaitForTimeout(timeout) => Err(Error::new(format!(
"This faucet is using a multi-owner chain and is not the leader right now. \
Try again at {}",
timeout.timestamp,
))),
}
}
}
Loading
Loading