Skip to content

Commit

Permalink
feat: force-delete (#173)
Browse files Browse the repository at this point in the history
* feat: force-delete option for when the cluster is inaccessible

* feat: test parsing the annotation

* feat: some logging

* fix: status handling

* chore: cleanup tests slightly
  • Loading branch information
zach-robinson-dev authored Nov 13, 2024
1 parent a34141b commit 68cb1bc
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 47 deletions.
144 changes: 97 additions & 47 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,28 +71,38 @@ async fn reconcile_deleted_resource(
Ok(Action::await_change())
}
Err(kube::Error::Api(err)) if err.code == 404 => {
resource_sync.stop_remote_watches_if_watching(ctx).await;
stop_watches_and_remove_resource_sync_finalizers(resource_sync, name, parent_api, ctx)
.await
}
Err(err) => Err(err.into()),
}
}

let patched_finalizers = resource_sync
.finalizers_clone_or_empty()
.with_item_removed(&FINALIZER.to_string());
async fn stop_watches_and_remove_resource_sync_finalizers(
resource_sync: Arc<ResourceSync>,
name: &str,
parent_api: &Api<ResourceSync>,
ctx: Arc<Context>,
) -> Result<Action> {
resource_sync.stop_remote_watches_if_watching(ctx).await;

// Target has been deleted, remove the finalizer from the ResourceSync
let patch = Merge(json!({
"metadata": {
"finalizers": patched_finalizers,
},
}));
let patched_finalizers = resource_sync
.finalizers_clone_or_empty()
.with_item_removed(&FINALIZER.to_string());

parent_api
.patch(name, &PatchParams::default(), &patch)
.await?;
// Target has been deleted, remove the finalizer from the ResourceSync
let patch = Merge(json!({
"metadata": {
"finalizers": patched_finalizers,
},
}));

// We have removed our finalizer, so nothing more needs to be done
Ok(Action::await_change())
}
Err(err) => Err(err.into()),
}
parent_api
.patch(name, &PatchParams::default(), &patch)
.await?;

// We have removed our finalizer, so nothing more needs to be done
Ok(Action::await_change())
}

async fn add_target_finalizer(
Expand Down Expand Up @@ -197,35 +207,13 @@ async fn reconcile(resource_sync: Arc<ResourceSync>, ctx: Arc<Context>) -> Resul
.ok_or(Error::NameRequired)?;
let parent_api = resource_sync.api(ctx.client.clone());

let result = {
let resource_sync = Arc::clone(&resource_sync);

info!(?name, "running reconciler");

debug!(?resource_sync.spec, "got");
let local_ns = resource_sync.namespace().ok_or(Error::NamespaceRequired)?;

let target_api = resource_sync
.spec
.target
.api_for(ctx.client.clone(), &local_ns)
.await?;
let source_api = resource_sync
.spec
.source
.api_for(ctx.client.clone(), &local_ns)
.await?;

match resource_sync {
resource_sync if resource_sync.has_been_deleted() => {
reconcile_deleted_resource(resource_sync, &name, target_api, &parent_api, ctx).await
}
resource_sync if !resource_sync.has_target_finalizer() => {
add_target_finalizer(resource_sync, &name, &parent_api).await
}
_ => reconcile_normally(resource_sync, &name, source_api, target_api, ctx).await,
}
};
let result = reconcile_helper(
Arc::clone(&resource_sync),
Arc::clone(&ctx),
&name,
&parent_api,
)
.await;

let status = match &result {
Err(err) => {
Expand Down Expand Up @@ -258,6 +246,68 @@ async fn reconcile(resource_sync: Arc<ResourceSync>, ctx: Arc<Context>) -> Resul
result
}

async fn reconcile_helper(
resource_sync: Arc<ResourceSync>,
ctx: Arc<Context>,
name: &String,
parent_api: &Api<ResourceSync>,
) -> Result<Action> {
let resource_sync = Arc::clone(&resource_sync);

info!(?name, "running reconciler");

debug!(?resource_sync.spec, "got");
let local_ns = resource_sync.namespace().ok_or(Error::NamespaceRequired)?;

let (source_api, target_api) =
match source_and_target_apis(&resource_sync, &ctx, local_ns).await {
Ok(apis) => apis,
Err(_)
if resource_sync.has_force_delete_option_enabled()
&& resource_sync.has_been_deleted() =>
{
debug!(?name, "force-deleting ResourceSync");
return stop_watches_and_remove_resource_sync_finalizers(
resource_sync,
name,
parent_api,
ctx,
)
.await;
}
Err(err) => return Err(err),
};

match resource_sync {
resource_sync if resource_sync.has_been_deleted() => {
reconcile_deleted_resource(resource_sync, name, target_api, parent_api, ctx).await
}
resource_sync if !resource_sync.has_target_finalizer() => {
add_target_finalizer(resource_sync, name, parent_api).await
}
_ => reconcile_normally(resource_sync, name, source_api, target_api, ctx).await,
}
}

async fn source_and_target_apis(
resource_sync: &Arc<ResourceSync>,
ctx: &Arc<Context>,
local_ns: String,
) -> Result<(NamespacedApi, NamespacedApi)> {
let target_api = resource_sync
.spec
.target
.api_for(ctx.client.clone(), &local_ns)
.await?;
let source_api = resource_sync
.spec
.source
.api_for(ctx.client.clone(), &local_ns)
.await?;

Ok((source_api, target_api))
}

fn sync_failing_transition_time(status: &Option<ResourceSyncStatus>) -> Time {
let now = Time(Utc::now());

Expand Down
48 changes: 48 additions & 0 deletions src/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,22 @@ use kube::{
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

static FORCE_DELETE_ANNOTATION: &str = "sinker.influxdata.io/force-delete";

impl ResourceSync {
pub fn has_force_delete_option_enabled(&self) -> bool {
self.metadata
.annotations
.as_ref()
.map(|annotations| annotations.get(FORCE_DELETE_ANNOTATION))
.unwrap_or_default()
.cloned()
.unwrap_or_default()
.parse()
.unwrap_or_default()
}
}

#[derive(CustomResource, Debug, Serialize, Deserialize, Default, Clone, JsonSchema)]
#[kube(
group = "sinker.influxdata.io",
Expand Down Expand Up @@ -142,3 +158,35 @@ impl SinkerContainer {
crd
}
}

#[cfg(test)]
mod tests {
use super::*;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use rstest::rstest;
use std::collections::BTreeMap;

#[rstest]
#[case::no_annotations(
ResourceSync{metadata: Default::default(),spec: Default::default(),status: None,}, false
)]
#[case::force_delete_annotation_not_present(
ResourceSync{metadata: ObjectMeta{annotations: Some(BTreeMap::new()), ..Default::default()},spec: Default::default(),status: None,}, false
)]
#[case::force_delete_annotation_is_false(
ResourceSync{metadata: ObjectMeta{annotations: Some(BTreeMap::from([(FORCE_DELETE_ANNOTATION.to_string(), false.to_string())])), ..Default::default()},spec: Default::default(),status: None,}, false
)]
#[case::force_delete_annotation_is_other(
ResourceSync{metadata: ObjectMeta{annotations: Some(BTreeMap::from([(FORCE_DELETE_ANNOTATION.to_string(), "other".to_string())])), ..Default::default()},spec: Default::default(),status: None,}, false
)]
#[case::force_delete_annotation_is_true(
ResourceSync{metadata: ObjectMeta{annotations: Some(BTreeMap::from([(FORCE_DELETE_ANNOTATION.to_string(), true.to_string())])), ..Default::default()},spec: Default::default(),status: None,}, true
)]
#[tokio::test]
async fn test_resource_sync_has_force_delete_option_enabled(
#[case] resource_sync: ResourceSync,
#[case] expected: bool,
) {
assert_eq!(resource_sync.has_force_delete_option_enabled(), expected);
}
}

0 comments on commit 68cb1bc

Please sign in to comment.