Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const PREFERED_SERVICE_NAMESPACE: &str = "PREFERED_SERVICE_NAMESPACE";
const CA_ADDRESS: &str = "CA_ADDRESS";
const SECRET_TTL: &str = "SECRET_TTL";
const FAKE_CA: &str = "FAKE_CA";
const CA_CERT_WATCHER: &str = "CA_CERT_WATCHER";
const ZTUNNEL_WORKER_THREADS: &str = "ZTUNNEL_WORKER_THREADS";
const ZTUNNEL_CPU_LIMIT: &str = "ZTUNNEL_CPU_LIMIT";
const POOL_MAX_STREAMS_PER_CONNECTION: &str = "POOL_MAX_STREAMS_PER_CONNECTION";
Expand Down Expand Up @@ -237,6 +238,10 @@ pub struct Config {
pub ca_address: Option<String>,
/// Root cert for CA TLS verification.
pub ca_root_cert: RootCert,
/// if true (default), a file watcher is started on the CA root cert folder
/// and the gRPC channel is rebuilt automatically on cert rotation.
/// Only effective when ca_root_cert is a file path.
pub ca_cert_watcher: bool,
// Allow custom alternative CA hostname verification
pub alt_ca_hostname: Option<String>,
/// XDS address to use. If unset, XDS will not be used.
Expand Down Expand Up @@ -560,6 +565,7 @@ pub fn construct_config(pc: ProxyConfig) -> Result<Config, Error> {
} else {
RootCert::Static(Bytes::from(ca_root_cert_provider))
};
let ca_cert_watcher = parse_default(CA_CERT_WATCHER, true)?;

let auth = match parse::<String>(TOKEN_PROVIDER_ENV)? {
None => {
Expand Down Expand Up @@ -794,6 +800,7 @@ pub fn construct_config(pc: ProxyConfig) -> Result<Config, Error> {
prefered_service_namespace,
ca_address,
ca_root_cert,
ca_cert_watcher,
alt_xds_hostname: parse(ALT_XDS_HOSTNAME)?,
alt_ca_hostname: parse(ALT_CA_HOSTNAME)?,

Expand Down
122 changes: 111 additions & 11 deletions src/identity/caclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use async_trait::async_trait;
use prost_types::Struct;
Expand All @@ -21,45 +22,119 @@ use tonic::IntoRequest;
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue};
use tracing::{debug, error, instrument, warn};

use crate::config::RootCert;
use crate::identity::Error;
use crate::identity::auth::AuthSource;
use crate::identity::manager::Identity;
use crate::tls::{self, TlsGrpcChannel};
use crate::tls::{self, RootCertManager, TlsGrpcChannel, control_plane_client_config};
use crate::xds::istio::ca::IstioCertificateRequest;
use crate::xds::istio::ca::istio_certificate_service_client::IstioCertificateServiceClient;

pub struct CaClient {
pub client: IstioCertificateServiceClient<TlsGrpcChannel>,
pub client: std::sync::RwLock<IstioCertificateServiceClient<TlsGrpcChannel>>,
/// gRPC endpoint of istiod ( retained to be able to "rebuild_channel")
address: String,
/// alternate hostname for TLS SNI / certificate verification
alt_hostname: Option<String>,
/// Token source for authorization header on gRPC requests
auth: AuthSource,
/// Signals when CA root cert file on disk has changed and TLS channel needs to be rebuilt
root_cert_manager: Option<Arc<RootCertManager>>,
pub enable_impersonated_identity: bool,
pub secret_ttl: i64,
ca_headers: Vec<(AsciiMetadataKey, AsciiMetadataValue)>,
}

impl CaClient {
#[allow(clippy::too_many_arguments)]
pub async fn new(
address: String,
alt_hostname: Option<String>,
cert_provider: Box<dyn tls::ControlPlaneClientCertProvider>,
root_cert: RootCert,
enable_ca_cert_watcher: bool,
auth: AuthSource,
enable_impersonated_identity: bool,
secret_ttl: i64,
ca_headers: Vec<(AsciiMetadataKey, AsciiMetadataValue)>,
) -> Result<CaClient, Error> {
let svc =
tls::grpc_connector(address, auth, cert_provider.fetch_cert(alt_hostname).await?)?;
let client = IstioCertificateServiceClient::new(svc);
//Build initial TLS ClientConfig from the current root cert
let client_config = control_plane_client_config(&root_cert, alt_hostname.clone()).await?;
let tls_grpc_channel = tls::grpc_connector(address.clone(), auth.clone(), client_config)?;
let initial_client = IstioCertificateServiceClient::new(tls_grpc_channel);

let root_cert_manager = (enable_ca_cert_watcher && matches!(root_cert, RootCert::File(_)))
.then(|| RootCertManager::new(root_cert.clone()))
.transpose()?;

Ok(CaClient {
client,
client: std::sync::RwLock::new(initial_client),
address,
alt_hostname,
auth,
root_cert_manager,
enable_impersonated_identity,
secret_ttl,
ca_headers,
})
}

/// Rebuilds gRPC client using the current root cert from disk.
///
/// Reads the cert file again and recreates gRPC channel using the new info stored in `CaClient`
///
/// Returns an error if:
/// - cert file is missing or malformed
/// - gRPC connector cannot be created.
async fn rebuild_channel(
&self,
) -> Result<IstioCertificateServiceClient<TlsGrpcChannel>, Error> {
let root_cert = self
.root_cert_manager
.as_ref()
.expect("rebuild_channel requires root_cert_manager to be Some")
.root_cert();

let client_config =
control_plane_client_config(&root_cert, self.alt_hostname.clone()).await?;
let tls_grpc_channel =
tls::grpc_connector(self.address.clone(), self.auth.clone(), client_config)?;
Ok(IstioCertificateServiceClient::new(tls_grpc_channel))
}
}

impl CaClient {
#[instrument(skip_all)]
async fn fetch_certificate(&self, id: &Identity) -> Result<tls::WorkloadCertificate, Error> {
// Hot reload check
// If the cert has changed since last call, rebuild the channel before making the request.
// If rebuild fails:
// - log a warning
// - rearm the flag to try again on next call attempt
// - fail through the existing channel
if let Some(ref manager) = self.root_cert_manager
&& manager.take_dirty()
{
match self.rebuild_channel().await {
Ok(new_client) => {
// Replace current client
let t = std::time::Instant::now();
*self.client.write().unwrap() = new_client;
debug!(
write_lock_wait_ms = t.elapsed().as_millis(),
"TLS channel rebuild after CA root cert rotation"
);
}
Err(e) => {
warn!(error = %e, "failed to rebuild TLS channel after root cert rotation; retaining old channel, will retry on next fetch");
// rearm so the next fetch_certificate call tries again
manager.mark_dirty();
}
}
}

// Clone the client *before* releasing the RwLock so it is not carried across awaits
let client = self.client.read().unwrap().clone();

let cs = tls::csr::CsrOptions {
san: id.to_string(),
}
Expand Down Expand Up @@ -93,8 +168,7 @@ impl CaClient {
}
});

let resp = self
.client
let resp = client
.clone()
.create_certificate(req.into_request())
.await
Expand Down Expand Up @@ -256,13 +330,16 @@ pub mod mock {
#[cfg(test)]
mod tests {

use std::time::Duration;
use std::{io::Write, time::Duration};

use matches::assert_matches;
use tempfile::NamedTempFile;

use crate::{
config::RootCert,
identity::{Error, Identity},
test_helpers, tls,
test_helpers,
tls::{self, RootCertManager, mock::TEST_ROOT},
xds::istio::ca::IstioCertificateResponse,
};

Expand Down Expand Up @@ -315,4 +392,27 @@ mod tests {
.await;
assert_matches!(res, Ok(_));
}

#[test]
fn root_cert_manager_dirty_flag_api() {
let mut root_file = NamedTempFile::new().unwrap();
root_file.write_all(TEST_ROOT).unwrap();

let manager = RootCertManager::new(RootCert::File(root_file.path().to_path_buf()))
.expect("RootCertManager must be created");

assert!(!manager.take_dirty(), "new manager should not be dirty");

// simulate file watcher trigger
manager.mark_dirty();

assert!(
manager.take_dirty(),
"take_dirty must return true when dirty"
);
assert!(
!manager.take_dirty(),
"take_dirty must return false after reset"
);
}
}
5 changes: 2 additions & 3 deletions src/identity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,8 @@ impl SecretManager {
.clone()
.expect("ca_address must be set to use CA"),
cfg.alt_ca_hostname.clone(),
Box::new(tls::ControlPlaneAuthentication::RootCert(
cfg.ca_root_cert.clone(),
)),
cfg.ca_root_cert.clone(),
cfg.ca_cert_watcher,
cfg.auth.clone(),
cfg.proxy_mode == ProxyMode::Shared,
cfg.secret_ttl.as_secs().try_into().unwrap_or(60 * 60 * 24),
Expand Down
3 changes: 2 additions & 1 deletion src/test_helpers/ca.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ impl CaServer {
let client = CaClient::new(
"https://".to_string() + &server_addr.to_string(),
None,
Box::new(tls::ControlPlaneAuthentication::RootCert(root_cert)),
root_cert,
false,
AuthSource::Token(
PathBuf::from(r"src/test_helpers/fake-jwt"),
"Kubernetes".to_string(),
Expand Down
Loading