Skip to content

Commit 5afa31e

Browse files
danrspencerclux
andauthored
Runtime: Add WatchStreamExt::subscribe (#1131)
* Added stream_subscribe which allows watch streams to have additional event subscribers Signed-off-by: Dan Spencer <[email protected]> * StreamSubscribe now wraps items in arcs so that: 1. Remove expensive cloning of Kubernetes objects 2. Allow propogation of err events Signed-off-by: Dan Spencer <[email protected]> * Renamed watch_ext subscribable to subscribe Co-authored-by: Eirik A <[email protected]> Signed-off-by: Dan Spencer <[email protected]> * StreamSubscribe now allows subscribers how to handle lagging Signed-off-by: Dan Spencer <[email protected]> * Fixed clippy errors in StreamSubscribe Signed-off-by: Dan Spencer <[email protected]> * Fixed grammar in StreamSubscribe docs Signed-off-by: Dan Spencer <[email protected]> * Fixed rustfmt errors in StreamSubscribe Signed-off-by: Dan Spencer <[email protected]> * Improved the documentation for WatchStreamExt::stream_subscribe method. Also renamed WatchStreamExt::subscribe to WatchStreamExt::stream_subscribe. The compiler was unable to tell if we were trying to call WatchStreamExt::subscribe or StreamSubscribe::subscribe when they were named the same. e.g. this code would not compile: let stream_subscribe = stream.subscribe(); let subscription = stream_subscribe.subscribe(); Signed-off-by: Dan Spencer <[email protected]> * Put StreamSubscribe behind a feature flag unstable_runtime_subscribe Signed-off-by: Dan Spencer <[email protected]> * Update kube-runtime/src/utils/mod.rs Co-authored-by: Eirik A <[email protected]> Signed-off-by: Dan Spencer <[email protected]> * Fixed rustfmt error in kube-runtime utils mod.rs Signed-off-by: Dan Spencer <[email protected]> * Fixed incorrect feature flag usage for the unstable-runtime-subscribe feature Signed-off-by: Dan Spencer <[email protected]> * Made substream_subscribe pub so that its error can be accessed / matched on by consumers Signed-off-by: Dan Spencer <[email protected]> * Fixed issue with doctest for stream_subscribe Signed-off-by: Dan Spencer <[email protected]> --------- Signed-off-by: Dan Spencer <[email protected]> Co-authored-by: Eirik A <[email protected]>
1 parent e23b076 commit 5afa31e

File tree

5 files changed

+318
-2
lines changed

5 files changed

+318
-2
lines changed

kube-runtime/Cargo.toml

+5-1
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@ categories = ["web-programming::http-client", "caching", "network-programming"]
1414
rust-version = "1.63.0"
1515
edition = "2021"
1616

17+
[features]
18+
unstable-runtime = ["unstable-runtime-subscribe"]
19+
unstable-runtime-subscribe = []
20+
1721
[package.metadata.docs.rs]
18-
features = ["k8s-openapi/v1_26"]
22+
features = ["k8s-openapi/v1_26", "unstable-runtime"]
1923
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
2024
rustdoc-args = ["--cfg", "docsrs"]
2125

kube-runtime/src/utils/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
mod backoff_reset_timer;
44
mod event_flatten;
55
mod stream_backoff;
6+
#[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe;
67
mod watch_ext;
78

89
pub use backoff_reset_timer::ResetTimerBackoff;
910
pub use event_flatten::EventFlatten;
1011
pub use stream_backoff::StreamBackoff;
12+
#[cfg(feature = "unstable-runtime-subscribe")]
13+
pub use stream_subscribe::StreamSubscribe;
1114
pub use watch_ext::WatchStreamExt;
1215

1316
use futures::{
+243
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
use core::{
2+
pin::Pin,
3+
task::{Context, Poll},
4+
};
5+
use futures::{stream, Stream};
6+
use pin_project::pin_project;
7+
use std::{fmt, sync::Arc};
8+
use tokio::sync::{broadcast, broadcast::error::RecvError};
9+
10+
const CHANNEL_CAPACITY: usize = 128;
11+
12+
/// Exposes the [`StreamSubscribe::subscribe()`] method which allows additional
13+
/// consumers of events from a stream without consuming the stream itself.
14+
///
15+
/// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`]
16+
/// error. The subscriber can then decide to abort its task or tolerate the lost events.
17+
///
18+
/// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams
19+
/// will also end.
20+
///
21+
/// ## Warning
22+
///
23+
/// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams
24+
/// will never receive any events.
25+
#[pin_project]
26+
#[must_use = "subscribers will not get events unless this stream is polled"]
27+
pub struct StreamSubscribe<S>
28+
where
29+
S: Stream,
30+
{
31+
#[pin]
32+
stream: S,
33+
sender: broadcast::Sender<Option<Arc<S::Item>>>,
34+
}
35+
36+
impl<S: Stream> StreamSubscribe<S> {
37+
pub fn new(stream: S) -> Self {
38+
let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);
39+
40+
Self { stream, sender }
41+
}
42+
43+
/// Subscribe to events from this stream
44+
#[must_use = "streams do nothing unless polled"]
45+
pub fn subscribe(&self) -> impl Stream<Item = Result<Arc<S::Item>, Error>> {
46+
stream::unfold(self.sender.subscribe(), |mut rx| async {
47+
match rx.recv().await {
48+
Ok(Some(obj)) => Some((Ok(obj), rx)),
49+
Err(RecvError::Lagged(amt)) => Some((Err(Error::Lagged(amt)), rx)),
50+
_ => None,
51+
}
52+
})
53+
}
54+
}
55+
56+
impl<S: Stream> Stream for StreamSubscribe<S> {
57+
type Item = Arc<S::Item>;
58+
59+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60+
let this = self.project();
61+
let item = this.stream.poll_next(cx);
62+
63+
match item {
64+
Poll::Ready(Some(item)) => {
65+
let item = Arc::new(item);
66+
this.sender.send(Some(item.clone())).ok();
67+
Poll::Ready(Some(item))
68+
}
69+
Poll::Ready(None) => {
70+
this.sender.send(None).ok();
71+
Poll::Ready(None)
72+
}
73+
Poll::Pending => Poll::Pending,
74+
}
75+
}
76+
}
77+
78+
/// An error returned from the inner stream of a [`StreamSubscribe`].
79+
#[derive(Debug, PartialEq, Eq, Clone)]
80+
pub enum Error {
81+
/// The subscriber lagged too far behind. Polling again will return
82+
/// the oldest event still retained.
83+
///
84+
/// Includes the number of skipped events.
85+
Lagged(u64),
86+
}
87+
88+
impl fmt::Display for Error {
89+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90+
match self {
91+
Error::Lagged(amt) => write!(f, "subscriber lagged by {amt}"),
92+
}
93+
}
94+
}
95+
96+
impl std::error::Error for Error {}
97+
98+
#[cfg(test)]
99+
mod tests {
100+
use super::*;
101+
use futures::{pin_mut, poll, stream, StreamExt};
102+
103+
#[tokio::test]
104+
async fn stream_subscribe_continues_to_propagate_values() {
105+
let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]);
106+
let rx = StreamSubscribe::new(rx);
107+
108+
pin_mut!(rx);
109+
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(0)))));
110+
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(1)))));
111+
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Err(2)))));
112+
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(3)))));
113+
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(4)))));
114+
assert_eq!(poll!(rx.next()), Poll::Ready(None));
115+
}
116+
117+
#[tokio::test]
118+
async fn all_subscribers_get_events() {
119+
let events = [Ok(0), Ok(1), Err(2), Ok(3), Ok(4)];
120+
let rx = stream::iter(events);
121+
let rx = StreamSubscribe::new(rx);
122+
123+
let rx_s1 = rx.subscribe();
124+
let rx_s2 = rx.subscribe();
125+
126+
pin_mut!(rx);
127+
pin_mut!(rx_s1);
128+
pin_mut!(rx_s2);
129+
130+
// Subscribers are pending until we start consuming the stream
131+
assert_eq!(poll!(rx_s1.next()), Poll::Pending, "rx_s1");
132+
assert_eq!(poll!(rx_s2.next()), Poll::Pending, "rx_s2");
133+
134+
for item in events {
135+
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(item))), "rx");
136+
let expected = Poll::Ready(Some(Ok(Arc::new(item))));
137+
assert_eq!(poll!(rx_s1.next()), expected, "rx_s1");
138+
assert_eq!(poll!(rx_s2.next()), expected, "rx_s2");
139+
}
140+
141+
// Ensure that if the stream is closed, all subscribers are closed
142+
assert_eq!(poll!(rx.next()), Poll::Ready(None), "rx");
143+
assert_eq!(poll!(rx_s1.next()), Poll::Ready(None), "rx_s1");
144+
assert_eq!(poll!(rx_s2.next()), Poll::Ready(None), "rx_s2");
145+
}
146+
147+
#[tokio::test]
148+
async fn subscribers_can_catch_up_to_the_main_stream() {
149+
let events = (0..CHANNEL_CAPACITY).map(Ok::<_, ()>).collect::<Vec<_>>();
150+
let rx = stream::iter(events.clone());
151+
let rx = StreamSubscribe::new(rx);
152+
153+
let rx_s1 = rx.subscribe();
154+
155+
pin_mut!(rx);
156+
pin_mut!(rx_s1);
157+
158+
for item in events.clone() {
159+
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(item))), "rx",);
160+
}
161+
162+
for item in events {
163+
assert_eq!(
164+
poll!(rx_s1.next()),
165+
Poll::Ready(Some(Ok(Arc::new(item)))),
166+
"rx_s1"
167+
);
168+
}
169+
}
170+
171+
#[tokio::test]
172+
async fn if_the_subscribers_lag_they_get_a_lagged_error_as_the_next_event() {
173+
// The broadcast channel rounds the capacity up to the next power of two.
174+
let max_capacity = CHANNEL_CAPACITY.next_power_of_two();
175+
let overflow = 5;
176+
let events = (0..max_capacity + overflow).collect::<Vec<_>>();
177+
let rx = stream::iter(events.clone());
178+
let rx = StreamSubscribe::new(rx);
179+
180+
let rx_s1 = rx.subscribe();
181+
182+
pin_mut!(rx);
183+
pin_mut!(rx_s1);
184+
185+
// Consume the entire stream, overflowing the inner channel
186+
for _ in events {
187+
rx.next().await;
188+
}
189+
190+
assert_eq!(
191+
poll!(rx_s1.next()),
192+
Poll::Ready(Some(Err(Error::Lagged(overflow as u64)))),
193+
);
194+
195+
let expected_next_event = overflow;
196+
assert_eq!(
197+
poll!(rx_s1.next()),
198+
Poll::Ready(Some(Ok(Arc::new(expected_next_event)))),
199+
);
200+
}
201+
202+
#[tokio::test]
203+
async fn a_lagging_subscriber_does_not_impact_a_well_behaved_subscriber() {
204+
// The broadcast channel rounds the capacity up to the next power of two.
205+
let max_capacity = CHANNEL_CAPACITY.next_power_of_two();
206+
let overflow = 5;
207+
let events = (0..max_capacity + overflow).collect::<Vec<_>>();
208+
let rx = stream::iter(events.clone());
209+
let rx = StreamSubscribe::new(rx);
210+
211+
let rx_s1 = rx.subscribe();
212+
let rx_s2 = rx.subscribe();
213+
214+
pin_mut!(rx);
215+
pin_mut!(rx_s1);
216+
pin_mut!(rx_s2);
217+
218+
for event in events {
219+
assert_eq!(poll!(rx_s1.next()), Poll::Pending, "rx_s1");
220+
221+
rx.next().await;
222+
223+
assert_eq!(
224+
poll!(rx_s1.next()),
225+
Poll::Ready(Some(Ok(Arc::new(event)))),
226+
"rx_s1"
227+
);
228+
}
229+
230+
assert_eq!(
231+
poll!(rx_s2.next()),
232+
Poll::Ready(Some(Err(Error::Lagged(overflow as u64)))),
233+
"rx_s2"
234+
);
235+
236+
let expected_next_event = overflow;
237+
assert_eq!(
238+
poll!(rx_s2.next()),
239+
Poll::Ready(Some(Ok(Arc::new(expected_next_event)))),
240+
"rx_s2"
241+
);
242+
}
243+
}

kube-runtime/src/utils/watch_ext.rs

+66-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
#[cfg(feature = "unstable-runtime-subscribe")]
2+
use crate::utils::stream_subscribe::StreamSubscribe;
13
use crate::{
24
utils::{event_flatten::EventFlatten, stream_backoff::StreamBackoff},
35
watcher,
46
};
57
use backoff::backoff::Backoff;
6-
78
use futures::{Stream, TryStream};
89

910
/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector)
@@ -36,5 +37,69 @@ pub trait WatchStreamExt: Stream {
3637
{
3738
EventFlatten::new(self, true)
3839
}
40+
41+
/// Create a [`StreamSubscribe`] from a [`watcher()`] stream.
42+
///
43+
/// The [`StreamSubscribe::subscribe()`] method which allows additional consumers
44+
/// of events from a stream without consuming the stream itself.
45+
///
46+
/// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`]
47+
/// error. The subscriber can then decide to abort its task or tolerate the lost events.
48+
///
49+
/// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams
50+
/// will also end.
51+
///
52+
/// ## Warning
53+
///
54+
/// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams
55+
/// will never receive any events.
56+
///
57+
/// # Usage
58+
///
59+
/// ```
60+
/// use futures::{Stream, StreamExt};
61+
/// use std::{fmt::Debug, sync::Arc};
62+
/// use kube_runtime::{watcher, WatchStreamExt};
63+
///
64+
/// fn explain_events<K, S>(
65+
/// stream: S,
66+
/// ) -> (
67+
/// impl Stream<Item = Arc<Result<watcher::Event<K>, watcher::Error>>> + Send + Sized + 'static,
68+
/// impl Stream<Item = String> + Send + Sized + 'static,
69+
/// )
70+
/// where
71+
/// K: Debug + Send + Sync + 'static,
72+
/// S: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
73+
/// {
74+
/// // Create a stream that can be subscribed to
75+
/// let stream_subscribe = stream.stream_subscribe();
76+
/// // Create a subscription to that stream
77+
/// let subscription = stream_subscribe.subscribe();
78+
///
79+
/// // Create a stream of descriptions of the events
80+
/// let explain_stream = subscription.filter_map(|event| async move {
81+
/// // We don't care about lagged events so we can throw that error away
82+
/// match event.ok()?.as_ref() {
83+
/// Ok(watcher::Event::Applied(event)) => {
84+
/// Some(format!("An object was added or modified: {event:?}"))
85+
/// }
86+
/// Ok(_) => todo!("explain other events"),
87+
/// // We don't care about watcher errors either
88+
/// Err(_) => None,
89+
/// }
90+
/// });
91+
///
92+
/// // We now still have the original stream, and a secondary stream of explanations
93+
/// (stream_subscribe, explain_stream)
94+
/// }
95+
/// ```
96+
#[cfg(feature = "unstable-runtime-subscribe")]
97+
fn stream_subscribe<K>(self) -> StreamSubscribe<Self>
98+
where
99+
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
100+
{
101+
StreamSubscribe::new(self)
102+
}
39103
}
104+
40105
impl<St: ?Sized> WatchStreamExt for St where St: Stream {}

kube/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ admission = ["kube-core/admission"]
2828
derive = ["kube-derive", "kube-core/schema"]
2929
config = ["kube-client/config"]
3030
runtime = ["kube-runtime"]
31+
unstable-runtime = ["kube-runtime/unstable-runtime"]
3132

3233
[package.metadata.docs.rs]
3334
features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/v1_26"]

0 commit comments

Comments
 (0)