Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const LOCAL_XDS_PATH: &str = "LOCAL_XDS_PATH";
const LOCAL_XDS: &str = "LOCAL_XDS";
const XDS_ON_DEMAND: &str = "XDS_ON_DEMAND";
const XDS_ADDRESS: &str = "XDS_ADDRESS";
const PREFERED_SERVICE_NAMESPACE: &str = "PREFERED_SERVICE_NAMESPACE";
const CA_ADDRESS: &str = "CA_ADDRESS";
const SECRET_TTL: &str = "SECRET_TTL";
const FAKE_CA: &str = "FAKE_CA";
Expand Down Expand Up @@ -242,6 +243,12 @@ pub struct Config {
// Allow custom alternative XDS hostname verification
pub alt_xds_hostname: Option<String>,

/// Prefered service namespace to use for service resolution.
/// If unset, local namespaces is preferred and other namespaces have equal priority.
/// If set, the local namespace is preferred, then the defined prefered_service_namespace
/// and finally other namespaces at an equal priority.
pub prefered_service_namespace: Option<String>,

/// TTL for CSR requests
pub secret_ttl: Duration,
/// YAML config for local XDS workloads
Expand Down Expand Up @@ -501,6 +508,14 @@ pub fn construct_config(pc: ProxyConfig) -> Result<Config, Error> {
.or_else(|| Some(default_istiod_address.clone())),
))?;

let prefered_service_namespace = match parse::<String>(PREFERED_SERVICE_NAMESPACE) {
Ok(ns) => ns,
Err(e) => {
warn!(err=?e, "failed to parse {PREFERED_SERVICE_NAMESPACE}, continuing with default behavior");
None
}
};

let istio_meta_cluster_id = ISTIO_META_PREFIX.to_owned() + CLUSTER_ID;
let cluster_id: String = match parse::<String>(&istio_meta_cluster_id)? {
Some(id) => id,
Expand Down Expand Up @@ -767,6 +782,7 @@ pub fn construct_config(pc: ProxyConfig) -> Result<Config, Error> {

xds_address,
xds_root_cert,
prefered_service_namespace,
ca_address,
ca_root_cert,
alt_xds_hostname: parse(ALT_XDS_HOSTNAME)?,
Expand Down
70 changes: 64 additions & 6 deletions src/dns/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::drain::{DrainMode, DrainWatcher};
use crate::metrics::{DeferRecorder, IncrementRecorder, Recorder};
use crate::proxy::Error;
use crate::state::DemandProxyState;
use crate::state::service::IpFamily;
use crate::state::service::{IpFamily, Service};
use crate::state::workload::Workload;
use crate::state::workload::address::Address;
use crate::{config, dns};
Expand Down Expand Up @@ -85,6 +85,7 @@ impl Server {
drain: DrainWatcher,
socket_factory: &(dyn SocketFactory + Send + Sync),
local_workload_information: Arc<LocalWorkloadFetcher>,
prefered_service_namespace: Option<String>,
) -> Result<Self, Error> {
// if the address we got from config is supposed to be v6-enabled,
// actually check if the local pod context our socketfactory operates in supports V6.
Expand All @@ -102,6 +103,7 @@ impl Server {
forwarder,
metrics,
local_workload_information,
prefered_service_namespace,
);
let store = Arc::new(store);
let handler = dns::handler::Handler::new(store.clone());
Expand Down Expand Up @@ -191,6 +193,7 @@ struct Store {
svc_domain: Name,
metrics: Arc<Metrics>,
local_workload: Arc<LocalWorkloadFetcher>,
prefered_service_namespace: Option<String>,
}

impl Store {
Expand All @@ -200,6 +203,7 @@ impl Store {
forwarder: Arc<dyn Forwarder>,
metrics: Arc<Metrics>,
local_workload_information: Arc<LocalWorkloadFetcher>,
prefered_service_namespace: Option<String>,
) -> Self {
let domain = as_name(domain);
let svc_domain = append_name(as_name("svc"), &domain);
Expand All @@ -211,6 +215,7 @@ impl Store {
svc_domain,
metrics,
local_workload: local_workload_information,
prefered_service_namespace,
}
}

Expand Down Expand Up @@ -359,7 +364,7 @@ impl Store {
let search_name_str = search_name.to_string().into();
search_name.set_fqdn(true);

let service = state
let services: Vec<Arc<Service>> = state
.services
.get_by_host(&search_name_str)
.iter()
Expand All @@ -382,13 +387,30 @@ impl Store {
})
// Get the service matching the client namespace. If no match exists, just
// return the first service.
.find_or_first(|service| service.namespace == client.namespace)
.cloned();
// .find_or_first(|service| service.namespace == client.namespace)
.cloned()
.collect();

// TODO: ideally we'd sort these by creation time so that the oldest would be used if there are no namespace matches
// presently service doesn't have creation time in WDS, but we could add it
// TODO: if the local namespace doesn't define a service, kube service should be prioritized over se
let service = match services
.iter()
.find(|service| service.namespace == client.namespace)
{
Some(service) => Some(service),
None => match self.prefered_service_namespace.as_ref() {
Some(prefered_namespace) => services.iter().find_or_first(|service| {
service.namespace == prefered_namespace.as_str()
}),
None => services.first(),
},
};

// First, lookup the host as a service.
if let Some(service) = service {
return Some(ServerMatch {
server: Address::Service(service),
server: Address::Service(service.clone()),
name: search_name,
alias,
});
Expand Down Expand Up @@ -956,6 +978,7 @@ mod tests {

const NS1: &str = "ns1";
const NS2: &str = "ns2";
const PREFERRED: &str = "preferred-ns";
const NW1: Strng = strng::literal!("nw1");
const NW2: Strng = strng::literal!("nw2");

Expand Down Expand Up @@ -1063,6 +1086,7 @@ mod tests {
forwarder,
metrics: test_metrics(),
local_workload,
prefered_service_namespace: None,
};

let namespaced_domain = n(format!("{}.svc.cluster.local", c.client_namespace));
Expand Down Expand Up @@ -1378,6 +1402,18 @@ mod tests {
expect_code: ResponseCode::NXDomain,
..Default::default()
},
Case {
name: "success: preferred namespace is chosen if local namespace is not defined",
host: "preferred.io.",
expect_records: vec![a(n("preferred.io."), ipv4("10.10.10.211"))],
..Default::default()
},
Case {
name: "success: external service resolves to local namespace's address",
host: "everywhere.io.",
expect_records: vec![a(n("everywhere.io."), ipv4("10.10.10.112"))],
..Default::default()
},
];

// Create and start the proxy.
Expand All @@ -1395,6 +1431,7 @@ mod tests {
drain,
&factory,
local_workload,
Some(PREFERRED.to_string()),
)
.await
.unwrap();
Expand Down Expand Up @@ -1481,6 +1518,7 @@ mod tests {
drain,
&factory,
local_workload,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -1530,6 +1568,7 @@ mod tests {
}),
state.clone(),
),
prefered_service_namespace: None,
};

let ip4n6_client_ip = ip("::ffff:202:202");
Expand Down Expand Up @@ -1563,6 +1602,7 @@ mod tests {
drain,
&factory,
local_workload,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -1679,6 +1719,16 @@ mod tests {
xds_external_service("www.google.com", &[na(NW1, "1.1.1.1")]),
xds_service("productpage", NS1, &[na(NW1, "9.9.9.9")]),
xds_service("example", NS2, &[na(NW1, "10.10.10.10")]),
// Service with the same name in another namespace
// This should not be used if the preferred service namespace is set
xds_namespaced_external_service("everywhere.io", NS2, &[na(NW1, "10.10.10.110")]),
xds_namespaced_external_service("preferred.io", NS2, &[na(NW1, "10.10.10.210")]),
// Preferred service namespace
xds_namespaced_external_service("everywhere.io", PREFERRED, &[na(NW1, "10.10.10.111")]),
xds_namespaced_external_service("preferred.io", PREFERRED, &[na(NW1, "10.10.10.211")]),
// Service with the same name in the same namespace
// Client in NS1 should use this service
xds_namespaced_external_service("everywhere.io", NS1, &[na(NW1, "10.10.10.112")]),
with_fqdn(
"details.ns2.svc.cluster.remote",
xds_service(
Expand Down Expand Up @@ -1829,9 +1879,17 @@ mod tests {
}

fn xds_external_service<S: AsRef<str>>(hostname: S, addrs: &[NetworkAddress]) -> XdsService {
xds_namespaced_external_service(hostname, NS1, addrs)
}

fn xds_namespaced_external_service<S1: AsRef<str>, S2: AsRef<str>>(
hostname: S1,
ns: S2,
vips: &[NetworkAddress],
) -> XdsService {
with_fqdn(
hostname.as_ref(),
xds_service(hostname.as_ref(), NS1, addrs),
xds_service(hostname.as_ref(), ns.as_ref(), vips),
)
}

Expand Down
1 change: 1 addition & 0 deletions src/proxyfactory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl ProxyFactory {
drain.clone(),
socket_factory.as_ref(),
local_workload_information.as_fetcher(),
self.config.prefered_service_namespace.clone(),
)
.await?;
resolver = Some(server.resolver());
Expand Down
1 change: 1 addition & 0 deletions src/test_helpers/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ pub async fn run_dns(responses: HashMap<Name, Vec<IpAddr>>) -> anyhow::Result<Te
}),
state.clone(),
),
Some("prefered-namespace".to_string()),
)
.await?;

Expand Down