Skip to content
Merged
123 changes: 112 additions & 11 deletions src/identity/caclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use async_trait::async_trait;
use prost_types::Struct;
Expand All @@ -21,15 +22,24 @@ use tonic::IntoRequest;
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue};
use tracing::{debug, error, instrument, warn};

use crate::config::RootCert;
use crate::identity::Error;
use crate::identity::auth::AuthSource;
use crate::identity::manager::Identity;
use crate::tls::{self, TlsGrpcChannel};
use crate::tls::{self, RootCertManager, TlsGrpcChannel, control_plane_client_config};
use crate::xds::istio::ca::IstioCertificateRequest;
use crate::xds::istio::ca::istio_certificate_service_client::IstioCertificateServiceClient;

pub struct CaClient {
pub client: IstioCertificateServiceClient<TlsGrpcChannel>,
pub client: std::sync::RwLock<IstioCertificateServiceClient<TlsGrpcChannel>>,
/// gRPC endpoint of istiod ( retained to be able to "rebuild_channel")
address: String,
/// alternate hostname for TLS SNI / certificate verification
alt_hostname: Option<String>,
/// Token source for authorization header on gRPC requests
auth: AuthSource,
/// Signals when CA root cert file on disk has changed and TLS channel needs to be rebuilt
root_cert_manager: Option<Arc<RootCertManager>>,
pub enable_impersonated_identity: bool,
pub secret_ttl: i64,
ca_headers: Vec<(AsciiMetadataKey, AsciiMetadataValue)>,
Expand All @@ -39,27 +49,93 @@ impl CaClient {
pub async fn new(
address: String,
alt_hostname: Option<String>,
cert_provider: Box<dyn tls::ControlPlaneClientCertProvider>,
root_cert: RootCert,
auth: AuthSource,
enable_impersonated_identity: bool,
secret_ttl: i64,
ca_headers: Vec<(AsciiMetadataKey, AsciiMetadataValue)>,
) -> Result<CaClient, Error> {
let svc =
tls::grpc_connector(address, auth, cert_provider.fetch_cert(alt_hostname).await?)?;
let client = IstioCertificateServiceClient::new(svc);
//Build initial TLS ClientConfig from the current root cert
let client_config = control_plane_client_config(&root_cert, alt_hostname.clone()).await?;
let tls_grpc_channel = tls::grpc_connector(address.clone(), auth.clone(), client_config)?;
let initial_client = IstioCertificateServiceClient::new(tls_grpc_channel);

// Start the file watcher only for file-based certs.
let root_cert_manager = if let RootCert::File(_) = &root_cert {
Some(RootCertManager::new(root_cert.clone())?)
} else {
None
};

Ok(CaClient {
client,
client: std::sync::RwLock::new(initial_client),
address,
alt_hostname,
auth,
root_cert_manager,
enable_impersonated_identity,
secret_ttl,
ca_headers,
})
}

/// Rebuilds gRPC client using the current root cert from disk.
///
/// Reads the cert file again and recreates gRPC channel using the new info stored in `CaClient`
///
/// Returns an error if:
/// - cert file is missing or malformed
/// - gRPC connector cannot be created.
async fn rebuild_channel(
&self,
) -> Result<IstioCertificateServiceClient<TlsGrpcChannel>, Error> {
let root_cert = self
.root_cert_manager
.as_ref()
.expect("rebuild_channel requires root_cert_manager to be Some")
.root_cert();

let client_config =
control_plane_client_config(&root_cert, self.alt_hostname.clone()).await?;
let tls_grpc_channel =
tls::grpc_connector(self.address.clone(), self.auth.clone(), client_config)?;
Ok(IstioCertificateServiceClient::new(tls_grpc_channel))
}
}

impl CaClient {
#[instrument(skip_all)]
async fn fetch_certificate(&self, id: &Identity) -> Result<tls::WorkloadCertificate, Error> {
// Hot reload check
// If the cert has changed since last call, rebuild the channel before making the request.
// If rebuild fails:
// - log a warning
// - rearm the flag to try again on next call attempt
// - fail through the existing channel
if let Some(ref manager) = self.root_cert_manager
&& manager.take_dirty()
{
match self.rebuild_channel().await {
Ok(new_client) => {
// Replace current client
let t = std::time::Instant::now();
*self.client.write().unwrap() = new_client;
debug!(
write_lock_wait_ms = t.elapsed().as_millis(),
"TLS channel rebuild after CA root cert rotation"
);
}
Err(e) => {
warn!(error = %e, "failed to rebuild TLS channel after root cert rotation; retaining old channel, will retry on next fetch");
// rearm so the next fetch_certificate call tries again
manager.mark_dirty();
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't requesting a change per se. Sort of reasoning "out loud" about the locking here.

After we take_dirty() the state returns to clean immediately and we enter the await to attempt rebuild_channel. During this time nothing holds the write lock and the state is clean so we may try to use an old client. It might be able to compound as contentious readers may continue to delay being able to take the write, all the while in a "clean" state. I think the advantage is that we do not queue multiple write/rebuilds, which seems like a nice property. In the happier state, the old client is still valid and we keep on running nicely but eventually wind up with a fresh client. In the worse state, the client isn't usable and we produce errors while potentially blocking the resolution by causing read contention on the lock.

Assuming we held the rwlock before marking clean, a bunch of reads could await the write lock to resolve. On success, they get the newly minted client (probably ideal?). On fail, they get the old client and what happens happens. The downside is we could have multiple writelock awaits simultaneously queue since multiple calls to fetch could see the dirty state while we await any held read locks to clear. This might be kind of helpful, since at least write locks to not lead to further read contentions, but we might need to check if the dirty was resolved by someone else before actually rebuilding.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ilrudie ,
Thanks for the comments!!!
Sorry for the late response

After we take_dirty() the state returns to clean immediately and we enter the await to attempt rebuild_channel. During this time nothing holds the write lock and the state is clean so we may try to use an old client. It might be able to compound as contentious readers may continue to delay being able to take the write, all the while in a "clean" state

Yeah, you are right, but the contention will be quite limited I think.
If I remember correctly, in Linux the RW lock uses "write-preferring" policy by default, so once a writer ask for the lock it only has to wait for the readers that already has the lock to unlock it, but has preference over any new reader. Given this the contention will be limited to just those readers that asked for the lock before the writer and Read lock is only hold while doing the clone which is a very fast operation (~us), so the contention time should be really small

I think the advantage is that we do not queue multiple write/rebuilds, which seems like a nice property. In the happier state, the old client is still valid and we keep on running nicely but eventually wind up with a fresh client. In the worse state, the client isn't usable and we produce errors while potentially blocking the resolution by causing read contention on the lock.

You are right again. Old client will be used while rebuild is done and that will fail if the old cert is not valid just after rotation, but I was assuming a healthy time window between rotation and certification expiry.

Assuming we held the rwlock before marking clean, a bunch of reads could await the write lock to resolve. On success, they get the newly minted client (probably ideal?). On fail, they get the old client and what happens happens. The downside is we could have multiple writelock awaits simultaneously queue since multiple calls to fetch could see the dirty state while we await any held read locks to clear. This might be kind of helpful, since at least write locks to not lead to further read contentions, but we might need to check if the dirty was resolved by someone else before actually rebuilding.

Yeah, the problem with that is we cannot hold a RWlock through an await operation, so we should change std::async::RWLock to tokio::async::RWLock which is not trivial, and as you have already said, we will have to use a double-checked locking.

Working from this idea I was thinking ...
What if we use a "rebuild_lock" (tokio::sync::Mutex) to serialize channel builds, so we avoid queuing multiple builds, it will be something like:

if take_dirty() is true
    get rebuild_lock                                    // only one rebuild in queue
    if take_dirty() is true                             // double-check 
        new_client = rebuild_channel().await
        *self.client.write().unwrap() = new_client      // quick write lock
    endif
    unlock rebuild_lock
endif

This will:

  • "serialize" rebuilds, so we only have one at a time
  • avoid readers to be block while rebuilding channel, as write lock is only hold for the client swap
  • no need to change std::sync::RWLock

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the discussion. I agree we should have time to rotate out the client in most reasonable cases so this is probably a suitable implementation with a good balance of complexity.

It might be nice to add how long we waited for the write lock to the debug message. This way if we suspect contention, we already have some information that can be made available to troubleshoot. I don't think we need to immediately introduce more complexity though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.
I have added the elapsed time in write contention to the debug log.


// Clone the client *before* releasing the RwLock so it is not carried across awaits
let client = self.client.read().unwrap().clone();

let cs = tls::csr::CsrOptions {
san: id.to_string(),
}
Expand Down Expand Up @@ -93,8 +169,7 @@ impl CaClient {
}
});

let resp = self
.client
let resp = client
.clone()
.create_certificate(req.into_request())
.await
Expand Down Expand Up @@ -256,13 +331,16 @@ pub mod mock {
#[cfg(test)]
mod tests {

use std::time::Duration;
use std::{io::Write, time::Duration};

use matches::assert_matches;
use tempfile::NamedTempFile;

use crate::{
config::RootCert,
identity::{Error, Identity},
test_helpers, tls,
test_helpers,
tls::{self, RootCertManager, mock::TEST_ROOT},
xds::istio::ca::IstioCertificateResponse,
};

Expand Down Expand Up @@ -315,4 +393,27 @@ mod tests {
.await;
assert_matches!(res, Ok(_));
}

#[test]
fn root_cert_manager_dirty_flag_api() {
let mut root_file = NamedTempFile::new().unwrap();
root_file.write_all(TEST_ROOT).unwrap();

let manager = RootCertManager::new(RootCert::File(root_file.path().to_path_buf()))
.expect("RootCertManager must be created");

assert!(!manager.take_dirty(), "new manager should not be dirty");

// simulate file watcher trigger
manager.mark_dirty();

assert!(
manager.take_dirty(),
"take_dirty must return true when dirty"
);
assert!(
!manager.take_dirty(),
"take_dirty must return false after reset"
);
}
}
4 changes: 1 addition & 3 deletions src/identity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,7 @@ impl SecretManager {
.clone()
.expect("ca_address must be set to use CA"),
cfg.alt_ca_hostname.clone(),
Box::new(tls::ControlPlaneAuthentication::RootCert(
cfg.ca_root_cert.clone(),
)),
cfg.ca_root_cert.clone(),
cfg.auth.clone(),
cfg.proxy_mode == ProxyMode::Shared,
cfg.secret_ttl.as_secs().try_into().unwrap_or(60 * 60 * 24),
Expand Down
2 changes: 1 addition & 1 deletion src/test_helpers/ca.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl CaServer {
let client = CaClient::new(
"https://".to_string() + &server_addr.to_string(),
None,
Box::new(tls::ControlPlaneAuthentication::RootCert(root_cert)),
root_cert,
AuthSource::StaticToken(FAKE_JWT.to_string(), "Kubernetes".to_string()),
true,
60 * 60 * 24,
Expand Down
Loading