diff --git a/kube-runtime/src/controller/future_hash_map.rs b/kube-runtime/src/controller/future_hash_map.rs index 92dc7fced..c2d62b064 100644 --- a/kube-runtime/src/controller/future_hash_map.rs +++ b/kube-runtime/src/controller/future_hash_map.rs @@ -6,8 +6,9 @@ use std::{ task::{Context, Poll}, }; -/// Variant of [`tokio::stream::StreamMap`] that only runs [`Future`]s, and uses a [`HashMap`] as -/// the backing store, giving O(1) insertion and membership checks. +/// Variant of [`tokio_stream::StreamMap`](https://docs.rs/tokio-stream/0.1.3/tokio_stream/struct.StreamMap.html) +/// that only runs [`Future`]s, and uses a [`HashMap`] as the backing store, giving (amortized) O(1) insertion +/// and membership checks. /// /// Just like for `StreamMap`'s `S`, `F` must be [`Unpin`], since [`HashMap`] is free to move /// entries as it pleases (for example: resizing the backing array). diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 18deff895..0c129f885 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -1,3 +1,5 @@ +//! Runs a user-supplied reconciler function on objects when they (or related objects) are updated + use self::runner::Runner; use crate::{ reflector::{ @@ -55,7 +57,7 @@ pub struct ReconcilerAction { pub requeue_after: Option, } -/// Helper for building custom trigger filters, see [`trigger_self`] and [`trigger_owners`] for some examples. +/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples. pub fn trigger_with( stream: S, mapper: impl Fn(T) -> I, @@ -127,14 +129,15 @@ impl Context { /// Apply a reconciler to an input stream, with a given retry policy /// -/// Takes a `store` parameter for the main object which should be updated by a `reflector`. +/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector`]. /// -/// The `queue` is a source of external events that trigger the reconciler, -/// usually taken from a `reflector` and then passed through a trigger function such as -/// [`trigger_self`]. +/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be +/// the [`reflector`] (piped through [`trigger_self`]). If your core objects own any subobjects then you +/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector`] +/// with a [`watcher`](watcher()) or [`reflector`](reflector()) for the subobject. /// /// This is the "hard-mode" version of [`Controller`], which allows you some more customization -/// (such as triggering from arbitrary `Stream`s), at the cost of some more verbosity. +/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose. pub fn applier( mut reconciler: impl FnMut(K, Context) -> ReconcilerFut, mut error_policy: impl FnMut(&ReconcilerFut::Error, Context) -> ReconcilerAction, @@ -290,6 +293,7 @@ where /// /// Configure `ListParams` and `Api` so you only get reconcile events /// for the correct `Api` scope (cluster/all/namespaced), or `ListParams` subset + #[must_use] pub fn new(owned_api: Api, lp: ListParams) -> Self { let writer = Writer::::default(); let reader = writer.as_reader(); diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index 9b29ab725..25617afe2 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -8,8 +8,12 @@ use std::{ task::{Context, Poll}, }; -/// Pulls messages from a [`Scheduler`], and runs an action for each message in parallel, -/// while making sure to not run the same message multiple times at once. +/// Pulls items from a [`Scheduler`], and runs an action for each item in parallel, +/// while making sure to not process [equal](`Eq`) items multiple times at once. +/// +/// If an item is to be emitted from the [`Scheduler`] while an equal item is +/// already being processed then it will be held pending until the current item +/// is finished. #[pin_project] pub struct Runner { #[pin] diff --git a/kube-runtime/src/lib.rs b/kube-runtime/src/lib.rs index 3bf189ff3..c21607ff3 100644 --- a/kube-runtime/src/lib.rs +++ b/kube-runtime/src/lib.rs @@ -1,11 +1,12 @@ -//! Crate with kubernetes runtime components +//! Common components for building Kubernetes operators //! //! This crate contains the core building blocks to allow users to build //! controllers/operators/watchers that need to synchronize/reconcile kubernetes //! state. //! -//! Newcomers should generally get started with the [`Controller`] builder, which manages -//! all state internals for you. +//! Newcomers are recommended to start with the [`Controller`] builder, which gives an +//! opinionated starting point that should be appropriate for simple operators, but all +//! components are designed to be usable รก la carte if your operator doesn't quite fit that mold. #![deny(unsafe_code)] #![deny(clippy::all)] diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index a2ec05ff2..e433ad4db 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -1,3 +1,5 @@ +//! Caches objects in memory + mod object_ref; pub mod store; diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index 64a999e43..ad7798e54 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -1,3 +1,5 @@ +//! Delays and deduplicates [`Stream`] items + use futures::{ stream::{Fuse, FusedStream}, Stream, StreamExt, @@ -113,9 +115,8 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { ); if can_take_message(&msg) { break Poll::Ready(Some(Ok(msg))); - } else { - self.pending.insert(msg); } + self.pending.insert(msg); } Poll::Ready(Some(Err(err))) => break Poll::Ready(Some(Err(err))), Poll::Ready(None) => { @@ -184,7 +185,7 @@ where /// no messages will be lost, even if it is reconstructed on each call to [`poll_next`](Self::poll_next). /// In fact, this is often desirable, to avoid long-lived borrows in `can_take_message`'s closure. /// - /// NOTE: `can_take_message` should be considered fairly performance-sensitive, since + /// NOTE: `can_take_message` should be considered to be fairly performance-sensitive, since /// it will generally be executed for each pending message, for each [`poll_next`](Self::poll_next). pub fn hold_unless bool>(self: Pin<&mut Self>, can_take_message: C) -> HoldUnless { HoldUnless { @@ -212,11 +213,14 @@ where } } -/// Stream transformer that takes a message and `Instant` (in the form of a `ScheduleRequest`), and emits -/// the message at the specified `Instant`. +/// Stream transformer that delays and deduplicates [`Stream`] items. +/// +/// Items are deduplicated: if an item is submitted multiple times before being emitted then it will only be +/// emitted at the earliest `Instant`. /// -/// Objects are de-duplicated: if a message is submitted twice before being emitted then it will only be -/// emitted at the earlier of the two `Instant`s. +/// Items can be "held pending" if the item doesn't match some predicate. Items trying to schedule an item +/// that is already pending will be discarded (since it is already going to be emitted as soon as the consumer +/// is ready for it). pub fn scheduler>>(requests: S) -> Scheduler { Scheduler::new(requests) } diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 7e976659b..44a6cc631 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -1,3 +1,5 @@ +//! Watches a Kubernetes Resource for changes, with error recovery + use derivative::Derivative; use futures::{stream::BoxStream, Stream, StreamExt}; use kube::{ @@ -35,11 +37,11 @@ pub enum Error { pub type Result = std::result::Result; #[derive(Debug, Clone)] -/// Watch events returned from the `Watcher` +/// Watch events returned from the [`watcher`] pub enum Event { - /// A resource was added or modified + /// An object was added or modified Applied(K), - /// A resource was deleted + /// An object was deleted /// /// NOTE: This should not be used for managing persistent state elsewhere, since /// events may be lost if the watcher is unavailable. Use Finalizers instead. @@ -47,6 +49,9 @@ pub enum Event { /// The watch stream was restarted, so `Deleted` events may have been missed /// /// Should be used as a signal to replace the store contents atomically. + /// + /// Any objects that were previously [`Applied`](Event::Applied) but are not listed in this event + /// should be assumed to have been [`Deleted`](Event::Deleted). Restarted(Vec), } @@ -80,9 +85,9 @@ impl Event { #[derive(Derivative)] #[derivative(Debug)] -/// The internal finite state machine driving the [`Watcher`](struct.Watcher.html) +/// The internal finite state machine driving the [`watcher`] enum State { - /// The Watcher is empty, and the next poll() will start the initial LIST to get all existing objects + /// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects Empty, /// The initial LIST was successful, so we should move on to starting the actual watch. InitListed { resource_version: String }, @@ -183,11 +188,13 @@ async fn step( /// Watches a Kubernetes Resource for changes continuously /// -/// Creates an indefinite read stream through continual [`Api::watch`] calls, and keeping track -/// of [returned resource versions](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes). -/// It tries to recover (by reconnecting and resyncing as required) if polled again after an error. -/// However, keep in mind that most terminal `TryStream` combinators (such as `TryFutureExt::try_for_each` -/// and `TryFutureExt::try_concat` will terminate eagerly if an `Error` reaches them. +/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors. +/// +/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll. +/// You can apply your own backoff by not polling the stream for a duration after errors. +/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as +/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat)) +/// will terminate eagerly as soon as they receive an [`Err`]. /// /// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`], /// direct users may want to flatten composite events with [`try_flatten_applied`]: @@ -217,9 +224,20 @@ async fn step( /// /// # Migration from `kube::runtime` /// -/// This is similar to the legacy `kube::runtime::Informer`, or the watching half of client-go's `Reflector`. +/// This is similar to the legacy [`kube::runtime::Informer`], or the watching half of client-go's `Reflector`. /// Renamed to avoid confusion with client-go's `Informer` (which watches a `Reflector` for updates, rather /// the Kubernetes API). +/// +/// # Recovery +/// +/// (The details of recovery are considered an implementation detail and should not be relied on to be stable, but are +/// documented here for posterity.) +/// +/// If the watch connection is interrupted then we attempt to restart the watch using the last +/// [resource versions](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes) +/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off. +/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with +/// an [`Event::Restarted`]. pub fn watcher( api: Api, list_params: ListParams,