Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion demo/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,10 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
let mut runtime = Runtime::new()?;
let _rpc_servers = {
let handler = || {
let state = rpc::apis::state::State::new(client.clone(), runtime.executor());
let chain = rpc::apis::chain::Chain::new(client.clone(), runtime.executor());
let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool), runtime.executor());
rpc::rpc_handler::<Block, _, _, _, _>(client.clone(), chain, author, DummySystem)
rpc::rpc_handler::<Block, _, _, _, _>(state, chain, author, DummySystem)
};
let http_address = "127.0.0.1:9933".parse().unwrap();
let ws_address = "127.0.0.1:9944".parse().unwrap();
Expand Down
1 change: 1 addition & 0 deletions substrate/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["Parity Technologies <[email protected]>"]

[dependencies]
error-chain = "0.12"
fnv = "1.0"
log = "0.3"
parking_lot = "0.4"
triehash = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ impl<Block: BlockT> client::backend::Backend<Block> for Backend<Block> {
}
}

impl<Block: BlockT> client::backend::LocalBackend<Block> for Backend<Block>
impl<Block: BlockT> client::backend::LocalBackend<Block> for Backend<Block>
{}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions substrate/client/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

//! Polkadot Client data backend

use state_machine::backend::Backend as StateBackend;
use error;
use primitives::AuthorityId;
use runtime_primitives::bft::Justification;
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use state_machine::backend::Backend as StateBackend;

/// Block insertion operation. Keeps hold if the inserted block state and data.
pub trait BlockImportOperation<Block: BlockT> {
Expand Down
39 changes: 30 additions & 9 deletions substrate/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use backend::{self, BlockImportOperation};
use blockchain::{self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend as ChainHeaderBackend};
use call_executor::{CallExecutor, LocalCallExecutor};
use executor::{RuntimeVersion, RuntimeInfo};
use notifications::{StorageNotifications, StorageEventStream};
use {error, in_mem, block_builder, runtime_io, bft, genesis};

/// Type that implements `futures::Stream` of block import events.
Expand All @@ -40,6 +41,7 @@ pub type BlockchainEventStream<Block> = mpsc::UnboundedReceiver<BlockImportNotif
pub struct Client<B, E, Block> where Block: BlockT {
backend: Arc<B>,
executor: E,
storage_notifications: Mutex<StorageNotifications<Block>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification<Block>>>>,
import_lock: Mutex<()>,
importing_block: RwLock<Option<Block::Hash>>, // holds the block hash currently being imported. TODO: replace this with block queue
Expand All @@ -49,7 +51,12 @@ pub struct Client<B, E, Block> where Block: BlockT {
/// A source of blockchain evenets.
pub trait BlockchainEvents<Block: BlockT> {
/// Get block import event stream.
fn import_notification_stream(&self) -> mpsc::UnboundedReceiver<BlockImportNotification<Block>>;
fn import_notification_stream(&self) -> BlockchainEventStream<Block>;

/// Get storage changes event stream.
///
/// Passing `None` as `filter_keys` subscribes to all storage changes.
fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result<StorageEventStream<Block::Hash>>;
}

/// Chain head information.
Expand Down Expand Up @@ -182,9 +189,10 @@ impl<B, E, Block> Client<B, E, Block> where
Ok(Client {
backend,
executor,
import_notification_sinks: Mutex::new(Vec::new()),
import_lock: Mutex::new(()),
importing_block: RwLock::new(None),
storage_notifications: Default::default(),
import_notification_sinks: Default::default(),
import_lock: Default::default(),
importing_block: Default::default(),
execution_strategy,
})
}
Expand Down Expand Up @@ -332,7 +340,7 @@ impl<B, E, Block> Client<B, E, Block> where
}

let mut transaction = self.backend.begin_operation(BlockId::Hash(parent_hash))?;
let storage_update = match transaction.state()? {
let (storage_update, storage_changes) = match transaction.state()? {
Some(transaction_state) => {
let mut overlay = Default::default();
let mut r = self.executor.call_at_state(
Expand All @@ -359,9 +367,10 @@ impl<B, E, Block> Client<B, E, Block> where
},
);
let (_, storage_update) = r?;
Some(storage_update)
overlay.commit_prospective();
(Some(storage_update), Some(overlay.into_committed()))
},
None => None,
None => (None, None)
};

let is_new_best = header.number() == &(self.backend.blockchain().info()?.best_number + One::one());
Expand All @@ -373,7 +382,15 @@ impl<B, E, Block> Client<B, E, Block> where
transaction.update_storage(storage_update)?;
}
self.backend.commit_operation(transaction)?;

if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast {

if let Some(storage_changes) = storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.lock()
.trigger(&hash, storage_changes);
}

let notification = BlockImportNotification::<Block> {
hash: hash,
origin: origin,
Expand Down Expand Up @@ -516,16 +533,20 @@ impl<B, E, Block> bft::Authorities<Block> for Client<B, E, Block>

impl<B, E, Block> BlockchainEvents<Block> for Client<B, E, Block>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
/// Get block import event stream.
fn import_notification_stream(&self) -> mpsc::UnboundedReceiver<BlockImportNotification<Block>> {
fn import_notification_stream(&self) -> BlockchainEventStream<Block> {
let (sink, stream) = mpsc::unbounded();
self.import_notification_sinks.lock().push(sink);
stream
}

/// Get storage changes event stream.
fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result<StorageEventStream<Block::Hash>> {
Ok(self.storage_notifications.lock().listen(filter_keys))
}
}

impl<B, E, Block> ChainHead<Block> for Client<B, E, Block>
Expand Down
7 changes: 5 additions & 2 deletions substrate/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extern crate substrate_state_machine as state_machine;
#[macro_use] extern crate slog; // needed until we can reexport `slog_info` from `substrate_telemetry`

extern crate ed25519;
extern crate fnv;
extern crate futures;
extern crate parking_lot;
extern crate triehash;
Expand All @@ -50,13 +51,15 @@ pub mod block_builder;
pub mod light;
mod call_executor;
mod client;
mod notifications;

pub use blockchain::Info as ChainInfo;
pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor};
pub use client::{
new_in_mem,
BlockStatus, BlockOrigin, BlockchainEventStream, BlockchainEvents,
Client, ClientInfo, ChainHead,
ImportResult, JustifiedHeader,
};
pub use blockchain::Info as ChainInfo;
pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor};
pub use notifications::{StorageEventStream, StorageChangeSet};
pub use state_machine::ExecutionStrategy;
6 changes: 5 additions & 1 deletion substrate/client/src/light/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ impl<S, F> Backend<S, F> {
}
}

impl<S, F, Block> ClientBackend<Block> for Backend<S, F> where Block: BlockT, S: BlockchainStorage<Block>, F: Fetcher<Block> {
impl<S, F, Block> ClientBackend<Block> for Backend<S, F> where
Block: BlockT,
S: BlockchainStorage<Block>,
F: Fetcher<Block>,
{
type BlockImportOperation = ImportOperation<Block, F>;
type Blockchain = Blockchain<S, F>;
type State = OnDemandState<Block, F>;
Expand Down
Loading