Skip to content

Commit 96053fd

Browse files
authored
feat(runtime): support for owned shared streams (#1483)
We added support for shared streams in the controller via a reflector. There are only two public interfaces available to thread a stream through a controller, `for_shared_stream` and `for_shared_stream_with`. For non-shared streams, the controller can compose with other types of streams that will trigger reconciliation, e.g. streams of owned objects, or custom streams that take mappers out-of-band. For parity, the controller should expose public interfaces to allow the same for shared streams. This change adds four new interfaces: * `owns_shared_stream` * `owns_shared_stream_with` * `watches_shared_stream` * `watches_shared_stream_with` The contract is the same as `for_shared_stream`. Accept a stream of `Arc<K>` and turn it into a reconciliation trigger. --------- Signed-off-by: Matei David <[email protected]>
1 parent 687506f commit 96053fd

File tree

2 files changed

+296
-111
lines changed

2 files changed

+296
-111
lines changed

examples/shared_stream_controllers.rs

+31-110
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
use std::{sync::Arc, time::Duration};
22

33
use futures::StreamExt;
4-
use k8s_openapi::api::core::v1::{Pod, PodCondition};
4+
use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod};
55
use kube::{
6-
api::{Patch, PatchParams},
7-
runtime::{
8-
controller::Action,
9-
reflector::{self},
10-
watcher, Config, Controller, WatchStreamExt,
11-
},
6+
runtime::{controller::Action, reflector, watcher, Config, Controller, WatchStreamExt},
127
Api, Client, ResourceExt,
138
};
149
use tracing::{debug, error, info, warn};
@@ -24,107 +19,31 @@ pub mod condition {
2419
const SUBSCRIBE_BUFFER_SIZE: usize = 256;
2520

2621
#[derive(Debug, Error)]
27-
enum Error {
28-
#[error("Failed to patch pod: {0}")]
29-
WriteFailed(#[source] kube::Error),
30-
31-
#[error("Missing po field: {0}")]
32-
MissingField(&'static str),
33-
}
34-
35-
#[derive(Clone)]
36-
struct Data {
37-
client: Client,
38-
}
39-
40-
/// A simple reconciliation function that will copy a pod's labels into the annotations.
41-
async fn reconcile_metadata(pod: Arc<Pod>, ctx: Arc<Data>) -> Result<Action, Error> {
42-
let namespace = &pod.namespace().unwrap_or_default();
43-
if namespace == "kube-system" {
44-
return Ok(Action::await_change());
45-
}
46-
47-
let mut pod = (*pod).clone();
48-
pod.metadata.managed_fields = None;
49-
// combine labels and annotations into a new map
50-
let labels = pod.labels().clone().into_iter();
51-
pod.annotations_mut().extend(labels);
52-
53-
let pod_api = Api::<Pod>::namespaced(
54-
ctx.client.clone(),
55-
pod.metadata
56-
.namespace
57-
.as_ref()
58-
.ok_or_else(|| Error::MissingField(".metadata.name"))?,
59-
);
60-
61-
pod_api
62-
.patch(
63-
&pod.name_any(),
64-
&PatchParams::apply("controller-1"),
65-
&Patch::Apply(&pod),
66-
)
67-
.await
68-
.map_err(Error::WriteFailed)?;
69-
70-
Ok(Action::requeue(Duration::from_secs(300)))
22+
enum Infallible {}
23+
24+
// A generic reconciler that can be used with any object whose type is known at
25+
// compile time. Will simply log its kind on reconciliation.
26+
async fn reconcile<K>(_obj: Arc<K>, _ctx: Arc<()>) -> Result<Action, Infallible>
27+
where
28+
K: ResourceExt<DynamicType = ()>,
29+
{
30+
let kind = K::kind(&());
31+
info!("Reconciled {kind}");
32+
Ok(Action::await_change())
7133
}
7234

73-
/// Another reconiliation function that will add an 'UndocumentedPort' condition to pods that do
74-
/// do not have any ports declared across all containers.
75-
async fn reconcile_status(pod: Arc<Pod>, ctx: Arc<Data>) -> Result<Action, Error> {
76-
for container in pod.spec.clone().unwrap_or_default().containers.iter() {
77-
if container.ports.clone().unwrap_or_default().len() != 0 {
78-
debug!(name = %pod.name_any(), "Skipped updating pod with documented ports");
79-
return Ok(Action::await_change());
80-
}
81-
}
82-
83-
let pod_api = Api::<Pod>::namespaced(
84-
ctx.client.clone(),
85-
pod.metadata
86-
.namespace
87-
.as_ref()
88-
.ok_or_else(|| Error::MissingField(".metadata.name"))?,
89-
);
90-
91-
let undocumented_condition = PodCondition {
92-
type_: condition::UNDOCUMENTED_TYPE.into(),
93-
status: condition::STATUS_TRUE.into(),
94-
..Default::default()
95-
};
96-
let value = serde_json::json!({
97-
"status": {
98-
"name": pod.name_any(),
99-
"kind": "Pod",
100-
"conditions": vec![undocumented_condition]
101-
}
102-
});
103-
pod_api
104-
.patch_status(
105-
&pod.name_any(),
106-
&PatchParams::apply("controller-2"),
107-
&Patch::Strategic(value),
108-
)
109-
.await
110-
.map_err(Error::WriteFailed)?;
111-
112-
Ok(Action::requeue(Duration::from_secs(300)))
113-
}
114-
115-
fn error_policy(obj: Arc<Pod>, error: &Error, _ctx: Arc<Data>) -> Action {
35+
fn error_policy<K: ResourceExt>(obj: Arc<K>, error: &Infallible, _ctx: Arc<()>) -> Action {
11636
error!(%error, name = %obj.name_any(), "Failed reconciliation");
11737
Action::requeue(Duration::from_secs(10))
11838
}
11939

12040
#[tokio::main]
12141
async fn main() -> anyhow::Result<()> {
12242
tracing_subscriber::fmt::init();
123-
12443
let client = Client::try_default().await?;
125-
let pods = Api::<Pod>::namespaced(client.clone(), "default");
44+
45+
let pods = Api::<Pod>::all(client.clone());
12646
let config = Config::default().concurrency(2);
127-
let ctx = Arc::new(Data { client });
12847

12948
// Create a shared store with a predefined buffer that will be shared between subscribers.
13049
let (reader, writer) = reflector::store_shared(SUBSCRIBE_BUFFER_SIZE);
@@ -146,30 +65,32 @@ async fn main() -> anyhow::Result<()> {
14665
}
14766
});
14867

149-
// Create the first controller using the reconcile_metadata function. Controllers accept
150-
// subscribers through a dedicated interface.
151-
let metadata_controller = Controller::for_shared_stream(subscriber.clone(), reader)
68+
// Create the first controller; the controller will log whenever it
69+
// reconciles a pod. The reconcile is a no-op.
70+
// Controllers accept subscribers through a dedicated interface.
71+
let pod_controller = Controller::for_shared_stream(subscriber.clone(), reader)
15272
.with_config(config.clone())
15373
.shutdown_on_signal()
154-
.run(reconcile_metadata, error_policy, ctx.clone())
74+
.run(reconcile, error_policy, Arc::new(()))
15575
.for_each(|res| async move {
15676
match res {
157-
Ok(v) => info!("Reconciled metadata {v:?}"),
77+
Ok(v) => debug!("Reconciled pod {v:?}"),
15878
Err(error) => warn!(%error, "Failed to reconcile metadata"),
15979
}
16080
});
16181

162-
// Subscribers can be used to get a read handle on the store, if the initial handle has been
163-
// moved or dropped.
164-
let reader = subscriber.reader();
165-
// Create the second controller using the reconcile_status function.
166-
let status_controller = Controller::for_shared_stream(subscriber, reader)
82+
// Create the second controller; the controller will log whenever it
83+
// reconciles a deployment. Any changes to a pod will trigger a
84+
// reconciliation to the owner (a deployment). Reconciliations are no-op.
85+
let deploys = Api::<Deployment>::all(client.clone());
86+
let deploy_controller = Controller::new(deploys, Default::default())
16787
.with_config(config)
88+
.owns_shared_stream(subscriber)
16889
.shutdown_on_signal()
169-
.run(reconcile_status, error_policy, ctx)
90+
.run(reconcile, error_policy, Arc::new(()))
17091
.for_each(|res| async move {
17192
match res {
172-
Ok(v) => info!("Reconciled status {v:?}"),
93+
Ok(v) => debug!("Reconciled deployment {v:?}"),
17394
Err(error) => warn!(%error, "Failed to reconcile status"),
17495
}
17596
});
@@ -179,7 +100,7 @@ async fn main() -> anyhow::Result<()> {
179100
//
180101
// Both controllers will operate on shared objects.
181102
tokio::select! {
182-
_ = futures::future::join(metadata_controller, status_controller) => {},
103+
_ = futures::future::join(pod_controller, deploy_controller) => {},
183104
_ = pod_watch => {}
184105
}
185106

0 commit comments

Comments
 (0)