Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions core/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ mod tests;
use std::{sync::Arc, convert::TryInto};

use client::{self, Client};
use crate::rpc::futures::{Sink, Stream, Future};
use crate::rpc::futures::{Sink, Future};
use crate::subscriptions::Subscriptions;
use futures03::{StreamExt as _, compat::Compat};
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use log::warn;
Expand Down Expand Up @@ -249,7 +250,7 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
self.subscriptions.add(subscriber, move |sink| {
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(watcher.into_stream().map(Ok))
.send_all(Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))))
.map(|_| ())
})
}
Expand Down
1 change: 1 addition & 0 deletions core/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ impl<Components: components::Components> Service<Components> {
let network = Arc::downgrade(&network);
let transaction_pool_ = transaction_pool.clone();
let events = transaction_pool.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |_| {
if let Some(network) = network.upgrade() {
network.trigger_repropagate();
Expand Down
1 change: 0 additions & 1 deletion core/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ edition = "2018"

[dependencies]
derive_more = "0.14.0"
futures = "0.1"
log = "0.4"
codec = { package = "parity-scale-codec", version = "1.0.0" }
parking_lot = "0.9.0"
Expand Down
2 changes: 1 addition & 1 deletion core/transaction-pool/graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2018"

[dependencies]
derive_more = "0.14.0"
futures = "0.1"
futures-preview = "=0.3.0-alpha.17"
log = "0.4"
parking_lot = "0.9.0"
serde = { version = "1.0", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion core/transaction-pool/graph/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::watcher::Watcher;
use serde::Serialize;
use log::debug;

use futures::sync::mpsc;
use futures::channel::mpsc;
use parking_lot::{Mutex, RwLock};
use sr_primitives::{
generic::BlockId,
Expand Down
7 changes: 3 additions & 4 deletions core/transaction-pool/graph/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use futures::{
Stream,
sync::mpsc,
channel::mpsc,
};
use serde::{Serialize, Deserialize};

Expand Down Expand Up @@ -60,9 +60,8 @@ impl<H, H2> Watcher<H, H2> {
/// Pipe the notifications to given sink.
///
/// Make sure to drive the future to completion.
pub fn into_stream(self) -> impl Stream<Item=Status<H, H2>, Error=()> {
// we can safely ignore the error here, `UnboundedReceiver` never fails.
self.receiver.map_err(|_| ())
pub fn into_stream(self) -> impl Stream<Item=Status<H, H2>> {
self.receiver
}
}

Expand Down