diff --git a/Cargo.lock b/Cargo.lock index ecc00436d1ab7..e35f329797074 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3903,7 +3903,10 @@ dependencies = [ "async-trait", "k8s-openapi", "kube", + "mz-ore", "mz-secrets", + "rand", + "tokio", "tracing", ] diff --git a/bin/cluster-dev b/bin/cluster-dev index 976b875a8feb3..d5c2337693f7e 100755 --- a/bin/cluster-dev +++ b/bin/cluster-dev @@ -52,6 +52,13 @@ spec: selector: app: materialized --- +apiVersion: v1 +kind: Secret +metadata: + name: user-managed-secrets + labels: + app: materialized +--- apiVersion: apps/v1 kind: StatefulSet metadata: @@ -79,6 +86,8 @@ spec: - --orchestrator-service-label=materialize.cloud/example1=label1 - --orchestrator-service-label=materialize.cloud/example2=label2 - --kubernetes-image-pull-policy=never + - --user-defined-secret=user-managed-secrets + - --user-defined-secret-mount-path=/secrets - --experimental ports: - containerPort: 6875 @@ -86,6 +95,18 @@ spec: volumeMounts: - name: data mountPath: /data + - mountPath: /secrets + name: secrets-mount + env: + - name: MZ_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + volumes: + - name: secrets-mount + secret: + defaultMode: 400 + secretName: user-managed-secrets volumeClaimTemplates: - metadata: name: data diff --git a/bin/cluster-reset b/bin/cluster-reset index 0d6f4f8387023..825ce8ab9dfa3 100755 --- a/bin/cluster-reset +++ b/bin/cluster-reset @@ -13,3 +13,4 @@ kubectl delete all --context=minikube --all kubectl delete pvc --all +kubectl delete secret --context=minikube --all diff --git a/src/materialized/src/bin/materialized/main.rs b/src/materialized/src/bin/materialized/main.rs index 449dcd50946af..3e9eef23e95c8 100644 --- a/src/materialized/src/bin/materialized/main.rs +++ b/src/materialized/src/bin/materialized/main.rs @@ -141,6 +141,20 @@ pub struct Args { /// production cluster that happens to be the active Kubernetes context.) #[structopt(long, hide = true, default_value = "minikube")] kubernetes_context: String, + /// The name of this pod + #[clap( + long, + hide = true, + env = "MZ_POD_NAME", + required_if_eq("orchestrator", "kubernetes") + )] + pod_name: Option, + /// The name of the Kubernetes secret object to use for storing user secrets + #[structopt(long, hide = true, required_if_eq("orchestrator", "kubernetes"))] + user_defined_secret: Option, + /// The mount location of the Kubernetes secret object to use for storing user secrets + #[structopt(long, hide = true, required_if_eq("orchestrator", "kubernetes"))] + user_defined_secret_mount_path: Option, /// The storaged image reference to use. #[structopt( long, @@ -605,6 +619,7 @@ fn run(args: Args) -> Result<(), anyhow::Error> { .collect(), service_account: args.kubernetes_service_account, image_pull_policy: args.kubernetes_image_pull_policy, + user_defined_secret: args.user_defined_secret.clone().unwrap_or_default(), }) } Orchestrator::Process => { @@ -743,6 +758,9 @@ max log level: {max_log_level}", let secrets_controller = match args.orchestrator { Orchestrator::Kubernetes => SecretsControllerConfig::Kubernetes { context: args.kubernetes_context, + user_defined_secret: args.user_defined_secret.unwrap_or_default(), + user_defined_secret_mount_path: args.user_defined_secret_mount_path.unwrap_or_default(), + refresh_pod_name: args.pod_name.unwrap_or_default(), }, Orchestrator::Process => SecretsControllerConfig::LocalFileSystem, }; diff --git a/src/materialized/src/lib.rs b/src/materialized/src/lib.rs index 2b507fd97cb59..7fadc0f9216b6 100644 --- a/src/materialized/src/lib.rs +++ b/src/materialized/src/lib.rs @@ -48,7 +48,7 @@ use mz_ore::task; use mz_pid_file::PidFile; use mz_secrets::SecretsController; use mz_secrets_filesystem::FilesystemSecretsController; -use mz_secrets_kubernetes::KubernetesSecretsController; +use mz_secrets_kubernetes::{KubernetesSecretsController, KubernetesSecretsControllerConfig}; use crate::mux::Mux; @@ -193,6 +193,9 @@ pub enum SecretsControllerConfig { /// The name of a Kubernetes context to use, if the Kubernetes configuration /// is loaded from the local kubeconfig. context: String, + user_defined_secret: String, + user_defined_secret_mount_path: String, + refresh_pod_name: String, }, } @@ -352,10 +355,22 @@ async fn serve_stash( fs::set_permissions(secrets_storage.clone(), permissions)?; Box::new(FilesystemSecretsController::new(secrets_storage)) } - Some(SecretsControllerConfig::Kubernetes { context }) => Box::new( - KubernetesSecretsController::new(context) - .await - .context("connecting to kubernetes")?, + Some(SecretsControllerConfig::Kubernetes { + context, + user_defined_secret, + user_defined_secret_mount_path, + refresh_pod_name, + }) => Box::new( + KubernetesSecretsController::new( + context, + KubernetesSecretsControllerConfig { + user_defined_secret, + user_defined_secret_mount_path, + refresh_pod_name, + }, + ) + .await + .context("connecting to kubernetes")?, ), }; diff --git a/src/orchestrator-kubernetes/src/lib.rs b/src/orchestrator-kubernetes/src/lib.rs index 917b858aff72b..30237050f21cb 100644 --- a/src/orchestrator-kubernetes/src/lib.rs +++ b/src/orchestrator-kubernetes/src/lib.rs @@ -29,7 +29,6 @@ use kube::ResourceExt; use sha2::{Digest, Sha256}; use mz_orchestrator::{NamespacedOrchestrator, Orchestrator, Service, ServiceConfig}; -use mz_secrets_kubernetes::SECRET_NAME; const FIELD_MANAGER: &str = "materialized"; @@ -47,6 +46,8 @@ pub struct KubernetesOrchestratorConfig { pub service_account: Option, /// The image pull policy to set for services created by the orchestrator. pub image_pull_policy: KubernetesImagePullPolicy, + /// The name of the secret used to store user defined secrets. + pub user_defined_secret: String, } /// Specifies whether Kubernetes should pull Docker images when creating pods. @@ -229,7 +230,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { let secrets_volume = Volume { name: volume_name.clone(), secret: Some(SecretVolumeSource { - secret_name: Some(SECRET_NAME.to_string()), + secret_name: Some(self.config.user_defined_secret.clone()), ..Default::default() }), ..Default::default() diff --git a/src/secrets-kubernetes/Cargo.toml b/src/secrets-kubernetes/Cargo.toml index bac7e2ef8ad7d..9c6f58bf694c4 100644 --- a/src/secrets-kubernetes/Cargo.toml +++ b/src/secrets-kubernetes/Cargo.toml @@ -13,3 +13,6 @@ mz-secrets = { path = "../secrets" } k8s-openapi = { version = "0.14.0", features = ["v1_22"] } kube = { version = "0.71.0", features = ["ws"] } tracing = "0.1.34" +tokio = { version = "1.17.0" } +rand = "0.8.5" +mz-ore = { path = "../ore", default-features = false } diff --git a/src/secrets-kubernetes/src/lib.rs b/src/secrets-kubernetes/src/lib.rs index f740a2b36457a..456e0867d1bf2 100644 --- a/src/secrets-kubernetes/src/lib.rs +++ b/src/secrets-kubernetes/src/lib.rs @@ -7,37 +7,43 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use anyhow::{bail, Error}; +use anyhow::{anyhow, bail, Error}; use async_trait::async_trait; -use k8s_openapi::api::core::v1::Secret; -use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; +use k8s_openapi::api::core::v1::{Pod, Secret}; use k8s_openapi::ByteString; -use kube::api::{Patch, PatchParams, PostParams}; +use kube::api::{Patch, PatchParams}; use kube::config::KubeConfigOptions; -use kube::{Api, Client, Config}; +use kube::{Api, Client, Config, ResourceExt}; +use mz_ore::retry::Retry; use mz_secrets::{SecretOp, SecretsController}; +use rand::random; use std::collections::BTreeMap; -use tracing::{error, info}; +use std::path::{Path, PathBuf}; +use std::time::Duration; +use tokio::io; +use tracing::info; const FIELD_MANAGER: &str = "materialized"; -pub const SECRET_NAME: &str = "user-managed-secrets"; +const POD_ANNOTATION: &str = "materialized.materialize.cloud/secret-refresh"; +const POLL_TIMEOUT: u64 = 120; + +pub struct KubernetesSecretsControllerConfig { + pub user_defined_secret: String, + pub user_defined_secret_mount_path: String, + pub refresh_pod_name: String, +} pub struct KubernetesSecretsController { secret_api: Api, + pod_api: Api, + config: KubernetesSecretsControllerConfig, } impl KubernetesSecretsController { - fn make_secret_with_name(name: String) -> Secret { - Secret { - metadata: ObjectMeta { - name: Some(name), - ..Default::default() - }, - ..Default::default() - } - } - - pub async fn new(context: String) -> Result { + pub async fn new( + context: String, + config: KubernetesSecretsControllerConfig, + ) -> Result { let kubeconfig_options = KubeConfigOptions { context: Some(context), ..Default::default() @@ -53,29 +59,60 @@ impl KubernetesSecretsController { }, }; let client = Client::try_from(kubeconfig)?; - let secret_api: Api = Api::default_namespaced(client); - - let secret = KubernetesSecretsController::make_secret_with_name(SECRET_NAME.to_string()); - match secret_api.create(&PostParams::default(), &secret).await { - Ok(_) => Ok(()), - Err(kube::Error::Api(e)) if e.code == 409 => { - info!("Secret {} already exists", SECRET_NAME.to_string()); - Ok(()) - } - Err(e) => { - error!("creating secret failed: {}", e); - Err(e) - } - }?; + let secret_api: Api = Api::default_namespaced(client.clone()); + let pod_api: Api = Api::default_namespaced(client); + + // ensure that the secret has been created in this environment + secret_api.get(&*config.user_defined_secret).await?; + + if !Path::new(&config.user_defined_secret_mount_path).is_dir() { + bail!( + "Configured secrets location could not be found on filesystem: ({})", + config.user_defined_secret_mount_path + ); + } + + Ok(KubernetesSecretsController { + secret_api, + pod_api, + config, + }) + } + + async fn trigger_resync(&mut self) -> Result<(), Error> { + let mut pod = Pod::default(); + pod.annotations_mut().insert( + String::from(POD_ANNOTATION), + format!("{:x}", random::()), + ); + + self.pod_api + .patch( + &self.config.refresh_pod_name, + &PatchParams::apply(FIELD_MANAGER).force(), + &Patch::Apply(pod), + ) + .await?; - Ok(KubernetesSecretsController { secret_api }) + return Ok(()); + } + + async fn try_exists(path: PathBuf) -> Result { + match tokio::fs::metadata(path).await { + Ok(_) => Ok(true), + Err(x) if x.kind() == io::ErrorKind::NotFound => Ok(false), + Err(x) => Err(Error::from(x)), + } } } #[async_trait] impl SecretsController for KubernetesSecretsController { async fn apply(&mut self, ops: Vec) -> Result<(), Error> { - let mut secret: Secret = self.secret_api.get(&*SECRET_NAME.to_string()).await?; + let mut secret: Secret = self + .secret_api + .get(&*self.config.user_defined_secret) + .await?; let mut data = secret.data.map_or_else(BTreeMap::new, |m| m); @@ -100,12 +137,40 @@ impl SecretsController for KubernetesSecretsController { self.secret_api .patch( - &SECRET_NAME, + &self.config.user_defined_secret, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(secret), ) .await?; + self.trigger_resync().await?; + + // guarantee that all new secrets are reflected on our local filesystem + let secrets_storage_path = + PathBuf::from(self.config.user_defined_secret_mount_path.clone()); + for op in ops.iter() { + match op { + SecretOp::Ensure { id, .. } => { + Retry::default() + .max_duration(Duration::from_secs(POLL_TIMEOUT)) + .retry_async(|_| async { + let file_path = secrets_storage_path.join(format!("{}", id)); + match KubernetesSecretsController::try_exists(file_path).await { + Ok(result) => { + if result { + Ok(()) + } else { + Err(anyhow!("Secret write operation has timed out. Secret with id {} could not be written", id)) + } + } + Err(e) => Err(e) + } + }).await? + } + _ => {} + } + } + return Ok(()); } }