-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Storage changes subscription #464
Conversation
substrate/rpc/src/chain/mod.rs
Outdated
| #[pubsub(name = "chain_newHead")] { | ||
| /// New head subscription | ||
| #[rpc(name = "subscribe_newHead")] | ||
| #[rpc(name = "subscribe_newHead", alias = ["chain_subscribeNewHead", ])] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great. Maybe it is better to have the "old" as the alias and the "new" one as the name?
(e.g. chain_subscribeNewHead is actually probably the preferred one to match since it aligns with other RPCs, my gut tells me the preferred one should be the "default")
dvdplm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff. I can't say I understand it all, but overall very readable and interesting.
substrate/client/src/client.rs
Outdated
|
|
||
| /// Get storage changes event stream. | ||
| /// | ||
| /// Passing `None` as keys subscribes to all possible keys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: …as keys should be …as key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rephrased the whole sentence, hope it's clearer now.
substrate/client/src/client.rs
Outdated
| if let Some(storage_update) = storage_update { | ||
| if let Some((storage_update, changes)) = storage_update { | ||
| transaction.update_storage(storage_update)?; | ||
| // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't we emit a re-org event with some meta data about what changed and let interested subscribers re-fetch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's possible, although it's not easy to get storage changes for blocks that are already imported/executed. Re-fetching changes would mean to re-execute the blocks, but I suppose based on the filter_keys we could just return all the storage values in the re-orged blocks.
| subscribers.extend(listeners.iter()); | ||
| } | ||
|
|
||
| if has_wildcard || listeners.is_some() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't you check for !susbscribers.is_empty() here? Or is it faster to do it this way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually interested only in subscribers for that particular key. So if there is a set of changes:
[(1, Some(2)), (2, None), (3, Some(4))]
but we have no wildcard_listeners and only listener for key=1, changes vector will only contain [(StorageKey(1), Some(StorageData(2))]
| filter: filter.clone(), | ||
| })).is_err() | ||
| }, | ||
| None => false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I read this right, if the subscriber is gone when we get here, then we assume they've been removed properly already, so we're returning false to avoid calling remove_subscriber() again for them? It's a bit unclear to me how they can still be in the subscribers collection though, can you elaborate on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I could actually .expect() here, since if the structure is consistent the subscribers should always be in self.sinks. The check here is superfluous.
I could refactor to:
let &(ref sink, ref filter) = self.sinks.get(&subscriber).expect("subscribers returned from self.listeners are always in self.sinks; qed");
let result = sink.unbounded_send((hash.clone(), StorageChangeSet {
changes: changes.clone(),
filter: filter.clone(),
}));
if result.is_err() {
self.remove_subscriber(subscriber);
}or
if let Some(&(ref sink, ref filter)) = match self.sinks.get(&subscriber) {
let result = sink.unbounded_send((hash.clone(), StorageChangeSet {
changes: changes.clone(),
filter: filter.clone(),
}));
if result.is_err() {
self.remove_subscriber(subscriber);
}
}Which one do you prefer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, actually I can't since .get() borrows immutably, so remove_subscriber has to be outside of the scope.
| assert_eq!(notifications.listeners.len(), 2); | ||
| assert_eq!(notifications.wildcard_listeners.len(), 1); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channels are closed here, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and since the receiving end is dropped, sending to such channel will trigger an error.
| sink | ||
| .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) | ||
| .send_all(stream) | ||
| // we ignore the resulting Stream (if the first stream is over we are unsubscribed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…is over…? Do you mean …is closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is over as in is finished/is done. Which means that the stream will not emit any more items.
| /// Drain committed changes to an iterator. | ||
| /// | ||
| /// Panics: | ||
| /// Will panic if there are any uncommitted prospective changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is "prospective" sort of like "pending"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, those are changes that can still be easily discarded. You can see example of usage inside block builder:
- We run a transaction
- It produces a set of prospective changes
- If we detect that it's somehow invalid we discard the prospective changes
- If we accept the transaction we commit prospective changes.
substrate/client/src/client.rs
Outdated
| if let Some((storage_update, changes)) = storage_update { | ||
| transaction.update_storage(storage_update)?; | ||
| // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? | ||
| self.storage_notifications.lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that it should be called before transaction is committed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Moved the notification after commit and also guarded by the same if as block import notification.
gavofyork
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside from the minor comment
| @@ -0,0 +1,267 @@ | |||
| // Copyright 2017 Parity Technologies (UK) Ltd. | |||
| // This file is part of Polkadot. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Substrate, not Polkadot :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
3d6c0d9 to
e5e7c4c
Compare
* master: Collator for the "adder" (formerly basic-add) parachain and various small fixes (#438) Storage changes subscription (#464) Wasm execution optimizations (#466) Fix the --key generation (#475) Fix typo in service.rs (#472) Fix session phase in early-exit (#453) Make ping unidirectional (#458) Update README.adoc
* Initial implementation of storage events. * Attaching storage events. * Expose storage modification stream over RPC. * Use FNV for hashing small keys. * Fix and add tests. * Swap alias and RPC name. * Fix demo. * Addressing review grumbles. * Fix comment.
Skip plot if it is too small
* Bump release version to v0.18.0 Signed-off-by: Alexandru Vasile <[email protected]> * Update changelog Signed-off-by: Alexandru Vasile <[email protected]> * Update dependency version to v0.18.0 Signed-off-by: Alexandru Vasile <[email protected]> * Modify changelog Signed-off-by: Alexandru Vasile <[email protected]> * Move changelog entries from added to changed
CC @jacogr