Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion demo/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,10 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
let mut runtime = Runtime::new()?;
let _rpc_servers = {
let handler = || {
let state = rpc::apis::chain::State::new(client.clone(), runtime.executor());
let chain = rpc::apis::chain::Chain::new(client.clone(), runtime.executor());
let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool), runtime.executor());
rpc::rpc_handler::<Block, _, _, _, _>(client.clone(), chain, author, DummySystem)
rpc::rpc_handler::<Block, _, _, _, _>(state, chain, author, DummySystem)
};
let http_address = "127.0.0.1:9933".parse().unwrap();
let ws_address = "127.0.0.1:9944".parse().unwrap();
Expand Down
1 change: 1 addition & 0 deletions substrate/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["Parity Technologies <[email protected]>"]

[dependencies]
error-chain = "0.12"
fnv = "1.0"
log = "0.3"
parking_lot = "0.4"
triehash = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ impl<Block: BlockT> client::backend::Backend<Block> for Backend<Block> {
}
}

impl<Block: BlockT> client::backend::LocalBackend<Block> for Backend<Block>
impl<Block: BlockT> client::backend::LocalBackend<Block> for Backend<Block>
{}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions substrate/client/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

//! Polkadot Client data backend

use state_machine::backend::Backend as StateBackend;
use error;
use primitives::AuthorityId;
use runtime_primitives::bft::Justification;
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use state_machine::backend::Backend as StateBackend;

/// Block insertion operation. Keeps hold if the inserted block state and data.
pub trait BlockImportOperation<Block: BlockT> {
Expand Down
32 changes: 24 additions & 8 deletions substrate/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use backend::{self, BlockImportOperation};
use blockchain::{self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend as ChainHeaderBackend};
use call_executor::{CallExecutor, LocalCallExecutor};
use executor::{RuntimeVersion, RuntimeInfo};
use notifications::{StorageNotifications, StorageEventStream};
use {error, in_mem, block_builder, runtime_io, bft, genesis};

/// Type that implements `futures::Stream` of block import events.
Expand All @@ -40,6 +41,7 @@ pub type BlockchainEventStream<Block> = mpsc::UnboundedReceiver<BlockImportNotif
pub struct Client<B, E, Block> where Block: BlockT {
backend: Arc<B>,
executor: E,
storage_notifications: Mutex<StorageNotifications<Block>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification<Block>>>>,
import_lock: Mutex<()>,
importing_block: RwLock<Option<Block::Hash>>, // holds the block hash currently being imported. TODO: replace this with block queue
Expand All @@ -49,7 +51,12 @@ pub struct Client<B, E, Block> where Block: BlockT {
/// A source of blockchain evenets.
pub trait BlockchainEvents<Block: BlockT> {
/// Get block import event stream.
fn import_notification_stream(&self) -> mpsc::UnboundedReceiver<BlockImportNotification<Block>>;
fn import_notification_stream(&self) -> BlockchainEventStream<Block>;

/// Get storage changes event stream.
///
/// Passing `None` as keys subscribes to all possible keys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: …as keys should be …as key

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrased the whole sentence, hope it's clearer now.

fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result<StorageEventStream<Block::Hash>>;
}

/// Chain head information.
Expand Down Expand Up @@ -182,9 +189,10 @@ impl<B, E, Block> Client<B, E, Block> where
Ok(Client {
backend,
executor,
import_notification_sinks: Mutex::new(Vec::new()),
import_lock: Mutex::new(()),
importing_block: RwLock::new(None),
storage_notifications: Default::default(),
import_notification_sinks: Default::default(),
import_lock: Default::default(),
importing_block: Default::default(),
execution_strategy,
})
}
Expand Down Expand Up @@ -359,7 +367,8 @@ impl<B, E, Block> Client<B, E, Block> where
},
);
let (_, storage_update) = r?;
Some(storage_update)
overlay.commit_prospective();
Some((storage_update, overlay.into_committed()))
},
None => None,
};
Expand All @@ -369,8 +378,11 @@ impl<B, E, Block> Client<B, E, Block> where
let unchecked: bft::UncheckedJustification<_> = justification.uncheck().into();
transaction.set_block_data(header.clone(), body, Some(unchecked.into()), is_new_best)?;
transaction.update_authorities(authorities);
if let Some(storage_update) = storage_update {
if let Some((storage_update, changes)) = storage_update {
transaction.update_storage(storage_update)?;
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we emit a re-org event with some meta data about what changed and let interested subscribers re-fetch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's possible, although it's not easy to get storage changes for blocks that are already imported/executed. Re-fetching changes would mean to re-execute the blocks, but I suppose based on the filter_keys we could just return all the storage values in the re-orged blocks.

self.storage_notifications.lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that it should be called before transaction is committed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Moved the notification after commit and also guarded by the same if as block import notification.

.trigger(&hash, changes);
}
self.backend.commit_operation(transaction)?;
if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast {
Expand Down Expand Up @@ -516,16 +528,20 @@ impl<B, E, Block> bft::Authorities<Block> for Client<B, E, Block>

impl<B, E, Block> BlockchainEvents<Block> for Client<B, E, Block>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
/// Get block import event stream.
fn import_notification_stream(&self) -> mpsc::UnboundedReceiver<BlockImportNotification<Block>> {
fn import_notification_stream(&self) -> BlockchainEventStream<Block> {
let (sink, stream) = mpsc::unbounded();
self.import_notification_sinks.lock().push(sink);
stream
}

/// Get storage changes event stream.
fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result<StorageEventStream<Block::Hash>> {
Ok(self.storage_notifications.lock().listen(filter_keys))
}
}

impl<B, E, Block> ChainHead<Block> for Client<B, E, Block>
Expand Down
7 changes: 5 additions & 2 deletions substrate/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extern crate substrate_state_machine as state_machine;
#[macro_use] extern crate slog; // needed until we can reexport `slog_info` from `substrate_telemetry`

extern crate ed25519;
extern crate fnv;
extern crate futures;
extern crate parking_lot;
extern crate triehash;
Expand All @@ -50,13 +51,15 @@ pub mod block_builder;
pub mod light;
mod call_executor;
mod client;
mod notifications;

pub use blockchain::Info as ChainInfo;
pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor};
pub use client::{
new_in_mem,
BlockStatus, BlockOrigin, BlockchainEventStream, BlockchainEvents,
Client, ClientInfo, ChainHead,
ImportResult, JustifiedHeader,
};
pub use blockchain::Info as ChainInfo;
pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor};
pub use notifications::{StorageEventStream, StorageChangeSet};
pub use state_machine::ExecutionStrategy;
6 changes: 5 additions & 1 deletion substrate/client/src/light/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ impl<S, F> Backend<S, F> {
}
}

impl<S, F, Block> ClientBackend<Block> for Backend<S, F> where Block: BlockT, S: BlockchainStorage<Block>, F: Fetcher<Block> {
impl<S, F, Block> ClientBackend<Block> for Backend<S, F> where
Block: BlockT,
S: BlockchainStorage<Block>,
F: Fetcher<Block>,
{
type BlockImportOperation = ImportOperation<Block, F>;
type Blockchain = Blockchain<S, F>;
type State = OnDemandState<Block, F>;
Expand Down
Loading