diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000000..1fcb1529f8 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +out diff --git a/Cargo.lock b/Cargo.lock index 78a1a0c51e..ab40529024 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1229,6 +1229,19 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hcn" +version = "0.1.0" +source = "git+https://github.com/keithmattix/hcn-rs.git#a4b3f6ee703509694b6eb178b07cdbb637771e80" +dependencies = [ + "anyhow", + "log", + "serde", + "serde_json", + "serde_repr", + "windows", +] + [[package]] name = "hdrhistogram" version = "7.5.4" @@ -1469,7 +1482,7 @@ dependencies = [ "hyper", "libc", "pin-project-lite", - "socket2", + "socket2 0.5.9", "tokio", "tower-service", "tracing", @@ -1663,7 +1676,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2", + "socket2 0.5.9", "widestring", "windows-sys 0.48.0", "winreg", @@ -3108,6 +3121,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_repr" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "serde_yaml" version = "0.9.34+deprecated" @@ -3170,6 +3194,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.0" +source = "git+https://github.com/keithmattix/socket2.git?branch=add-tcp-retries-to-windows#3b6ea10cc0fde38a8819e05b338f9a3629c03ad6" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "spki" version = "0.7.3" @@ -3502,7 +3535,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.9", "tokio-macros", "windows-sys 0.52.0", ] @@ -4409,6 +4442,7 @@ dependencies = [ "boring-sys", "byteorder", "bytes", + "cfg-if", "chrono", "criterion", "ctor", @@ -4421,6 +4455,7 @@ dependencies = [ "futures-util", "h2", "hashbrown", + "hcn", "hickory-client", "hickory-proto", "hickory-resolver", @@ -4464,7 +4499,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", - "socket2", + "socket2 0.6.0", "split-iter", "test-case", "textnonce", @@ -4484,6 +4519,7 @@ dependencies = [ "tracing-log", "tracing-subscriber", "url", + "windows", "x509-parser", "ztunnel", ] diff --git a/Cargo.toml b/Cargo.toml index e950d7a705..2a9fa446a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,7 +86,7 @@ rustls-pemfile = "2.2" serde = { version = "1.0", features = ["derive", "rc"] } serde_json = "1.0" serde_yaml = "0.9" -socket2 = { version = "0.5", features = ["all"] } +socket2 = { git = "https://github.com/keithmattix/socket2.git", branch="add-tcp-retries-to-windows", features = ["all"] } textnonce = { version = "1.0" } thiserror = "2.0" tls-listener = { version = "0.11" } @@ -117,11 +117,16 @@ educe = "0.6" netns-rs = "0.1" pprof = { version = "0.14", features = ["protobuf", "protobuf-codec", "criterion"] } +[target.'cfg(target_os = "windows")'.dependencies] +windows = { version = "0.58.0", features = ["Win32_System_HostCompute", "Win32_System_HostComputeNetwork", "Win32_System_HostComputeSystem", "Win32_NetworkManagement_IpHelper"] } +hcn = { git = "https://github.com/keithmattix/hcn-rs.git" } + [build-dependencies] tonic-build = { version = "0.13", default-features = false, features = ["prost"] } prost-build = "0.13" anyhow = "1.0" rustc_version = "0.4" +cfg-if = "1.0" [profile.release] opt-level = 3 diff --git a/Dockerfile.ztunnel-windows b/Dockerfile.ztunnel-windows new file mode 100644 index 0000000000..4643ec982d --- /dev/null +++ b/Dockerfile.ztunnel-windows @@ -0,0 +1,10 @@ +ARG WINBASE=mcr.microsoft.com/oss/kubernetes/windows-host-process-containers-base-image:v1.0.0 +FROM --platform=$BUILDPLATFORM rust AS build +WORKDIR /src +RUN apt-get update && apt-get install -y mingw-w64 protobuf-compiler cmake nasm && rustup target add x86_64-pc-windows-gnu +COPY . . +RUN cargo build --target x86_64-pc-windows-gnu --release + +FROM ${WINBASE} +COPY --from=build /src/out/rust/x86_64-pc-windows-gnu/release/ztunnel.exe ztunnel.exe +ENTRYPOINT [ "ztunnel.exe" ] diff --git a/WINDOWS.md b/WINDOWS.md new file mode 100644 index 0000000000..aa8c335681 --- /dev/null +++ b/WINDOWS.md @@ -0,0 +1,46 @@ +# WIP: Windows Support + +Easiest way is probably to cross-compile? On Debian-based distros, install mingw: + +```bash +sudo apt-get install mingw-w64 +``` + +Then, add Rust cross-compile support with rustup: + +```bash +rustup target add x86_64-pc-windows-gnu +``` + +Test a build with: + +```bash +cargo build --target x86_64-pc-windows-gnu +``` + +Docker does support cross-building for Windows, but it is a bit of a pain. You can use the `docker buildx` command to build images for Windows. First, you need to create a new builder instance: + +```bash +docker buildx create --name windows-builder --platform=windows/amd64 # change to windows/arm64 if you want to build for arm64 +``` + +Then, build a docker image with: + +```bash +docker buildx build . -f Dockerfile.ztunnel-windows --platform=windows/amd64 --output type=registry -t localhost:5000/ztunnel-windows --builder windows-builder +``` + +## DNS + +HostProcess pods in Windows can't resolve cluster local DNS names. This is a known issue. In the meantime, you can use ALT_XDS_HOSTNAME and ALT_CA_HOSTNAME environment variables to set the expected certificate dns names for both XDS and CA clients. + +UPDATE: looks like there are some powershell commands we can run (perhaps as an init container?) to set the nameserver for a certain DNS namespace: + +```powershell +Add-DnsClientNrptRule -Namespace ".cluster.local" -NameServers "$env:KUBE_DNS_IP" +Clear-DnsClientCache # Clears the DNS client cache. Equivalent to `ipconfig /flushdns` +``` + +## REUSE_PORT + +Socket reuse is effectively not supported on Windows (despite the options existing, they're either insecure or ineffective for our purposes) diff --git a/benches/basic.rs b/benches/basic.rs index e495d443d2..7fe0a8f4ec 100644 --- a/benches/basic.rs +++ b/benches/basic.rs @@ -17,8 +17,10 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use bytes::Bytes; +#[cfg(target_os = "linux")] use criterion::{Criterion, Throughput, criterion_group, criterion_main}; use hickory_resolver::config::{ResolverConfig, ResolverOpts}; +#[cfg(target_os = "linux")] use pprof::criterion::{Output, PProfProfiler}; use prometheus_client::registry::Registry; use tokio::runtime::Runtime; @@ -33,6 +35,7 @@ use ztunnel::xds::istio::workload::Workload as XdsWorkload; use ztunnel::xds::istio::workload::load_balancing; use ztunnel::xds::istio::workload::{NetworkAddress as XdsNetworkAddress, PortList}; +#[cfg(target_os = "linux")] pub fn xds(c: &mut Criterion) { use ztunnel::xds::istio::workload::Port; use ztunnel::xds::istio::workload::Service as XdsService; @@ -87,6 +90,7 @@ pub fn xds(c: &mut Criterion) { }); } +#[cfg(target_os = "linux")] pub fn load_balance(c: &mut Criterion) { let mut c = c.benchmark_group("load_balance"); c.throughput(Throughput::Elements(1)); @@ -191,6 +195,7 @@ fn build_load_balancer( (rt, demand, src_wl, svc_addr) } +#[cfg(target_os = "linux")] criterion_group! { name = benches; config = Criterion::default() @@ -199,4 +204,10 @@ criterion_group! { targets = xds, load_balance } +#[cfg(target_os = "linux")] criterion_main!(benches); + +#[cfg(not(target_os = "linux"))] +fn main() { + println!("This benchmark is only supported on Linux"); +} diff --git a/benches/throughput.rs b/benches/throughput.rs index d3fa15764d..085c677a9f 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - use std::cmp::Ordering::{Equal, Greater, Less}; use std::future::Future; use std::io::Error; @@ -27,7 +26,6 @@ use criterion::{ BenchmarkGroup, Criterion, SamplingMode, Throughput, criterion_group, criterion_main, }; use hickory_resolver::config::{ResolverConfig, ResolverOpts}; -use pprof::criterion::{Output, PProfProfiler}; use prometheus_client::registry::Registry; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; @@ -38,11 +36,17 @@ use ztunnel::rbac::{Authorization, RbacMatch, StringMatch}; use ztunnel::state::workload::{InboundProtocol, Workload}; use ztunnel::state::{DemandProxyState, ProxyRbacContext, ProxyState}; use ztunnel::test_helpers::app::{DestinationAddr, TestApp}; +#[cfg(target_os = "linux")] use ztunnel::test_helpers::linux::{TestMode, WorkloadManager}; use ztunnel::test_helpers::tcp::Mode; use ztunnel::test_helpers::{helpers, tcp, test_default_workload}; use ztunnel::xds::LocalWorkload; -use ztunnel::{app, identity, metrics, proxy, rbac, setup_netns_test, strng, test_helpers}; +use ztunnel::{app, identity, metrics, proxy, rbac, strng, test_helpers}; + +#[cfg(target_os = "linux")] +use pprof::criterion::{Output, PProfProfiler}; +#[cfg(target_os = "linux")] +use ztunnel::setup_netns_test; const KB: usize = 1024; const MB: usize = 1024 * KB; @@ -55,6 +59,7 @@ const N_POLICIES: usize = 10_000; const DUMMY_NETWORK: &str = "198.51.100.0/24"; #[ctor::ctor] +#[cfg(target_os = "linux")] fn initialize_namespace_tests() { ztunnel::test_helpers::namespaced::initialize_namespace_tests(); } @@ -128,6 +133,8 @@ pub enum TestTrafficMode { } #[allow(clippy::type_complexity)] +#[cfg(target_os = "linux")] + fn initialize_environment( ztunnel_mode: WorkloadMode, traffic_mode: TestTrafficMode, @@ -200,6 +207,7 @@ fn initialize_environment( Ok((manager, tx, ack_rx)) } +#[cfg(target_os = "linux")] fn spawn_client( i: usize, manager: &mut WorkloadManager, @@ -266,6 +274,7 @@ struct TestClient { ack: Receiver>, } +#[cfg(target_os = "linux")] pub fn throughput(c: &mut Criterion) { const THROUGHPUT_SEND_SIZE: usize = GB; fn run_throughput( @@ -317,6 +326,7 @@ pub fn throughput(c: &mut Criterion) { } } +#[cfg(target_os = "linux")] pub fn latency(c: &mut Criterion) { const LATENCY_SEND_SIZE: usize = KB; fn run_latency(c: &mut BenchmarkGroup, name: &str, mode: WorkloadMode) { @@ -342,6 +352,7 @@ pub fn latency(c: &mut Criterion) { run_latency(&mut c, "hbone", WorkloadMode::HBONE); } +#[cfg(target_os = "linux")] pub fn connections(c: &mut Criterion) { fn run_connections(c: &mut BenchmarkGroup, name: &str, mode: WorkloadMode) { let (_manager, tx, ack) = @@ -365,6 +376,7 @@ pub fn connections(c: &mut Criterion) { run_connections(&mut c, "hbone", WorkloadMode::HBONE); } +#[cfg(target_os = "linux")] pub fn rbac(c: &mut Criterion) { let policies = create_test_policies(); let mut state = ProxyState::new(None); @@ -540,36 +552,37 @@ fn hbone_connections(c: &mut Criterion) { // Connections/second c.throughput(Throughput::Elements(1)); c.bench_function("connect_request_response", |b| { - b.to_async(&rt).iter(|| async { - let bench = async { - let mut addresses = addresses.lock().await; - let ta = ta.lock().await; - - // Get next address pair - *addresses = next_ip_pair(*addresses); - let source_addr = hbone_connection_ip(addresses.0); - let dest_addr = hbone_connection_ip(addresses.1); - - // Start HBONE connection - let mut hbone = ta - .socks5_connect(DestinationAddr::Ip(helpers::with_ip(echo_addr, dest_addr)), source_addr) - .await; - - // TCP ping - hbone.write_u8(42).await.ok(); - hbone.read_u8().await.ok(); - }; - - // If misconfigured, `socks5_connect` will silently fail causing subsequent commands - // to hang. Panic if too slow. - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Timeout: Test is hanging."), - _ = bench => () - }; - }) - }); + b.to_async(&rt).iter(|| async { + let bench = async { + let mut addresses = addresses.lock().await; + let ta = ta.lock().await; + + // Get next address pair + *addresses = next_ip_pair(*addresses); + let source_addr = hbone_connection_ip(addresses.0); + let dest_addr = hbone_connection_ip(addresses.1); + + // Start HBONE connection + let mut hbone = ta + .socks5_connect(DestinationAddr::Ip(helpers::with_ip(echo_addr, dest_addr)), source_addr) + .await; + + // TCP ping + hbone.write_u8(42).await.ok(); + hbone.read_u8().await.ok(); + }; + + // If misconfigured, `socks5_connect` will silently fail causing subsequent commands + // to hang. Panic if too slow. + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Timeout: Test is hanging."), + _ = bench => () + }; + }) + }); } +#[cfg(target_os = "linux")] criterion_group! { name = benches; config = Criterion::default() @@ -578,4 +591,10 @@ criterion_group! { targets = hbone_connections } +#[cfg(target_os = "linux")] criterion_main!(benches); + +#[cfg(not(target_os = "linux"))] +fn main() { + println!("This benchmark is only supported on Linux"); +} diff --git a/build.rs b/build.rs index 5949ee74ad..b1ce305ae6 100644 --- a/build.rs +++ b/build.rs @@ -74,7 +74,19 @@ fn main() -> Result<(), anyhow::Error> { .nth_back(3) .unwrap(); - match Command::new("common/scripts/report_build_info.sh").output() { + + cfg_if::cfg_if! { + if #[cfg(target_os = "windows")] { + let output = Command::new("powershell.exe") + .arg("common/scripts/report_build_info.ps1") + .output(); + } else { + let output: std::io::Result = Command::new("common/scripts/report_build_info.sh").output(); + + } + } + + match output { Ok(output) => { for line in String::from_utf8(output.stdout).unwrap().lines() { // Each line looks like `istio.io/pkg/version.buildGitRevision=abc` diff --git a/common/scripts/report_build_info.ps1 b/common/scripts/report_build_info.ps1 new file mode 100644 index 0000000000..9977748224 --- /dev/null +++ b/common/scripts/report_build_info.ps1 @@ -0,0 +1,34 @@ +# Get the current Git revision +$BUILD_GIT_REVISION = git rev-parse HEAD 2>$null +if ($BUILD_GIT_REVISION) { + # Check if there are uncommitted changes in the working directory + if (-not $env:IGNORE_DIRTY_TREE -and (git status --porcelain 2>$null)) { + $BUILD_GIT_REVISION = "$BUILD_GIT_REVISION-dirty" + } +} else { + $BUILD_GIT_REVISION = "unknown" +} + +# Check the status of the working tree +$tree_status = "Clean" +if (-not $env:IGNORE_DIRTY_TREE -and -not (git diff-index --quiet HEAD --)) { + $tree_status = "Modified" +} + +# Get the Git tag description +$GIT_DESCRIBE_TAG = git describe --tags --always + +# Set the Docker hub, default to "docker.io/istio" if HUB is not set +$HUB = if ($env:HUB) { $env:HUB } else { "docker.io/istio" } + +$BUILD_OS = if ($env:OS -eq "Windows_NT") { "windows" } else { "" } +$BUILD_ARCH = if ($env:PROCESSOR_ARCHITECTURE -eq "AMD64") { "amd64fre" } else { "" } + +# Output version information used by common/scripts/gobuild.sh +Write-Output "istio.io/istio/pkg/version.buildVersion=$($env:VERSION -or $BUILD_GIT_REVISION)" +Write-Output "istio.io/istio/pkg/version.buildGitRevision=$BUILD_GIT_REVISION" +Write-Output "istio.io/istio/pkg/version.buildStatus=$tree_status" +Write-Output "istio.io/istio/pkg/version.buildTag=$GIT_DESCRIBE_TAG" +Write-Output "istio.io/istio/pkg/version.buildHub=$HUB" +Write-Output "istio.io/istio/pkg/version.buildOS=$BUILD_OS" +Write-Output "istio.io/istio/pkg/version.buildArch=$BUILD_ARCH" \ No newline at end of file diff --git a/daemonset-windows.yaml b/daemonset-windows.yaml new file mode 100644 index 0000000000..78f700fc2a --- /dev/null +++ b/daemonset-windows.yaml @@ -0,0 +1,142 @@ +# Source: ztunnel/templates/daemonset.yaml +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: ztunnel-windows + namespace: istio-system + labels: + app.kubernetes.io/name: ztunnel-windows + app.kubernetes.io/managed-by: "Helm" + app.kubernetes.io/instance: "ztunnel" + app.kubernetes.io/part-of: "istio" + app.kubernetes.io/version: "1.0.0" + helm.sh/chart: ztunnel-windows-1.0.0 + annotations: +spec: + updateStrategy: + rollingUpdate: + maxSurge: 1 + maxUnavailable: 0 + selector: + matchLabels: + app: ztunnel-windows + template: + metadata: + labels: + sidecar.istio.io/inject: "false" + istio.io/dataplane-mode: none + app: ztunnel-windows + app.kubernetes.io/name: ztunnel-windows + app.kubernetes.io/managed-by: "Helm" + app.kubernetes.io/instance: "ztunnel-windows" + app.kubernetes.io/part-of: "istio" + app.kubernetes.io/version: "1.0.0" + helm.sh/chart: ztunnel-windows-1.0.0 + annotations: + sidecar.istio.io/inject: "false" + prometheus.io/port: "15020" + prometheus.io/scrape: "true" + spec: + nodeSelector: + kubernetes.io/os: windows + serviceAccountName: ztunnel + hostNetwork: true + tolerations: + - effect: NoSchedule + operator: Exists + - key: CriticalAddonsOnly + operator: Exists + - effect: NoExecute + operator: Exists + containers: + - name: istio-proxy + image: "istiolocaltesting.azurecr.io/ztunnel-windows:20250322-9" + resources: + requests: + cpu: 200m + memory: 512Mi + securityContext: + windowsOptions: + hostProcess: true + runAsUserName: "NT AUTHORITY\\SYSTEM" + readinessProbe: + httpGet: + port: 15021 + path: /healthz/ready + args: + - proxy + - ztunnel + env: + - name: ALT_XDS_HOSTNAME + value: "istiod-1-25-0.istio-system.svc" + - name: ALT_CA_HOSTNAME + value: "istiod-1-25-0.istio-system.svc" + - name: USE_ENV_FOR_DEFAULT_ISTIOD_ADDR + value: "true" + - name: REVISION + value: "1-25-0" + - name: RUST_LOG + value: "trace" + - name: RUST_BACKTRACE + value: "1" + - name: ISTIO_META_CLUSTER_ID + value: Kubernetes + - name: INPOD_ENABLED + value: "true" + - name: INPOD_UDS + value: \\.\pipe\istio-zds + - name: INPOD_PORT_REUSE + value: "false" + - name: TERMINATION_GRACE_PERIOD_SECONDS + value: "30" + - name: IPV6_ENABLED + value: "false" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: INSTANCE_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: SERVICE_ACCOUNT + valueFrom: + fieldRef: + fieldPath: spec.serviceAccountName + volumeMounts: + - mountPath: /var/run/secrets/istio + name: istiod-ca-cert + - mountPath: /var/run/secrets/tokens + name: istio-token + - mountPath: /var/run/ztunnel + name: cni-ztunnel-sock-dir + - mountPath: /tmp + name: tmp + priorityClassName: system-node-critical + terminationGracePeriodSeconds: 30 + volumes: + - name: istio-token + projected: + sources: + - serviceAccountToken: + path: istio-token + expirationSeconds: 43200 + audience: istio-ca + - name: istiod-ca-cert + configMap: + name: istio-ca-root-cert + - name: cni-ztunnel-sock-dir + hostPath: + path: /var/run/ztunnel + type: DirectoryOrCreate # ideally this would be a socket, but ztunnel may not have started yet. + # pprof needs a writable /tmp, and we don't have that thanks to `readOnlyRootFilesystem: true`, so mount one + - name: tmp + emptyDir: {} diff --git a/examples/inpodserver.rs b/examples/inpodserver.rs index cf131ce976..6a6d4ababe 100644 --- a/examples/inpodserver.rs +++ b/examples/inpodserver.rs @@ -11,14 +11,13 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +#[cfg(target_os = "linux")] use std::os::fd::AsRawFd; - -use ztunnel::test_helpers::inpod::StartZtunnelMessage; -use ztunnel::{ - inpod::istio::zds::WorkloadInfo, - test_helpers::inpod::{Message, start_ztunnel_server}, -}; +#[cfg(target_os = "linux")] +use ztunnel::test_helpers::inpod_linux::StartZtunnelMessage; +#[cfg(target_os = "linux")] +use ztunnel::test_helpers::inpod_linux::{start_ztunnel_server, Message}; +use ztunnel::inpod::istio::zds::WorkloadInfo; const PROXY_WORKLOAD_INFO: &str = "PROXY_WORKLOAD_INFO"; @@ -61,6 +60,7 @@ fn parse_proxy_workload_info() -> Result { name: "local".to_string(), namespace: "default".to_string(), service_account: "default".to_string(), + windows_namespace: None, }); } }; @@ -76,6 +76,7 @@ fn parse_proxy_workload_info() -> Result { name: name.to_string(), namespace: ns.to_string(), service_account: sa.to_string(), + windows_namespace: None, }) } diff --git a/examples/windows-deploy.yaml b/examples/windows-deploy.yaml new file mode 100644 index 0000000000..87a2cb8103 --- /dev/null +++ b/examples/windows-deploy.yaml @@ -0,0 +1,42 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: win-webserver + labels: + app: win-webserver +spec: + ports: + # the port that this service should serve on + - port: 80 + targetPort: 80 + selector: + app: win-webserver + type: NodePort +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: win-webserver + name: win-webserver +spec: + replicas: 2 + selector: + matchLabels: + app: win-webserver + template: + metadata: + labels: + app: win-webserver + name: win-webserver + spec: + containers: + - name: windowswebserver + image: mcr.microsoft.com/windows/servercore:ltsc2022 + command: + - powershell.exe + - -command + - "<#code used from https://gist.github.com/19WAS85/5424431#> ; $$listener = New-Object System.Net.HttpListener ; $$listener.Prefixes.Add('http://*:80/') ; $$listener.Start() ; $$callerCounts = @{} ; Write-Host('Listening at http://*:80/') ; while ($$listener.IsListening) { ;$$context = $$listener.GetContext() ;$$requestUrl = $$context.Request.Url ;$$clientIP = $$context.Request.RemoteEndPoint.Address ;$$response = $$context.Response ;Write-Host '' ;Write-Host('> {0}' -f $$requestUrl) ; ;$$count = 1 ;$$k=$$callerCounts.Get_Item($$clientIP) ;if ($$k -ne $$null) { $$count += $$k } ;$$callerCounts.Set_Item($$clientIP, $$count) ;$$ip=(Get-NetAdapter | Get-NetIpAddress); $$header='

Windows Container Web Server

' ;$$callerCountsString='' ;$$callerCounts.Keys | % { $$callerCountsString+='

IP {0} callerCount {1} ' -f $$ip[1].IPAddress,$$callerCounts.Item($$_) } ;$$footer='' ;$$content='{0}{1}{2}' -f $$header,$$callerCountsString,$$footer ;Write-Output $$content ;$$buffer = [System.Text.Encoding]::UTF8.GetBytes($$content) ;$$response.ContentLength64 = $$buffer.Length ;$$response.OutputStream.Write($$buffer, 0, $$buffer.Length) ;$$response.Close() ;$$responseStatus = $$response.StatusCode ;Write-Host('< {0}' -f $$responseStatus) } ; " + nodeSelector: + kubernetes.io/os: windows diff --git a/examples/windows-hostprocess.yaml b/examples/windows-hostprocess.yaml new file mode 100644 index 0000000000..8b5194c66d --- /dev/null +++ b/examples/windows-hostprocess.yaml @@ -0,0 +1,33 @@ +apiVersion: v1 +kind: Pod +metadata: + labels: + pod: hpc + name: hpc +spec: + securityContext: + windowsOptions: + hostProcess: true + runAsUserName: "NT AUTHORITY\\SYSTEM" + hostNetwork: true + containers: + - name: hpc + image: mcr.microsoft.com/windows/servercore:ltsc2022 # Use servercore:1809 for WS2019 + command: + - powershell.exe + - -Command + - "Start-Sleep 2147483" + imagePullPolicy: IfNotPresent + nodeSelector: + kubernetes.io/os: windows + kubernetes.io/hostname: akswin000000 + tolerations: + - effect: NoSchedule + key: node.kubernetes.io/unschedulable + operator: Exists + - effect: NoSchedule + key: node.kubernetes.io/network-unavailable + operator: Exists + - effect: NoExecute + key: node.kubernetes.io/unreachable + operator: Exists diff --git a/proto/zds.proto b/proto/zds.proto index b0bcac1815..695fd67401 100644 --- a/proto/zds.proto +++ b/proto/zds.proto @@ -1,3 +1,17 @@ +// Copyright Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + syntax = "proto3"; // GRPC package - part of the URL. Service is added. @@ -21,6 +35,13 @@ message WorkloadInfo { string name = 1; string namespace = 2; string service_account = 3; + // The namespace for the workload. Windows only. + optional WindowsNamespace windows_namespace = 5; +} + +message WindowsNamespace { + uint32 id = 1; + string guid = 2; } // Add a workload to the ztunnel. this will be accompanied by ancillary data contianing diff --git a/src/admin.rs b/src/admin.rs index e96d15a777..2513ca4b7c 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -26,6 +26,8 @@ use bytes::Bytes; use http_body_util::Full; use hyper::body::Incoming; use hyper::{Request, Response, header::CONTENT_TYPE, header::HeaderValue}; +#[cfg(not(target_os = "windows"))] +use pprof::protos::Message; use std::borrow::Borrow; use std::collections::HashMap; @@ -239,7 +241,6 @@ async fn dump_certs(cert_manager: &SecretManager) -> Vec { #[cfg(target_os = "linux")] async fn handle_pprof(_req: Request) -> anyhow::Result>> { - use pprof::protos::Message; let guard = pprof::ProfilerGuardBuilder::default() .frequency(1000) // .blocklist(&["libc", "libgcc", "pthread", "vdso"]) @@ -257,6 +258,14 @@ async fn handle_pprof(_req: Request) -> anyhow::Result) -> anyhow::Result>> { + Ok(Response::builder() + .status(hyper::StatusCode::NOT_FOUND) + .body("pprof not supported on non-Linux platforms".into()) + .expect("builder with known status code should not fail")) +} + async fn handle_server_shutdown( shutdown_trigger: signal::ShutdownTrigger, _req: Request, diff --git a/src/app.rs b/src/app.rs index 320054f69c..73e1c0a991 100644 --- a/src/app.rs +++ b/src/app.rs @@ -324,7 +324,7 @@ fn mock_secret_manager() -> Arc { unimplemented!("fake_ca requires --features testing") } -#[cfg(not(target_os = "linux"))] +#[cfg(not(any(target_os = "linux", target_os = "windows")))] fn init_inpod_proxy_mgr( _registry: &mut Registry, _admin_server: &mut crate::admin::Service, @@ -336,6 +336,33 @@ fn init_inpod_proxy_mgr( anyhow::bail!("in-pod mode is not supported on non-linux platforms") } +#[cfg(target_os = "windows")] +fn init_inpod_proxy_mgr( + registry: &mut Registry, + admin_server: &mut crate::admin::Service, + config: &config::Config, + proxy_gen: ProxyFactory, + ready: readiness::Ready, + drain_rx: drain::DrainWatcher, +) -> anyhow::Result + Send + Sync>>> { + let metrics = Arc::new(crate::inpod::metrics::Metrics::new( + registry.sub_registry_with_prefix("workload_manager"), + )); + let proxy_mgr = + crate::inpod::windows::init_and_new(metrics, admin_server, config, proxy_gen, ready) + .map_err(|e| anyhow::anyhow!("failed to start workload proxy manager {:?}", e))?; + + Ok(Box::pin(async move { + match proxy_mgr.run(drain_rx).await { + Ok(()) => (), + Err(e) => { + tracing::error!("WorkloadProxyManager run error: {:?}", e); + std::process::exit(1); + } + } + })) +} + #[cfg(target_os = "linux")] fn init_inpod_proxy_mgr( registry: &mut Registry, @@ -348,8 +375,9 @@ fn init_inpod_proxy_mgr( let metrics = Arc::new(crate::inpod::metrics::Metrics::new( registry.sub_registry_with_prefix("workload_manager"), )); - let proxy_mgr = crate::inpod::init_and_new(metrics, admin_server, config, proxy_gen, ready) - .map_err(|e| anyhow::anyhow!("failed to start workload proxy manager {:?}", e))?; + let proxy_mgr = + crate::inpod::linux::init_and_new(metrics, admin_server, config, proxy_gen, ready) + .map_err(|e| anyhow::anyhow!("failed to start workload proxy manager {:?}", e))?; Ok(Box::pin(async move { match proxy_mgr.run(drain_rx).await { diff --git a/src/config.rs b/src/config.rs index c5b9492fe9..05235eff9e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -36,6 +36,7 @@ use {crate::test_helpers::MpscAckReceiver, crate::xds::LocalConfig, tokio::sync: const ENABLE_PROXY: &str = "ENABLE_PROXY"; const KUBERNETES_SERVICE_HOST: &str = "KUBERNETES_SERVICE_HOST"; +const REVISION: &str = "REVISION"; const NETWORK: &str = "NETWORK"; const NODE_NAME: &str = "NODE_NAME"; const PROXY_MODE: &str = "PROXY_MODE"; @@ -57,6 +58,7 @@ const XDS_ADDRESS: &str = "XDS_ADDRESS"; const CA_ADDRESS: &str = "CA_ADDRESS"; const SECRET_TTL: &str = "SECRET_TTL"; const FAKE_CA: &str = "FAKE_CA"; +const USE_ENV_FOR_DEFAULT_ISTIOD_ADDR: &str = "USE_ENV_FOR_DEFAULT_ISTIOD_ADDR"; const ZTUNNEL_WORKER_THREADS: &str = "ZTUNNEL_WORKER_THREADS"; const POOL_MAX_STREAMS_PER_CONNECTION: &str = "POOL_MAX_STREAMS_PER_CONNECTION"; const POOL_UNUSED_RELEASE_TIMEOUT: &str = "POOL_UNUSED_RELEASE_TIMEOUT"; @@ -321,6 +323,7 @@ impl Default for SocketConfig { keepalive_retries: 9, keepalive_enabled: true, // Might be a good idea but for now we haven't proven this out enough. + // TODO: Compile out from windows if we turn this on user_timeout_enabled: false, } } @@ -435,12 +438,22 @@ pub fn construct_config(pc: ProxyConfig) -> Result { } else { IpAddr::V4(Ipv4Addr::UNSPECIFIED) }; + + let revision = env::var(REVISION); + let cluster_domain = parse_default(CLUSTER_DOMAIN, DEFAULT_CLUSTER_DOMAIN.to_string())?; let default_istiod_address = if env::var(KUBERNETES_SERVICE_HOST).is_ok() { - "https://istiod.istio-system.svc:15012".to_string() + if revision.as_ref().is_ok() { + format!( + "https://istiod-{}.istio-system.svc:15012", + revision.as_ref().ok().unwrap(), + ) + } else { + "https://istiod.istio-system.svc:15012".to_string() + } } else { "https://localhost:15012".to_string() }; - let xds_address = validate_uri(empty_to_none( + let mut xds_address = validate_uri(empty_to_none( parse(XDS_ADDRESS)? .or(pc.discovery_address) .or_else(|| Some(default_istiod_address.clone())), @@ -451,15 +464,33 @@ pub fn construct_config(pc: ProxyConfig) -> Result { Some(id) => id, None => parse_default::(CLUSTER_ID, DEFAULT_CLUSTER_ID.to_string())?, }; - let cluster_domain = parse_default(CLUSTER_DOMAIN, DEFAULT_CLUSTER_DOMAIN.to_string())?; let fake_ca = parse_default(FAKE_CA, false)?; - let ca_address = validate_uri(empty_to_none(if fake_ca { + let mut ca_address = validate_uri(empty_to_none(if fake_ca { None } else { Some(parse_default(CA_ADDRESS, default_istiod_address)?) }))?; + match parse::(USE_ENV_FOR_DEFAULT_ISTIOD_ADDR)? { + Some(true) => { + // If we are using the environment for XDS and CA addresses, we should override the + // values we just parsed with the environment variables. + // However, we need to keep the hosts around for SNI purposes. + let mut istiod_addr_env = "ISTIOD_SERVICE_HOST".to_string(); + if revision.as_ref().is_ok() { + istiod_addr_env = format!("ISTIOD_{}_SERVICE_HOST", revision.ok().unwrap().replace("-", "_")); + } + let istiod_host = env::var(istiod_addr_env.clone()).unwrap(); + let istiod_addr = format!("https://{}:15012", istiod_host); + let validated_addr = validate_uri(Some(istiod_addr))?; + xds_address = validated_addr.clone(); + ca_address = validated_addr.clone(); + } + Some(false) => {} + None => {} + } + let xds_root_cert_provider = parse_default(XDS_ROOT_CA_ENV, DEFAULT_ROOT_CERT_PROVIDER.to_string())?; let xds_root_cert = if Path::new(&xds_root_cert_provider).exists() { diff --git a/src/dns/handler.rs b/src/dns/handler.rs index 1091e0c585..e9027c5648 100644 --- a/src/dns/handler.rs +++ b/src/dns/handler.rs @@ -20,7 +20,7 @@ use hickory_resolver::ResolveErrorKind; use hickory_server::authority::{LookupError, MessageResponse, MessageResponseBuilder}; use hickory_server::server::{Request, RequestHandler, ResponseHandler, ResponseInfo}; use std::sync::Arc; -use tracing::{error, warn}; +use tracing::{error, info, warn}; /// A Trust-DNS [RequestHandler] that proxies all DNS requests. /// @@ -58,6 +58,7 @@ impl RequestHandler for Handler { request: &Request, response_handle: R, ) -> ResponseInfo { + info!("handling request: {:?}", request); match request.message_type() { MessageType::Query => match request.op_code() { OpCode::Query => self.lookup(request, response_handle).await, @@ -170,7 +171,6 @@ async fn send_response<'a, R: ResponseHandler>( mut response_handle: R, ) -> ResponseInfo { let result = response_handle.send_response(response).await; - match result { Err(e) => { error!("request error: {}", e); @@ -178,7 +178,10 @@ async fn send_response<'a, R: ResponseHandler>( header.set_response_code(ResponseCode::ServFail); header.into() } - Ok(info) => info, + Ok(info) => { + info!("response sent: {:?}", info.response_code()); + info + }, } } diff --git a/src/dns/server.rs b/src/dns/server.rs index 0f7c464495..c11d449beb 100644 --- a/src/dns/server.rs +++ b/src/dns/server.rs @@ -538,6 +538,7 @@ impl Resolver for Store { ), )] async fn lookup(&self, request: &Request) -> Result { + info!("looking up {request:?}"); let client = self.local_workload.get_workload().await.map_err(|_| { debug!("unknown source"); self.metrics.increment(&DnsRequest { diff --git a/src/identity/caclient.rs b/src/identity/caclient.rs index 921f286cdd..806a2438ed 100644 --- a/src/identity/caclient.rs +++ b/src/identity/caclient.rs @@ -112,6 +112,7 @@ impl CaClient { warn!("no chain certs for: {}", id); vec![] }; + debug!("received certificate for {:?}", id); let certs = tls::WorkloadCertificate::new(&private_key, leaf, chain)?; // Make the certificate actually matches the identity we requested. if self.enable_impersonated_identity && certs.identity().as_ref() != Some(id) { diff --git a/src/inpod.rs b/src/inpod.rs index 8ea1bc58eb..a7cabad492 100644 --- a/src/inpod.rs +++ b/src/inpod.rs @@ -1,38 +1,8 @@ -// Copyright Istio Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::config as zconfig; -use crate::readiness; -use metrics::Metrics; -use std::sync::Arc; -use workloadmanager::WorkloadProxyManager; - -use crate::proxyfactory::ProxyFactory; - -use self::config::InPodConfig; - -pub mod admin; -mod config; +#[cfg(target_os = "linux")] +pub mod linux; pub mod metrics; -pub mod netns; -pub mod packet; -mod protocol; -mod statemanager; -mod workloadmanager; - -#[cfg(any(test, feature = "testing"))] -pub mod test_helpers; +#[cfg(target_os = "windows")] +pub mod windows; pub mod istio { pub mod zds { @@ -42,7 +12,7 @@ pub mod istio { #[derive(thiserror::Error, Debug)] pub enum Error { - #[error("error creating proxy {0}: {1}")] + #[error("error creating proxy: {0}")] ProxyError(String, crate::proxy::Error), #[error("error receiving message: {0}")] ReceiveMessageError(String), @@ -67,45 +37,3 @@ impl WorkloadUid { self.0 } } - -#[derive(Debug)] -pub struct WorkloadData { - netns: std::os::fd::OwnedFd, - workload_uid: WorkloadUid, - workload_info: Option, -} - -#[derive(Debug)] -pub enum WorkloadMessage { - AddWorkload(WorkloadData), - KeepWorkload(WorkloadUid), - WorkloadSnapshotSent, - DelWorkload(WorkloadUid), -} - -pub fn init_and_new( - metrics: Arc, - admin_server: &mut crate::admin::Service, - cfg: &zconfig::Config, - proxy_gen: ProxyFactory, - ready: readiness::Ready, -) -> anyhow::Result { - // verify that we have the permissions for the syscalls we need - WorkloadProxyManager::verify_syscalls()?; - let admin_handler: Arc = Default::default(); - admin_server.add_handler(admin_handler.clone()); - let inpod_config = crate::inpod::InPodConfig::new(cfg)?; - - let state_mgr = statemanager::WorkloadProxyManagerState::new( - proxy_gen, - inpod_config, - metrics, - admin_handler, - ); - - Ok(WorkloadProxyManager::new( - cfg.inpod_uds.clone(), - state_mgr, - ready, - )?) -} diff --git a/src/inpod/linux.rs b/src/inpod/linux.rs new file mode 100644 index 0000000000..492ba4c97a --- /dev/null +++ b/src/inpod/linux.rs @@ -0,0 +1,78 @@ +// Copyright Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::metrics::Metrics; +use crate::config as zconfig; +use crate::inpod::istio; +use crate::readiness; +use std::sync::Arc; +use workloadmanager::WorkloadProxyManager; +use super::WorkloadUid; + +use crate::proxyfactory::ProxyFactory; + +use self::config::InPodConfig; + +pub mod admin; +mod config; +pub mod netns; +pub mod packet; +mod protocol; +mod statemanager; +mod workloadmanager; + +#[cfg(any(test, feature = "testing"))] +pub mod test_helpers; + +#[derive(Debug)] +pub struct WorkloadData { + netns: std::os::fd::OwnedFd, + workload_uid: WorkloadUid, + workload_info: Option, +} + +#[derive(Debug)] +pub enum WorkloadMessage { + AddWorkload(WorkloadData), + KeepWorkload(WorkloadUid), + WorkloadSnapshotSent, + DelWorkload(WorkloadUid), +} + +pub fn init_and_new( + metrics: Arc, + admin_server: &mut crate::admin::Service, + cfg: &zconfig::Config, + proxy_gen: ProxyFactory, + ready: readiness::Ready, +) -> anyhow::Result { + // verify that we have the permissions for the syscalls we need + WorkloadProxyManager::verify_syscalls()?; + let admin_handler: Arc = Default::default(); + admin_server.add_handler(admin_handler.clone()); + let inpod_config = InPodConfig::new(cfg)?; + + let state_mgr = statemanager::WorkloadProxyManagerState::new( + proxy_gen, + inpod_config, + metrics, + admin_handler, + ); + + Ok(WorkloadProxyManager::new( + cfg.inpod_uds.clone(), + state_mgr, + ready, + )?) +} diff --git a/src/inpod/admin.rs b/src/inpod/linux/admin.rs similarity index 97% rename from src/inpod/admin.rs rename to src/inpod/linux/admin.rs index 86a97af23c..5ed5a9beb0 100644 --- a/src/inpod/admin.rs +++ b/src/inpod/linux/admin.rs @@ -61,7 +61,11 @@ pub struct WorkloadManagerAdminHandler { } impl WorkloadManagerAdminHandler { - pub fn proxy_pending(&self, uid: &crate::inpod::WorkloadUid, workload_info: &WorkloadInfo) { + pub fn proxy_pending( + &self, + uid: &crate::inpod::WorkloadUid, + workload_info: &WorkloadInfo, + ) { let mut state = self.state.write().unwrap(); // don't increment count here, as it is only for up and down. see comment in count. diff --git a/src/inpod/config.rs b/src/inpod/linux/config.rs similarity index 97% rename from src/inpod/config.rs rename to src/inpod/linux/config.rs index 94a51debb5..3d2e27c468 100644 --- a/src/inpod/config.rs +++ b/src/inpod/linux/config.rs @@ -198,7 +198,7 @@ impl crate::proxy::SocketFactory for InPodSocketPortReuseFactory { mod test { use super::*; - use crate::inpod::test_helpers::new_netns; + use crate::inpod::linux::test_helpers::new_netns; macro_rules! fixture { () => {{ @@ -228,7 +228,7 @@ mod test { let sf = inpod_cfg.socket_factory( InpodNetns::new( - Arc::new(crate::inpod::netns::InpodNetns::current().unwrap()), + Arc::new(crate::inpod::linux::netns::InpodNetns::current().unwrap()), new_netns(), ) .unwrap(), @@ -275,7 +275,7 @@ mod test { let sf = inpod_cfg.socket_factory( InpodNetns::new( - Arc::new(crate::inpod::netns::InpodNetns::current().unwrap()), + Arc::new(crate::inpod::linux::netns::InpodNetns::current().unwrap()), new_netns(), ) .unwrap(), @@ -317,7 +317,7 @@ mod test { let sf = inpod_cfg.socket_factory( InpodNetns::new( - Arc::new(crate::inpod::netns::InpodNetns::current().unwrap()), + Arc::new(crate::inpod::linux::netns::InpodNetns::current().unwrap()), new_netns(), ) .unwrap(), diff --git a/src/inpod/netns.rs b/src/inpod/linux/netns.rs similarity index 100% rename from src/inpod/netns.rs rename to src/inpod/linux/netns.rs diff --git a/src/inpod/packet.rs b/src/inpod/linux/packet.rs similarity index 100% rename from src/inpod/packet.rs rename to src/inpod/linux/packet.rs diff --git a/src/inpod/protocol.rs b/src/inpod/linux/protocol.rs similarity index 98% rename from src/inpod/protocol.rs rename to src/inpod/linux/protocol.rs index 28c9f8cd1a..25f12c906c 100644 --- a/src/inpod/protocol.rs +++ b/src/inpod/linux/protocol.rs @@ -24,7 +24,6 @@ use tokio::net::UnixStream; use tracing::{debug, info, warn}; use zds::workload_request::Payload; -// Not dead code, but automock confuses Rust otherwise when built with certain targets #[allow(dead_code)] pub struct WorkloadStreamProcessor { stream: UnixStream, @@ -254,7 +253,7 @@ mod tests { use super::super::istio; use super::*; - use crate::inpod::test_helpers::uid; + use crate::inpod::linux::test_helpers::uid; use nix::sys::socket::MsgFlags; // Helpers to test get_workload_data_from_parts @@ -288,6 +287,7 @@ mod tests { name: "test".to_string(), namespace: "default".to_string(), service_account: "defaultsvc".to_string(), + windows_namespace: None, }; let uid = uid(0); let data = prep_request(zds::workload_request::Payload::Add( diff --git a/src/inpod/statemanager.rs b/src/inpod/linux/statemanager.rs similarity index 98% rename from src/inpod/statemanager.rs rename to src/inpod/linux/statemanager.rs index c3eac2f768..2e39326f27 100644 --- a/src/inpod/statemanager.rs +++ b/src/inpod/linux/statemanager.rs @@ -14,11 +14,11 @@ use crate::drain; use crate::drain::DrainTrigger; +use crate::inpod::linux::WorkloadMessage; +use crate::inpod::{metrics::Metrics, Error}; use std::sync::Arc; use tracing::{Instrument, debug, info}; -use super::{Error, WorkloadMessage, metrics::Metrics}; - use crate::proxyfactory::ProxyFactory; use crate::state::WorkloadInfo; @@ -388,8 +388,8 @@ impl WorkloadProxyManagerState { #[cfg(test)] mod tests { use super::*; - use crate::inpod::WorkloadData; - use crate::inpod::test_helpers::{self, create_proxy_conflict, new_netns, uid}; + use crate::inpod::linux::test_helpers::{self, create_proxy_conflict, new_netns, uid}; + use crate::inpod::linux::WorkloadData; use crate::inpod::istio::zds; use matches::assert_matches; @@ -398,7 +398,7 @@ mod tests { struct Fixture { state: WorkloadProxyManagerState, - metrics: Arc, + metrics: Arc, } fn workload_info() -> Option { @@ -406,6 +406,7 @@ mod tests { name: "name".to_string(), namespace: "ns".to_string(), service_account: "sa".to_string(), + windows_namespace: None, }) } diff --git a/src/inpod/test_helpers.rs b/src/inpod/linux/test_helpers.rs similarity index 96% rename from src/inpod/test_helpers.rs rename to src/inpod/linux/test_helpers.rs index 621a2515d8..14bb3d1797 100644 --- a/src/inpod/test_helpers.rs +++ b/src/inpod/linux/test_helpers.rs @@ -43,7 +43,7 @@ pub fn uid(i: usize) -> crate::inpod::WorkloadUid { pub struct Fixture { pub proxy_factory: ProxyFactory, pub ipc: InPodConfig, - pub inpod_metrics: Arc, + pub inpod_metrics: Arc, pub drain_tx: DrainTrigger, pub drain_rx: DrainWatcher, } @@ -96,7 +96,7 @@ impl Default for Fixture { Fixture { proxy_factory: proxy_gen, ipc, - inpod_metrics: Arc::new(crate::inpod::Metrics::new(&mut registry)), + inpod_metrics: Arc::new(crate::inpod::metrics::Metrics::new(&mut registry)), drain_tx, drain_rx, } @@ -228,7 +228,7 @@ pub async fn send_workload_del(s: &mut UnixStream, uid: super::WorkloadUid) { pub fn create_proxy_conflict(ns: &std::os::fd::OwnedFd) -> std::os::fd::OwnedFd { let inpodns = InpodNetns::new( - Arc::new(crate::inpod::netns::InpodNetns::current().unwrap()), + Arc::new(crate::inpod::linux::netns::InpodNetns::current().unwrap()), ns.try_clone().unwrap(), ) .unwrap(); diff --git a/src/inpod/workloadmanager.rs b/src/inpod/linux/workloadmanager.rs similarity index 98% rename from src/inpod/workloadmanager.rs rename to src/inpod/linux/workloadmanager.rs index aac7357b5c..b2f89f29ef 100644 --- a/src/inpod/workloadmanager.rs +++ b/src/inpod/linux/workloadmanager.rs @@ -20,8 +20,8 @@ use std::time::Duration; use tokio::net::UnixStream; use tracing::{debug, error, info, warn}; -use super::Error; use super::statemanager::WorkloadProxyManagerState; +use crate::inpod::Error; use super::protocol::WorkloadStreamProcessor; @@ -258,7 +258,7 @@ impl<'a> WorkloadProxyManagerProcessor<'a> { async fn read_message_and_retry_proxies( &mut self, processor: &mut WorkloadStreamProcessor, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let readmsg = processor.read_message(); // Note: readmsg future is NOT cancel safe, so we want to make sure this function doesn't exit // return without completing it. @@ -324,7 +324,7 @@ impl<'a> WorkloadProxyManagerProcessor<'a> { } }?; - debug!("received message: {:?}", msg); + info!("received message: {:?}", msg); // send ack: match self.state.process_msg(msg).await { @@ -386,7 +386,7 @@ pub(crate) mod tests { use super::*; - use crate::inpod::test_helpers::{ + use crate::inpod::linux::test_helpers::{ self, create_proxy_conflict, new_netns, read_hello, read_msg, send_snap_sent, send_workload_added, send_workload_del, uid, }; @@ -417,12 +417,13 @@ pub(crate) mod tests { name: "name".to_string(), namespace: "ns".to_string(), service_account: "sa".to_string(), + windows_namespace: None, }) } struct Fixture { state: WorkloadProxyManagerState, - inpod_metrics: Arc, + inpod_metrics: Arc, drain_rx: DrainWatcher, _drain_tx: DrainTrigger, } diff --git a/src/inpod/windows.rs b/src/inpod/windows.rs new file mode 100644 index 0000000000..91f110877c --- /dev/null +++ b/src/inpod/windows.rs @@ -0,0 +1,77 @@ +// Copyright Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::metrics::Metrics; +use crate::config as zconfig; +use crate::inpod::istio::zds::WorkloadInfo; +use crate::readiness; +use std::sync::Arc; +use workloadmanager::WorkloadProxyManager; +use super::WorkloadUid; + +use crate::proxyfactory::ProxyFactory; + +use self::config::InPodConfig; + +pub mod admin; +mod config; +pub mod namespace; +mod protocol; +mod statemanager; +mod workloadmanager; + +#[cfg(any(test, feature = "testing"))] +pub mod test_helpers; + + +#[derive(Debug)] +pub struct WorkloadData { + workload_uid: WorkloadUid, + workload_info: Option, +} + +#[derive(Debug)] +pub enum WorkloadMessage { + AddWorkload(WorkloadData), + KeepWorkload(WorkloadUid), + WorkloadSnapshotSent, + DelWorkload(WorkloadUid), +} + +pub fn init_and_new( + metrics: Arc, + admin_server: &mut crate::admin::Service, + cfg: &zconfig::Config, + proxy_gen: ProxyFactory, + ready: readiness::Ready, +) -> anyhow::Result { + // verify that we have the permissions for the syscalls we need + WorkloadProxyManager::verify_syscalls()?; + let admin_handler: Arc = Default::default(); + admin_server.add_handler(admin_handler.clone()); + let inpod_config = InPodConfig::new(cfg)?; + + let state_mgr = statemanager::WorkloadProxyManagerState::new( + proxy_gen, + inpod_config, + metrics, + admin_handler, + ); + + Ok(WorkloadProxyManager::new( + cfg.inpod_uds.clone(), + state_mgr, + ready, + )?) +} diff --git a/src/inpod/windows/admin.rs b/src/inpod/windows/admin.rs new file mode 100644 index 0000000000..5744a724ea --- /dev/null +++ b/src/inpod/windows/admin.rs @@ -0,0 +1,185 @@ +// Copyright Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; +use tracing::error; + +use crate::proxy::connection_manager::ConnectionManager; +use crate::state::WorkloadInfo; +use anyhow::anyhow; +use std::collections::HashMap; +use std::sync::RwLock; + +#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +pub enum State { + Pending, + Up, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ProxyState { + pub state: State, + + #[serde( + skip_serializing_if = "Option::is_none", + deserialize_with = "always_none", + default + )] + pub connections: Option, + + pub info: WorkloadInfo, + + // using reference counts to account for possible race between the proxy task that notifies us + // that a proxy is down, and the proxy factory task that notifies us when it is up. + #[serde(skip)] + count: usize, +} + +fn always_none<'de, D>(_deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + serde::de::IgnoredAny::deserialize(_deserializer)?; + Ok(None) +} + +#[derive(Default)] +pub struct WorkloadManagerAdminHandler { + state: RwLock>, +} + +impl WorkloadManagerAdminHandler { + pub fn proxy_pending( + &self, + uid: &crate::inpod::WorkloadUid, + workload_info: &WorkloadInfo, + ) { + let mut state = self.state.write().unwrap(); + + // don't increment count here, as it is only for up and down. see comment in count. + match state.get_mut(uid) { + Some(key) => { + key.state = State::Pending; + } + None => { + state.insert( + uid.clone(), + ProxyState { + state: State::Pending, + connections: None, + count: 0, + info: workload_info.clone(), + }, + ); + } + } + } + pub fn proxy_up( + &self, + uid: &crate::inpod::WorkloadUid, + workload_info: &WorkloadInfo, + cm: Option, + ) { + let mut state = self.state.write().unwrap(); + + match state.get_mut(uid) { + Some(key) => { + key.count += 1; + key.state = State::Up; + key.connections = cm; + key.info.clone_from(workload_info); + } + None => { + state.insert( + uid.clone(), + ProxyState { + state: State::Up, + connections: cm, + count: 1, + info: workload_info.clone(), + }, + ); + } + } + } + + pub fn proxy_down(&self, uid: &crate::inpod::WorkloadUid) { + let mut state = self.state.write().unwrap(); + + match state.get_mut(uid) { + Some(key) if key.count > 0 => { + key.count -= 1; + if key.count == 0 { + state.remove(uid); + } + } + _ => { + error!("proxy_down called where no proxy was created"); + debug_assert!(false, "proxy_down called where no proxy was created"); + } + } + } + + fn to_json(&self) -> anyhow::Result { + if let Ok(state) = self.state.read() { + Ok(serde_json::to_value(&*state)?) + } else { + Err(anyhow!("Failed to read state")) + } + } +} + +impl crate::admin::AdminHandler for WorkloadManagerAdminHandler { + fn key(&self) -> &'static str { + "workloadState" + } + + fn handle(&self) -> anyhow::Result { + self.to_json() + } +} + +// #[cfg(test)] +// mod test { +// use super::*; + +// #[test] +// fn test_proxy_state() { +// let handler = WorkloadManagerAdminHandler::default(); +// let data = || serde_json::to_string(&handler.to_json().unwrap()).unwrap(); + +// let uid1 = crate::inpod::linux::WorkloadUid::new("uid1".to_string()); +// handler.proxy_pending(&uid1, &None); +// assert_eq!(data(), r#"{"uid1":{"state":"Pending"}}"#); +// handler.proxy_up( +// &uid1, +// &Some(crate::state::WorkloadInfo { +// name: "name".to_string(), +// namespace: "ns".to_string(), +// service_account: "sa".to_string(), +// }), +// None, +// ); +// assert_eq!( +// data(), +// r#"{"uid1":{"info":{"name":"name","namespace":"ns","serviceAccount":"sa"},"state":"Up"}}"# +// ); +// handler.proxy_down(&uid1); +// assert_eq!(data(), "{}"); + +// let state = handler.state.read().unwrap(); +// assert_eq!(state.len(), 0); +// } +// } diff --git a/src/inpod/windows/config.rs b/src/inpod/windows/config.rs new file mode 100644 index 0000000000..280f9ea538 --- /dev/null +++ b/src/inpod/windows/config.rs @@ -0,0 +1,269 @@ +// Copyright Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + + +use crate::inpod::windows::namespace::InpodNamespace; +use crate::proxy::DefaultSocketFactory; +use crate::{config, socket}; + +pub struct InPodConfig { + cur_namespace: u32, + reuse_port: bool, // TODO: Not supported in windows so always must be false + socket_config: config::SocketConfig, +} + +impl InPodConfig { + pub fn new(cfg: &config::Config) -> std::io::Result { + if cfg.inpod_port_reuse { + return Err(std::io::Error::other( + "SO_REUSEPORT is not supported in windows", + )); + } + let socket_config = config::SocketConfig { + user_timeout_enabled: false, // Not supported on windows + ..cfg.socket_config + }; + Ok(InPodConfig { + cur_namespace: InpodNamespace::current()?, + reuse_port: cfg.inpod_port_reuse, + socket_config, + }) + } + pub fn socket_factory( + &self, + netns: InpodNamespace, + ) -> Box { + let base = crate::proxy::DefaultSocketFactory(self.socket_config); + let sf = InPodSocketFactory::from_cfg(base, self, netns); + if self.reuse_port { + // We should never get here + unreachable!("SO_REUSEPORT is not supported in windows"); + } else { + Box::new(sf) + } + } + + pub fn cur_netns(&self) -> u32 { + self.cur_namespace + } +} + +struct InPodSocketFactory { + inner: DefaultSocketFactory, + netns: InpodNamespace, +} +impl InPodSocketFactory { + fn from_cfg(inner: DefaultSocketFactory, _: &InPodConfig, netns: InpodNamespace) -> Self { + Self::new(inner, netns) + } + fn new(inner: DefaultSocketFactory,netns: InpodNamespace) -> Self { + Self {inner, netns } + } + + fn run_in_ns std::io::Result>(&self, f: F) -> std::io::Result { + self.netns.run(f)? + } + + fn configure std::io::Result>( + &self, + f: F, + ) -> std::io::Result { + let socket = self.netns.run(f)??; + + Ok(socket) + } +} + +impl crate::proxy::SocketFactory for InPodSocketFactory { + fn new_tcp_v4(&self) -> std::io::Result { + self.configure(|| self.inner.new_tcp_v4()) + } + + fn new_tcp_v6(&self) -> std::io::Result { + self.configure(|| self.inner.new_tcp_v6()) + } + + fn tcp_bind(&self, addr: std::net::SocketAddr) -> std::io::Result { + let std_sock = self.configure( || std::net::TcpListener::bind(addr))?; + std_sock.set_nonblocking(true)?; + tokio::net::TcpListener::from_std(std_sock).map(socket::Listener::new) + } + + fn udp_bind(&self, addr: std::net::SocketAddr) -> std::io::Result { + let std_sock = self.configure(|| std::net::UdpSocket::bind(addr))?; + std_sock.set_nonblocking(true)?; + tokio::net::UdpSocket::from_std(std_sock) + } + + fn ipv6_enabled_localhost(&self) -> std::io::Result { + self.run_in_ns(|| self.inner.ipv6_enabled_localhost()) + } +} + +// Same as socket factory, but sets SO_REUSEPORT +// struct InPodSocketPortReuseFactory { +// sf: InPodSocketFactory, +// } + +// impl InPodSocketPortReuseFactory { +// fn new(_: InPodSocketFactory) -> Self { +// panic!("SO_REUSEPORT is not supported in windows"); +// } +// } + +// #[cfg(test)] +// mod test { +// use super::*; + +// use crate::inpod::linux::test_helpers::new_netns; + +// macro_rules! fixture { +// () => {{ +// if !crate::test_helpers::can_run_privilged_test() { +// eprintln!("This test requires root; skipping"); +// return; +// } + +// crate::config::Config { +// packet_mark: Some(123), +// ..crate::config::parse_config().unwrap() +// } +// }}; +// } + +// #[tokio::test] +// async fn test_inpod_config_no_port_reuse() { +// let mut cfg = fixture!(); +// cfg.inpod_port_reuse = false; + +// let inpod_cfg = InPodConfig::new(&cfg).unwrap(); +// assert_eq!( +// inpod_cfg.mark(), +// Some(std::num::NonZeroU32::new(123).unwrap()) +// ); +// assert!(!inpod_cfg.reuse_port); + +// let sf = inpod_cfg.socket_factory( +// InpodNamespace::new( +// Arc::new(InpodNamespace::current().unwrap()), +// new_netns(), +// ) +// .unwrap(), +// ); + +// let sock_addr: std::net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); +// { +// let s = sf.tcp_bind(sock_addr).unwrap().inner(); + +// // make sure mark nad port re-use are set +// let sock_ref = socket2::SockRef::from(&s); +// assert_eq!( +// sock_ref.local_addr().unwrap(), +// socket2::SockAddr::from(sock_addr) +// ); +// assert!(!sock_ref.reuse_port().unwrap()); +// assert_eq!(sock_ref.mark().unwrap(), 123); +// } + +// { +// let s = sf.udp_bind(sock_addr).unwrap(); + +// // make sure mark nad port re-use are set +// let sock_ref = socket2::SockRef::from(&s); +// assert_eq!( +// sock_ref.local_addr().unwrap(), +// socket2::SockAddr::from(sock_addr) +// ); +// assert!(!sock_ref.reuse_port().unwrap()); +// assert_eq!(sock_ref.mark().unwrap(), 123); +// } +// } + +// #[tokio::test] +// async fn test_inpod_config_port_reuse() { +// let cfg = fixture!(); + +// let inpod_cfg = InPodConfig::new(&cfg).unwrap(); +// assert_eq!( +// inpod_cfg.mark(), +// Some(std::num::NonZeroU32::new(123).unwrap()) +// ); +// assert!(inpod_cfg.reuse_port); + +// let sf = inpod_cfg.socket_factory( +// InpodNamespace::new( +// Arc::new(crate::inpod::linux::netns::InpodNetns::current().unwrap()), +// new_netns(), +// ) +// .unwrap(), +// ); + +// let sock_addr: std::net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); +// { +// let s = sf.tcp_bind(sock_addr).unwrap().inner(); + +// // make sure mark nad port re-use are set +// let sock_ref = socket2::SockRef::from(&s); +// assert_eq!( +// sock_ref.local_addr().unwrap(), +// socket2::SockAddr::from(sock_addr) +// ); +// assert!(sock_ref.reuse_port().unwrap()); +// assert_eq!(sock_ref.mark().unwrap(), 123); +// } + +// { +// let s = sf.udp_bind(sock_addr).unwrap(); + +// // make sure mark nad port re-use are set +// let sock_ref = socket2::SockRef::from(&s); +// assert_eq!( +// sock_ref.local_addr().unwrap(), +// socket2::SockAddr::from(sock_addr) +// ); +// assert!(sock_ref.reuse_port().unwrap()); +// assert_eq!(sock_ref.mark().unwrap(), 123); +// } +// } + +// #[tokio::test] +// async fn test_inpod_config_outbound_sockets() { +// let cfg = fixture!(); + +// let inpod_cfg = InPodConfig::new(&cfg).unwrap(); + +// let sf = inpod_cfg.socket_factory( +// InpodNamespace::new( +// Arc::new(crate::inpod::linux::netns::InpodNetns::current().unwrap()), +// new_netns(), +// ) +// .unwrap(), +// ); + +// { +// let s = sf.new_tcp_v4().unwrap(); +// let sock_ref = socket2::SockRef::from(&s); +// assert!(!sock_ref.reuse_port().unwrap()); +// assert_eq!(sock_ref.mark().unwrap(), 123); +// } + +// { +// let s = sf.new_tcp_v6().unwrap(); +// let sock_ref = socket2::SockRef::from(&s); +// assert!(!sock_ref.reuse_port().unwrap()); +// assert_eq!(sock_ref.mark().unwrap(), 123); +// } +// } +// } diff --git a/src/inpod/windows/namespace.rs b/src/inpod/windows/namespace.rs new file mode 100644 index 0000000000..8fa5f321bb --- /dev/null +++ b/src/inpod/windows/namespace.rs @@ -0,0 +1,166 @@ +use std::sync::Arc; +use tracing::warn; +use windows::Win32::NetworkManagement::IpHelper::{ + GetCurrentThreadCompartmentId, SetCurrentThreadCompartmentId, +}; + +#[derive(Debug, Clone, Eq, Hash, PartialEq)] +pub struct Namespace { + pub id: u32, + pub guid: String, +} + +#[derive(Clone, Debug)] +pub struct InpodNamespace { + inner: Arc, +} + +#[derive(Debug, Eq, PartialEq)] +struct NetnsInner { + current_namespace: u32, + workload_namespace: Namespace, +} + +impl InpodNamespace { + pub fn current() -> std::io::Result { + let curr_namespace = unsafe { GetCurrentThreadCompartmentId() }; + if curr_namespace.0 == 0 { + warn!("GetCurrentThreadCompartmentId failed"); + return Err(std::io::Error::last_os_error()); + } + Ok(curr_namespace.0) + } + + pub fn capable() -> std::io::Result<()> { + // set the netns to our current netns. This is intended to be a no-op, + // and meant to be used as a test, so we can fail early if we can't set the netns + let curr_namespace = Self::current()?; + setns(curr_namespace) + } + + pub fn new(cur_namespace: u32, workload_namespace: String) -> std::io::Result { + let ns = hcn::get_namespace(&workload_namespace); + match ns { + Err(e) => { + warn!("Failed to get namespace: {}", e); + Err(std::io::Error::last_os_error()) + } + Ok(ns) => Ok(InpodNamespace { + inner: Arc::new(NetnsInner { + current_namespace: cur_namespace, + workload_namespace: Namespace { + id: ns + .namespace_id + .expect("There must always be a namespace id"), + guid: ns.id, + }, + }), + }), + } + } + + pub fn workload_namespace(&self) -> u32 { + self.inner.workload_namespace.id + } + + // Useful for logging / debugging + pub fn workload_namespace_guid(&self) -> String { + self.inner.workload_namespace.guid.clone() + } + + pub fn run(&self, f: F) -> std::io::Result + where + F: FnOnce() -> T, + { + setns(self.inner.workload_namespace.id)?; + let ret = f(); + setns(self.inner.current_namespace).expect("this must never fail"); + Ok(ret) + } +} + +// hop into a namespace +fn setns(namespace: u32) -> std::io::Result<()> { + let error = unsafe { SetCurrentThreadCompartmentId(namespace) }; + if error.0 != 0 { + return Err(std::io::Error::from_raw_os_error(error.0 as i32)); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use hcn::schema::HostComputeQuery; + use hcn::api; + use windows::core::GUID; + + use super::*; + + fn new_namespace() -> Namespace { + let api_namespace = hcn::schema::HostComputeNamespace::default(); + + let api_namespace = serde_json::to_string(&api_namespace).unwrap(); + let handle = hcn::api::create_namespace(&GUID::zeroed(), &api_namespace).unwrap(); + + // we don't get info back so need to query to get metadata about network + let query = HostComputeQuery::default(); + let query = serde_json::to_string(&query).unwrap(); + + let api_namespace = api::query_namespace_properties(handle, &query).unwrap(); + + let api_namespace: hcn::schema::HostComputeNamespace = + serde_json::from_str(&api_namespace).unwrap(); + + Namespace { + id: api_namespace.namespace_id.unwrap(), + guid: api_namespace.id, + } + } + + #[test] + fn test_run_works() { + if !crate::test_helpers::can_run_privilged_test() { + eprintln!("This test requires root; skipping"); + return; + } + + // TODO: Right now, creating a namespace doesn't automatically create a compartment + // (the actual network stack/context). Add these tests back when that capability is added + // in Windows Server 2025. + // start with new netns to not impact the current netns + // unshare(CloneFlags::CLONE_NEWNET).unwrap(); + // let cur_netns = InpodNetns::current().unwrap(); + // helpers::run_command("ip link add name dummy1 type dummy").unwrap(); + + // let other_netns = new_netns(); + + // let sync_netns = + // netns_rs::get_from_path(format!("/proc/self/fd/{}", other_netns.as_raw_fd())).unwrap(); + // sync_netns + // .run(|_| helpers::run_command("ip link add name dummy2 type dummy")) + // .expect("netns run failed") + // .unwrap(); + + // // test with future netns + // let netns = InpodNetns::new(Arc::new(cur_netns), other_netns).unwrap(); + + // let output = netns + // .run(|| Command::new("ip").args(["link", "show"]).output()) + // .expect("netns run failed") + // .expect("tokio command failed"); + + // assert!(output.status.success()); + // let out_str = String::from_utf8_lossy(output.stdout.as_slice()); + // assert!(!out_str.contains("dummy1")); + // assert!(out_str.contains("dummy2")); + + // // make sure we returned to the original ns + + // let output = Command::new("ip").args(["link", "show"]).output().unwrap(); + + // assert!(output.status.success()); + // let out_str = String::from_utf8_lossy(output.stdout.as_slice()); + // assert!(out_str.contains("dummy1")); + // assert!(!out_str.contains("dummy2")); + } +} diff --git a/src/inpod/windows/packet.rs b/src/inpod/windows/packet.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/src/inpod/windows/packet.rs @@ -0,0 +1 @@ + diff --git a/src/inpod/windows/protocol.rs b/src/inpod/windows/protocol.rs new file mode 100644 index 0000000000..a586d7abec --- /dev/null +++ b/src/inpod/windows/protocol.rs @@ -0,0 +1,128 @@ +use crate::drain::DrainWatcher; +use crate::inpod::windows::{WorkloadData, WorkloadMessage, WorkloadUid}; +use crate::inpod::istio::zds::{ + self, workload_request::Payload, Ack, Version, WorkloadRequest, WorkloadResponse, ZdsHello, +}; +use prost::Message; +use tracing::info; +use std::io::{IoSlice, IoSliceMut}; +use tokio::net::windows::named_pipe::*; + + +pub struct WorkloadStreamProcessor { + client: NamedPipeClient, + drain: DrainWatcher, +} + +impl WorkloadStreamProcessor { + pub fn new(client: NamedPipeClient, drain: DrainWatcher) -> Self { + WorkloadStreamProcessor { client, drain } + } + + pub async fn send_hello(&mut self) -> std::io::Result<()> { + let r = ZdsHello { + version: Version::V1 as i32, + }; + self.send_msg(r).await + } + + pub async fn send_ack(&mut self) -> std::io::Result<()> { + let r = WorkloadResponse { + payload: Some(zds::workload_response::Payload::Ack(Ack { + error: String::new(), + })), + }; + self.send_msg(r).await + } + + pub async fn send_nack(&mut self, e: anyhow::Error) -> std::io::Result<()> { + let r = WorkloadResponse { + payload: Some(zds::workload_response::Payload::Ack(Ack { + error: e.to_string(), + })), + }; + self.send_msg(r).await + } + + async fn send_msg(&mut self, r: T) -> std::io::Result<()> { + let mut buf = Vec::new(); + r.encode(&mut buf).unwrap(); + + let iov = [IoSlice::new(&buf)]; + loop { + self.client.writable().await?; + match self.client.try_write_vectored(&iov) { + Ok(n) => { + println!("Wrote {:?} bytes to pipe", n); + return Ok(()); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + println!("Received WouldBlock error, retrying"); + continue; + } + Err(e) => return Err(e), + }; + } + } + + pub async fn read_message(&self) -> anyhow::Result> { + // TODO: support messages for removing workload + let mut buffer: Vec = vec![0u8; 1024]; + let mut iov = [IoSliceMut::new(&mut buffer)]; + + let len = { + loop { + println!("Waiting for pipe to be readable"); + tokio::select! { + biased; // check drain first, so we don't read from the pipe if we are draining. + _ = self.drain.clone().wait_for_drain() => { + info!("workload proxy manager: drain requested"); + return Ok(None); + } + res = self.client.readable() => res, + }?; + let res = self.client.try_read_vectored(&mut iov); + let ok_res = match res { + Ok(res) => { + if res == 0 { + println!("No data read from pipe. Probably a bug"); + return Ok(None); + } + res + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + println!("Received WouldBlock error, retrying"); + continue; + } + Err(e) => { + return Err(e.into()); + } + }; + break ok_res; + } + }; + info!("Successfully read {:?} bytes from pipe", len); + get_workload_data(&buffer[..len]).map(Some) + } +} + +fn get_workload_data(data: &[u8]) -> anyhow::Result { + let req = get_info_from_data(data)?; + let payload = req.payload.ok_or(anyhow::anyhow!("no payload"))?; + match payload { + Payload::Add(a) => { + let uid = a.uid; + Ok(WorkloadMessage::AddWorkload(WorkloadData { + workload_uid: WorkloadUid::new(uid), + workload_info: a.workload_info, + })) + } + Payload::Keep(k) => Ok(WorkloadMessage::KeepWorkload(WorkloadUid::new(k.uid))), + Payload::Del(d) => Ok(WorkloadMessage::DelWorkload(WorkloadUid::new(d.uid))), + Payload::SnapshotSent(_) => Ok(WorkloadMessage::WorkloadSnapshotSent), + } +} + +fn get_info_from_data<'a>(data: impl bytes::Buf + 'a) -> anyhow::Result { + Ok(WorkloadRequest::decode(data)?) +} diff --git a/src/inpod/windows/statemanager.rs b/src/inpod/windows/statemanager.rs new file mode 100644 index 0000000000..c098383514 --- /dev/null +++ b/src/inpod/windows/statemanager.rs @@ -0,0 +1,599 @@ +use crate::drain; +use crate::drain::DrainTrigger; +use crate::inpod::metrics::Metrics; +use std::sync::Arc; +use crate::inpod::Error; + +use crate::proxyfactory::ProxyFactory; +use crate::state::WorkloadInfo; + +use super::config::InPodConfig; +use tracing::{debug, info, Instrument}; + +use super::namespace::InpodNamespace; +use super::WorkloadMessage; +use super::WorkloadUid; + +// Note: we can't drain on drop, as drain is async (it waits for the drain to finish). +pub(super) struct WorkloadState { + drain: DrainTrigger, + netns_id: u32, + netns_guid: String, +} + +#[derive(Default)] +struct DrainingTasks { + draining: Vec>, +} + +impl DrainingTasks { + fn shutdown_workload(&mut self, workload_state: WorkloadState) { + // Workload is gone, so no need to gracefully clean it up + let handle = tokio::spawn( + workload_state + .drain + .start_drain_and_wait(drain::DrainMode::Immediate), + ); + // before we push to draining, try to clear done entries, so the vector doesn't grow too much + self.draining.retain(|x| !x.is_finished()); + // add deleted pod to draining. we do this so we make sure to wait for it incase we + // get the global drain signal. + self.draining.push(handle); + } + + async fn join(self) { + futures::future::join_all(self.draining).await; + } +} + +pub struct WorkloadProxyManagerState { + proxy_gen: ProxyFactory, + metrics: Arc, + admin_handler: Arc, + // use hashbrown for extract_if + workload_states: hashbrown::HashMap, + + // workloads we wanted to start but couldn't because we had an error starting them. + // This happened to use mainly in testing when we redeploy ztunnel, and the old pod was + // not completely drained yet. + pending_workloads: hashbrown::HashMap, + draining: DrainingTasks, + + // new connection stuff + snapshot_received: bool, + snapshot_names: std::collections::HashSet, + + inpod_config: InPodConfig, +} + +// TODO: Audit usage of clone on InPodNamespace +impl WorkloadProxyManagerState { + pub fn new( + proxy_gen: ProxyFactory, + inpod_config: InPodConfig, + metrics: Arc, + admin_handler: Arc, + ) -> Self { + WorkloadProxyManagerState { + proxy_gen, + metrics, + admin_handler, + workload_states: Default::default(), + pending_workloads: Default::default(), + draining: Default::default(), + + snapshot_received: false, + snapshot_names: Default::default(), + inpod_config, + } + } + + #[cfg(test)] // only used in tests, so added this to avoid warning + pub(super) fn workload_states(&self) -> &hashbrown::HashMap { + &self.workload_states + } + + // Call this on new connection + pub fn reset_snapshot(&mut self) { + self.snapshot_names.clear(); + self.pending_workloads.clear(); + self.snapshot_received = false; + } + + pub async fn process_msg(&mut self, msg: WorkloadMessage) -> Result<(), Error> { + match msg { + WorkloadMessage::AddWorkload(poddata) => { + info!( + uid = poddata.workload_uid.0, + name = poddata + .workload_info + .as_ref() + .map(|w| w.name.as_str()) + .unwrap_or_default(), + namespace = poddata + .workload_info + .as_ref() + .map(|w| w.namespace.as_str()) + .unwrap_or_default(), + "pod received, starting proxy", + ); + let Some(wli) = poddata.workload_info else { + return Err(Error::ProtocolError( + "workload_info is required but not present".into(), + )); + }; + if !self.snapshot_received { + self.snapshot_names.insert(poddata.workload_uid.clone()); + } + let ns = wli.windows_namespace.expect("pod should have windows namespace"); + // TODO: this is currently failing because HNS doesn't have a network compartment + // for the namespace. + let netns = InpodNamespace::new( + self.inpod_config.cur_netns(), + ns.guid + ) + .map_err(|e| Error::ProxyError(poddata.workload_uid.0.clone(), crate::proxy::Error::Io(e)))?; + let info = WorkloadInfo { + name: wli.name, + namespace: wli.namespace, + service_account: wli.service_account, + }; + self.add_workload(&poddata.workload_uid, info, netns) + .await + .map_err(|e| Error::ProxyError(poddata.workload_uid.0, e)) + } + WorkloadMessage::KeepWorkload(workload_uid) => { + info!( + uid = workload_uid.0, + "pod keep received. will not delete it when snapshot is sent" + ); + if self.snapshot_received { + // this can only happen before snapshot is received. + return Err(Error::ProtocolError( + "pod keep received after snapshot".into(), + )); + } + self.snapshot_names.insert(workload_uid); + Ok(()) + } + WorkloadMessage::DelWorkload(workload_uid) => { + info!( + uid = workload_uid.0, + "pod delete request, shutting down proxy" + ); + if !self.snapshot_received { + // TODO: consider if this is an error. if not, do this instead: + // self.snapshot_names.remove(&workload_uid) + // self.pending_workloads.remove(&workload_uid) + return Err(Error::ProtocolError( + "pod delete received before snapshot".into(), + )); + } + self.del_workload(&workload_uid); + Ok(()) + } + WorkloadMessage::WorkloadSnapshotSent => { + info!("pod received snapshot sent"); + if self.snapshot_received { + return Err(Error::ProtocolError("pod snapshot received already".into())); + } + self.reconcile(); + // mark ready + self.snapshot_received = true; + Ok(()) + } + } + } + + // reconcile existing state to snaphsot. drains any workloads not in the snapshot + // this can happen if workloads were removed while we were disconnected. + fn reconcile(&mut self) { + for (_, workload_state) in self + .workload_states + .extract_if(|uid, _| !self.snapshot_names.contains(uid)) + { + self.draining.shutdown_workload(workload_state); + } + self.snapshot_names.clear(); + self.update_proxy_count_metrics(); + } + + pub async fn drain(self) { + let drain_futures = + self.workload_states.into_iter().map(|(_, v)| { + v.drain.start_drain_and_wait(drain::DrainMode::Graceful) + } /* do not .await here!!! */); + // join these first, as we need to drive these to completion + futures::future::join_all(drain_futures).await; + // these are join handles that are driven by tokio, we just need to wait for them, so join these + // last + self.draining.join().await; + } + + async fn add_workload( + &mut self, + workload_uid: &WorkloadUid, + workload_info: WorkloadInfo, + netns: InpodNamespace, + ) -> Result<(), crate::proxy::Error> { + match self + .add_workload_inner(workload_uid, &workload_info, netns.clone()) + .await + { + Ok(()) => { + self.update_proxy_count_metrics(); + Ok(()) + } + Err(e) => { + self.pending_workloads + .insert(workload_uid.clone(), (workload_info, netns)); + self.update_proxy_count_metrics(); + Err(e) + } + } + } + async fn add_workload_inner( + &mut self, + workload_uid: &WorkloadUid, + workload_info: &WorkloadInfo, + netns: InpodNamespace, + ) -> Result<(), crate::proxy::Error> { + // check if we have a proxy already + let maybe_existing = self.workload_states.get(workload_uid); + if let Some(existing) = maybe_existing { + if existing.netns_guid != netns.workload_namespace_guid() { + // inodes are different, we have a new netns. + // this can happen when there's a CNI failure (that's unrelated to us) which triggers + // pod sandobx to be re-created with a fresh new netns. + // drain the old proxy and add this one. + self.del_workload(workload_uid); + } else { + // idempotency - no error if we already have a proxy for the workload + // check if the inodes match. if they don't, we have a new netns + // we need to drain the previous proxy and add this one. + return Ok(()); + } + } + self.admin_handler + .proxy_pending(workload_uid, workload_info); + + let workload_namespace_guid = netns.workload_namespace_guid(); + + debug!( + workload=?workload_uid, + workload_info=?workload_info, + netns_id=?workload_namespace_guid, + "starting proxy", + ); + + // We create a per workload drain here. If the main loop in WorkloadProxyManager::run drains, + // we drain all these per-workload drains before exiting the loop + let (drain_tx, drain_rx) = drain::new(); + + let proxies = self + .proxy_gen + .new_proxies_from_factory( + Some(drain_rx), + workload_info.clone(), + Arc::from(self.inpod_config.socket_factory(netns.clone())), + ) + .await?; + + let uid = workload_uid.clone(); + + self.admin_handler + .proxy_up(&uid, workload_info, proxies.connection_manager); + + let metrics = self.metrics.clone(); + let admin_handler = self.admin_handler.clone(); + + metrics.proxies_started.inc(); + if let Some(proxy) = proxies.proxy { + tokio::spawn( + async move { + proxy.run().await; + debug!("proxy for workload {:?} exited", uid); + metrics.proxies_stopped.inc(); + admin_handler.proxy_down(&uid); + } + .instrument(tracing::info_span!("proxy", wl=%format!("{}/{}", workload_info.namespace, workload_info.name))), + ); + } + if let Some(proxy) = proxies.dns_proxy { + tokio::spawn(proxy.run().instrument(tracing::info_span!("dns_proxy", wl=%format!("{}/{}", workload_info.namespace, workload_info.name)))); + } + + let namespace_id = netns.clone().workload_namespace(); + self.workload_states.insert( + workload_uid.clone(), + WorkloadState { + drain: drain_tx, + netns_id: namespace_id, + netns_guid: workload_namespace_guid, + }, + ); + + Ok(()) + } + + pub fn have_pending(&self) -> bool { + !self.pending_workloads.is_empty() + } + + pub fn ready(&self) -> bool { + // We are ready after we received our first snapshot and don't have any proxies that failed to start. + self.snapshot_received && !self.have_pending() + } + + pub async fn retry_pending(&mut self) { + let current_pending_workloads = std::mem::take(&mut self.pending_workloads); + + for (uid, (info, netns)) in current_pending_workloads { + info!("retrying workload {:?}", uid); + match self.add_workload(&uid, info, netns).await { + Ok(()) => {} + Err(e) => { + info!("retrying workload {:?} failed: {}", uid, e); + } + } + } + } + + fn del_workload(&mut self, workload_uid: &WorkloadUid) { + // for idempotency, we ignore errors here (maybe just log / metric them) + self.pending_workloads.remove(workload_uid); + let Some(workload_state) = self.workload_states.remove(workload_uid) else { + // TODO: add metrics + return; + }; + + self.update_proxy_count_metrics(); + + self.draining.shutdown_workload(workload_state); + } + + fn update_proxy_count_metrics(&self) { + self.metrics + .active_proxy_count + .set(self.workload_states.len().try_into().unwrap_or(-1)); + self.metrics + .pending_proxy_count + .set(self.pending_workloads.len().try_into().unwrap_or(-1)); + } +} + +// #[cfg(test)] +// mod tests { +// use super::*; +// use crate::inpod::linux::test_helpers::{self, create_proxy_confilct, new_netns, uid}; +// use crate::inpod::linux::WorkloadData; + +// use std::sync::Arc; +// use std::time::Duration; + +// struct Fixture { +// state: WorkloadProxyManagerState, +// metrics: Arc, +// } + +// macro_rules! fixture { +// () => {{ +// if !crate::test_helpers::can_run_privilged_test() { +// eprintln!("This test requires root; skipping"); +// return; +// } +// let f = test_helpers::Fixture::default(); +// let state = WorkloadProxyManagerState::new( +// f.proxy_factory, +// f.ipc, +// f.inpod_metrics.clone(), +// Default::default(), +// ); +// Fixture { +// state, +// metrics: f.inpod_metrics, +// } +// }}; +// } + +// #[tokio::test] +// async fn add_workload_starts_a_proxy() { +// let fixture = fixture!(); +// let mut state = fixture.state; +// let data = WorkloadData { +// netns: new_netns(), +// workload_uid: uid(0), +// workload_info: None, +// }; +// state +// .process_msg(WorkloadMessage::AddWorkload(data)) +// .await +// .unwrap(); +// state.drain().await; +// } + +// #[tokio::test] +// async fn idemepotency_add_workload_starts_only_one_proxy() { +// let fixture = fixture!(); +// let mut state = fixture.state; +// let ns = new_netns(); +// let data = WorkloadData { +// netns: ns.try_clone().unwrap(), +// workload_uid: uid(0), +// workload_info: None, +// }; +// state +// .process_msg(WorkloadMessage::AddWorkload(data)) +// .await +// .unwrap(); +// let data = WorkloadData { +// netns: ns, +// workload_uid: uid(0), +// workload_info: None, +// }; +// state +// .process_msg(WorkloadMessage::AddWorkload(data)) +// .await +// .unwrap(); +// state.drain().await; +// } + +// #[tokio::test] +// async fn idemepotency_add_workload_fails() { +// let fixture = fixture!(); +// let m = fixture.metrics.clone(); +// let mut state = fixture.state; +// let ns = new_netns(); +// // to make the proxy fail, bind to its ports in its netns +// let sock = create_proxy_confilct(&ns); + +// let data = WorkloadData { +// netns: ns, +// workload_uid: uid(0), +// workload_info: None, +// }; + +// let ret = state.process_msg(WorkloadMessage::AddWorkload(data)).await; +// assert!(ret.is_err()); +// assert!(state.have_pending()); + +// std::mem::drop(sock); +// // Unfortunate but necessary. When we close a socket in listener, the port is not synchronously freed. +// // This can lead to our retry failing due to a conflict. There doesn't seem to be a great way to reliably detect this. +// // Sleeping 10ms, however, is quite small and seems very reliable. +// tokio::time::sleep(Duration::from_millis(10)).await; + +// state.retry_pending().await; +// assert!(!state.have_pending()); +// state.drain().await; +// assert_eq!(m.proxies_started.get_or_create(&()).get(), 1); +// } + +// #[tokio::test] +// async fn idemepotency_add_workload_fails_and_then_deleted() { +// let fixture = fixture!(); +// let mut state = fixture.state; + +// let ns = new_netns(); +// // to make the proxy fail, bind to its ports in its netns +// let _sock = create_proxy_confilct(&ns); + +// let data = WorkloadData { +// netns: ns, +// workload_uid: uid(0), +// workload_info: None, +// }; +// state +// .process_msg(WorkloadMessage::WorkloadSnapshotSent) +// .await +// .unwrap(); + +// let ret = state.process_msg(WorkloadMessage::AddWorkload(data)).await; +// assert!(ret.is_err()); +// assert!(state.have_pending()); + +// state +// .process_msg(WorkloadMessage::DelWorkload(uid(0))) +// .await +// .unwrap(); + +// assert!(!state.have_pending()); +// state.drain().await; +// } + +// #[tokio::test] +// async fn add_delete_add_workload_starts_only_one_proxy() { +// let fixture = fixture!(); +// let mut state = fixture.state; + +// let ns = new_netns(); +// let data = WorkloadData { +// netns: ns.try_clone().unwrap(), +// workload_uid: uid(0), +// workload_info: None, +// }; + +// let workload_uid = data.workload_uid.clone(); + +// let msg1 = WorkloadMessage::AddWorkload(data); +// let msg2 = WorkloadMessage::DelWorkload(workload_uid.clone()); +// let msg3 = WorkloadMessage::AddWorkload(WorkloadData { +// netns: ns, +// workload_uid, +// workload_info: None, +// }); + +// state +// .process_msg(WorkloadMessage::WorkloadSnapshotSent) +// .await +// .unwrap(); +// state.process_msg(msg1).await.unwrap(); +// state.process_msg(msg2).await.unwrap(); +// // give a bit of time for the proxy to drain +// tokio::time::sleep(std::time::Duration::from_millis(100)).await; +// state.process_msg(msg3).await.unwrap(); +// state.drain().await; +// } + +// #[tokio::test] +// async fn proxy_added_then_kept_with_new_snapshot() { +// let fixture = fixture!(); +// let m = fixture.metrics.clone(); +// let mut state = fixture.state; + +// let data = WorkloadData { +// netns: new_netns(), +// workload_uid: uid(0), +// workload_info: None, +// }; + +// let workload_uid = data.workload_uid.clone(); + +// let msg1 = WorkloadMessage::AddWorkload(data); +// let msg2 = WorkloadMessage::KeepWorkload(workload_uid.clone()); + +// state.process_msg(msg1).await.unwrap(); +// state +// .process_msg(WorkloadMessage::WorkloadSnapshotSent) +// .await +// .unwrap(); +// state.reset_snapshot(); +// state.process_msg(msg2).await.unwrap(); +// state +// .process_msg(WorkloadMessage::WorkloadSnapshotSent) +// .await +// .unwrap(); + +// assert_eq!(m.proxies_started.get_or_create(&()).get(), 1); + +// state.drain().await; +// } + +// #[tokio::test] +// async fn add_with_different_netns_keeps_latest_proxy() { +// let fixture = fixture!(); +// let m = fixture.metrics.clone(); +// let mut state = fixture.state; + +// let data = WorkloadData { +// netns: new_netns(), +// workload_uid: uid(0), +// workload_info: None, +// }; +// let workload_uid = data.workload_uid.clone(); + +// let add1 = WorkloadMessage::AddWorkload(data); +// let add2 = WorkloadMessage::AddWorkload(WorkloadData { +// netns: new_netns(), +// workload_uid, +// workload_info: None, +// }); + +// state.process_msg(add1).await.unwrap(); +// state.process_msg(add2).await.unwrap(); +// state.drain().await; + +// assert_eq!(m.proxies_started.get_or_create(&()).get(), 2); +// assert_eq!(m.active_proxy_count.get_or_create(&()).get(), 1); +// } +// } diff --git a/src/inpod/windows/test_helpers.rs b/src/inpod/windows/test_helpers.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/src/inpod/windows/test_helpers.rs @@ -0,0 +1 @@ + diff --git a/src/inpod/windows/workloadmanager.rs b/src/inpod/windows/workloadmanager.rs new file mode 100644 index 0000000000..b920763cd2 --- /dev/null +++ b/src/inpod/windows/workloadmanager.rs @@ -0,0 +1,649 @@ +// Copyright Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::drain::DrainWatcher; +use crate::readiness; +use backoff::{backoff::Backoff, ExponentialBackoff}; +use std::path::PathBuf; +use std::time::Duration; +use tokio::time::sleep; +use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeClient}; +use tracing::{debug, error, info, warn}; + +use super::statemanager::WorkloadProxyManagerState; +use crate::inpod::Error; + +use crate::inpod::windows::namespace::InpodNamespace; + +use super::protocol::WorkloadStreamProcessor; + +const RETRY_DURATION: Duration = Duration::from_secs(5); + +const CONNECTION_FAILURE_RETRY_DELAY_MAX_INTERVAL: Duration = Duration::from_secs(15); + +struct WorkloadProxyNetworkHandler { + uds: PathBuf, +} + +struct WorkloadProxyReadinessHandler { + ready: readiness::Ready, + // Manually drop as we don't want to mark ready if we are dropped. + // This can happen when the server drains. + block_ready: Option>, + backoff: ExponentialBackoff, +} + +pub struct WorkloadProxyManager { + state: super::statemanager::WorkloadProxyManagerState, + networking: WorkloadProxyNetworkHandler, + // readiness - we are only ready when we are connected. if we get disconnected, we become not ready. + readiness: WorkloadProxyReadinessHandler, +} + +struct WorkloadProxyManagerProcessor<'a> { + state: &'a mut super::statemanager::WorkloadProxyManagerState, + readiness: &'a mut WorkloadProxyReadinessHandler, + + next_pending_retry: Option>>, +} + +impl WorkloadProxyReadinessHandler { + fn new(ready: readiness::Ready, reconnect_backoff: Option) -> Self { + let backoff = reconnect_backoff.unwrap_or(ExponentialBackoff { + initial_interval: Duration::from_millis(5), + max_interval: CONNECTION_FAILURE_RETRY_DELAY_MAX_INTERVAL, + multiplier: 2.0, + randomization_factor: 0.2, + ..Default::default() + }); + + let mut r = Self { + ready, + block_ready: None, + backoff, + }; + r.not_ready(); + r + } + fn mark_ready(&mut self) { + // take the block ready out of the ManuallyDrop, and drop it. + if self.block_ready.is_some() { + debug!("workload proxy manager is ready"); + let block_ready: Option = self + .block_ready + .take() + .map(std::mem::ManuallyDrop::into_inner); + + std::mem::drop(block_ready); + } + + self.backoff.reset() + } + + fn not_ready(&mut self) { + debug!("workload proxy manager is NOT ready"); + if self.block_ready.is_none() { + self.block_ready = Some(std::mem::ManuallyDrop::new( + self.ready.register_task("workload proxy manager"), + )); + } + } +} + +impl WorkloadProxyNetworkHandler { + fn new(uds: PathBuf) -> std::io::Result { + Ok(Self { uds }) + } + + // OLD CODE USING UNIX STREAM + // async fn connect(&self) -> UnixStream { + // let mut backoff = Duration::from_millis(10); + + // debug!("connecting to server: {:?}", self.uds); + + // loop { + // match super::packet::connect(&self.uds).await { + // Err(e) => { + // backoff = + // std::cmp::min(CONNECTION_FAILURE_RETRY_DELAY_MAX_INTERVAL, backoff * 2); + // warn!( + // "failed to connect to the Istio CNI node agent over {:?}, is the node agent healthy? details: {:?}. retrying in {:?}", + // &self.uds, e, backoff + // ); + // tokio::time::sleep(backoff).await; + // continue; + // } + + // Ok(conn) => { + // return conn; + // } + // }; + // } + // } + + // TODO make this work without UnixStream + async fn connect(&self) -> Result { + let mut backoff = Duration::from_millis(10); + + debug!("connecting to server: {:?}", self.uds); + + loop { + let client_options = ClientOptions::new() + .pipe_mode(tokio::net::windows::named_pipe::PipeMode::Message) + .open(&self.uds); + match client_options { + Ok(client) => { + info!("connected to server: {:?}", self.uds); + return Ok(client); + } + Err(ref e) => { + error!("failed to connect to server: {:?}, error: {:?}", self.uds, e); + sleep(backoff).await; + backoff *= 2; + } + } + } + } +} + +impl WorkloadProxyManager { + pub fn verify_syscalls() -> anyhow::Result<()> { + // verify that we are capable, so we can fail early if not. + InpodNamespace::capable().map_err(|e| anyhow::anyhow!("failed to set netns: {:?}", e)) + } + + pub fn new( + uds: PathBuf, + state: WorkloadProxyManagerState, + ready: readiness::Ready, + ) -> std::io::Result { + let networking = WorkloadProxyNetworkHandler::new(uds)?; + + let mgr = WorkloadProxyManager { + state, + networking, + readiness: WorkloadProxyReadinessHandler::new(ready, None), + }; + Ok(mgr) + } + + pub async fn run(mut self, drain: DrainWatcher) -> Result<(), anyhow::Error> { + self.run_internal(drain).await?; + + // We broke the loop, this can only happen when drain was signaled + // or we got a terminal protocol error. Drain our proxies. + debug!("workload proxy manager waiting for proxies to drain"); + self.state.drain().await; + debug!("workload proxy manager proxies drained"); + Ok(()) + } + + // This func will run and attempt to (re)connect to the node agent over uds, until + // - a drain is signaled + // - we have a ProtocolError (we have a serious version mismatch) + // We should never _have_ a protocol error as the gRPC proto should be forwards+backwards compatible, + // so this is mostly a safeguard + async fn run_internal(&mut self, drain: DrainWatcher) -> Result<(), anyhow::Error> { + // for now just drop block_ready, until we support knowing that our state is in sync. + debug!("workload proxy manager is running"); + // hold the release shutdown until we are done with `state.drain` below. + + let _rs = loop { + // Accept a connection + let stream = tokio::select! { + biased; // check the drain first + rs = drain.clone().wait_for_drain() => { + info!("drain requested"); + break rs; + } + res = self.networking.connect() => res?, + }; + + info!("handling new stream"); + + // TODO: add metrics? + + let processor = WorkloadStreamProcessor::new(stream, drain.clone()); + let mut processor_helper = + WorkloadProxyManagerProcessor::new(&mut self.state, &mut self.readiness); + match processor_helper.process(processor).await { + Ok(()) => { + info!("process stream ended with eof"); + } + // If we successfully accepted a connection, but the first thing we try (announce) + // fails, it can mean 2 things: + // 1. The connection was killed because the node agent happened to restart at a bad time + // 2. The connection was killed because we have a protocol mismatch with the node agent. + // + // For case 1, we must keep retrying. For case 2 we shouldn't retry, as we will spam + // an incompatible server with messages and connections it can't understand. + // + // We also cannot easily tell these cases apart due to the simplistic protocol in use here, + // so a happy medium is to backoff if we get announce errors - they could be legit or + // non-legit disconnections, we can't tell. + Err(Error::AnnounceError(e)) => { + self.readiness.not_ready(); + // This will retry infinitely for as long as the socket doesn't EOF, but not immediately. + let wait = self + .readiness + .backoff + .next_backoff() + .unwrap_or(CONNECTION_FAILURE_RETRY_DELAY_MAX_INTERVAL); + error!("node agent announcement failed ({e}), retrying in {wait:?}"); + tokio::time::sleep(wait).await; + continue; + } + Err(Error::ProtocolError(e)) => { + error!("protocol mismatch error while processing stream, shutting down"); + self.readiness.not_ready(); + return Err(anyhow::anyhow!("protocol error {:?}", e)); + } + Err(e) => { + // for other errors, just retry + warn!("process stream ended: {:?}", e); + } + }; + + self.readiness.not_ready(); + }; + + Ok(()) + } +} + +impl<'a> WorkloadProxyManagerProcessor<'a> { + fn new( + state: &'a mut super::statemanager::WorkloadProxyManagerState, + readiness: &'a mut WorkloadProxyReadinessHandler, + ) -> Self { + state.reset_snapshot(); + Self { + state, + readiness, + next_pending_retry: None, + } + } + + async fn read_message_and_retry_proxies( + &mut self, + processor: &mut WorkloadStreamProcessor, + ) -> anyhow::Result> { + let readmsg = processor.read_message(); + // Note: readmsg future is NOT cancel safe, so we want to make sure this function doesn't exit + // return without completing it. + futures::pin_mut!(readmsg); + loop { + match self.next_pending_retry.take() { + None => { + return readmsg.await; + } + Some(timer) => { + match futures_util::future::select(timer, readmsg).await { + futures_util::future::Either::Left((_, readmsg_fut)) => { + self.retry_proxies().await; + // we have an uncompleted future. It might be in the middle of recvmsg. + // to make sure we don't drop messages, we complete the original + // future and not start a new one. + readmsg = readmsg_fut; + } + futures_util::future::Either::Right((res, timer)) => { + // we have a message before the timer expired + // put the timer back so we will wait for the time remaining next time. + self.next_pending_retry = Some(timer); + return res; + } + }; + } + } + } + } + + async fn retry_proxies(&mut self) { + self.state.retry_pending().await; + if self.state.have_pending() { + self.schedule_retry(); + } else { + self.next_pending_retry.take(); + self.check_ready(); + } + } + + pub async fn process(&mut self, mut processor: WorkloadStreamProcessor) -> Result<(), Error> { + processor + .send_hello() + .await + .map_err(|_| Error::AnnounceError("could not announce to node agent".into()))?; + + loop { + let msg = match self.read_message_and_retry_proxies(&mut processor).await { + Ok(Some(msg)) => Ok(msg), + Ok(None) => { + return Ok(()); + } + Err(e) => { + error!("failed to read message: {:?}", e); + + // TODO: make it clear that the error is from reading the message, and not from processing it. + // .map_err(|e| Error::ReceiveMessageError(e.to_string()))?; + processor + .send_nack(anyhow::anyhow!("failure to read message : {:?}", e)) + .await + .map_err(|e: std::io::Error| Error::SendNackError(e.to_string()))?; + Err(Error::ReceiveMessageError(e.to_string())) + } + }?; + + debug!("received message: {:?}", msg); + + // send ack: + match self.state.process_msg(msg).await { + Ok(()) => { + self.check_ready(); + processor + .send_ack() + .await + .map_err(|e| Error::SendAckError(e.to_string()))?; + } + Err(Error::ProxyError(uid, e)) => { + error!(%uid, "failed to start proxy: {:?}", e); + // setup the retry timer: + self.schedule_retry(); + // proxy error is a transient error, so report it but don't disconnect + // TODO: raise metrics + processor + .send_nack(anyhow::anyhow!("failure to start proxy : {:?}", e)) + .await + .map_err(|e| Error::SendNackError(e.to_string()))?; + } + Err(e) => { + // TODO: raise metrics + error!("failed to process message: {:?}", e); + processor + .send_nack(anyhow::anyhow!("failure to process message : {:?}", e)) + .await + .map_err(|e| Error::SendNackError(e.to_string()))?; + // other errors are not recoverable, so exit function the function to re-connect. + // also, these errors should never happen, so log/metrics/document them. + return Err(e); + } + }; + } + } + + fn schedule_retry(&mut self) { + if self.next_pending_retry.is_none() { + info!("scheduling retry"); + self.next_pending_retry = Some(Box::pin(tokio::time::sleep(RETRY_DURATION))); + } + } + fn check_ready(&mut self) { + if self.state.ready() { + self.readiness.mark_ready(); + } else { + self.readiness.not_ready(); + } + } +} + +// #[cfg(test)] +// pub(crate) mod tests { + +// use super::super::protocol::WorkloadStreamProcessor; + +// use tokio::io::AsyncWriteExt; + +// use super::*; + +// use crate::inpod::linux::test_helpers::{ +// self, create_proxy_confilct, new_netns, read_hello, read_msg, send_snap_sent, +// send_workload_added, send_workload_del, uid, +// }; + +// use crate::drain::DrainTrigger; +// use std::{collections::HashSet, sync::Arc}; + +// fn assert_end_stream(res: Result<(), Error>) { +// match res { +// Err(Error::ReceiveMessageError(e)) => { +// assert!(e.contains("EOF")); +// } +// Ok(()) => {} +// Err(e) => panic!("expected error due to EOF {:?}", e), +// } +// } + +// fn assert_announce_error(res: Result<(), Error>) { +// match res { +// Err(Error::AnnounceError(_)) => {} +// _ => panic!("expected announce error"), +// } +// } + +// struct Fixture { +// state: WorkloadProxyManagerState, +// inpod_metrics: Arc, +// drain_rx: DrainWatcher, +// _drain_tx: DrainTrigger, +// } + +// macro_rules! fixture { +// () => {{ +// if !crate::test_helpers::can_run_privilged_test() { +// eprintln!("This test requires root; skipping"); +// return; +// } +// let f = test_helpers::Fixture::default(); +// let state = WorkloadProxyManagerState::new( +// f.proxy_factory, +// f.ipc, +// f.inpod_metrics.clone(), +// Default::default(), +// ); +// Fixture { +// state, +// inpod_metrics: f.inpod_metrics, +// drain_rx: f.drain_rx, +// _drain_tx: f.drain_tx, +// } +// }}; +// } + +// #[tokio::test] +// async fn test_process_add() { +// let f = fixture!(); +// let (s1, mut s2) = UnixStream::pair().unwrap(); +// let processor = WorkloadStreamProcessor::new(s1, f.drain_rx.clone()); +// let mut state = f.state; + +// let server = tokio::spawn(async move { +// read_hello(&mut s2).await; +// send_workload_added(&mut s2, uid(0), new_netns()).await; +// read_msg(&mut s2).await; +// }); + +// let mut readiness = WorkloadProxyReadinessHandler::new(readiness::Ready::new(), None); +// let mut processor_helper = WorkloadProxyManagerProcessor::new(&mut state, &mut readiness); + +// let res = processor_helper.process(processor).await; +// // make sure that the error is due to eof: +// assert_end_stream(res); +// assert!(!readiness.ready.pending().is_empty()); +// state.drain().await; +// server.await.unwrap(); +// } + +// #[tokio::test] +// async fn test_process_failed_announce() { +// let f = fixture!(); +// let (s1, mut s2) = UnixStream::pair().unwrap(); +// let processor = WorkloadStreamProcessor::new(s1, f.drain_rx.clone()); +// let mut state = f.state; + +// // fake server that simply slams the socket shut and bails +// let server = tokio::spawn(async move { +// let _ = s2.shutdown().await; +// }); + +// let mut readiness = WorkloadProxyReadinessHandler::new(readiness::Ready::new(), None); +// let mut processor_helper = WorkloadProxyManagerProcessor::new(&mut state, &mut readiness); + +// let res = processor_helper.process(processor).await; +// // make sure that the error is due to announce fail: +// assert_announce_error(res); +// assert!(!readiness.ready.pending().is_empty()); +// state.drain().await; +// server.await.unwrap(); +// } + +// #[tokio::test] +// async fn test_process_failed() { +// let f = fixture!(); +// let (s1, mut s2) = UnixStream::pair().unwrap(); +// let processor: WorkloadStreamProcessor = +// WorkloadStreamProcessor::new(s1, f.drain_rx.clone()); +// let mut state = f.state; + +// let podns = new_netns(); +// let socket = create_proxy_confilct(&podns); + +// let server = tokio::spawn(async move { +// read_hello(&mut s2).await; +// send_workload_added(&mut s2, uid(0), podns).await; +// read_msg(&mut s2).await; +// send_snap_sent(&mut s2).await; +// read_msg(&mut s2).await; +// }); + +// let mut readiness = WorkloadProxyReadinessHandler::new(readiness::Ready::new(), None); +// let mut processor_helper = WorkloadProxyManagerProcessor::new(&mut state, &mut readiness); + +// let res = processor_helper.process(processor).await; +// assert_end_stream(res); +// std::mem::drop(socket); +// server.await.unwrap(); + +// // not ready as we have a failing proxy +// assert!(!processor_helper.readiness.ready.pending().is_empty()); +// assert!(processor_helper.next_pending_retry.is_some()); + +// // now make sure that re-trying works: +// // all should be ready: +// processor_helper.retry_proxies().await; +// assert!(processor_helper.readiness.ready.pending().is_empty()); +// assert!(processor_helper.next_pending_retry.is_none()); + +// state.drain().await; +// } + +// #[tokio::test] +// async fn test_process_add_and_del() { +// let f = fixture!(); +// let m = f.inpod_metrics; +// let mut state = f.state; +// let (s1, mut s2) = UnixStream::pair().unwrap(); +// let processor: WorkloadStreamProcessor = +// WorkloadStreamProcessor::new(s1, f.drain_rx.clone()); + +// let podns = new_netns(); +// let server = tokio::spawn(async move { +// read_hello(&mut s2).await; +// send_workload_added(&mut s2, uid(0), podns).await; +// read_msg(&mut s2).await; +// send_snap_sent(&mut s2).await; +// read_msg(&mut s2).await; +// send_workload_del(&mut s2, uid(0)).await; +// read_msg(&mut s2).await; +// }); + +// let mut readiness = WorkloadProxyReadinessHandler::new(readiness::Ready::new(), None); +// let mut processor_helper = WorkloadProxyManagerProcessor::new(&mut state, &mut readiness); + +// let res = processor_helper.process(processor).await; +// server.await.unwrap(); +// // make sure that the error is due to eof: +// assert_end_stream(res); + +// assert_eq!(state.workload_states().len(), 0); +// assert_eq!(m.active_proxy_count.get_or_create(&()).get(), 0); +// assert!(readiness.ready.pending().is_empty()); + +// state.drain().await; +// } + +// #[tokio::test] +// async fn test_process_snapshot_with_missing_workload() { +// let f = fixture!(); +// let m = f.inpod_metrics; +// let (s1, mut s2) = UnixStream::pair().unwrap(); +// let processor = WorkloadStreamProcessor::new(s1, f.drain_rx.clone()); +// let mut state = f.state; + +// let server = tokio::spawn(async move { +// read_hello(&mut s2).await; +// send_workload_added(&mut s2, uid(0), new_netns()).await; +// read_msg(&mut s2).await; +// send_workload_added(&mut s2, uid(1), new_netns()).await; +// read_msg(&mut s2).await; +// send_snap_sent(&mut s2).await; +// read_msg(&mut s2).await; +// }); + +// let mut readiness = WorkloadProxyReadinessHandler::new(readiness::Ready::new(), None); + +// let mut processor_helper = WorkloadProxyManagerProcessor::new(&mut state, &mut readiness); +// let res = processor_helper.process(processor).await; + +// assert_end_stream(res); +// server.await.unwrap(); +// assert!(readiness.ready.pending().is_empty()); + +// // first proxy should be here: +// assert_eq!(state.workload_states().len(), 2); +// let key_set: HashSet = +// state.workload_states().keys().cloned().collect(); +// let expected_key_set: HashSet = [0, 1] +// .into_iter() +// .map(uid) +// .map(crate::inpod::linux::WorkloadUid::from) +// .collect(); +// assert_eq!(key_set, expected_key_set); +// assert_eq!(m.active_proxy_count.get_or_create(&()).get(), 2); + +// // second connection - don't send the one of the proxies here, to see ztunnel reconciles and removes it: +// let (s1, mut s2) = UnixStream::pair().unwrap(); +// let processor = WorkloadStreamProcessor::new(s1, f.drain_rx.clone()); + +// let server = tokio::spawn(async move { +// read_hello(&mut s2).await; +// send_workload_added(&mut s2, uid(1), new_netns()).await; +// read_msg(&mut s2).await; +// send_snap_sent(&mut s2).await; +// read_msg(&mut s2).await; +// }); + +// let mut processor_helper = WorkloadProxyManagerProcessor::new(&mut state, &mut readiness); +// let res = processor_helper.process(processor).await; + +// assert_end_stream(res); +// server.await.unwrap(); + +// // only second workload should remain +// assert_eq!(state.workload_states().len(), 1); +// assert_eq!(state.workload_states().keys().next(), Some(&uid(1))); +// assert_eq!(m.active_proxy_count.get_or_create(&()).get(), 1); +// assert!(readiness.ready.pending().is_empty()); + +// state.drain().await; +// } +// } diff --git a/src/lib.rs b/src/lib.rs index 377edab514..3fcfab46b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,7 +23,6 @@ pub mod dns; pub mod drain; pub mod hyper_util; pub mod identity; -#[cfg(target_os = "linux")] pub mod inpod; pub mod metrics; pub mod proxy; diff --git a/src/proxy.rs b/src/proxy.rs index 6c24ef2e46..338986ee90 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -121,18 +121,31 @@ impl DefaultSocketFactory { ); } if cfg.user_timeout_enabled { - // https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/ - // TCP_USER_TIMEOUT = TCP_KEEPIDLE + TCP_KEEPINTVL * TCP_KEEPCNT. - let ut = cfg.keepalive_time + cfg.keepalive_retries * cfg.keepalive_interval; - tracing::trace!( - "set user timeout: {:?}", - socket2::SockRef::from(&s).set_tcp_user_timeout(Some(ut)) - ); + self.set_tcp_user_timeout(s)?; } Ok(()) } + + #[cfg(target_os = "linux")] + fn set_tcp_user_timeout(&self, s: &TcpSocket) -> io::Result<()> { + let cfg = self.0; + // https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/ + // TCP_USER_TIMEOUT = TCP_KEEPIDLE + TCP_KEEPINTVL * TCP_KEEPCNT. + let ut = cfg.keepalive_time + cfg.keepalive_retries * cfg.keepalive_interval; + tracing::trace!( + "set user timeout: {:?}", + socket2::SockRef::from(&s).set_tcp_user_timeout(Some(ut)) + ); + Ok(()) + } + + #[cfg(target_os = "windows")] + fn set_tcp_user_timeout(&self, _s: &TcpSocket) -> io::Result<()> { + unreachable!("user_timeout not supported on windows") + } } + pub struct MarkSocketFactory { pub inner: DefaultSocketFactory, pub mark: u32, diff --git a/src/proxy/inbound_passthrough.rs b/src/proxy/inbound_passthrough.rs index 67d4e90657..5fabdab906 100644 --- a/src/proxy/inbound_passthrough.rs +++ b/src/proxy/inbound_passthrough.rs @@ -19,7 +19,8 @@ use std::time::Instant; use tokio::net::TcpStream; use tokio::sync::watch; -use tracing::{Instrument, debug, error, info, trace}; +use tracing::debug; +use tracing::{Instrument, error, info}; use crate::drain::DrainWatcher; use crate::drain::run_with_drain; @@ -117,6 +118,7 @@ impl InboundPassthrough { inbound_stream: TcpStream, enable_orig_src: bool, ) { + info!("proxying inbound plaintext connection"); let start = Instant::now(); let dest_addr = socket::orig_dst_addr_or_default(&inbound_stream); // Check if it is an illegal call to ourself, which could trampoline to illegal addresses or @@ -212,13 +214,13 @@ impl InboundPassthrough { }; let send = async { - trace!(%source_addr, %dest_addr, component="inbound plaintext", "connecting..."); + info!(%source_addr, %dest_addr, component="inbound plaintext", "connecting..."); let outbound = super::freebind_connect(orig_src, dest_addr, pi.socket_factory.as_ref()) .await .map_err(Error::ConnectionFailed)?; - trace!(%source_addr, destination=%dest_addr, component="inbound plaintext", "connected"); + info!(%source_addr, destination=%dest_addr, component="inbound plaintext", "connected"); copy::copy_bidirectional( copy::TcpStreamSplitter(inbound_stream), copy::TcpStreamSplitter(outbound), diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index 7b42ac324d..b093c34b79 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -278,7 +278,7 @@ impl OutboundConnection { req: &Request, connection_stats: &ConnectionResult, ) -> Result<(), Error> { - let upgraded = Box::pin(self.send_hbone_request(remote_addr, req)).await?; + let upgraded: H2Stream = Box::pin(self.send_hbone_request(remote_addr, req)).await?; copy::copy_bidirectional(copy::TcpStreamSplitter(stream), upgraded, connection_stats).await } diff --git a/src/proxy/pool.rs b/src/proxy/pool.rs index 4b55b9dd6a..460d13946e 100644 --- a/src/proxy/pool.rs +++ b/src/proxy/pool.rs @@ -389,11 +389,16 @@ impl WorkloadHBONEPool { let mut s = DefaultHasher::new(); workload_key.hash(&mut s); let hash_key = s.finish(); + let id = self + .state + .pool_global_conn_count + .fetch_add(1, Ordering::SeqCst); + #[cfg(target_os = "windows")] + let id = id.try_into().unwrap(); // id is usize on windows so try and convert + let pool_key = pingora_pool::ConnectionMeta::new( hash_key, - self.state - .pool_global_conn_count - .fetch_add(1, Ordering::SeqCst), + id, ); // First, see if we can naively take an inner lock for our specific key, and get a connection. // This should be the common case, except for the first establishment of a new connection/key. diff --git a/src/proxyfactory.rs b/src/proxyfactory.rs index 74625e3652..5f882fa7c2 100644 --- a/src/proxyfactory.rs +++ b/src/proxyfactory.rs @@ -16,7 +16,7 @@ use crate::config; use crate::identity::SecretManager; use crate::state::{DemandProxyState, WorkloadInfo}; use std::sync::Arc; -use tracing::error; +use tracing::{debug, error}; use crate::dns; use crate::drain::DrainWatcher; diff --git a/src/signal.rs b/src/signal.rs index e278a4a5b0..74489088c6 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -12,11 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// #[async_trait::async_trait] -// pub trait Shutdown { -// async fn shutdown(); -// } - use tokio::sync::mpsc; pub struct Shutdown { diff --git a/src/socket.rs b/src/socket.rs index b3811d83b9..8464bb88a8 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::Error; use std::net::SocketAddr; use tokio::io; use tokio::net::TcpSocket; use tokio::net::{TcpListener, TcpStream}; +use std::io::Error; +use socket2::SockRef; + #[cfg(target_os = "linux")] use { - socket2::{Domain, SockRef}, + socket2::Domain, std::io::ErrorKind, tracing::warn, }; @@ -32,7 +34,7 @@ pub fn set_freebind_and_transparent(socket: &TcpSocket) -> io::Result<()> { let socket = SockRef::from(socket); match socket.domain()? { Domain::IPV4 => { - socket.set_ip_transparent(true)?; + socket.set_ip_transparent_v4(true)?; socket.set_freebind(true)?; } Domain::IPV6 => { @@ -66,13 +68,13 @@ fn orig_dst_addr(stream: &tokio::net::TcpStream) -> io::Result { Err(e4) => match linux::original_dst_ipv6(&sock) { Ok(addr) => Ok(addr.as_socket().expect("failed to convert to SocketAddr")), Err(e6) => { - if !sock.ip_transparent().unwrap_or(false) { - // In TPROXY mode, this is normal, so don't bother logging - warn!( - peer=?stream.peer_addr().unwrap(), - local=?stream.local_addr().unwrap(), - "failed to read SO_ORIGINAL_DST: {e4:?}, {e6:?}" - ); + if !sock.ip_transparent_v4().unwrap_or(false) && !linux::ipv6_transparent(&sock).unwrap_or(false) { + // In TPROXY mode, this is normal, so don't bother logging + warn!( + peer=?stream.peer_addr().unwrap(), + local=?stream.local_addr().unwrap(), + "failed to read SO_ORIGINAL_DST: {e4:?}, {e6:?}" + ); } Err(e6) } @@ -80,9 +82,22 @@ fn orig_dst_addr(stream: &tokio::net::TcpStream) -> io::Result { } } -#[cfg(not(target_os = "linux"))] +#[cfg(target_os = "windows")] +fn orig_dst_addr(stream: &tokio::net::TcpStream) -> io::Result { + let sock = SockRef::from(stream); + // Dual-stack IPv4/IPv6 sockets require us to check both options. + match windows::original_dst(&sock) { + Ok(addr) => Ok(addr.as_socket().expect("failed to convert to SocketAddr")), + Err(_e4) => match windows::original_dst_ipv6(&sock) { + Ok(addr) => Ok(addr.as_socket().expect("failed to convert to SocketAddr")), + Err(e6) => Err(e6), + }, + } +} + +#[cfg(not(any(target_os = "linux", target_os = "windows")))] fn orig_dst_addr(_: &tokio::net::TcpStream) -> io::Result { - Err(Error::new( + Err(io::Error::new( io::ErrorKind::Other, "SO_ORIGINAL_DST not supported on this operating system", )) @@ -113,7 +128,7 @@ pub fn set_mark(_socket: &TcpSocket, _mark: u32) -> io::Result<()> { #[cfg(target_os = "linux")] #[allow(unsafe_code)] mod linux { - use std::os::unix::io::AsRawFd; + use std::{mem::MaybeUninit, os::unix::io::AsRawFd}; use socket2::{SockAddr, SockRef}; use tokio::io; @@ -135,6 +150,25 @@ mod linux { Ok(()) } + pub fn ipv6_transparent(sock: &SockRef) -> io::Result { + let mut val: MaybeUninit = std::mem::MaybeUninit::uninit(); + let mut len = size_of::() as libc::socklen_t; + unsafe { + match libc::getsockopt( + sock.as_raw_fd(), + libc::IPPROTO_IPV6, + libc::IPV6_TRANSPARENT, + val.as_mut_ptr().cast(), + &mut len, + ) { + -1 => Err(std::io::Error::last_os_error()), + _ => { + Ok(val.assume_init()) + }, + } + } + } + pub fn original_dst(sock: &SockRef) -> io::Result { sock.original_dst() } @@ -143,7 +177,20 @@ mod linux { sock.original_dst_ipv6() } } +#[cfg(target_os = "windows")] +#[allow(unsafe_code)] +mod windows { + use socket2::{SockAddr, SockRef}; + use tokio::io; + + pub fn original_dst(sock: &SockRef) -> io::Result { + sock.original_dst() + } + pub fn original_dst_ipv6(sock: &SockRef) -> io::Result { + sock.original_dst_ipv6() + } +} /// Listener is a wrapper For TCPListener with sane defaults. Notably, setting NODELAY pub struct Listener(TcpListener); @@ -164,10 +211,23 @@ impl Listener { } } +// TODO: Apparently IP_TRANSPARENT doesn't work for ipv6, so probably want to add +// some checks here. #[cfg(target_os = "linux")] impl Listener { pub fn set_transparent(&self) -> io::Result<()> { - SockRef::from(&self.0).set_ip_transparent(true) + let socket = SockRef::from(&self.0); + match socket.domain()? { + Domain::IPV4 => { + socket.set_ip_transparent_v4(true)?; + Ok(()) + } + Domain::IPV6 => { + linux::set_ipv6_transparent(&socket)?; + Ok(()) + } + _ => Err(Error::new(ErrorKind::Unsupported, "unsupported domain")), + } } } diff --git a/src/test_helpers.rs b/src/test_helpers.rs index 7ce762f0f8..2a052373eb 100644 --- a/src/test_helpers.rs +++ b/src/test_helpers.rs @@ -52,7 +52,7 @@ pub mod ca; pub mod dns; pub mod helpers; #[cfg(target_os = "linux")] -pub mod inpod; +pub mod inpod_linux; pub mod tcp; pub mod xds; @@ -64,6 +64,7 @@ pub mod namespaced; #[cfg(target_os = "linux")] pub mod netns; +#[cfg(target_os = "linux")] pub fn can_run_privilged_test() -> bool { let is_root = unsafe { libc::getuid() } == 0; if !is_root && std::env::var("CI").is_ok() { @@ -72,6 +73,13 @@ pub fn can_run_privilged_test() -> bool { is_root } +#[cfg(target_os = "windows")] +// ztunnel is only supported as a HostProcessContainer, so assume privileged +// TOOD: I'm probably making this up; confirm later +pub fn can_run_privilged_test() -> bool { + true +} + pub fn test_config_with_waypoint(addr: IpAddr) -> config::Config { config::Config { local_xds_config: Some(ConfigSource::Static( diff --git a/src/test_helpers/app.rs b/src/test_helpers/app.rs index d8531446ed..c7ca0988cb 100644 --- a/src/test_helpers/app.rs +++ b/src/test_helpers/app.rs @@ -50,6 +50,7 @@ pub struct TestApp { pub udp_dns_proxy_address: Option, pub cert_manager: Arc, + #[cfg(target_os = "linux")] pub namespace: Option, pub shutdown: ShutdownTrigger, pub ztunnel_identity: Option, @@ -65,6 +66,7 @@ impl From<(&Bound, Arc)> for TestApp { tcp_dns_proxy_address: app.tcp_dns_proxy_address, udp_dns_proxy_address: app.udp_dns_proxy_address, cert_manager, + #[cfg(target_os = "linux")] namespace: None, shutdown: app.shutdown.trigger(), ztunnel_identity: None, @@ -113,6 +115,7 @@ impl TestApp { Ok(client.request(req).await?) }; + #[cfg(target_os = "linux")] match self.namespace { Some(ref _ns) => { // TODO: if this is needed, do something like admin_request_body. @@ -122,6 +125,11 @@ impl TestApp { } None => get_resp().await, } + + #[cfg(not(target_os = "linux"))] + { + get_resp().await + } } pub async fn admin_request_body(&self, path: &str) -> anyhow::Result { let port = self.admin_address.port(); @@ -139,10 +147,16 @@ impl TestApp { Ok::<_, anyhow::Error>(res.collect().await?.to_bytes()) }; + #[cfg(target_os = "linux")] match self.namespace { Some(ref ns) => ns.clone().run(get_resp)?.join().unwrap(), None => get_resp().await, } + + #[cfg(not(target_os = "linux"))] + { + get_resp().await + } } pub async fn metrics(&self) -> anyhow::Result { @@ -163,13 +177,15 @@ impl TestApp { } #[cfg(target_os = "linux")] - pub async fn inpod_state(&self) -> anyhow::Result> { + pub async fn inpod_state( + &self, + ) -> anyhow::Result> { let body = self.admin_request_body("config_dump").await?; let serde_json::Value::Object(mut v) = serde_json::from_slice(&body)? else { anyhow::bail!("not an object"); }; - let result: HashMap = + let result: HashMap = serde_json::from_value(v.remove("workloadState").unwrap())?; Ok(result) } diff --git a/src/test_helpers/inpod.rs b/src/test_helpers/inpod_linux.rs similarity index 96% rename from src/test_helpers/inpod.rs rename to src/test_helpers/inpod_linux.rs index 5e121e89d0..2d2b36925a 100644 --- a/src/test_helpers/inpod.rs +++ b/src/test_helpers/inpod_linux.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::inpod::test_helpers::{ +use crate::inpod::linux::test_helpers::{ read_hello, read_msg, send_snap_sent, send_workload_added, send_workload_del, }; @@ -49,7 +49,7 @@ pub async fn start_ztunnel_server(bind_path: PathBuf) -> MpscAckSender info!("spawning server"); tokio::task::spawn(async move { - let listener = crate::inpod::packet::bind(&bind_path).expect("bind failed"); + let listener = crate::inpod::linux::packet::bind(&bind_path).expect("bind failed"); info!("waiting for connection from ztunnel server"); let (mut ztun_sock, _) = listener.accept().await.expect("accept failed"); info!("accepted connection from ztunnel server"); diff --git a/src/test_helpers/linux.rs b/src/test_helpers/linux.rs index 183e4d68ff..9b277500d0 100644 --- a/src/test_helpers/linux.rs +++ b/src/test_helpers/linux.rs @@ -25,7 +25,7 @@ use crate::{config, identity, proxy, strng}; use crate::inpod::istio::zds::WorkloadInfo; use crate::signal::ShutdownTrigger; -use crate::test_helpers::inpod::start_ztunnel_server; +use crate::test_helpers::inpod_linux::start_ztunnel_server; use crate::test_helpers::linux::TestMode::{Dedicated, Shared}; use arcstr::ArcStr; use itertools::Itertools; @@ -60,7 +60,7 @@ pub struct WorkloadManager { #[derive(Debug)] pub struct LocalZtunnel { - fd_sender: Option>, + fd_sender: Option>, config_sender: MpscAckSender, namespace: Namespace, } @@ -327,7 +327,7 @@ impl WorkloadManager { self.workloads = keep; for d in drop { if let Some(zt) = self.ztunnels.get_mut(&d.workload.node.to_string()).as_mut() { - let msg = inpod::Message::Stop(d.workload.uid.to_string()); + let msg = inpod_linux::Message::Stop(d.workload.uid.to_string()); zt.fd_sender .as_mut() .unwrap() @@ -628,6 +628,7 @@ impl<'a> TestWorkloadBuilder<'a> { name: self.w.workload.name.to_string(), namespace: self.w.workload.namespace.to_string(), service_account: self.w.workload.service_account.to_string(), + windows_namespace: None, }; self.manager.workloads.push(self.w); if self.captured { @@ -639,7 +640,7 @@ impl<'a> TestWorkloadBuilder<'a> { .netns() .run(|_| helpers::run_command("scripts/ztunnel-redirect-inpod.sh"))??; let fd = network_namespace.netns().file().as_raw_fd(); - let msg = inpod::Message::Start(inpod::StartZtunnelMessage { + let msg = inpod_linux::Message::Start(inpod_linux::StartZtunnelMessage { uid: uid.to_string(), workload_info: Some(wli.clone()), fd, diff --git a/src/xds/client.rs b/src/xds/client.rs index 270b98da78..adb39f7eac 100644 --- a/src/xds/client.rs +++ b/src/xds/client.rs @@ -622,6 +622,7 @@ impl AdsClient { }; let addr = self.config.address.clone(); + let tls_grpc_channel = tls::grpc_connector( self.config.address.clone(), self.config.auth.clone(),