Skip to content

Commit 724f58d

Browse files
conorbrostaiki-e
authored andcommitted
Add push_front and push_back to FuturesOrdered (#2591)
1 parent eb223d3 commit 724f58d

File tree

4 files changed

+96
-4
lines changed

4 files changed

+96
-4
lines changed

futures-util/src/stream/futures_ordered.rs

+30-2
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,39 @@ impl<Fut: Future> FuturesOrdered<Fut> {
135135
/// This function will not call `poll` on the submitted future. The caller
136136
/// must ensure that `FuturesOrdered::poll` is called in order to receive
137137
/// task notifications.
138+
#[deprecated(note = "use `push_back` instead")]
138139
pub fn push(&mut self, future: Fut) {
140+
self.push_back(future);
141+
}
142+
143+
/// Pushes a future to the back of the queue.
144+
///
145+
/// This function submits the given future to the internal set for managing.
146+
/// This function will not call `poll` on the submitted future. The caller
147+
/// must ensure that `FuturesOrdered::poll` is called in order to receive
148+
/// task notifications.
149+
pub fn push_back(&mut self, future: Fut) {
139150
let wrapped = OrderWrapper { data: future, index: self.next_incoming_index };
140151
self.next_incoming_index += 1;
141152
self.in_progress_queue.push(wrapped);
142153
}
154+
155+
/// Pushes a future to the front of the queue.
156+
///
157+
/// This function submits the given future to the internal set for managing.
158+
/// This function will not call `poll` on the submitted future. The caller
159+
/// must ensure that `FuturesOrdered::poll` is called in order to receive
160+
/// task notifications. This future will be the next future to be returned
161+
/// complete.
162+
pub fn push_front(&mut self, future: Fut) {
163+
if self.next_outgoing_index == 0 {
164+
self.push_back(future)
165+
} else {
166+
let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
167+
self.next_outgoing_index -= 1;
168+
self.in_progress_queue.push(wrapped);
169+
}
170+
}
143171
}
144172

145173
impl<Fut: Future> Default for FuturesOrdered<Fut> {
@@ -196,7 +224,7 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
196224
{
197225
let acc = Self::new();
198226
iter.into_iter().fold(acc, |mut acc, item| {
199-
acc.push(item);
227+
acc.push_back(item);
200228
acc
201229
})
202230
}
@@ -214,7 +242,7 @@ impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
214242
I: IntoIterator<Item = Fut>,
215243
{
216244
for item in iter {
217-
self.push(item);
245+
self.push_back(item);
218246
}
219247
}
220248
}

futures-util/src/stream/stream/buffered.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ where
6464
// our queue of futures.
6565
while this.in_progress_queue.len() < *this.max {
6666
match this.stream.as_mut().poll_next(cx) {
67-
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
67+
Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut),
6868
Poll::Ready(None) | Poll::Pending => break,
6969
}
7070
}

futures-util/src/stream/try_stream/try_buffered.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ where
5454
// our queue of futures. Propagate errors from the stream immediately.
5555
while this.in_progress_queue.len() < *this.max {
5656
match this.stream.as_mut().poll_next(cx)? {
57-
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()),
57+
Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()),
5858
Poll::Ready(None) | Poll::Pending => break,
5959
}
6060
}

futures/tests/stream_futures_ordered.rs

+64
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use futures::channel::oneshot;
22
use futures::executor::{block_on, block_on_stream};
33
use futures::future::{self, join, Future, FutureExt, TryFutureExt};
44
use futures::stream::{FuturesOrdered, StreamExt};
5+
use futures::task::Poll;
56
use futures_test::task::noop_context;
67
use std::any::Any;
78

@@ -45,6 +46,69 @@ fn works_2() {
4546
assert!(stream.poll_next_unpin(&mut cx).is_ready());
4647
}
4748

49+
#[test]
50+
fn test_push_front() {
51+
let (a_tx, a_rx) = oneshot::channel::<i32>();
52+
let (b_tx, b_rx) = oneshot::channel::<i32>();
53+
let (c_tx, c_rx) = oneshot::channel::<i32>();
54+
let (d_tx, d_rx) = oneshot::channel::<i32>();
55+
56+
let mut stream = FuturesOrdered::new();
57+
58+
let mut cx = noop_context();
59+
60+
stream.push_back(a_rx);
61+
stream.push_back(b_rx);
62+
stream.push_back(c_rx);
63+
64+
a_tx.send(1).unwrap();
65+
b_tx.send(2).unwrap();
66+
c_tx.send(3).unwrap();
67+
68+
// 1 and 2 should be received in order
69+
assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
70+
assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
71+
72+
stream.push_front(d_rx);
73+
d_tx.send(4).unwrap();
74+
75+
// we pushed `d_rx` to the front and sent 4, so we should recieve 4 next
76+
// and then 3 after it
77+
assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
78+
assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
79+
}
80+
81+
#[test]
82+
fn test_push_back() {
83+
let (a_tx, a_rx) = oneshot::channel::<i32>();
84+
let (b_tx, b_rx) = oneshot::channel::<i32>();
85+
let (c_tx, c_rx) = oneshot::channel::<i32>();
86+
let (d_tx, d_rx) = oneshot::channel::<i32>();
87+
88+
let mut stream = FuturesOrdered::new();
89+
90+
let mut cx = noop_context();
91+
92+
stream.push_back(a_rx);
93+
stream.push_back(b_rx);
94+
stream.push_back(c_rx);
95+
96+
a_tx.send(1).unwrap();
97+
b_tx.send(2).unwrap();
98+
c_tx.send(3).unwrap();
99+
100+
// All results should be received in order
101+
102+
assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
103+
assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
104+
105+
stream.push_back(d_rx);
106+
d_tx.send(4).unwrap();
107+
108+
assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
109+
assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
110+
}
111+
48112
#[test]
49113
fn from_iterator() {
50114
let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]

0 commit comments

Comments
 (0)