Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/users/reference/env_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ process.
| `FOREST_ETH_V1_DISABLE_F3_FINALITY_RESOLUTION` | 1 or true | empty | 1 | Whether or not to disable F3 finality resolution in Eth `v1` RPC methods |
| `FOREST_GENESIS_NETWORK_VERSION` | non-negative integer | empty | 25 | Override the genesis network version (devnet only) |
| `FOREST_TIPSET_CACHE_DISABLED` | 1 or true | empty | 1 | Disable the tipset cache. Used internally by development and tool subcommands |
| `FOREST_MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS` | positive integer | 3 | 3 | number of max concurrent requests to send over chain exchange protocol |

### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT`

Expand Down
36 changes: 22 additions & 14 deletions src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{
convert::TryFrom,
num::NonZeroU64,
num::{NonZeroU64, NonZeroUsize},
sync::{
Arc, LazyLock,
atomic::{AtomicU64, Ordering},
Expand Down Expand Up @@ -43,7 +43,15 @@ static CHAIN_EXCHANGE_TIMEOUT_MILLIS: LazyLock<ExponentialAdaptiveValueProvider<

/// Maximum number of concurrent chain exchange request being sent to the
/// network.
const MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS: usize = 2;
static MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS: LazyLock<NonZeroUsize> = LazyLock::new(|| {
std::env::var("FOREST_MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS")
.ok()
.and_then(|i| {
i.parse().ok().inspect(|i| {
tracing::info!("max concurrent chain exchange requests set to {i} from `FOREST_MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS`");
})
}).unwrap_or(nonzero!(3_usize))
});

/// Context used in chain sync to handle network requests.
/// This contains the peer manager, P2P service interface, and [`Blockstore`]
Expand Down Expand Up @@ -79,10 +87,10 @@ impl<T> RaceBatch<T>
where
T: Send + 'static,
{
pub fn new(max_concurrent_jobs: usize) -> Self {
pub fn new(max_concurrent_jobs: NonZeroUsize) -> Self {
RaceBatch {
tasks: JoinSet::new(),
semaphore: Arc::new(Semaphore::new(max_concurrent_jobs)),
semaphore: Arc::new(Semaphore::new(max_concurrent_jobs.get())),
}
}

Expand Down Expand Up @@ -211,7 +219,7 @@ where

/// Helper function to handle the peer retrieval if no peer supplied as well
/// as the logging and updating of the peer info in the `PeerManager`.
async fn handle_chain_exchange_request<T, F>(
pub async fn handle_chain_exchange_request<T, F>(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKey,
Expand Down Expand Up @@ -253,7 +261,7 @@ where
);

let n_peers = peers.len();
let mut batch = RaceBatch::new(MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS);
let mut batch = RaceBatch::new(*MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS);
let success_time_cost_millis_stats = Arc::new(Mutex::new(Stats::new()));
for peer_id in peers.into_iter() {
let peer_manager = self.peer_manager.clone();
Expand Down Expand Up @@ -485,7 +493,7 @@ mod tests {

#[tokio::test]
async fn race_batch_ok() {
let mut batch = RaceBatch::new(3);
let mut batch = RaceBatch::new(nonzero!(3_usize));
batch.add(async move { Ok(1) });
batch.add(async move { anyhow::bail!("kaboom") });

Expand All @@ -494,7 +502,7 @@ mod tests {

#[tokio::test]
async fn race_batch_ok_faster() {
let mut batch = RaceBatch::new(3);
let mut batch = RaceBatch::new(nonzero!(3_usize));
batch.add(async move {
tokio::time::sleep(Duration::from_secs(100)).await;
Ok(1)
Expand All @@ -507,7 +515,7 @@ mod tests {

#[tokio::test]
async fn race_batch_none() {
let mut batch: RaceBatch<i32> = RaceBatch::new(3);
let mut batch: RaceBatch<i32> = RaceBatch::new(nonzero!(3_usize));
batch.add(async move { anyhow::bail!("kaboom") });
batch.add(async move { anyhow::bail!("banana") });

Expand All @@ -516,7 +524,7 @@ mod tests {

#[tokio::test]
async fn race_batch_semaphore() {
const MAX_JOBS: usize = 30;
const MAX_JOBS: NonZeroUsize = nonzero!(30_usize);
let counter = Arc::new(AtomicUsize::new(0));
let exceeded = Arc::new(AtomicBool::new(false));

Expand All @@ -526,7 +534,7 @@ mod tests {
let e = exceeded.clone();
batch.add(async move {
let prev = c.fetch_add(1, Ordering::Relaxed);
if prev >= MAX_JOBS {
if prev >= MAX_JOBS.get() {
e.fetch_or(true, Ordering::Relaxed);
}

Expand All @@ -543,18 +551,18 @@ mod tests {

#[tokio::test]
async fn race_batch_semaphore_exceeded() {
const MAX_JOBS: usize = 30;
const MAX_JOBS: NonZeroUsize = nonzero!(30_usize);
let counter = Arc::new(AtomicUsize::new(0));
let exceeded = Arc::new(AtomicBool::new(false));

// We add one more job to exceed the limit
let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS + 1);
let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS.checked_add(1).unwrap());
for _ in 0..10000 {
let c = counter.clone();
let e = exceeded.clone();
batch.add(async move {
let prev = c.fetch_add(1, Ordering::Relaxed);
if prev >= MAX_JOBS {
if prev >= MAX_JOBS.get() {
e.fetch_or(true, Ordering::Relaxed);
}

Expand Down
40 changes: 39 additions & 1 deletion src/rpc/methods/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@
// SPDX-License-Identifier: Apache-2.0, MIT

mod types;
use itertools::Itertools;
pub use types::*;

use std::any::Any;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::time::Instant;

use crate::libp2p::chain_exchange::TipsetBundle;
use crate::libp2p::{NetRPCMethods, NetworkMessage, PeerId};
use crate::rpc::types::ApiTipsetKey;
use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError};
use anyhow::{Context as _, Result};
use cid::multibase;
use enumflags2::BitFlags;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools as _;

pub enum NetAddrsListen {}
impl RpcMethod<0> for NetAddrsListen {
Expand Down Expand Up @@ -385,3 +389,37 @@ impl RpcMethod<1> for NetProtectRemove {
Ok(())
}
}

pub enum NetChainExchange {}
impl RpcMethod<3> for NetChainExchange {
const NAME: &'static str = "Forest.NetChainExchange";
const PARAM_NAMES: [&'static str; 3] = ["startTipsetKey", "len", "options"];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
const PERMISSION: Permission = Permission::Admin;
const DESCRIPTION: Option<&'static str> = Some("Internal API for debugging chain exchange.");

type Params = (ApiTipsetKey, u64, u64);
type Ok = String;

async fn handle(
ctx: Ctx<impl Blockstore>,
(tsk, request_len, options): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
let request_len =
NonZeroU64::new(request_len).context("request length must be greater than 0")?;
let tsk = tsk
.0
.unwrap_or_else(|| ctx.chain_store().heaviest_tipset().key().clone());
let timer = Instant::now();
let result: Vec<TipsetBundle> = ctx
.sync_network_context
.handle_chain_exchange_request(None, &tsk, request_len, options, |_| true)
.await?;
Ok(format!(
"fetched {} tipsets, took {}",
result.len(),
humantime::format_duration(timer.elapsed())
))
}
}
1 change: 1 addition & 0 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ macro_rules! for_each_rpc_method {
$callback!($crate::rpc::net::NetProtectList);
$callback!($crate::rpc::net::NetProtectRemove);
$callback!($crate::rpc::net::NetVersion);
$callback!($crate::rpc::net::NetChainExchange);

// node vertical
$callback!($crate::rpc::node::NodeStatus);
Expand Down
Loading