Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coord] New secret guarantees #12334

Merged
merged 10 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions bin/cluster-dev
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ spec:
selector:
app: materialized
---
apiVersion: v1
kind: Secret
metadata:
name: user-managed-secrets
labels:
app: materialized
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
Expand Down Expand Up @@ -79,13 +86,27 @@ spec:
- --orchestrator-service-label=materialize.cloud/example1=label1
- --orchestrator-service-label=materialize.cloud/example2=label2
- --kubernetes-image-pull-policy=never
- --user-defined-secret=user-managed-secrets
- --user-defined-secret-mount-path=/secrets
- --experimental
ports:
- containerPort: 6875
name: sql
volumeMounts:
- name: data
mountPath: /data
- mountPath: /secrets
name: secrets-mount
env:
- name: MZ_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
volumes:
- name: secrets-mount
secret:
defaultMode: 400
secretName: user-managed-secrets
volumeClaimTemplates:
- metadata:
name: data
Expand Down
1 change: 1 addition & 0 deletions bin/cluster-reset
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@

kubectl delete all --context=minikube --all
kubectl delete pvc --all
kubectl delete secret --context=minikube --all
18 changes: 18 additions & 0 deletions src/materialized/src/bin/materialized/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ pub struct Args {
/// production cluster that happens to be the active Kubernetes context.)
#[structopt(long, hide = true, default_value = "minikube")]
kubernetes_context: String,
/// The name of this pod
#[clap(
long,
hide = true,
env = "MZ_POD_NAME",
required_if_eq("orchestrator", "kubernetes")
)]
pod_name: Option<String>,
/// The name of the Kubernetes secret object to use for storing user secrets
#[structopt(long, hide = true, required_if_eq("orchestrator", "kubernetes"))]
user_defined_secret: Option<String>,
/// The mount location of the Kubernetes secret object to use for storing user secrets
#[structopt(long, hide = true, required_if_eq("orchestrator", "kubernetes"))]
user_defined_secret_mount_path: Option<String>,
/// The storaged image reference to use.
#[structopt(
long,
Expand Down Expand Up @@ -605,6 +619,7 @@ fn run(args: Args) -> Result<(), anyhow::Error> {
.collect(),
service_account: args.kubernetes_service_account,
image_pull_policy: args.kubernetes_image_pull_policy,
user_defined_secret: args.user_defined_secret.clone().unwrap_or_default(),
})
}
Orchestrator::Process => {
Expand Down Expand Up @@ -743,6 +758,9 @@ max log level: {max_log_level}",
let secrets_controller = match args.orchestrator {
Orchestrator::Kubernetes => SecretsControllerConfig::Kubernetes {
context: args.kubernetes_context,
user_defined_secret: args.user_defined_secret.unwrap_or_default(),
user_defined_secret_mount_path: args.user_defined_secret_mount_path.unwrap_or_default(),
refresh_pod_name: args.pod_name.unwrap_or_default(),
},
Orchestrator::Process => SecretsControllerConfig::LocalFileSystem,
};
Expand Down
25 changes: 20 additions & 5 deletions src/materialized/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use mz_ore::task;
use mz_pid_file::PidFile;
use mz_secrets::SecretsController;
use mz_secrets_filesystem::FilesystemSecretsController;
use mz_secrets_kubernetes::KubernetesSecretsController;
use mz_secrets_kubernetes::{KubernetesSecretsController, KubernetesSecretsControllerConfig};

use crate::mux::Mux;

Expand Down Expand Up @@ -193,6 +193,9 @@ pub enum SecretsControllerConfig {
/// The name of a Kubernetes context to use, if the Kubernetes configuration
/// is loaded from the local kubeconfig.
context: String,
user_defined_secret: String,
user_defined_secret_mount_path: String,
refresh_pod_name: String,
},
}

Expand Down Expand Up @@ -352,10 +355,22 @@ async fn serve_stash<S: mz_stash::Append + 'static>(
fs::set_permissions(secrets_storage.clone(), permissions)?;
Box::new(FilesystemSecretsController::new(secrets_storage))
}
Some(SecretsControllerConfig::Kubernetes { context }) => Box::new(
KubernetesSecretsController::new(context)
.await
.context("connecting to kubernetes")?,
Some(SecretsControllerConfig::Kubernetes {
context,
user_defined_secret,
user_defined_secret_mount_path,
refresh_pod_name,
}) => Box::new(
KubernetesSecretsController::new(
context,
KubernetesSecretsControllerConfig {
user_defined_secret,
user_defined_secret_mount_path,
refresh_pod_name,
},
)
.await
.context("connecting to kubernetes")?,
),
};

Expand Down
5 changes: 3 additions & 2 deletions src/orchestrator-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use kube::ResourceExt;
use sha2::{Digest, Sha256};

use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, Service, ServiceConfig};
use mz_secrets_kubernetes::SECRET_NAME;

const FIELD_MANAGER: &str = "materialized";

Expand All @@ -47,6 +46,8 @@ pub struct KubernetesOrchestratorConfig {
pub service_account: Option<String>,
/// The image pull policy to set for services created by the orchestrator.
pub image_pull_policy: KubernetesImagePullPolicy,
/// The name of the secret used to store user defined secrets.
pub user_defined_secret: String,
}

/// Specifies whether Kubernetes should pull Docker images when creating pods.
Expand Down Expand Up @@ -229,7 +230,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
let secrets_volume = Volume {
name: volume_name.clone(),
secret: Some(SecretVolumeSource {
secret_name: Some(SECRET_NAME.to_string()),
secret_name: Some(self.config.user_defined_secret.clone()),
..Default::default()
}),
..Default::default()
Expand Down
3 changes: 3 additions & 0 deletions src/secrets-kubernetes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ mz-secrets = { path = "../secrets" }
k8s-openapi = { version = "0.14.0", features = ["v1_22"] }
kube = { version = "0.71.0", features = ["ws"] }
tracing = "0.1.34"
tokio = { version = "1.17.0" }
rand = "0.8.5"
mz-ore = { path = "../ore", default-features = false }
135 changes: 100 additions & 35 deletions src/secrets-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,43 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use anyhow::{bail, Error};
use anyhow::{anyhow, bail, Error};
use async_trait::async_trait;
use k8s_openapi::api::core::v1::Secret;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use k8s_openapi::api::core::v1::{Pod, Secret};
use k8s_openapi::ByteString;
use kube::api::{Patch, PatchParams, PostParams};
use kube::api::{Patch, PatchParams};
use kube::config::KubeConfigOptions;
use kube::{Api, Client, Config};
use kube::{Api, Client, Config, ResourceExt};
use mz_ore::retry::Retry;
use mz_secrets::{SecretOp, SecretsController};
use rand::random;
use std::collections::BTreeMap;
use tracing::{error, info};
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::io;
use tracing::info;

const FIELD_MANAGER: &str = "materialized";
pub const SECRET_NAME: &str = "user-managed-secrets";
const POD_ANNOTATION: &str = "materialized.materialize.cloud/secret-refresh";
const POLL_TIMEOUT: u64 = 120;

pub struct KubernetesSecretsControllerConfig {
pub user_defined_secret: String,
pub user_defined_secret_mount_path: String,
pub refresh_pod_name: String,
}

pub struct KubernetesSecretsController {
secret_api: Api<Secret>,
pod_api: Api<Pod>,
config: KubernetesSecretsControllerConfig,
}

impl KubernetesSecretsController {
fn make_secret_with_name(name: String) -> Secret {
Secret {
metadata: ObjectMeta {
name: Some(name),
..Default::default()
},
..Default::default()
}
}

pub async fn new(context: String) -> Result<KubernetesSecretsController, anyhow::Error> {
pub async fn new(
context: String,
config: KubernetesSecretsControllerConfig,
) -> Result<KubernetesSecretsController, anyhow::Error> {
let kubeconfig_options = KubeConfigOptions {
context: Some(context),
..Default::default()
Expand All @@ -53,29 +59,60 @@ impl KubernetesSecretsController {
},
};
let client = Client::try_from(kubeconfig)?;
let secret_api: Api<Secret> = Api::default_namespaced(client);

let secret = KubernetesSecretsController::make_secret_with_name(SECRET_NAME.to_string());
match secret_api.create(&PostParams::default(), &secret).await {
Ok(_) => Ok(()),
Err(kube::Error::Api(e)) if e.code == 409 => {
info!("Secret {} already exists", SECRET_NAME.to_string());
Ok(())
}
Err(e) => {
error!("creating secret failed: {}", e);
Err(e)
}
}?;
let secret_api: Api<Secret> = Api::default_namespaced(client.clone());
let pod_api: Api<Pod> = Api::default_namespaced(client);

// ensure that the secret has been created in this environment
secret_api.get(&*config.user_defined_secret).await?;

if !Path::new(&config.user_defined_secret_mount_path).is_dir() {
bail!(
"Configured secrets location could not be found on filesystem: ({})",
config.user_defined_secret_mount_path
);
}

Ok(KubernetesSecretsController {
secret_api,
pod_api,
config,
})
}

async fn trigger_resync(&mut self) -> Result<(), Error> {
let mut pod = Pod::default();
pod.annotations_mut().insert(
String::from(POD_ANNOTATION),
format!("{:x}", random::<u64>()),
);

self.pod_api
.patch(
&self.config.refresh_pod_name,
&PatchParams::apply(FIELD_MANAGER).force(),
&Patch::Apply(pod),
)
.await?;

Ok(KubernetesSecretsController { secret_api })
return Ok(());
}

async fn try_exists(path: PathBuf) -> Result<bool, Error> {
match tokio::fs::metadata(path).await {
Ok(_) => Ok(true),
Err(x) if x.kind() == io::ErrorKind::NotFound => Ok(false),
Err(x) => Err(Error::from(x)),
}
}
}

#[async_trait]
impl SecretsController for KubernetesSecretsController {
async fn apply(&mut self, ops: Vec<SecretOp>) -> Result<(), Error> {
let mut secret: Secret = self.secret_api.get(&*SECRET_NAME.to_string()).await?;
let mut secret: Secret = self
.secret_api
.get(&*self.config.user_defined_secret)
.await?;

let mut data = secret.data.map_or_else(BTreeMap::new, |m| m);

Expand All @@ -100,12 +137,40 @@ impl SecretsController for KubernetesSecretsController {

self.secret_api
.patch(
&SECRET_NAME,
&self.config.user_defined_secret,
&PatchParams::apply(FIELD_MANAGER).force(),
&Patch::Apply(secret),
)
.await?;

self.trigger_resync().await?;

// guarantee that all new secrets are reflected on our local filesystem
let secrets_storage_path =
PathBuf::from(self.config.user_defined_secret_mount_path.clone());
for op in ops.iter() {
match op {
SecretOp::Ensure { id, .. } => {
Retry::default()
.max_duration(Duration::from_secs(POLL_TIMEOUT))
.retry_async(|_| async {
let file_path = secrets_storage_path.join(format!("{}", id));
match KubernetesSecretsController::try_exists(file_path).await {
Ok(result) => {
if result {
Ok(())
} else {
Err(anyhow!("Secret write operation has timed out. Secret with id {} could not be written", id))
}
}
Err(e) => Err(e)
}
}).await?
}
_ => {}
}
}

return Ok(());
}
}