Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions core/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ fnv = { version = "1.0", optional = true }
log = { version = "0.4", optional = true }
parking_lot = { version = "0.9.0", optional = true }
hex = { package = "hex-literal", version = "0.2", optional = true }
futures-preview = { version = "=0.3.0-alpha.17", optional = true }
futures = { version = "0.1", optional = true }
futures03 = { package = "futures-preview", version = "=0.3.0-alpha.17", features = ["compat"], optional = true }
consensus = { package = "substrate-consensus-common", path = "../consensus/common", optional = true }
executor = { package = "substrate-executor", path = "../executor", optional = true }
state-machine = { package = "substrate-state-machine", path = "../state-machine", optional = true }
Expand Down Expand Up @@ -49,7 +50,8 @@ std = [
"fnv",
"log",
"hex",
"futures-preview",
"futures",
"futures03",
"executor",
"state-machine",
"keyring",
Expand Down
5 changes: 5 additions & 0 deletions core/client/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

//! Substrate Client data backend

use std::sync::Arc;
use std::collections::HashMap;
use crate::error;
use crate::light::blockchain::RemoteBlockchain;
use primitives::ChangesTrieConfiguration;
use sr_primitives::{generic::BlockId, Justification, StorageOverlay, ChildrenStorageOverlay};
use sr_primitives::traits::{Block as BlockT, NumberFor};
Expand Down Expand Up @@ -303,4 +305,7 @@ where
{
/// Returns true if the state for given block is available locally.
fn is_local_state_available(&self, block: &BlockId<Block>) -> bool;
/// Returns reference to blockchain backend that either resolves blockchain data
/// locally, or prepares request to fetch that data from remote node.
fn remote_blockchain(&self) -> Arc<dyn RemoteBlockchain<Block>>;
}
2 changes: 1 addition & 1 deletion core/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
panic::UnwindSafe, result, cell::RefCell, rc::Rc,
};
use log::{info, trace, warn};
use futures::channel::mpsc;
use futures03::channel::mpsc;
use parking_lot::{Mutex, RwLock};
use codec::{Encode, Decode};
use hash_db::{Hasher, Prefix};
Expand Down
4 changes: 4 additions & 0 deletions core/client/src/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,10 @@ where
.map(|num| num.is_zero())
.unwrap_or(false)
}

fn remote_blockchain(&self) -> Arc<dyn crate::light::blockchain::RemoteBlockchain<Block>> {
unimplemented!()
}
}

/// Prunable in-memory changes trie storage.
Expand Down
10 changes: 7 additions & 3 deletions core/client/src/light/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ impl<S, F, Block, H> ClientBackend<Block, H> for Backend<S, F, H> where
impl<S, F, Block, H> RemoteBackend<Block, H> for Backend<S, F, H>
where
Block: BlockT,
S: BlockchainStorage<Block>,
F: Fetcher<Block>,
S: BlockchainStorage<Block> + 'static,
F: Fetcher<Block> + 'static,
H: Hasher<Out=Block::Hash>,
H::Out: Ord,
{
Expand All @@ -242,6 +242,10 @@ where
.map(|num| num.is_zero())
.unwrap_or(false)
}

fn remote_blockchain(&self) -> Arc<dyn crate::light::blockchain::RemoteBlockchain<Block>> {
self.blockchain.clone()
}
}

impl<S, F, Block, H> BlockImportOperation<Block, H> for ImportOperation<Block, S, F, H>
Expand Down Expand Up @@ -358,7 +362,7 @@ where
*self.cached_header.write() = Some(cached_header);
}

futures::executor::block_on(
futures03::executor::block_on(
self.fetcher.upgrade().ok_or(ClientError::NotAvailableOnLightClient)?
.remote_read(RemoteReadRequest {
block: self.block,
Expand Down
112 changes: 84 additions & 28 deletions core/client/src/light/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! Light client blockchain backend. Only stores headers and justifications of recent
//! blocks. CHT roots are stored for headers of ancient blocks.

use std::future::Future;
use std::{sync::{Weak, Arc}, collections::HashMap};
use parking_lot::Mutex;

Expand Down Expand Up @@ -72,6 +73,27 @@ pub trait Storage<Block: BlockT>: AuxStore + BlockchainHeaderBackend<Block> {
fn cache(&self) -> Option<Arc<dyn BlockchainCache<Block>>>;
}

/// Remote header.
#[derive(Debug)]
pub enum LocalOrRemote<Data, Request> {
/// When data is available locally, it is returned.
Local(Data),
/// When data is unavailable locally, the request to fetch it from remote node is returned.
Remote(Request),
/// When data is unknown.
Unknown,
}

/// Futures-based blockchain backend that either resolves blockchain data
/// locally, or fetches required data from remote node.
pub trait RemoteBlockchain<Block: BlockT>: Send + Sync {
/// Get block header.
fn header(&self, id: BlockId<Block>) -> ClientResult<LocalOrRemote<
Block::Header,
RemoteHeaderRequest<Block::Header>,
>>;
}

/// Light client blockchain.
pub struct Blockchain<S, F> {
fetcher: Mutex<Weak<F>>,
Expand Down Expand Up @@ -105,32 +127,10 @@ impl<S, F> Blockchain<S, F> {

impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Block: BlockT, S: Storage<Block>, F: Fetcher<Block> {
fn header(&self, id: BlockId<Block>) -> ClientResult<Option<Block::Header>> {
match self.storage.header(id)? {
Some(header) => Ok(Some(header)),
None => {
let number = match id {
BlockId::Hash(hash) => match self.storage.number(hash)? {
Some(number) => number,
None => return Ok(None),
},
BlockId::Number(number) => number,
};

// if the header is from future or genesis (we never prune genesis) => return
if number.is_zero() || self.storage.status(BlockId::Number(number))? == BlockStatus::Unknown {
return Ok(None);
}

futures::executor::block_on(
self.fetcher().upgrade()
.ok_or(ClientError::NotAvailableOnLightClient)?
.remote_header(RemoteHeaderRequest {
cht_root: self.storage.header_cht_root(cht::size(), number)?,
block: number,
retry_count: None,
})
).map(Some)
}
match RemoteBlockchain::header(self, id)? {
LocalOrRemote::Local(header) => Ok(Some(header)),
LocalOrRemote::Remote(_) => Err(ClientError::NotAvailableOnLightClient),
LocalOrRemote::Unknown => Ok(None),
}
}

Expand All @@ -153,12 +153,12 @@ impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Bloc

impl<S, F, Block> BlockchainBackend<Block> for Blockchain<S, F> where Block: BlockT, S: Storage<Block>, F: Fetcher<Block> {
fn body(&self, id: BlockId<Block>) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
let header = match self.header(id)? {
let header = match BlockchainHeaderBackend::header(self, id)? {
Some(header) => header,
None => return Ok(None),
};

futures::executor::block_on(
futures03::executor::block_on(
self.fetcher().upgrade().ok_or(ClientError::NotAvailableOnLightClient)?
.remote_body(RemoteBodyRequest {
header,
Expand Down Expand Up @@ -194,6 +194,62 @@ impl<S: Storage<Block>, F, Block: BlockT> ProvideCache<Block> for Blockchain<S,
}
}

impl<S, F, Block: BlockT> RemoteBlockchain<Block> for Blockchain<S, F>
where
S: Storage<Block>,
F: Fetcher<Block> + Send + Sync,
{
fn header(&self, id: BlockId<Block>) -> ClientResult<LocalOrRemote<
Block::Header,
RemoteHeaderRequest<Block::Header>,
>> {
// first, try to read header from local storage
if let Some(local_header) = self.storage.header(id)? {
return Ok(LocalOrRemote::Local(local_header));
}

// we need to know block number to check if it's a part of CHT
let number = match id {
BlockId::Hash(hash) => match self.storage.number(hash)? {
Some(number) => number,
None => return Ok(LocalOrRemote::Unknown),
},
BlockId::Number(number) => number,
};

// if the header is genesis (never pruned), non-canonical, or from future => return
if number.is_zero() || self.storage.status(BlockId::Number(number))? == BlockStatus::Unknown {
return Ok(LocalOrRemote::Unknown);
}

Ok(LocalOrRemote::Remote(RemoteHeaderRequest {
cht_root: self.storage.header_cht_root(cht::size(), number)?,
block: number,
retry_count: None,
}))
}
}

/// Returns future that resolves header either locally, or remotely.
pub fn future_header<Block: BlockT, F: Fetcher<Block>>(
blockchain: &dyn RemoteBlockchain<Block>,
fetcher: &F,
id: BlockId<Block>,
) -> impl Future<Output = Result<Option<Block::Header>, ClientError>> {
use futures03::future::{ready, Either, FutureExt};

match blockchain.header(id) {
Ok(LocalOrRemote::Remote(request)) => Either::Left(
fetcher
.remote_header(request)
.then(|header| ready(header.map(Some)))
),
Ok(LocalOrRemote::Unknown) => Either::Right(ready(Ok(None))),
Ok(LocalOrRemote::Local(local_header)) => Either::Right(ready(Ok(Some(local_header)))),
Err(err) => Either::Right(ready(Err(err))),
}
}

#[cfg(test)]
pub mod tests {
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion core/client/src/light/call_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
let block_hash = self.blockchain.expect_block_hash_from_id(id)?;
let block_header = self.blockchain.expect_header(id.clone())?;

futures::executor::block_on(self.fetcher.remote_call(RemoteCallRequest {
futures03::executor::block_on(self.fetcher.remote_call(RemoteCallRequest {
block: block_hash,
header: block_header,
method: method.into(),
Expand Down
16 changes: 8 additions & 8 deletions core/client/src/light/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,15 @@ pub struct RemoteBodyRequest<Header: HeaderT> {
/// is correct (see FetchedDataChecker) and return already checked data.
pub trait Fetcher<Block: BlockT>: Send + Sync {
/// Remote header future.
type RemoteHeaderResult: Future<Output = Result<Block::Header, ClientError>>;
type RemoteHeaderResult: Future<Output = Result<Block::Header, ClientError>> + Send + 'static;
/// Remote storage read future.
type RemoteReadResult: Future<Output = Result<Option<Vec<u8>>, ClientError>>;
type RemoteReadResult: Future<Output = Result<Option<Vec<u8>>, ClientError>> + Send + 'static;
/// Remote call result future.
type RemoteCallResult: Future<Output = Result<Vec<u8>, ClientError>>;
type RemoteCallResult: Future<Output = Result<Vec<u8>, ClientError>> + Send + 'static;
/// Remote changes result future.
type RemoteChangesResult: Future<Output = Result<Vec<(NumberFor<Block>, u32)>, ClientError>>;
type RemoteChangesResult: Future<Output = Result<Vec<(NumberFor<Block>, u32)>, ClientError>> + Send + 'static;
/// Remote block body result future.
type RemoteBodyResult: Future<Output = Result<Vec<Block::Extrinsic>, ClientError>>;
type RemoteBodyResult: Future<Output = Result<Vec<Block::Extrinsic>, ClientError>> + Send + 'static;

/// Fetch remote header.
fn remote_header(&self, request: RemoteHeaderRequest<Block::Header>) -> Self::RemoteHeaderResult;
Expand Down Expand Up @@ -493,7 +493,7 @@ impl<'a, H, Number, Hash> ChangesTrieRootsStorage<H, Number> for RootsStorage<'a

#[cfg(test)]
pub mod tests {
use futures::future::Ready;
use futures03::future::Ready;
use parking_lot::Mutex;
use codec::Decode;
use crate::client::tests::prepare_client_with_key_changes;
Expand Down Expand Up @@ -521,7 +521,7 @@ pub mod tests {
where
E: std::convert::From<&'static str>,
{
futures::future::ready(Err("Not implemented on test node".into()))
futures03::future::ready(Err("Not implemented on test node".into()))
}

impl Fetcher<Block> for OkCallFetcher {
Expand All @@ -544,7 +544,7 @@ pub mod tests {
}

fn remote_call(&self, _request: RemoteCallRequest<Header>) -> Self::RemoteCallResult {
futures::future::ready(Ok((*self.lock()).clone()))
futures03::future::ready(Ok((*self.lock()).clone()))
}

fn remote_changes(&self, _request: RemoteChangesRequest<Header>) -> Self::RemoteChangesResult {
Expand Down
4 changes: 2 additions & 2 deletions core/client/src/light/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ pub fn new_light<B, S, F, GS, RA, E>(
>, B, RA>>
where
B: BlockT<Hash=H256>,
S: BlockchainStorage<B>,
F: Fetcher<B>,
S: BlockchainStorage<B> + 'static,
F: Fetcher<B> + 'static,
GS: BuildStorage,
E: CodeExecutor<Blake2Hasher> + RuntimeInfo,
{
Expand Down
20 changes: 10 additions & 10 deletions core/client/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
};

use fnv::{FnvHashSet, FnvHashMap};
use futures::channel::mpsc;
use futures03::channel::mpsc;
use primitives::storage::{StorageKey, StorageData};
use sr_primitives::traits::Block as BlockT;

Expand Down Expand Up @@ -347,7 +347,7 @@ mod tests {
// given
let mut notifications = StorageNotifications::<Block>::default();
let child_filter = [(StorageKey(vec![4]), None)];
let mut recv = futures::executor::block_on_stream(
let mut recv = futures03::executor::block_on_stream(
notifications.listen(None, Some(&child_filter[..]))
);

Expand Down Expand Up @@ -382,13 +382,13 @@ mod tests {
// given
let mut notifications = StorageNotifications::<Block>::default();
let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))];
let mut recv1 = futures::executor::block_on_stream(
let mut recv1 = futures03::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![1])]), None)
);
let mut recv2 = futures::executor::block_on_stream(
let mut recv2 = futures03::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![2])]), None)
);
let mut recv3 = futures::executor::block_on_stream(
let mut recv3 = futures03::executor::block_on_stream(
notifications.listen(Some(&[]), Some(&child_filter))
);

Expand Down Expand Up @@ -429,16 +429,16 @@ mod tests {
let mut notifications = StorageNotifications::<Block>::default();
{
let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))];
let _recv1 = futures::executor::block_on_stream(
let _recv1 = futures03::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![1])]), None)
);
let _recv2 = futures::executor::block_on_stream(
let _recv2 = futures03::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![2])]), None)
);
let _recv3 = futures::executor::block_on_stream(
let _recv3 = futures03::executor::block_on_stream(
notifications.listen(None, None)
);
let _recv4 = futures::executor::block_on_stream(
let _recv4 = futures03::executor::block_on_stream(
notifications.listen(None, Some(&child_filter))
);
assert_eq!(notifications.listeners.len(), 2);
Expand All @@ -465,7 +465,7 @@ mod tests {
// given
let mut recv = {
let mut notifications = StorageNotifications::<Block>::default();
let recv = futures::executor::block_on_stream(notifications.listen(None, None));
let recv = futures03::executor::block_on_stream(notifications.listen(None, None));

// when
let changeset = vec![];
Expand Down
1 change: 1 addition & 0 deletions core/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ state_machine = { package = "substrate-state-machine", path = "../state-machine"
substrate-executor = { path = "../executor" }
substrate-keystore = { path = "../keystore" }
transaction_pool = { package = "substrate-transaction-pool", path = "../transaction-pool" }
hash-db = { version = "0.15.0", default-features = false }

[dev-dependencies]
assert_matches = "1.1"
Expand Down
Loading