From 4de25ada5d28c215979e06bfa860da3ab8fff089 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Thu, 17 Apr 2025 13:22:23 +0200 Subject: [PATCH 1/5] ... --- Cargo.lock | 4 ---- 1 file changed, 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8977041c6df..1d8f0e8d8c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4625,10 +4625,6 @@ dependencies = [ "syn 2.0.100", ] -[[package]] -name = "mirrord-mcp" -version = "3.139.0" - [[package]] name = "mirrord-operator" version = "3.139.0" From cc20b3f74717a0bd8f7c3c8c0a868eec07267af7 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Thu, 17 Apr 2025 13:22:31 +0200 Subject: [PATCH 2/5] Reapply "Introduced `RemoteRuntime` wrapper in the agent (#3227)" (#3261) This reverts commit 87987073f2ea48350118a11d343ff01027ea0858. --- Cargo.lock | 482 ++++++++++------------- changelog.d/3255.fixed.md | 1 - mirrord/agent/Cargo.toml | 1 - mirrord/agent/src/cli.rs | 2 - mirrord/agent/src/dns.rs | 92 +++-- mirrord/agent/src/entrypoint.rs | 316 ++++++--------- mirrord/agent/src/entrypoint/setup.rs | 137 +++++-- mirrord/agent/src/error.rs | 57 +-- mirrord/agent/src/main.rs | 16 +- mirrord/agent/src/namespace.rs | 47 ++- mirrord/agent/src/outgoing.rs | 49 +-- mirrord/agent/src/outgoing/udp.rs | 45 +-- mirrord/agent/src/sniffer.rs | 16 +- mirrord/agent/src/sniffer/api.rs | 17 +- mirrord/agent/src/steal/api.rs | 20 +- mirrord/agent/src/steal/connection.rs | 16 +- mirrord/agent/src/util.rs | 74 +--- mirrord/agent/src/util/error.rs | 27 ++ mirrord/agent/src/util/remote_runtime.rs | 313 +++++++++++++++ mirrord/agent/src/vpn.rs | 131 +++--- mirrord/agent/src/watched_task.rs | 128 ------ mirrord/agent/tests/blackbox.rs | 149 ------- 22 files changed, 988 insertions(+), 1148 deletions(-) delete mode 100644 changelog.d/3255.fixed.md create mode 100644 mirrord/agent/src/util/error.rs create mode 100644 mirrord/agent/src/util/remote_runtime.rs delete mode 100644 mirrord/agent/src/watched_task.rs delete mode 100644 mirrord/agent/tests/blackbox.rs diff --git a/Cargo.lock b/Cargo.lock index 1d8f0e8d8c4..efb767fe720 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -310,9 +310,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.98" +version = "1.0.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" [[package]] name = "apple-bundles" @@ -347,7 +347,7 @@ dependencies = [ "dialoguer", "difference", "digest", - "dirs 5.0.1", + "dirs", "elliptic-curve", "env_logger", "figment", @@ -637,9 +637,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-config" -version = "1.6.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c39646d1a6b51240a1a23bb57ea4eebede7e16fbc237fdc876980233dcecb4f" +checksum = "6a84fe2c5e9965fba0fbc2001db252f1d57527d82a905cca85127df227bca748" dependencies = [ "aws-credential-types", "aws-runtime", @@ -679,9 +679,9 @@ dependencies = [ [[package]] name = "aws-lc-rs" -version = "1.13.0" +version = "1.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b756939cb2f8dc900aa6dcd505e6e2428e9cae7ff7b028c49e3946efa70878" +checksum = "dabb68eb3a7aa08b46fddfd59a3d55c978243557a90ab804769f7e20e67d2b01" dependencies = [ "aws-lc-sys", "zeroize", @@ -689,9 +689,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.28.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9f7720b74ed28ca77f90769a71fd8c637a0137f6fae4ae947e1050229cff57f" +checksum = "77926887776171ced7d662120a75998e444d3750c951abfe07f90da130514b1f" dependencies = [ "bindgen", "cc", @@ -727,9 +727,9 @@ dependencies = [ [[package]] name = "aws-sdk-sqs" -version = "1.64.0" +version = "1.62.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514d007ac4d5b156b408d8dd623a57b37ae77425810e0fedcffab57b0dabaded" +checksum = "194d4234d8042159069b905760af670eefaa80848db097399b50110ab4746d04" dependencies = [ "aws-credential-types", "aws-runtime", @@ -741,7 +741,6 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", "http 0.2.12", "once_cell", "regex-lite", @@ -750,9 +749,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.64.0" +version = "1.62.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02d4bdb0e5f80f0689e61c77ab678b2b9304af329616af38aef5b6b967b8e736" +checksum = "1d5330ad4e8a1ff49e9f26b738611caa72b105c41d41733801d1a36e8f9de936" dependencies = [ "aws-credential-types", "aws-runtime", @@ -764,7 +763,6 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", "http 0.2.12", "once_cell", "regex-lite", @@ -773,9 +771,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.65.0" +version = "1.63.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acbbb3ce8da257aedbccdcb1aadafbbb6a5fe9adf445db0e1ea897bdc7e22d08" +checksum = "7956b1a85d49082347a7d17daa2e32df191f3e23c03d47294b99f95413026a78" dependencies = [ "aws-credential-types", "aws-runtime", @@ -787,7 +785,6 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", "http 0.2.12", "once_cell", "regex-lite", @@ -796,9 +793,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.65.0" +version = "1.63.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96a78a8f50a1630db757b60f679c8226a8a70ee2ab5f5e6e51dc67f6c61c7cfd" +checksum = "065c533fbe6f84962af33fcf02b0350b7c1f79285baab5924615d2be3b232855" dependencies = [ "aws-credential-types", "aws-runtime", @@ -811,7 +808,6 @@ dependencies = [ "aws-smithy-types", "aws-smithy-xml", "aws-types", - "fastrand", "http 0.2.12", "once_cell", "regex-lite", @@ -875,14 +871,14 @@ dependencies = [ [[package]] name = "aws-smithy-http-client" -version = "1.0.1" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8aff1159006441d02e57204bf57a1b890ba68bedb6904ffd2873c1c4c11c546b" +checksum = "0497ef5d53065b7cd6a35e9c1654bd1fefeae5c52900d91d1b188b0af0f29324" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types", - "h2 0.4.9", + "h2 0.4.8", "http 0.2.12", "http 1.3.1", "http-body 0.4.6", @@ -893,7 +889,7 @@ dependencies = [ "hyper-util", "pin-project-lite", "rustls 0.21.12", - "rustls 0.23.26", + "rustls 0.23.25", "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", @@ -910,16 +906,6 @@ dependencies = [ "aws-smithy-types", ] -[[package]] -name = "aws-smithy-observability" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "445d065e76bc1ef54963db400319f1dd3ebb3e0a74af20f7f7630625b0cc7cc0" -dependencies = [ - "aws-smithy-runtime-api", - "once_cell", -] - [[package]] name = "aws-smithy-query" version = "0.60.7" @@ -932,14 +918,13 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.8.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0152749e17ce4d1b47c7747bdfec09dac1ccafdcbc741ebf9daa2a373356730f" +checksum = "f6328865e36c6fd970094ead6b05efd047d3a80ec5fc3be5e743910da9f2ebf8" dependencies = [ "aws-smithy-async", "aws-smithy-http", "aws-smithy-http-client", - "aws-smithy-observability", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", @@ -1375,9 +1360,9 @@ dependencies = [ [[package]] name = "bstr" -version = "1.12.0" +version = "1.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4" +checksum = "531a9155a481e2ee699d4f98f43c0ca4ff8ee1bfd55c31e9e98fb29d2b176fe0" dependencies = [ "memchr", "serde", @@ -1430,9 +1415,9 @@ dependencies = [ [[package]] name = "bytesize" -version = "1.3.3" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e93abca9e28e0a1b9877922aacb20576e05d4679ffa78c3d6dc22a26a216659" +checksum = "2d2c12f985c78475a6b8d629afd0c360260ef34cfef52efccdcfd31972f81c2e" [[package]] name = "bytestring" @@ -1506,9 +1491,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.19" +version = "1.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e3a13707ac958681c13b39b458c073d0d9bc8a22cb1b2f4c8e55eb72c13f362" +checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c" dependencies = [ "jobserver", "libc", @@ -1596,9 +1581,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.36" +version = "4.5.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2df961d8c8a0d08aa9945718ccf584145eee3f3aa06cddbeac12933781102e04" +checksum = "6088f3ae8c3608d19260cd7445411865a485688711b78b5be70d78cd96136f83" dependencies = [ "clap_builder", "clap_derive", @@ -1606,9 +1591,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.36" +version = "4.5.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "132dbda40fb6753878316a489d5a1242a8ef2f0d9e47ba01c951ea8aa7d013a5" +checksum = "22a7ef7f676155edfb82daa97f99441f3ebf4a58d5e32f295a56259f1b6facc8" dependencies = [ "anstream", "anstyle", @@ -1618,9 +1603,9 @@ dependencies = [ [[package]] name = "clap_complete" -version = "4.5.47" +version = "4.5.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06f5378ea264ad4f82bbc826628b5aad714a75abf6ece087e923010eb937fb6" +checksum = "f5c5508ea23c5366f77e53f5a0070e5a84e51687ec3ef9e0464c86dc8d13ce98" dependencies = [ "clap", ] @@ -1949,9 +1934,9 @@ dependencies = [ [[package]] name = "darling" -version = "0.20.11" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" dependencies = [ "darling_core", "darling_macro", @@ -1959,9 +1944,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.11" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" dependencies = [ "fnv", "ident_case", @@ -1973,9 +1958,9 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.20.11" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", @@ -1998,9 +1983,9 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.9.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" +checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010" [[package]] name = "der" @@ -2152,16 +2137,7 @@ version = "5.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" dependencies = [ - "dirs-sys 0.4.1", -] - -[[package]] -name = "dirs" -version = "6.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e" -dependencies = [ - "dirs-sys 0.5.0", + "dirs-sys", ] [[package]] @@ -2182,22 +2158,10 @@ checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" dependencies = [ "libc", "option-ext", - "redox_users 0.4.6", + "redox_users", "windows-sys 0.48.0", ] -[[package]] -name = "dirs-sys" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab" -dependencies = [ - "libc", - "option-ext", - "redox_users 0.5.0", - "windows-sys 0.59.0", -] - [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -2205,7 +2169,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" dependencies = [ "libc", - "redox_users 0.4.6", + "redox_users", "winapi", ] @@ -2384,9 +2348,9 @@ checksum = "c7f84e12ccf0a7ddc17a6c41c93326024c42920d7ee630d04950e6926645c0fe" [[package]] name = "env_logger" -version = "0.11.8" +version = "0.11.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" +checksum = "c3716d7a920fb4fac5d84e9d4bce8ceb321e9414b4409da61b07b75c1e3d0697" dependencies = [ "anstream", "anstyle", @@ -2414,9 +2378,9 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.11" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" +checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", "windows-sys 0.59.0", @@ -2445,9 +2409,9 @@ dependencies = [ [[package]] name = "event-listener-strategy" -version = "0.5.4" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" dependencies = [ "event-listener", "pin-project-lite", @@ -2548,9 +2512,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "flate2" -version = "1.1.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" +checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc" dependencies = [ "crc32fast", "miniz_oxide", @@ -2579,9 +2543,9 @@ dependencies = [ [[package]] name = "fragile" -version = "2.0.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" [[package]] name = "frida-build" @@ -2867,7 +2831,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.9.0", + "indexmap 2.8.0", "slab", "tokio", "tokio-util", @@ -2876,9 +2840,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.9" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75249d144030531f8dee69fe9cea04d3edf809a017ae445e2abdff6629e86633" +checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2" dependencies = [ "atomic-waker", "bytes", @@ -2886,7 +2850,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.3.1", - "indexmap 2.9.0", + "indexmap 2.8.0", "slab", "tokio", "tokio-util", @@ -3042,13 +3006,13 @@ dependencies = [ [[package]] name = "hostname" -version = "0.4.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" +checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba" dependencies = [ "cfg-if", "libc", - "windows-link", + "windows", ] [[package]] @@ -3177,7 +3141,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.9", + "h2 0.4.8", "http 1.3.1", "http-body 1.0.1", "httparse", @@ -3251,7 +3215,7 @@ dependencies = [ "hyper 1.6.0", "hyper-util", "log", - "rustls 0.23.26", + "rustls 0.23.25", "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", @@ -3290,9 +3254,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.11" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", @@ -3300,7 +3264,6 @@ dependencies = [ "http 1.3.1", "http-body 1.0.1", "hyper 1.6.0", - "libc", "pin-project-lite", "socket2", "tokio", @@ -3325,15 +3288,14 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.63" +version = "0.1.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" dependencies = [ "android_system_properties", "core-foundation-sys", "iana-time-zone-haiku", "js-sys", - "log", "wasm-bindgen", "windows-core", ] @@ -3388,9 +3350,9 @@ dependencies = [ [[package]] name = "icu_locid_transform_data" -version = "1.5.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7515e6d781098bf9f7205ab3fc7e9709d34554ae0b21ddbcb5febfa4bc7df11d" +checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" [[package]] name = "icu_normalizer" @@ -3412,9 +3374,9 @@ dependencies = [ [[package]] name = "icu_normalizer_data" -version = "1.5.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5e8338228bdc8ab83303f16b797e177953730f601a96c25d10cb3ab0daa0cb7" +checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" [[package]] name = "icu_properties" @@ -3433,9 +3395,9 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "1.5.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85fb8799753b75aee8d2a21d7c14d9f38921b54b3dbda10f5a3c7a7b82dba5e2" +checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" [[package]] name = "icu_provider" @@ -3527,9 +3489,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.9.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" +checksum = "3954d50fe15b02142bf25d3b8bdadb634ec3948f103d04ffe3031bc8fe9d7058" dependencies = [ "equivalent", "hashbrown 0.15.2", @@ -3657,7 +3619,7 @@ dependencies = [ name = "issue1776" version = "3.139.0" dependencies = [ - "errno 0.3.11", + "errno 0.3.10", "libc", "socket2", ] @@ -3726,9 +3688,9 @@ checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" [[package]] name = "jiff" -version = "0.2.8" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ad87c89110f55e4cd4dc2893a9790820206729eaf221555f742d540b0724a0" +checksum = "d699bc6dfc879fb1bf9bdff0d4c56f0884fc6f0d0eb0fba397a6d00cd9a6b85e" dependencies = [ "jiff-static", "log", @@ -3739,9 +3701,9 @@ dependencies = [ [[package]] name = "jiff-static" -version = "0.2.8" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d076d5b64a7e2fe6f0743f02c43ca4a6725c0f904203bfe276a5b3e793103605" +checksum = "8d16e75759ee0aa64c57a56acbf43916987b20c77373cb7e808979e02b93c9f9" dependencies = [ "proc-macro2", "quote", @@ -3750,11 +3712,10 @@ dependencies = [ [[package]] name = "jobserver" -version = "0.1.33" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" dependencies = [ - "getrandom 0.3.2", "libc", ] @@ -3889,7 +3850,7 @@ dependencies = [ "kube-core", "pem", "rand 0.8.5", - "rustls 0.23.26", + "rustls 0.23.25", "rustls-pemfile 2.2.0", "secrecy", "serde", @@ -3987,9 +3948,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.172" +version = "0.2.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" [[package]] name = "libloading" @@ -4032,9 +3993,9 @@ checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" [[package]] name = "linux-raw-sys" -version = "0.9.4" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" +checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413" [[package]] name = "listen_ports" @@ -4093,9 +4054,9 @@ checksum = "9374ef4228402d4b7e403e5838cb880d9ee663314b0a900d5a6aabf0c213552e" [[package]] name = "log" -version = "0.4.27" +version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" [[package]] name = "lru-cache" @@ -4254,9 +4215,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.8.8" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a" +checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" dependencies = [ "adler2", ] @@ -4312,7 +4273,7 @@ dependencies = [ "regex", "reqwest", "rstest", - "rustls 0.23.26", + "rustls 0.23.25", "semver 1.0.26", "serde", "serde_json", @@ -4324,7 +4285,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", - "which 7.0.3", + "which 7.0.2", ] [[package]] @@ -4364,14 +4325,13 @@ dependencies = [ "rcgen", "reqwest", "rstest", - "rustls 0.23.26", + "rustls 0.23.25", "semver 1.0.26", "serde", "serde_json", "socket2", "streammap-ext", "tempfile", - "test_bin", "thiserror 2.0.12", "tokio", "tokio-rustls 0.26.2", @@ -4464,7 +4424,7 @@ dependencies = [ "nom", "rand 0.9.0", "rstest", - "rustls 0.23.26", + "rustls 0.23.25", "schemars", "serde", "serde_json", @@ -4520,7 +4480,7 @@ dependencies = [ "rand 0.9.0", "rcgen", "rstest", - "rustls 0.23.26", + "rustls 0.23.25", "semver 1.0.26", "serde", "thiserror 2.0.12", @@ -4704,7 +4664,7 @@ dependencies = [ "tempfile", "thiserror 2.0.12", "tracing", - "which 7.0.3", + "which 7.0.2", ] [[package]] @@ -4714,7 +4674,7 @@ dependencies = [ "http 1.3.1", "pem", "rcgen", - "rustls 0.23.26", + "rustls 0.23.25", "rustls-pemfile 2.2.0", "tempfile", "thiserror 2.0.12", @@ -4993,7 +4953,7 @@ dependencies = [ "crc32fast", "flate2", "hashbrown 0.15.2", - "indexmap 2.9.0", + "indexmap 2.8.0", "memchr", "ruzstd", ] @@ -5034,9 +4994,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.21.3" +version = "1.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +checksum = "d75b0bedcc4fe52caa0e03d9f1151a323e4aa5e2d78ba3580400cd3c9e2bc4bc" [[package]] name = "openssl-probe" @@ -5203,9 +5163,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pest" -version = "2.8.0" +version = "2.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "198db74531d58c70a361c42201efde7e2591e976d518caf7662a47dc5720e7b6" +checksum = "8b7cafe60d6cf8e62e1b9b2ea516a089c008945bb5a275416789e7db0bc199dc" dependencies = [ "memchr", "thiserror 2.0.12", @@ -5214,9 +5174,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.8.0" +version = "2.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d725d9cfd79e87dccc9341a2ef39d1b6f6353d68c4b33c177febbe1a402c97c5" +checksum = "816518421cfc6887a0d62bf441b6ffb4536fcc926395a69e1a85852d4363f57e" dependencies = [ "pest", "pest_generator", @@ -5224,9 +5184,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.8.0" +version = "2.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db7d01726be8ab66ab32f9df467ae8b1148906685bbe75c82d1e65d7f5b3f841" +checksum = "7d1396fd3a870fc7838768d171b4616d5c91f6cc25e377b673d714567d99377b" dependencies = [ "pest", "pest_meta", @@ -5237,9 +5197,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.8.0" +version = "2.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f9f832470494906d1fca5329f8ab5791cc60beb230c74815dff541cbd2b5ca0" +checksum = "e1e58089ea25d717bfd31fb534e4f3afcc2cc569c70de3e239778991ea3b7dea" dependencies = [ "once_cell", "pest", @@ -5253,7 +5213,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" dependencies = [ "fixedbitset", - "indexmap 2.9.0", + "indexmap 2.8.0", ] [[package]] @@ -5372,12 +5332,12 @@ checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" [[package]] name = "plist" -version = "1.7.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac26e981c03a6e53e0aee43c113e3202f5581d5360dae7bd2c70e800dd0451d" +checksum = "42cf17e9a1800f5f396bc67d193dc9411b59012a5876445ef450d449881e1016" dependencies = [ "base64 0.22.1", - "indexmap 2.9.0", + "indexmap 2.8.0", "quick-xml", "serde", "time", @@ -5501,7 +5461,7 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" dependencies = [ - "zerocopy 0.8.24", + "zerocopy 0.8.23", ] [[package]] @@ -5532,9 +5492,9 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.2.32" +version = "0.2.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "664ec5419c51e34154eec046ebcba56312d5a2fc3b09a06da188e1ad21afadf6" +checksum = "5316f57387668042f561aae71480de936257848f9c43ce528e311d89a07cadeb" dependencies = [ "proc-macro2", "syn 2.0.100", @@ -5596,9 +5556,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.95" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" dependencies = [ "unicode-ident", ] @@ -5760,7 +5720,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.1.1", - "rustls 0.23.26", + "rustls 0.23.25", "socket2", "thiserror 2.0.12", "tokio", @@ -5779,7 +5739,7 @@ dependencies = [ "rand 0.9.0", "ring", "rustc-hash 2.1.1", - "rustls 0.23.26", + "rustls 0.23.25", "rustls-pki-types", "slab", "thiserror 2.0.12", @@ -5790,9 +5750,9 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.5.11" +version = "0.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "541d0f57c6ec747a90738a52741d3221f7960e8ac2f0ff4b1a63680e033b4ab5" +checksum = "e46f3055866785f6b92bc6164b76be02ca8f2eb4b002c0354b28cf4c119e5944" dependencies = [ "cfg_aliases", "libc", @@ -5842,7 +5802,7 @@ checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.3", - "zerocopy 0.8.24", + "zerocopy 0.8.23", ] [[package]] @@ -5990,9 +5950,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.11" +version = "0.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2f103c6d277498fbceb16e84d317e2a400f160f46904d5f5410848c829511a3" +checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" dependencies = [ "bitflags 2.9.0", ] @@ -6008,17 +5968,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "redox_users" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b" -dependencies = [ - "getrandom 0.2.15", - "libredox", - "thiserror 2.0.12", -] - [[package]] name = "regex" version = "1.11.1" @@ -6086,7 +6035,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.4.9", + "h2 0.4.8", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -6101,7 +6050,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.26", + "rustls 0.23.25", "rustls-native-certs 0.8.1", "rustls-pemfile 2.2.0", "rustls-pki-types", @@ -6276,7 +6225,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ "bitflags 2.9.0", - "errno 0.3.11", + "errno 0.3.10", "libc", "linux-raw-sys 0.4.15", "windows-sys 0.59.0", @@ -6284,14 +6233,14 @@ dependencies = [ [[package]] name = "rustix" -version = "1.0.5" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf" +checksum = "e56a18552996ac8d29ecc3b190b4fdbb2d91ca4ec396de7bbffaf43f3d637e96" dependencies = [ "bitflags 2.9.0", - "errno 0.3.11", + "errno 0.3.10", "libc", - "linux-raw-sys 0.9.4", + "linux-raw-sys 0.9.3", "windows-sys 0.59.0", ] @@ -6309,16 +6258,16 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.26" +version = "0.23.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0" +checksum = "822ee9188ac4ec04a2f0531e55d035fb2de73f18b41a63c70c2712503b6fb13c" dependencies = [ "aws-lc-rs", "log", "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.103.1", + "rustls-webpki 0.103.0", "subtle", "zeroize", ] @@ -6399,9 +6348,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.1" +version = "0.103.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fef8b8769aaccf73098557a87cd1816b4f9c7c16811c9c77142aa695c16f2c03" +checksum = "0aa4eeac2588ffff23e9d7a7e9b3f971c5fb5b7ebc9452745e0c232c64f83b2f" dependencies = [ "aws-lc-rs", "ring", @@ -6497,9 +6446,9 @@ dependencies = [ [[package]] name = "scroll_derive" -version = "0.12.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1783eabc414609e28a5ba76aee5ddd52199f7107a0b24c2e9746a1ecc34a683d" +checksum = "7f81c2fde025af7e69b1d1420531c8a8811ca898919db177141a85313b1cb932" dependencies = [ "proc-macro2", "quote", @@ -6717,7 +6666,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.9.0", + "indexmap 2.8.0", "serde", "serde_derive", "serde_json", @@ -6730,7 +6679,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.9.0", + "indexmap 2.8.0", "itoa", "ryu", "serde", @@ -6776,11 +6725,11 @@ checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" [[package]] name = "shellexpand" -version = "3.1.1" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b1fdf65dd6331831494dd616b30351c38e96e45921a27745cf98490458b90bb" +checksum = "da03fa3b94cc19e3ebfc88c4229c49d8f08cdbd1228870a45f0ffdf84988e14b" dependencies = [ - "dirs 6.0.0", + "dirs", ] [[package]] @@ -6847,9 +6796,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.15.0" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" +checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" [[package]] name = "snafu" @@ -6874,9 +6823,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.9" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f5fd57c80058a56cf5c777ab8a126398ece8e442983605d280a44ce79d0edef" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" dependencies = [ "libc", "windows-sys 0.52.0", @@ -7054,14 +7003,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.19.1" +version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf" +checksum = "488960f40a3fd53d72c2a29a58722561dee8afdd175bd88e3db4677d7b2ba600" dependencies = [ "fastrand", "getrandom 0.3.2", "once_cell", - "rustix 1.0.5", + "rustix 1.0.3", "windows-sys 0.59.0", ] @@ -7104,7 +7053,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45c6481c4829e4cc63825e62c49186a34538b7b2750b73b266581ffb612fb5ed" dependencies = [ - "rustix 1.0.5", + "rustix 1.0.3", "windows-sys 0.59.0", ] @@ -7126,12 +7075,6 @@ dependencies = [ "toml 0.5.11", ] -[[package]] -name = "test_bin" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e7a7de15468c6e65dd7db81cf3822c1ec94c71b2a3c1a976ea8e4696c91115c" - [[package]] name = "tests" version = "0.1.0" @@ -7163,7 +7106,7 @@ dependencies = [ "regex", "reqwest", "rstest", - "rustls 0.23.26", + "rustls 0.23.25", "serde", "serde_json", "tempfile", @@ -7234,9 +7177,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.41" +version = "0.3.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +checksum = "9d9c75b47bdff86fa3334a3db91356b8d7d86a9b839dab7d0bdc5c3d3a077618" dependencies = [ "deranged", "itoa", @@ -7255,9 +7198,9 @@ checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" [[package]] name = "time-macros" -version = "0.2.22" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +checksum = "29aa485584182073ed57fd5004aa09c371f021325014694e432313345865fd04" dependencies = [ "num-conv", "time-core", @@ -7299,9 +7242,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.44.2" +version = "1.44.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48" +checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" dependencies = [ "backtrace", "bytes", @@ -7353,7 +7296,7 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls 0.23.26", + "rustls 0.23.25", "tokio", ] @@ -7455,7 +7398,7 @@ version = "0.22.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" dependencies = [ - "indexmap 2.9.0", + "indexmap 2.8.0", "serde", "serde_spanned", "toml_datetime", @@ -7473,7 +7416,7 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", - "h2 0.4.9", + "h2 0.4.8", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -7688,7 +7631,7 @@ dependencies = [ "httparse", "log", "rand 0.8.5", - "rustls 0.23.26", + "rustls 0.23.25", "rustls-native-certs 0.7.3", "rustls-pki-types", "sha1", @@ -8100,21 +8043,21 @@ dependencies = [ [[package]] name = "which" -version = "7.0.3" +version = "7.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24d643ce3fd3e5b54854602a080f34fb10ab75e0b813ee32d00ca2b44fa74762" +checksum = "2774c861e1f072b3aadc02f8ba886c26ad6321567ecc294c935434cad06f1283" dependencies = [ "either", "env_home", - "rustix 1.0.5", + "rustix 0.38.44", "winsafe", ] [[package]] name = "whoami" -version = "1.6.0" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7" +checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" dependencies = [ "redox_syscall", "wasite", @@ -8165,38 +8108,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] -name = "windows-core" -version = "0.61.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980" -dependencies = [ - "windows-implement", - "windows-interface", - "windows-link", - "windows-result", - "windows-strings 0.4.0", -] - -[[package]] -name = "windows-implement" -version = "0.60.0" +name = "windows" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.100", + "windows-core", + "windows-targets 0.52.6", ] [[package]] -name = "windows-interface" -version = "0.59.1" +name = "windows-core" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.100", + "windows-targets 0.52.6", ] [[package]] @@ -8212,7 +8139,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4286ad90ddb45071efd1a66dfa43eb02dd0dfbae1545ad6cc3c51cf34d7e8ba3" dependencies = [ "windows-result", - "windows-strings 0.3.1", + "windows-strings", "windows-targets 0.53.0", ] @@ -8234,15 +8161,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "windows-strings" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97" -dependencies = [ - "windows-link", -] - [[package]] name = "windows-sys" version = "0.48.0" @@ -8457,9 +8375,9 @@ checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" [[package]] name = "winnow" -version = "0.7.6" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63d3fcd9bba44b03821e7d699eeee959f3126dcc4aa8e4ae18ec617c2a5cea10" +checksum = "0e97b544156e9bebe1a0ffbc03484fc1ffe3100cbce3ffb17eac35f7cdd7ab36" dependencies = [ "memchr", ] @@ -8482,9 +8400,9 @@ checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904" [[package]] name = "wintun-bindings" -version = "0.7.31" +version = "0.7.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "605f50b13e12e1f9f99dc5e93701d779dbe47282fec186cb8a079165368d3124" +checksum = "67a02981bed4592bcd271f9bfe154228ddbd2fd69e37a7d358da5d3a1251d696" dependencies = [ "blocking", "c2rust-bitfields", @@ -8584,14 +8502,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d65cbf2f12c15564212d48f4e3dfb87923d25d611f2aed18f4cb23f0413d89e" dependencies = [ "libc", - "rustix 1.0.5", + "rustix 1.0.3", ] [[package]] name = "xml-rs" -version = "0.8.26" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62ce76d9b56901b19a74f19431b0d8b3bc7ca4ad685a746dfd78ca8f4fc6bda" +checksum = "c5b940ebc25896e71dd073bad2dbaa2abfe97b0a391415e22ad1326d9c54e3c4" [[package]] name = "xmlparser" @@ -8667,11 +8585,11 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.24" +version = "0.8.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879" +checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6" dependencies = [ - "zerocopy-derive 0.8.24", + "zerocopy-derive 0.8.23", ] [[package]] @@ -8687,9 +8605,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.8.24" +version = "0.8.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be" +checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154" dependencies = [ "proc-macro2", "quote", @@ -8761,16 +8679,18 @@ dependencies = [ [[package]] name = "zip" -version = "2.6.1" +version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dcb24d0152526ae49b9b96c1dcf71850ca1e0b882e4e28ed898a93c41334744" +checksum = "fabe6324e908f85a1c52063ce7aa26b68dcb7eb6dbc83a2d148403c9bc3eba50" dependencies = [ "arbitrary", "crc32fast", "crossbeam-utils", + "displaydoc", "flate2", - "indexmap 2.9.0", + "indexmap 2.8.0", "memchr", + "thiserror 2.0.12", "zopfli", ] @@ -8810,18 +8730,18 @@ dependencies = [ [[package]] name = "zstd-safe" -version = "7.2.4" +version = "7.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +checksum = "f3051792fbdc2e1e143244dc28c60f73d8470e93f3f9cbd0ead44da5ed802722" dependencies = [ "zstd-sys", ] [[package]] name = "zstd-sys" -version = "2.0.15+zstd.1.5.7" +version = "2.0.14+zstd.1.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237" +checksum = "8fb060d4926e4ac3a3ad15d864e99ceb5f343c6b34f5bd6d81ae6ed417311be5" dependencies = [ "cc", "pkg-config", diff --git a/changelog.d/3255.fixed.md b/changelog.d/3255.fixed.md deleted file mode 100644 index b83af4b799f..00000000000 --- a/changelog.d/3255.fixed.md +++ /dev/null @@ -1 +0,0 @@ -Rollback RemoteRuntime change fixing OOMKilled diff --git a/mirrord/agent/Cargo.toml b/mirrord/agent/Cargo.toml index 6bec1139e3b..91cb344de10 100644 --- a/mirrord/agent/Cargo.toml +++ b/mirrord/agent/Cargo.toml @@ -82,4 +82,3 @@ rcgen.workspace = true reqwest.workspace = true rstest.workspace = true tempfile.workspace = true -test_bin = "0.4" diff --git a/mirrord/agent/src/cli.rs b/mirrord/agent/src/cli.rs index c3626bf85d4..e77e494cf95 100644 --- a/mirrord/agent/src/cli.rs +++ b/mirrord/agent/src/cli.rs @@ -98,8 +98,6 @@ pub enum Mode { }, #[default] Targetless, - #[clap(hide = true)] - BlackboxTest, } impl Mode { diff --git a/mirrord/agent/src/dns.rs b/mirrord/agent/src/dns.rs index 6830b08dbf4..a541ea7e8be 100644 --- a/mirrord/agent/src/dns.rs +++ b/mirrord/agent/src/dns.rs @@ -1,4 +1,6 @@ -use std::{future, io, path::PathBuf, sync::atomic::Ordering, time::Duration}; +use std::{ + collections::HashMap, future, io, path::PathBuf, sync::atomic::Ordering, time::Duration, +}; use futures::{stream::FuturesOrdered, StreamExt}; use hickory_resolver::{ @@ -24,16 +26,12 @@ use tokio::{ mpsc::{Receiver, Sender}, oneshot, }, - task::JoinSet, + task::{Id, JoinSet}, }; use tokio_util::sync::CancellationToken; use tracing::{warn, Level}; -use crate::{ - error::{AgentError, AgentResult}, - metrics::DNS_REQUEST_COUNT, - watched_task::TaskStatus, -}; +use crate::{error::AgentResult, metrics::DNS_REQUEST_COUNT, util::remote_runtime::BgTaskStatus}; #[derive(Debug)] pub(crate) enum ClientGetAddrInfoRequest { @@ -54,7 +52,7 @@ impl ClientGetAddrInfoRequest { #[derive(Debug)] pub(crate) struct DnsCommand { request: ClientGetAddrInfoRequest, - response_tx: oneshot::Sender>, + response_tx: oneshot::Sender>, } /// Background task for resolving hostnames to IP addresses. @@ -80,12 +78,11 @@ pub(crate) struct DnsWorker { /// Background tasks that handle the DNS requests. /// /// Each of these builds a new [`TokioAsyncResolver`] and performs one lookup. - tasks: JoinSet<()>, + tasks: JoinSet>, + response_txs: HashMap>>, } impl DnsWorker { - pub const TASK_NAME: &'static str = "DNS worker"; - /// Creates a new instance of this worker. /// To run this worker, call [`Self::run`]. /// @@ -124,6 +121,7 @@ impl DnsWorker { attempts, support_ipv6, tasks: Default::default(), + response_txs: Default::default(), } } @@ -203,34 +201,51 @@ impl DnsWorker { let attempts = self.attempts; let support_ipv6 = self.support_ipv6; - let lookup_future = async move { - let result = Self::do_lookup( - etc_path, - message.request.into_v2(), - attempts, - timeout, - support_ipv6, - ) - .await; - - let _ = message.response_tx.send(result); - }; + let handle = self.tasks.spawn(Self::do_lookup( + etc_path, + message.request.into_v2(), + attempts, + timeout, + support_ipv6, + )); + self.response_txs.insert(handle.id(), message.response_tx); DNS_REQUEST_COUNT.fetch_add(1, Ordering::Relaxed); - self.tasks.spawn(lookup_future); } - pub(crate) async fn run(mut self, cancellation_token: CancellationToken) -> AgentResult<()> { + pub(crate) async fn run(mut self, cancellation_token: CancellationToken) { loop { tokio::select! { - _ = cancellation_token.cancelled() => break Ok(()), + _ = cancellation_token.cancelled() => break, - Some(..) = self.tasks.join_next() => { + Some(result) = self.tasks.join_next_with_id() => { DNS_REQUEST_COUNT.fetch_sub(1, Ordering::Relaxed); + let (id, result) = match result { + Ok((id, result)) => ( + id, + result.map_err(Into::into), + ), + Err(error) => { + ( + error.id(), + Err(ResolveErrorKindInternal::Message("DNS task panicked".into())) + ) + } + }; + + let response_tx = self.response_txs.remove(&id); + match response_tx { + Some(response_tx) => { + let _ = response_tx.send(result); + } + None => { + warn!(?id, "Received a DNS result with no matching response channel"); + } + } } message = self.request_rx.recv() => match message { - None => break Ok(()), + None => break, Some(message) => self.handle_message(message), }, } @@ -246,15 +261,15 @@ impl Drop for DnsWorker { } pub(crate) struct DnsApi { - task_status: TaskStatus, + task_status: BgTaskStatus, request_tx: Sender, /// [`DnsWorker`] processes all requests concurrently, so we use a combination of [`oneshot`] /// channels and [`FuturesOrdered`] to preserve order of responses. - responses: FuturesOrdered>>, + responses: FuturesOrdered>>, } impl DnsApi { - pub(crate) fn new(task_status: TaskStatus, task_sender: Sender) -> Self { + pub(crate) fn new(task_status: BgTaskStatus, task_sender: Sender) -> Self { Self { task_status, request_tx: task_sender, @@ -276,7 +291,7 @@ impl DnsApi { response_tx, }; if self.request_tx.send(command).await.is_err() { - return Err(self.task_status.unwrap_err().await); + return Err(self.task_status.wait_assert_running().await); } self.responses.push_back(response_rx); @@ -294,11 +309,14 @@ impl DnsApi { return future::pending().await; }; - let response = response - .map_err(|_| AgentError::DnsTaskPanic)? - .map_err(|error| ResponseError::DnsLookup(DnsLookupError { kind: error.into() })); - - Ok(GetAddrInfoResponse(response)) + match response { + Ok(response) => { + Ok(GetAddrInfoResponse(response.map_err(|kind| { + ResponseError::DnsLookup(DnsLookupError { kind }) + }))) + } + Err(..) => Err(self.task_status.wait_assert_running().await), + } } } diff --git a/mirrord/agent/src/entrypoint.rs b/mirrord/agent/src/entrypoint.rs index 4b206a18766..b2e2cd29b57 100644 --- a/mirrord/agent/src/entrypoint.rs +++ b/mirrord/agent/src/entrypoint.rs @@ -11,7 +11,7 @@ use std::{ }; use client_connection::AgentTlsConnector; -use dns::{ClientGetAddrInfoRequest, DnsCommand, DnsWorker}; +use dns::{ClientGetAddrInfoRequest, DnsCommand}; use futures::TryFutureExt; use metrics::{start_metrics, CLIENT_COUNT}; use mirrord_agent_env::envs; @@ -22,40 +22,45 @@ use mirrord_agent_iptables::{ IPTABLE_STANDARD_ENV, }; use mirrord_protocol::{ClientMessage, DaemonMessage, GetEnvVarsRequest, LogMessage}; -use sniffer::tcp_capture::RawSocketTcpCapture; use steal::StealerMessage; use tokio::{ net::{TcpListener, TcpStream}, process::Command, select, signal::unix::SignalKind, - sync::mpsc::{self, Sender}, + sync::mpsc::Sender, task::JoinSet, time::{timeout, Duration}, }; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, trace, warn, Level}; +use tracing::{debug, error, trace, warn, Level}; use tracing_subscriber::{fmt::format::FmtSpan, prelude::*}; use crate::{ - cli::Args, - client_connection::ClientConnection, + cli::{self, Args}, + client_connection::{self, ClientConnection}, container_handle::ContainerHandle, - dns::DnsApi, + dns::{self, DnsApi}, + env, error::{AgentError, AgentResult}, file::FileManager, + metrics, + namespace::NamespaceType, outgoing::{TcpOutgoingApi, UdpOutgoingApi}, - runtime::get_container, - sniffer::{api::TcpSnifferApi, messages::SnifferCommand, TcpConnectionSniffer}, - steal::{StealTlsHandlerStore, StealerCommand, TcpConnectionStealer, TcpStealerApi}, - util::{path_resolver::InTargetPathResolver, run_thread_in_namespace, ClientId}, - watched_task::{TaskStatus, WatchedTask}, - *, + runtime::{self, get_container}, + sniffer::{api::TcpSnifferApi, messages::SnifferCommand}, + steal::{self, StealerCommand, TcpStealerApi}, + util::{ + remote_runtime::{BgTaskRuntime, BgTaskStatus, RemoteRuntime}, + ClientId, + }, + TOKIO_WORKER_THREADS, }; mod setup; -/// Size of [`mpsc`] channels connecting [`TcpStealerApi`] with the background task. +/// Size of [`mpsc`](tokio::sync::mpsc) channels connecting [`TcpStealerApi`]s with the background +/// task. const CHANNEL_SIZE: usize = 1024; /// Keeps track of next client id. @@ -73,6 +78,8 @@ struct State { ephemeral: bool, /// When present, it is used to secure incoming TCP connections. tls_connector: Option, + /// [`tokio::runtime`] that should be used for network operations. + network_runtime: BgTaskRuntime, } impl State { @@ -86,7 +93,7 @@ impl State { let mut env: HashMap = HashMap::new(); - let (ephemeral, container, pid) = match &args.mode { + let (ephemeral, container) = match &args.mode { cli::Mode::Targeted { container_id, container_runtime, @@ -95,11 +102,10 @@ impl State { let container = get_container(container_id.clone(), container_runtime).await?; let container_handle = ContainerHandle::new(container).await?; - let pid = container_handle.pid().to_string(); env.extend(container_handle.raw_env().clone()); - (false, Some(container_handle), pid) + (false, Some(container_handle)) } cli::Mode::Ephemeral { .. } => { let container_handle = ContainerHandle::new(runtime::Container::Ephemeral( @@ -113,17 +119,26 @@ impl State { )) .await?; - let pid = container_handle.pid().to_string(); env.extend(container_handle.raw_env().clone()); // If we are in an ephemeral container, we use pid 1. - (true, Some(container_handle), pid) + (true, Some(container_handle)) } - cli::Mode::Targetless | cli::Mode::BlackboxTest => (false, None, "self".to_string()), + cli::Mode::Targetless => (false, None), }; - let environ_path = PathBuf::from("/proc").join(pid).join("environ"); + let network_runtime = match container.as_ref().map(ContainerHandle::pid) { + Some(pid) if ephemeral.not() => BgTaskRuntime::Remote( + RemoteRuntime::new_in_namespace(pid, NamespaceType::Net).await?, + ), + None | Some(..) => BgTaskRuntime::Local, + }; + let env_pid = match container.as_ref().map(ContainerHandle::pid) { + Some(pid) => pid.to_string(), + None => "self".to_string(), + }; + let environ_path = PathBuf::from("/proc").join(env_pid).join("environ"); match env::get_proc_environ(environ_path).await { Ok(environ) => env.extend(environ.into_iter()), Err(err) => { @@ -137,6 +152,7 @@ impl State { env: Arc::new(env), ephemeral, tls_connector, + network_runtime, }) } @@ -178,7 +194,7 @@ impl State { } enum BackgroundTask { - Running(TaskStatus, Sender), + Running(BgTaskStatus, Sender), Disabled, } @@ -206,7 +222,9 @@ struct ClientConnectionHandler { /// Handles mirrord's file operations, see [`FileManager`]. file_manager: FileManager, connection: ClientConnection, + /// [`None`] when targetless. tcp_sniffer_api: Option, + /// [`None`] when targetless. tcp_stealer_api: Option, tcp_outgoing_api: TcpOutgoingApi, udp_outgoing_api: UdpOutgoingApi, @@ -240,8 +258,8 @@ impl ClientConnectionHandler { Self::create_stealer_api(id, bg_tasks.stealer, &mut connection).await?; let dns_api = Self::create_dns_api(bg_tasks.dns); - let tcp_outgoing_api = TcpOutgoingApi::new(pid); - let udp_outgoing_api = UdpOutgoingApi::new(pid); + let tcp_outgoing_api = TcpOutgoingApi::new(&state.network_runtime); + let udp_outgoing_api = UdpOutgoingApi::new(&state.network_runtime); let client_handler = Self { id, @@ -460,16 +478,21 @@ impl ClientConnectionHandler { if let Some(sniffer_api) = &mut self.tcp_sniffer_api { sniffer_api.handle_client_message(message).await? } else { - warn!("received tcp sniffer request while not available"); - Err(AgentError::SnifferNotRunning)? + self.respond(DaemonMessage::Close( + "component responsible for mirroring incoming traffic is not running, \ + which might be due to Kubernetes node kernel version <4.20. \ + Check agent logs for errors and please report a bug if kernel version >=4.20".into(), + )).await?; } } ClientMessage::TcpSteal(message) => { if let Some(tcp_stealer_api) = self.tcp_stealer_api.as_mut() { tcp_stealer_api.handle_client_message(message).await? } else { - warn!("received tcp steal request while not available"); - Err(AgentError::StealerNotRunning)? + self.respond(DaemonMessage::Close( + "incoming traffic stealing is not available in the targetless mode".into(), + )) + .await?; } } ClientMessage::Close => { @@ -498,8 +521,8 @@ impl ClientConnectionHandler { self.ready_for_logs = true; } ClientMessage::Vpn(_message) => { - unreachable!("VPN is not supported"); - // self.vpn_api.layer_message(message).await?; + self.respond(DaemonMessage::Close("VPN is not supported".into())) + .await?; } } @@ -535,13 +558,10 @@ async fn start_agent(args: Args) -> AgentResult<()> { ipv4_listener_result }?; - match listener.local_addr() { - Ok(addr) => debug!( - client_listener_address = addr.to_string(), - "Created listener." - ), - Err(err) => error!(%err, "listener local address error"), - } + debug!( + client_listener_address = %listener.local_addr()?, + "Created the client listener.", + ); let state = State::new(&args).await?; @@ -562,117 +582,28 @@ async fn start_agent(args: Args) -> AgentResult<()> { }); } - let (sniffer_command_tx, sniffer_command_rx) = mpsc::channel::(1000); - let (stealer_command_tx, stealer_command_rx) = mpsc::channel::(1000); - let (dns_command_tx, dns_command_rx) = mpsc::channel::(1000); - - let (sniffer_task, sniffer_status) = if args.mode.is_targetless() { - (None, None) + let sniffer = if state.container_pid().is_some() { + setup::start_sniffer(&args, &state.network_runtime, cancellation_token.clone()).await } else { - let cancellation_token = cancellation_token.clone(); - let is_mesh = args.is_mesh(); - // We're using this to avoid crashing on old kernels when initializing the - // `RawSocketTcpCapture`. failed task causes the agent to exit - // so we just check that initialization was successful - // then decide whether to store the task or drop it - // https://github.com/metalbear-co/mirrord/pull/2910 - let (sniffer_init_tx, sniffer_init_rx) = tokio::sync::oneshot::channel::(); - let watched_task = WatchedTask::new( - TcpConnectionSniffer::::TASK_NAME, - async move { - if let Ok(sniffer) = - TcpConnectionSniffer::new(sniffer_command_rx, args.network_interface, is_mesh) - .await - { - if let Err(error) = sniffer_init_tx.send(true) { - tracing::error!(%error, "Failed to send sniffer init result"); - }; - // will block from this point on - let res = sniffer.start(cancellation_token).await; - if let Err(err) = res { - error!(%err, "Sniffer failed"); - } - } else if let Err(error) = sniffer_init_tx.send(false) { - tracing::error!(%error, "Failed to send sniffer init result"); - } - - Ok(()) - }, - ); - let status = watched_task.status(); - let task = run_thread_in_namespace( - watched_task.start(), - TcpConnectionSniffer::::TASK_NAME.to_string(), - state.container_pid(), - "net", - ); - - match sniffer_init_rx.await { - Ok(true) => (Some(task), Some(status)), - Ok(false) => (None, None), - Err(error) => { - tracing::error!(%error, "unexpected error while waiting for sniffer init"); - (None, None) - } - } + BackgroundTask::Disabled }; - - let (stealer_task, stealer_status) = match state.container_pid() { - None => (None, None), + let stealer = match state.container_pid() { + None => BackgroundTask::Disabled, Some(pid) => { - let steal_handle = setup::start_traffic_redirector(pid).await?; - - let cancellation_token = cancellation_token.clone(); - let tls_steal_config = envs::STEAL_TLS_CONFIG.from_env_or_default(); - let tls_handler_store = tls_steal_config.is_empty().not().then(|| { - StealTlsHandlerStore::new(tls_steal_config, InTargetPathResolver::new(pid)) - }); - let watched_task = WatchedTask::new(TcpConnectionStealer::TASK_NAME, async move { - TcpConnectionStealer::new(stealer_command_rx, steal_handle, tls_handler_store) - .start(cancellation_token) - .await - .inspect_err(|error| { - error!(%error, "Stealer failed"); - }) - }); - let status = watched_task.status(); - let task = run_thread_in_namespace( - watched_task.start(), - TcpConnectionStealer::TASK_NAME.to_string(), - state.container_pid(), - "net", - ); - - (Some(task), Some(status)) + let steal_handle = setup::start_traffic_redirector(&state.network_runtime).await?; + setup::start_stealer( + &state.network_runtime, + pid, + steal_handle, + cancellation_token.clone(), + ) } }; - - let (dns_task, dns_status) = { - let cancellation_token = cancellation_token.clone(); - let watched_task = WatchedTask::new( - DnsWorker::TASK_NAME, - DnsWorker::new(state.container_pid(), dns_command_rx, args.ipv6) - .run(cancellation_token), - ); - let status = watched_task.status(); - let task = run_thread_in_namespace( - watched_task.start(), - DnsWorker::TASK_NAME.to_string(), - state.container_pid(), - "net", - ); - - (task, status) - }; - + let dns = setup::start_dns(&args, &state.network_runtime, cancellation_token.clone()); let bg_tasks = BackgroundTasks { - sniffer: sniffer_status - .map(|status| BackgroundTask::Running(status, sniffer_command_tx)) - .unwrap_or(BackgroundTask::Disabled), - stealer: stealer_status - .map(|status| BackgroundTask::Running(status, stealer_command_tx)) - .unwrap_or(BackgroundTask::Disabled), - dns: BackgroundTask::Running(dns_status, dns_command_tx), + sniffer, + stealer, + dns, }; // WARNING: `wait_for_agent_startup` in `mirrord/kube/src/api/container.rs` expects a line @@ -735,7 +666,6 @@ async fn start_agent(args: Args) -> AgentResult<()> { Some(Err(error)) => { error!(?error, "start_agent -> Failed to join client handler task"); - Err(error)? } None => { @@ -756,30 +686,29 @@ async fn start_agent(args: Args) -> AgentResult<()> { dns, } = bg_tasks; - if let (Some(sniffer_task), BackgroundTask::Running(mut sniffer_status, _)) = - (sniffer_task, sniffer) - { - sniffer_task.join().map_err(|_| AgentError::JoinTask)?; - if let Some(err) = sniffer_status.err().await { - error!("start_agent -> sniffer task failed with error: {}", err); - } - } - - if let (Some(stealer_task), BackgroundTask::Running(mut stealer_status, _)) = - (stealer_task, stealer) - { - stealer_task.join().map_err(|_| AgentError::JoinTask)?; - if let Some(err) = stealer_status.err().await { - error!("start_agent -> stealer task failed with error: {}", err); - } - } - - if let BackgroundTask::Running(mut dns_status, _) = dns { - dns_task.join().map_err(|_| AgentError::JoinTask)?; - if let Some(err) = dns_status.err().await { - error!("start_agent -> dns task failed with error: {}", err); - } - } + tokio::join!( + async move { + if let BackgroundTask::Running(status, _) = sniffer { + if let Err(error) = status.wait().await { + error!("start_agent -> {error}"); + } + } + }, + async move { + if let BackgroundTask::Running(status, _) = stealer { + if let Err(error) = status.wait().await { + error!("start_agent -> {error}"); + } + } + }, + async move { + if let BackgroundTask::Running(status, _) = dns { + if let Err(error) = status.wait().await { + error!("start_agent -> {error}"); + } + } + }, + ); trace!("start_agent -> Agent shutdown"); @@ -850,14 +779,22 @@ async fn start_iptable_guard(args: Args) -> AgentResult<()> { result = run_child_agent() => result, }; - let _ = run_thread_in_namespace( - clear_iptable_chain(), - "clear iptables".to_owned(), - pid, - "net", - ) - .join() - .map_err(|_| AgentError::JoinTask)?; + let Some(pid) = pid else { + return result; + }; + + let runtime = RemoteRuntime::new_in_namespace(pid, NamespaceType::Net).await?; + runtime + .spawn(clear_iptable_chain()) + .await + .map_err(|error| AgentError::BackgroundTaskFailed { + task: "IPTablesCleaner", + error: Arc::new(error), + })? + .map_err(|error| AgentError::BackgroundTaskFailed { + task: "IPTablesCleaner", + error: Arc::new(error), + })?; result } @@ -888,7 +825,7 @@ async fn start_iptable_guard(args: Args) -> AgentResult<()> { /// This weird flow is a safety measure - should the real agent OOM (which means instant process /// termination) or be killed with a signal, the parent will a chance to clean iptables. If we leave /// the iptables dirty, the whole target pod is broken, probably forever. -pub async fn main() -> AgentResult<()> { +pub fn main() -> AgentResult<()> { rustls::crypto::CryptoProvider::install_default(rustls::crypto::aws_lc_rs::default_provider()) .expect("Failed to install crypto provider"); @@ -922,27 +859,20 @@ pub async fn main() -> AgentResult<()> { let args = cli::parse_args(); - let agent_result = if args.mode.is_targetless() + if args.mode.is_targetless() || (std::env::var(IPTABLE_PREROUTING_ENV).is_ok() && std::env::var(IPTABLE_MESH_ENV).is_ok() && std::env::var(IPTABLE_STANDARD_ENV).is_ok()) { - start_agent(args).await + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(TOKIO_WORKER_THREADS) + .build()? + .block_on(start_agent(args)) } else { - start_iptable_guard(args).await - }; - - match agent_result { - Ok(_) => { - info!("main -> mirrord-agent `start` exiting successfully.") - } - Err(fail) => { - error!( - "main -> mirrord-agent `start` exiting with error {:#?}", - fail - ) - } + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()? + .block_on(start_iptable_guard(args)) } - - Ok(()) } diff --git a/mirrord/agent/src/entrypoint/setup.rs b/mirrord/agent/src/entrypoint/setup.rs index 094e8685460..38ba7d44f29 100644 --- a/mirrord/agent/src/entrypoint/setup.rs +++ b/mirrord/agent/src/entrypoint/setup.rs @@ -1,53 +1,114 @@ +use std::ops::Not; + use mirrord_agent_env::envs; -use tokio::sync::oneshot; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; +use super::BackgroundTask; use crate::{ + dns::{DnsCommand, DnsWorker}, error::{AgentError, AgentResult}, incoming::{self, RedirectorTask, StealHandle}, - util::run_thread_in_namespace, + sniffer::{messages::SnifferCommand, TcpConnectionSniffer}, + steal::{StealTlsHandlerStore, StealerCommand, TcpConnectionStealer}, + util::{ + path_resolver::InTargetPathResolver, + remote_runtime::{BgTaskRuntime, IntoStatus}, + }, }; -/// Starts a [`RedirectorTask`] in the target's network namespace. +/// Starts a [`RedirectorTask`] on the given `runtime`. /// /// Returns the [`StealHandle`] that can be used to steal incoming traffic. -pub(crate) async fn start_traffic_redirector(target_pid: u64) -> AgentResult { +pub(super) async fn start_traffic_redirector(runtime: &BgTaskRuntime) -> AgentResult { let flush_connections = envs::STEALER_FLUSH_CONNECTIONS.from_env_or_default(); let pod_ips = envs::POD_IPS.from_env_or_default(); let support_ipv6 = envs::IPV6_SUPPORT.from_env_or_default(); - let (handle_tx, handle_rx) = oneshot::channel(); - - run_thread_in_namespace( - async move { - let redirector_result = - incoming::create_iptables_redirector(flush_connections, &pod_ips, support_ipv6) - .await; - - let redirector = match redirector_result { - Ok(redirector) => redirector, - Err(error) => { - let _ = handle_tx.send(Err(error)); - return; - } - }; - - let (task, handle) = RedirectorTask::new(redirector); - - if handle_tx.send(Ok(handle)).is_err() { - return; - } - - let _ = task.run().await.inspect_err(|error| { - tracing::error!(%error, "Incoming traffic redirector task failed"); - }); - }, - "IncomingTrafficRedirector".into(), - Some(target_pid), - "net", - ); - - match handle_rx.await { - Ok(result) => result.map_err(|error| AgentError::IPTablesSetupError(error.into())), - Err(..) => Err(AgentError::IPTablesSetupError("task panicked".into())), + let (task, handle) = runtime + .spawn(async move { + incoming::create_iptables_redirector(flush_connections, &pod_ips, support_ipv6) + .await + .map(RedirectorTask::new) + }) + .await + .map_err(|error| AgentError::IPTablesSetupError(error.into()))? + .map_err(|error| AgentError::IPTablesSetupError(error.into()))?; + + runtime.spawn(task.run()); + + Ok(handle) +} + +pub(super) async fn start_sniffer( + args: &super::Args, + runtime: &BgTaskRuntime, + cancellation_token: CancellationToken, +) -> BackgroundTask { + let (command_tx, command_rx) = mpsc::channel::(1000); + + let sniffer = runtime + .spawn(TcpConnectionSniffer::new( + command_rx, + args.network_interface.clone(), + args.is_mesh(), + )) + .await; + + match sniffer { + Ok(Ok(sniffer)) => { + let task_status = runtime + .spawn(sniffer.start(cancellation_token.clone())) + .into_status("TcpSnifferTask"); + + BackgroundTask::Running(task_status, command_tx) + } + Ok(Err(error)) => { + tracing::error!(%error, "Failed to create a TCP sniffer"); + BackgroundTask::Disabled + } + Err(error) => { + tracing::error!(%error, "Failed to create a TCP sniffer"); + BackgroundTask::Disabled + } } } + +pub(super) fn start_stealer( + runtime: &BgTaskRuntime, + target_pid: u64, + steal_handle: StealHandle, + cancellation_token: CancellationToken, +) -> BackgroundTask { + let (command_tx, command_rx) = mpsc::channel::(1000); + + let tls_steal_config = envs::STEAL_TLS_CONFIG.from_env_or_default(); + let tls_handler_store = tls_steal_config.is_empty().not().then(|| { + StealTlsHandlerStore::new(tls_steal_config, InTargetPathResolver::new(target_pid)) + }); + let task_status = runtime + .spawn( + TcpConnectionStealer::new(command_rx, steal_handle, tls_handler_store) + .start(cancellation_token), + ) + .into_status("TcpStealerTask"); + + BackgroundTask::Running(task_status, command_tx) +} + +pub(super) fn start_dns( + args: &super::Args, + runtime: &BgTaskRuntime, + cancellation_token: CancellationToken, +) -> BackgroundTask { + let (command_tx, command_rx) = mpsc::channel::(1000); + + let task_status = runtime + .spawn( + DnsWorker::new(runtime.target_pid(), command_rx, args.ipv6) + .run(cancellation_token.clone()), + ) + .into_status("DnsTask"); + + BackgroundTask::Running(task_status, command_tx) +} diff --git a/mirrord/agent/src/error.rs b/mirrord/agent/src/error.rs index abeb51d5cd9..b80c9b90eee 100644 --- a/mirrord/agent/src/error.rs +++ b/mirrord/agent/src/error.rs @@ -1,12 +1,10 @@ -use std::process::ExitStatus; +use std::{process::ExitStatus, sync::Arc}; -use mirrord_protocol::outgoing::udp::DaemonUdpOutgoing; use thiserror::Error; -use tokio::sync::mpsc::{self, error::SendError}; use crate::{ client_connection::TlsSetupError, incoming::RedirectorTaskError, namespace::NamespaceError, - runtime, sniffer::messages::SnifferCommand, steal::StealerCommand, + runtime, util::error::RemoteRuntimeError, }; #[derive(Debug, Error)] @@ -14,49 +12,22 @@ pub(crate) enum AgentError { #[error("io error: {0}")] IO(#[from] std::io::Error), - #[error("SnifferCommand sender failed with `{0}`")] - SendSnifferCommand(#[from] SendError), - - #[error("TCP stealer task is dead")] - TcpStealerTaskDead, - - #[error("UdpConnectRequest sender failed with `{0}`")] - SendUdpOutgoingTrafficResponse(#[from] SendError), - - #[error("task::Join failed with `{0}`")] - Join(#[from] tokio::task::JoinError), - #[error("Container runtime error: {0}")] ContainerRuntimeError(#[from] runtime::ContainerRuntimeError), #[error("Path failed with `{0}`")] StripPrefixError(#[from] std::path::StripPrefixError), - #[error("Join task failed")] - JoinTask, - - #[error("DNS request send failed with `{0}`")] - DnsRequestSendError(#[from] SendError), - - #[error("DNS background task panicked")] - DnsTaskPanic, - #[error(r#"Failed to set socket flag PACKET_IGNORE_OUTGOING, this might be due to kernel version before 4.20. Original error `{0}`"#)] PacketIgnoreOutgoing(#[source] std::io::Error), - #[error( - r#"Couldn't send message to sniffer (mirror) api, sniffer probably not running. - Possible reason can be node kernel version before 4.20 - Check agent logs for errors and please report a bug if kernel version >=4.20"# - )] - SnifferNotRunning, - - #[error("Couldn't send message to stealer (steal) api, stealer probably not running.")] - StealerNotRunning, - - #[error("Background task `{task}` failed with `{cause}`")] - BackgroundTaskFailed { task: &'static str, cause: String }, + #[error("Background task `{task}` failed: `{error}`")] + BackgroundTaskFailed { + task: &'static str, + #[source] + error: Arc, + }, #[error("Returning an error to test the agent's error cleanup. Should only ever be used when testing mirrord.")] TestError, @@ -77,22 +48,14 @@ pub(crate) enum AgentError { #[error("Timeout on accepting first client connection")] FirstConnectionTimeout, - #[allow(dead_code)] - /// Temporary error for vpn feature - #[error("Generic error in vpn: {0}")] - VpnError(String), - #[error("Incoming traffic redirector failed: {0}")] PortRedirectorError(#[from] RedirectorTaskError), #[error("IP tables setup failed: {0}")] IPTablesSetupError(#[source] Box), -} -impl From> for AgentError { - fn from(_: mpsc::error::SendError) -> Self { - Self::TcpStealerTaskDead - } + #[error("Failed to start a tokio runtime in the target's namespace: {0}")] + RemoteRuntimeError(#[from] RemoteRuntimeError), } pub(crate) type AgentResult = std::result::Result; diff --git a/mirrord/agent/src/main.rs b/mirrord/agent/src/main.rs index 03f8b3159e6..c70d930d9d0 100644 --- a/mirrord/agent/src/main.rs +++ b/mirrord/agent/src/main.rs @@ -7,12 +7,6 @@ #![warn(clippy::indexing_slicing)] #![deny(unused_crate_dependencies)] -/// Silences `deny(unused_crate_dependencies)`. -/// -/// This dependency is only used in integration tests. -#[cfg(test)] -use test_bin as _; - mod cli; mod client_connection; mod container_handle; @@ -31,10 +25,10 @@ mod sniffer; mod steal; mod util; mod vpn; -mod watched_task; -#[cfg(target_os = "linux")] -#[tokio::main(flavor = "current_thread")] -async fn main() -> crate::error::AgentResult<()> { - crate::entrypoint::main().await +/// Number of worker threads we use per [`tokio::runtime`]. +const TOKIO_WORKER_THREADS: usize = 8; + +fn main() -> crate::error::AgentResult<()> { + crate::entrypoint::main() } diff --git a/mirrord/agent/src/namespace.rs b/mirrord/agent/src/namespace.rs index d640134b7f1..2ff746abe43 100644 --- a/mirrord/agent/src/namespace.rs +++ b/mirrord/agent/src/namespace.rs @@ -1,27 +1,41 @@ -use std::fs::File; +use std::{fmt, fs::File}; use nix::sched::{setns, CloneFlags}; use thiserror::Error; +use tracing::Level; +/// Errors that can occur when entering a Linux namespace. #[derive(Debug, Error)] -pub(crate) enum NamespaceError { - #[error("Failed opening pid's namespace file: {0}")] +pub enum NamespaceError { + #[error("failed to open target's namespace file: {0}")] FailedNamespaceOpen(#[from] std::io::Error), - #[error("Failed to enter namespace: {0}")] + #[error("failed to enter target's namespace: {0}")] FailedNamespaceEnter(#[from] nix::Error), } -/// Non exhaustive namespace type enum. Add as needed -#[derive(Debug)] -pub(crate) enum NamespaceType { +/// Linux namespace types. +/// +/// Add more as needed. +#[derive(Debug, Clone, Copy)] +pub enum NamespaceType { Net, } impl NamespaceType { - #[tracing::instrument(level = "trace", ret)] - fn path_from_pid(&self, pid: u64) -> String { + /// Returns a path to the namespace file for the given target process ID. + /// + /// This path can be used with [`setns`] to enter the namespace. + fn path_for_target(self, target_pid: u64) -> String { match self { - NamespaceType::Net => format!("/proc/{}/ns/net", pid), + NamespaceType::Net => format!("/proc/{target_pid}/ns/net"), + } + } +} + +impl fmt::Display for NamespaceType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Net => f.write_str("net"), } } } @@ -34,14 +48,11 @@ impl From for CloneFlags { } } -/// Set namespace by cloneflags and pid. -/// NOTE: don't make it async in the case we're in an multi-thread scheduler and we want it to -/// happen on the same thread always. -#[tracing::instrument(level = "trace")] -pub(crate) fn set_namespace(pid: u64, namespace_type: NamespaceType) -> Result<(), NamespaceError> { - let fd = File::open(namespace_type.path_from_pid(pid))?; +/// Reassociates the current thread with the target's namespace. +#[tracing::instrument(level = Level::TRACE, ret, err)] +pub fn set_namespace(target_pid: u64, namespace_type: NamespaceType) -> Result<(), NamespaceError> { + let file = File::open(namespace_type.path_for_target(target_pid))?; + setns(file, namespace_type.into())?; - // use as_raw_fd to get reference so it will drop after setns - setns(fd, namespace_type.into())?; Ok(()) } diff --git a/mirrord/agent/src/outgoing.rs b/mirrord/agent/src/outgoing.rs index dda3813be7b..e36dcc7838d 100644 --- a/mirrord/agent/src/outgoing.rs +++ b/mirrord/agent/src/outgoing.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fmt, thread, time::Duration}; +use std::{collections::HashMap, fmt, time::Duration}; use bytes::Bytes; use mirrord_protocol::{ @@ -20,8 +20,7 @@ use tracing::Level; use crate::{ error::AgentResult, metrics::TCP_OUTGOING_CONNECTION, - util::run_thread_in_namespace, - watched_task::{TaskStatus, WatchedTask}, + util::remote_runtime::{BgTaskRuntime, BgTaskStatus, IntoStatus}, }; mod socket_stream; @@ -33,11 +32,7 @@ pub(crate) use udp::UdpOutgoingApi; /// Each agent client has their own independent instance (neither this wrapper nor the background /// task are shared). pub(crate) struct TcpOutgoingApi { - /// Holds the thread in which [`TcpOutgoingTask`] is running. - _task: thread::JoinHandle<()>, - - /// Status of the [`TcpOutgoingTask`]. - task_status: TaskStatus, + task_status: BgTaskStatus, /// Sends the layer messages to the [`TcpOutgoingTask`]. layer_tx: Sender, @@ -47,33 +42,22 @@ pub(crate) struct TcpOutgoingApi { } impl TcpOutgoingApi { - const TASK_NAME: &'static str = "TcpOutgoing"; - - /// Spawns a new background task for handling `outgoing` feature and creates a new instance of - /// this struct to serve as an interface. + /// Spawns a new background task for handling the `outgoing` feature and creates a new instance + /// of this struct to serve as an interface. /// /// # Params /// - /// * `pid` - process id of the agent's target container - #[tracing::instrument(level = Level::TRACE)] - pub(crate) fn new(pid: Option) -> Self { + /// * `runtime` - tokio runtime to spawn the background task on. + pub(crate) fn new(runtime: &BgTaskRuntime) -> Self { let (layer_tx, layer_rx) = mpsc::channel(1000); let (daemon_tx, daemon_rx) = mpsc::channel(1000); - let watched_task = WatchedTask::new( - Self::TASK_NAME, - TcpOutgoingTask::new(pid, layer_rx, daemon_tx).run(), - ); - let task_status = watched_task.status(); - let task = run_thread_in_namespace( - watched_task.start(), - Self::TASK_NAME.to_string(), - pid, - "net", - ); + let pid = runtime.target_pid(); + let task_status = runtime + .spawn(TcpOutgoingTask::new(pid, layer_rx, daemon_tx).run()) + .into_status("TcpOutgoingTask"); Self { - _task: task, task_status, layer_tx, daemon_rx, @@ -86,7 +70,7 @@ impl TcpOutgoingApi { if self.layer_tx.send(message).await.is_ok() { Ok(()) } else { - Err(self.task_status.unwrap_err().await) + Err(self.task_status.wait_assert_running().await) } } @@ -95,7 +79,7 @@ impl TcpOutgoingApi { pub(crate) async fn recv_from_task(&mut self) -> AgentResult { match self.daemon_rx.recv().await { Some(msg) => Ok(msg), - None => Err(self.task_status.unwrap_err().await), + None => Err(self.task_status.wait_assert_running().await), } } } @@ -157,10 +141,9 @@ impl TcpOutgoingTask { } } - /// Runs this task as long as the channels connecting it with [`TcpOutgoingApi`] are open. - /// This routine never fails and returns [`Result`] only due to [`WatchedTask`] constraints. + /// Runs this task as long as the channels connecting it with the [`TcpOutgoingApi`] are open. #[tracing::instrument(level = Level::TRACE, skip(self))] - async fn run(mut self) -> AgentResult<()> { + async fn run(mut self) { loop { let channel_closed = select! { biased; @@ -182,7 +165,7 @@ impl TcpOutgoingTask { if channel_closed { tracing::trace!("Client channel closed, exiting"); - break Ok(()); + break; } } } diff --git a/mirrord/agent/src/outgoing/udp.rs b/mirrord/agent/src/outgoing/udp.rs index d2a2128da96..8985815544d 100644 --- a/mirrord/agent/src/outgoing/udp.rs +++ b/mirrord/agent/src/outgoing/udp.rs @@ -2,7 +2,6 @@ use core::fmt; use std::{ collections::HashMap, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, - thread, }; use bytes::{Bytes, BytesMut}; @@ -27,13 +26,12 @@ use tracing::Level; use crate::{ error::AgentResult, metrics::UDP_OUTGOING_CONNECTION, - util::run_thread_in_namespace, - watched_task::{TaskStatus, WatchedTask}, + util::remote_runtime::{BgTaskRuntime, BgTaskStatus, IntoStatus}, }; /// Task that handles [`LayerUdpOutgoing`] and [`DaemonUdpOutgoing`] messages. /// -/// We start these tasks from the [`UdpOutgoingApi`] as a [`WatchedTask`]. +/// We start these tasks from the [`UdpOutgoingApi`] on a [`BgTaskRuntime`]. struct UdpOutgoingTask { next_connection_id: ConnectionId, /// Writing halves of peer connections made on layer's requests. @@ -87,11 +85,9 @@ impl UdpOutgoingTask { } } - /// Runs this task as long as the channels connecting it with [`UdpOutgoingApi`] are open. - /// This routine never fails and returns [`AgentResult`] only due to [`WatchedTask`] - /// constraints. + /// Runs this task as long as the channels connecting it with the [`UdpOutgoingApi`] are open. #[tracing::instrument(level = Level::TRACE, skip(self))] - pub(super) async fn run(mut self) -> AgentResult<()> { + pub(super) async fn run(mut self) { loop { let channel_closed = select! { biased; @@ -113,7 +109,7 @@ impl UdpOutgoingTask { if channel_closed { tracing::trace!("Client channel closed, exiting"); - break Ok(()); + break; } } } @@ -276,12 +272,7 @@ impl UdpOutgoingTask { /// Handles (briefly) the `UdpOutgoingRequest` and `UdpOutgoingResponse` messages, mostly the /// passing of these messages to the `interceptor_task` thread. pub(crate) struct UdpOutgoingApi { - /// Holds the `interceptor_task`. - _task: thread::JoinHandle<()>, - - /// Status of the `interceptor_task`. - task_status: TaskStatus, - + task_status: BgTaskStatus, /// Sends the `Layer` message to the `interceptor_task`. layer_tx: Sender, @@ -312,27 +303,15 @@ async fn connect(remote_address: SocketAddress) -> Result) -> Self { + pub(crate) fn new(runtime: &BgTaskRuntime) -> Self { let (layer_tx, layer_rx) = mpsc::channel(1000); let (daemon_tx, daemon_rx) = mpsc::channel(1000); - let watched_task = WatchedTask::new( - Self::TASK_NAME, - UdpOutgoingTask::new(pid, layer_rx, daemon_tx).run(), - ); - - let task_status = watched_task.status(); - let task = run_thread_in_namespace( - watched_task.start(), - Self::TASK_NAME.to_string(), - pid, - "net", - ); + let task_status = runtime + .spawn(UdpOutgoingTask::new(runtime.target_pid(), layer_rx, daemon_tx).run()) + .into_status("UdpOutgoingTask"); Self { - _task: task, task_status, layer_tx, daemon_rx, @@ -345,7 +324,7 @@ impl UdpOutgoingApi { if self.layer_tx.send(message).await.is_ok() { Ok(()) } else { - Err(self.task_status.unwrap_err().await) + Err(self.task_status.wait_assert_running().await) } } @@ -353,7 +332,7 @@ impl UdpOutgoingApi { pub(crate) async fn recv_from_task(&mut self) -> AgentResult { match self.daemon_rx.recv().await { Some(msg) => Ok(msg), - None => Err(self.task_status.unwrap_err().await), + None => Err(self.task_status.wait_assert_running().await), } } } diff --git a/mirrord/agent/src/sniffer.rs b/mirrord/agent/src/sniffer.rs index 49d0e62b109..b1a6f1cb0e0 100644 --- a/mirrord/agent/src/sniffer.rs +++ b/mirrord/agent/src/sniffer.rs @@ -187,8 +187,6 @@ impl TcpConnectionSniffer where R: TcpCapture, { - pub const TASK_NAME: &'static str = "Sniffer"; - /// Capacity of [`broadcast`] channels used to distribute incoming TCP packets between clients. const CONNECTION_DATA_CHANNEL_CAPACITY: usize = 512; @@ -418,11 +416,11 @@ mod test { use tokio::sync::mpsc; use super::*; - use crate::watched_task::{TaskStatus, WatchedTask}; + use crate::util::remote_runtime::{BgTaskRuntime, BgTaskStatus, IntoStatus}; struct TestSnifferSetup { command_tx: Sender, - task_status: TaskStatus, + task_status: BgTaskStatus, packet_tx: Sender<(TcpSessionDirectionId, TcpPacketData)>, times_filter_changed: Arc, next_client_id: ClientId, @@ -458,12 +456,10 @@ mod test { client_txs: Default::default(), clients_closed: Default::default(), }; - let watched_task = WatchedTask::new( - TcpConnectionSniffer::::TASK_NAME, - sniffer.start(CancellationToken::new()), - ); - let task_status = watched_task.status(); - tokio::spawn(watched_task.start()); + + let task_status = BgTaskRuntime::Local + .spawn(sniffer.start(CancellationToken::new())) + .into_status("TcpSnifferTask"); Self { command_tx, diff --git a/mirrord/agent/src/sniffer/api.rs b/mirrord/agent/src/sniffer/api.rs index 08874e93124..44e19bc9eda 100644 --- a/mirrord/agent/src/sniffer/api.rs +++ b/mirrord/agent/src/sniffer/api.rs @@ -18,7 +18,10 @@ use super::{ messages::{SniffedConnection, SnifferCommand, SnifferCommandInner}, AgentResult, }; -use crate::{error::AgentError, util::ClientId, watched_task::TaskStatus}; +use crate::{ + error::AgentError, + util::{remote_runtime::BgTaskStatus, ClientId}, +}; /// Interface used by clients to interact with the /// [`TcpConnectionSniffer`](super::TcpConnectionSniffer). Multiple instances of this struct operate @@ -34,7 +37,7 @@ pub(crate) struct TcpSnifferApi { /// [`TcpConnectionSniffer`](super::TcpConnectionSniffer). receiver: Receiver, /// View on the sniffer task's status. - task_status: TaskStatus, + task_status: BgTaskStatus, /// Currently sniffed connections. connections: StreamMap>>>, /// Ids for sniffed connections. @@ -59,7 +62,7 @@ impl TcpSnifferApi { pub async fn new( client_id: ClientId, sniffer_sender: Sender, - mut task_status: TaskStatus, + task_status: BgTaskStatus, ) -> AgentResult { let (sender, receiver) = mpsc::channel(Self::CONNECTION_CHANNEL_SIZE); @@ -68,7 +71,7 @@ impl TcpSnifferApi { command: SnifferCommandInner::NewClient(sender), }; if sniffer_sender.send(command).await.is_err() { - return Err(task_status.unwrap_err().await); + return Err(task_status.wait_assert_running().await); } Ok(Self { @@ -93,7 +96,7 @@ impl TcpSnifferApi { if self.sender.send(command).await.is_ok() { Ok(()) } else { - Err(self.task_status.unwrap_err().await) + Err(self.task_status.wait_assert_running().await) } } @@ -120,7 +123,7 @@ impl TcpSnifferApi { }, None => { - Err(self.task_status.unwrap_err().await) + Err(self.task_status.wait_assert_running().await) }, }, @@ -157,7 +160,7 @@ impl TcpSnifferApi { Some(result) = self.subscriptions_in_progress.next() => match result { Ok(port) => Ok((DaemonTcp::SubscribeResult(Ok(port)), None)), Err(..) => { - Err(self.task_status.unwrap_err().await) + Err(self.task_status.wait_assert_running().await) } } } diff --git a/mirrord/agent/src/steal/api.rs b/mirrord/agent/src/steal/api.rs index fd7346bca44..8e651b3a724 100644 --- a/mirrord/agent/src/steal/api.rs +++ b/mirrord/agent/src/steal/api.rs @@ -12,8 +12,9 @@ use tracing::Level; use super::{http::ReceiverStreamBody, *}; use crate::{ - error::AgentResult, metrics::HTTP_REQUEST_IN_PROGRESS_COUNT, util::ClientId, - watched_task::TaskStatus, + error::AgentResult, + metrics::HTTP_REQUEST_IN_PROGRESS_COUNT, + util::{remote_runtime::BgTaskStatus, ClientId}, }; type ResponseBodyTx = Sender, Infallible>>; @@ -43,7 +44,7 @@ pub(crate) struct TcpStealerApi { daemon_rx: Receiver, /// View on the stealer task's status. - task_status: TaskStatus, + task_status: BgTaskStatus, /// [`Sender`]s that allow us to provide body [`Frame`]s of responses to filtered HTTP /// requests. @@ -63,17 +64,20 @@ impl TcpStealerApi { pub(crate) async fn new( client_id: ClientId, command_tx: Sender, - task_status: TaskStatus, + task_status: BgTaskStatus, channel_size: usize, ) -> AgentResult { let (daemon_tx, daemon_rx) = mpsc::channel(channel_size); - command_tx + let init_result = command_tx .send(StealerCommand { client_id, command: Command::NewClient(daemon_tx), }) - .await?; + .await; + if init_result.is_err() { + return Err(task_status.wait_assert_running().await); + } Ok(Self { client_id, @@ -94,7 +98,7 @@ impl TcpStealerApi { if self.command_tx.send(command).await.is_ok() { Ok(()) } else { - Err(self.task_status.unwrap_err().await) + Err(self.task_status.wait_assert_running().await) } } @@ -117,7 +121,7 @@ impl TcpStealerApi { } Ok(msg) } - None => Err(self.task_status.unwrap_err().await), + None => Err(self.task_status.wait_assert_running().await), } } diff --git a/mirrord/agent/src/steal/connection.rs b/mirrord/agent/src/steal/connection.rs index 8749291fab8..cf7d73774dc 100644 --- a/mirrord/agent/src/steal/connection.rs +++ b/mirrord/agent/src/steal/connection.rs @@ -392,8 +392,6 @@ pub(crate) struct TcpConnectionStealer { } impl TcpConnectionStealer { - pub const TASK_NAME: &'static str = "Stealer"; - /// Initializes a new [`TcpConnectionStealer`], but doesn't start the actual work. /// You need to call [`TcpConnectionStealer::start`] to do so. pub(crate) fn new( @@ -807,7 +805,7 @@ mod test { net::{TcpListener, TcpStream}, sync::{ mpsc::{self, Receiver, Sender}, - oneshot, watch, + oneshot, }, }; use tokio_stream::wrappers::ReceiverStream; @@ -820,7 +818,7 @@ mod test { connection::{Client, MatchedHttpRequest}, TcpConnectionStealer, TcpStealerApi, }, - watched_task::TaskStatus, + util::remote_runtime::{BgTaskRuntime, IntoStatus}, }; async fn prepare_dummy_service() -> ( @@ -1025,11 +1023,11 @@ mod test { let (task, handle) = RedirectorTask::new(redirector); tokio::spawn(task.run()); - let stealer = TcpConnectionStealer::new(command_rx, handle, None); - tokio::spawn(stealer.start(CancellationToken::new())); - - let (_dummy_tx, dummy_rx) = watch::channel(None); - let task_status = TaskStatus::dummy(TcpConnectionStealer::TASK_NAME, dummy_rx); + let task_status = BgTaskRuntime::Local + .spawn( + TcpConnectionStealer::new(command_rx, handle, None).start(CancellationToken::new()), + ) + .into_status("TcpStealerTask"); let mut api = TcpStealerApi::new(0, command_tx.clone(), task_status, 8) .await .unwrap(); diff --git a/mirrord/agent/src/util.rs b/mirrord/agent/src/util.rs index 24944066a42..c3ec87d45d9 100644 --- a/mirrord/agent/src/util.rs +++ b/mirrord/agent/src/util.rs @@ -5,19 +5,14 @@ use std::{ hash::Hash, pin::Pin, task::{Context, Poll}, - thread::JoinHandle, }; use futures::{future::BoxFuture, FutureExt}; use tokio::sync::mpsc; -use tracing::error; - -use crate::{ - error::AgentResult, - namespace::{set_namespace, NamespaceType}, -}; +pub mod error; pub mod path_resolver; +pub mod remote_runtime; /// Struct that helps you manage topic -> subscribers /// @@ -99,71 +94,6 @@ where } } -/// Helper that creates a new [`tokio::runtime::Runtime`], and immediately blocks on it. -/// -/// Used to start new tasks that would be too heavy for just [`tokio::task::spawn()`] in the -/// caller's runtime. -/// -/// These tasks will execute `on_start_fn` to change namespace (see [`enter_namespace`] for more -/// details). -#[tracing::instrument(level = "trace", skip_all)] -pub(crate) fn run_thread( - future: F, - thread_name: String, - on_start_fn: StartFn, -) -> JoinHandle -where - F: Future + Send + 'static, - F::Output: Send + 'static, - StartFn: Fn() + Send + Sync + 'static, -{ - std::thread::spawn(move || { - on_start_fn(); - - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .thread_name(thread_name) - .on_thread_start(on_start_fn) - .build() - .unwrap(); - rt.block_on(future) - }) -} - -/// Calls [`run_thread`] with `on_start_fn` always being [`enter_namespace`]. -#[tracing::instrument(level = "trace", skip_all)] -pub(crate) fn run_thread_in_namespace( - future: F, - thread_name: String, - pid: Option, - namespace: &str, -) -> JoinHandle -where - F: Future + Send + 'static, - F::Output: Send + 'static, -{ - let namespace = namespace.to_string(); - - run_thread(future, thread_name, move || { - enter_namespace(pid, &namespace).expect("Failed setting namespace!") - }) -} - -/// Used to enter a different (so far only used for "net") namespace for a task. -/// -/// Many of the agent's TCP/UDP connections require that they're made from the `pid`'s namespace to -/// work. -#[tracing::instrument(level = "trace")] -pub(crate) fn enter_namespace(pid: Option, namespace: &str) -> AgentResult<()> { - if let Some(pid) = pid { - Ok(set_namespace(pid, NamespaceType::Net).inspect_err(|fail| { - error!("Failed setting pid {pid:#?} namespace {namespace:#?} with {fail:#?}") - })?) - } else { - Ok(()) - } -} - /// [`Future`] that resolves to [`ClientId`] when the client drops their [`mpsc::Receiver`]. pub(crate) struct ChannelClosedFuture(BoxFuture<'static, ClientId>); diff --git a/mirrord/agent/src/util/error.rs b/mirrord/agent/src/util/error.rs new file mode 100644 index 00000000000..f482b7841c7 --- /dev/null +++ b/mirrord/agent/src/util/error.rs @@ -0,0 +1,27 @@ +use std::io; + +use thiserror::Error; + +use crate::namespace::NamespaceError; + +/// Errors that can occur when creating a [`RemoteRuntime`](super::remote_runtime::RemoteRuntime). +#[derive(Error, Debug)] +pub enum RemoteRuntimeError { + #[error("failed to spawn runtime thread: {0}")] + ThreadSpawnError(#[source] io::Error), + #[error(transparent)] + NamespaceError(#[from] NamespaceError), + #[error("failed to build tokio runtime: {0}")] + TokioRuntimeError(#[source] io::Error), + #[error("runtime thread panicked")] + Panicked, +} + +/// An error that occurs when polling a future spawned with +/// [`RemoteRuntime::spawn`](super::remote_runtime::RemoteRuntime::spawn) or +/// [`BgTaskRuntime::spawn`](super::remote_runtime::BgTaskRuntime::spawn). +/// +/// This error indicated that the future has panicked. +#[derive(Debug, Error)] +#[error("task panicked")] +pub struct BgTaskPanicked; diff --git a/mirrord/agent/src/util/remote_runtime.rs b/mirrord/agent/src/util/remote_runtime.rs new file mode 100644 index 00000000000..851c41d1484 --- /dev/null +++ b/mirrord/agent/src/util/remote_runtime.rs @@ -0,0 +1,313 @@ +//! Utilities for running async code in the agent target's namespace. +//! +//! Useful for running tasks that require access to the target's network namespace, +//! such as traffic stealing, traffic mirroring, DNS resolution, outgoing traffic. +//! +//! Provides: +//! 1. A [`RemoteRuntime`] struct, that can be used to run tasks in the target's namespace. +//! 2. A [`BgTaskRuntime`] enum, that don't necessarily require a target (DNS and outgoing traffic), +//! but should be run in the target's namespace if available. +//! 3. A [`BgTaskStatus`] struct, that can be used to poll for a spawned task's status. + +use std::{ + error::Error, + fmt, + future::Future, + ops::Not, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + thread, +}; + +use futures::{ + future::{BoxFuture, Shared}, + FutureExt, +}; +use tokio::sync::{mpsc, oneshot}; + +use super::error::{BgTaskPanicked, RemoteRuntimeError}; +use crate::{ + error::AgentError, + namespace::{self, NamespaceType}, + TOKIO_WORKER_THREADS, +}; + +/// A cloneable handle to a remote [`tokio::runtime::Runtime`] that runs in its own thread. +/// +/// Can be used to spawn tasks with [`RemoteRuntime::spawn`]. +/// +/// The runtime will be aborted when all handles are dropped. +#[derive(Clone)] +pub struct RemoteRuntime { + target_pid: u64, + future_tx: mpsc::Sender>, +} + +impl RemoteRuntime { + /// Creates a new remote runtime. + /// + /// This runtime's thread will enter the specified namespace of the target. + pub async fn new_in_namespace( + target_pid: u64, + namespace_type: NamespaceType, + ) -> Result { + let (future_tx, mut future_rx) = mpsc::channel(16); + let (result_tx, result_rx) = oneshot::channel(); + let thread_name = format!("remote-{target_pid}-{namespace_type}-runtime-thread"); + let thread_logic = move || { + if let Err(error) = namespace::set_namespace(target_pid, namespace_type) { + let _ = result_tx.send(Err(error.into())); + return; + } + + let rt_result = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(TOKIO_WORKER_THREADS) + .thread_name(format!( + "remote-{target_pid}-{namespace_type}-runtime-worker" + )) + .build(); + let rt = match rt_result { + Ok(rt) => rt, + Err(error) => { + let _ = result_tx.send(Err(RemoteRuntimeError::TokioRuntimeError(error))); + return; + } + }; + + if result_tx.send(Ok(())).is_err() { + return; + } + + rt.block_on(async move { + while let Some(future) = future_rx.recv().await { + tokio::spawn(future); + } + }); + }; + + thread::Builder::new() + .name(thread_name) + .spawn(thread_logic) + .map_err(RemoteRuntimeError::ThreadSpawnError)?; + + match result_rx.await { + Ok(Ok(())) => Ok(Self { + target_pid, + future_tx, + }), + Ok(Err(error)) => Err(error), + Err(..) => Err(RemoteRuntimeError::Panicked), + } + } + + /// Spawns the given future on this remote runtime. + pub fn spawn(&self, future: F) -> BgTask + where + F: 'static + Future + Send, + F::Output: 'static + Send, + { + let (result_tx, result_rx) = oneshot::channel(); + + let future = async move { + let result = future.await; + let _ = result_tx.send(result); + } + .boxed(); + + let future_tx = self.future_tx.clone(); + tokio::spawn(async move { + let _ = future_tx.send(future).await; + }); + + BgTask { + future_result: result_rx, + } + } + + /// Returns the target's PID. + pub fn target_pid(&self) -> u64 { + self.target_pid + } +} + +impl fmt::Debug for RemoteRuntime { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RemoteRuntime") + .field("running", &self.future_tx.is_closed().not()) + .finish() + } +} + +/// A future spawned with [`RemoteRuntime::spawn`] or +/// [`BgTaskRuntime::spawn`] +pub struct BgTask { + future_result: oneshot::Receiver, +} + +impl Future for BgTask { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + Pin::new(&mut this.future_result) + .poll(cx) + .map_err(|_| BgTaskPanicked) + } +} + +/// A cloneable status of a future spawned with [`RemoteRuntime::spawn`] or +/// [`BgTaskRuntime::spawn`]. +#[derive(Clone)] +pub struct BgTaskStatus { + task_name: &'static str, + result: Shared>>>, +} + +impl BgTaskStatus { + /// Waits for the future to finish and returns its result. + /// + /// Should the future fail or panic, this function will return + /// [`AgentError::BackgroundTaskFailed`]. + pub async fn wait(&self) -> Result<(), AgentError> { + match self.result.clone().await { + Ok(Ok(())) => Ok(()), + Ok(Err(error)) => Err(AgentError::BackgroundTaskFailed { + task: self.task_name, + error, + }), + Err(..) => Err(AgentError::BackgroundTaskFailed { + task: self.task_name, + error: Arc::new(BgTaskPanicked) as Arc, + }), + } + } + + /// Waits for the future to finish and returns its result. + /// + /// This function always returns [`AgentError::BackgroundTaskFailed`]. Use it when the task is + /// not expected to finish yet. + pub async fn wait_assert_running(&self) -> AgentError { + match self.result.clone().await { + Ok(Ok(())) => AgentError::BackgroundTaskFailed { + task: self.task_name, + error: Box::::from("task finished unexpectedly").into(), + }, + Ok(Err(error)) => AgentError::BackgroundTaskFailed { + task: self.task_name, + error, + }, + Err(..) => AgentError::BackgroundTaskFailed { + task: self.task_name, + error: Arc::new(BgTaskPanicked) as Arc, + }, + } + } +} + +impl fmt::Debug for BgTaskStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BgTaskStatus") + .field("task_name", &self.task_name) + .field("result", &self.result.clone().now_or_never()) + .finish() + } +} + +/// Convenience trait for transforming [`BgTask`] into [`BgTaskStatus`]. +pub trait IntoStatus { + fn into_status(self, task_name: &'static str) -> BgTaskStatus; +} + +impl IntoStatus for BgTask> +where + E: Error + Send + Sync + 'static, +{ + fn into_status(self, task_name: &'static str) -> BgTaskStatus { + let (result_tx, result_rx) = oneshot::channel(); + + tokio::spawn(async move { + let result = match self.future_result.await { + Ok(Ok(())) => Ok(()), + Ok(Err(e)) => Err(Arc::new(e) as Arc), + Err(..) => Err(Arc::new(BgTaskPanicked) as Arc), + }; + + let _ = result_tx.send(result); + }); + + BgTaskStatus { + task_name, + result: result_rx.shared(), + } + } +} + +impl IntoStatus for BgTask<()> { + fn into_status(self, task_name: &'static str) -> BgTaskStatus { + let (result_tx, result_rx) = oneshot::channel(); + + tokio::spawn(async move { + let result = match self.future_result.await { + Ok(()) => Ok(()), + Err(..) => Err(Arc::new(BgTaskPanicked) as Arc), + }; + + let _ = result_tx.send(result); + }); + + BgTaskStatus { + task_name, + result: result_rx.shared(), + } + } +} + +/// A runtime to spawn tasks on, either remote or local. +/// +/// This can be used to spawn tasks that can either run in the target's namespace or the agent's. +/// +/// If the agent has a target, you should use [`BgTaskRuntime::Remote`]. +/// If the agent does not have a target, you should fallback to [`BgTaskRuntime::Local`]. +#[derive(Clone)] +pub enum BgTaskRuntime { + /// Remote runtime, which runs in the target's namespace. + Remote(RemoteRuntime), + /// Local runtime ([`tokio::runtime::Handle::current`]). + Local, +} + +impl BgTaskRuntime { + /// Spawns the given future on this runtime. + pub fn spawn(&self, future: F) -> BgTask + where + F: 'static + Future + Send, + F::Output: 'static + Send, + { + match self { + Self::Remote(remote_runtime) => remote_runtime.spawn(future), + Self::Local => { + let (result_tx, result_rx) = oneshot::channel(); + + tokio::spawn(async move { + let result = future.await; + let _ = result_tx.send(result); + }); + + BgTask { + future_result: result_rx, + } + } + } + } + + /// If this is a remote runtime, returns the target's PID. + /// Otherwise, returns [`None`]. + pub fn target_pid(&self) -> Option { + match self { + Self::Remote(remote_runtime) => Some(remote_runtime.target_pid()), + Self::Local => None, + } + } +} diff --git a/mirrord/agent/src/vpn.rs b/mirrord/agent/src/vpn.rs index d7d30d5ca6f..276db95e558 100644 --- a/mirrord/agent/src/vpn.rs +++ b/mirrord/agent/src/vpn.rs @@ -1,9 +1,10 @@ +//! This code is not used anywhere. + #![allow(dead_code)] use std::{ fmt, - io::Read, + io::{self, Read}, net::{IpAddr, Ipv4Addr, SocketAddr}, - thread, }; use mirrord_protocol::vpn::{ClientVpn, NetworkConfiguration, ServerVpn}; @@ -17,21 +18,15 @@ use tokio::{ }; use crate::{ - error::{AgentError, AgentResult}, - util::run_thread_in_namespace, - watched_task::{TaskStatus, WatchedTask}, + error::AgentResult, + util::remote_runtime::{BgTaskRuntime, BgTaskStatus, IntoStatus}, }; /// An interface for a background task handling [`ClientVpn`] messages. /// Each agent client has their own independent instance (neither this wrapper nor the background /// task are shared). pub(crate) struct VpnApi { - /// Holds the thread in which [`VpnTask`] is running. - _task: thread::JoinHandle<()>, - - /// Status of the [`VpnTask`]. - task_status: TaskStatus, - + task_status: BgTaskStatus, /// Sends the layer messages to the [`VpnTask`]. layer_tx: Sender, @@ -40,33 +35,22 @@ pub(crate) struct VpnApi { } impl VpnApi { - const TASK_NAME: &'static str = "Vpn"; - - /// Spawns a new background task for handling `outgoing` feature and creates a new instance of + /// Spawns a new background task for handling the `vpn` feature and creates a new instance of /// this struct to serve as an interface. /// /// # Params /// - /// * `pid` - process id of the agent's target container - #[tracing::instrument(level = "trace")] - pub(crate) fn new(pid: Option) -> Self { + /// * `runtime` - tokio runtime to spawn the task on. + pub(crate) fn new(runtime: &BgTaskRuntime) -> Self { let (layer_tx, layer_rx) = mpsc::channel(1000); let (daemon_tx, daemon_rx) = mpsc::channel(1000); + let pid = runtime.target_pid(); - let watched_task = WatchedTask::new( - Self::TASK_NAME, - VpnTask::new(pid, layer_rx, daemon_tx).run(), - ); - let task_status = watched_task.status(); - let task = run_thread_in_namespace( - watched_task.start(), - Self::TASK_NAME.to_string(), - pid, - "net", - ); + let task_status = runtime + .spawn(VpnTask::new(pid, layer_rx, daemon_tx).run()) + .into_status("VpnTask"); Self { - _task: task, task_status, layer_tx, daemon_rx, @@ -74,12 +58,11 @@ impl VpnApi { } /// Sends the [`ClientVpn`] message to the background task. - #[tracing::instrument(level = "trace", skip(self))] pub(crate) async fn layer_message(&mut self, message: ClientVpn) -> AgentResult<()> { if self.layer_tx.send(message).await.is_ok() { Ok(()) } else { - Err(self.task_status.unwrap_err().await) + Err(self.task_status.wait_assert_running().await) } } @@ -87,7 +70,7 @@ impl VpnApi { pub(crate) async fn daemon_message(&mut self) -> AgentResult { match self.daemon_rx.recv().await { Some(msg) => Ok(msg), - None => Err(self.task_status.unwrap_err().await), + None => Err(self.task_status.wait_assert_running().await), } } } @@ -121,25 +104,27 @@ impl AsyncRawSocket { } } -async fn create_raw_socket() -> AgentResult { - let index = nix::net::if_::if_nametoindex("eth0") - .map_err(|err| AgentError::VpnError(err.to_string()))?; +async fn create_raw_socket() -> io::Result { + let index = nix::net::if_::if_nametoindex("eth0")?; let socket = Socket::new( Domain::PACKET, Type::DGRAM, Some(Protocol::from(libc::ETH_P_IP.to_be())), )?; - let sock_addr = interface_index_to_sock_addr( - i32::try_from(index).map_err(|err| AgentError::VpnError(err.to_string()))?, - )?; + let sock_addr = interface_index_to_sock_addr(i32::try_from(index).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + format!("invalid interface index {index}"), + ) + })?)?; socket.bind(&sock_addr)?; socket.set_nonblocking(true)?; - AsyncRawSocket::new(socket, sock_addr).map_err(From::from) + AsyncRawSocket::new(socket, sock_addr) } #[tracing::instrument(level = "debug", ret)] -async fn resolve_interface() -> AgentResult<(IpAddr, IpAddr, IpAddr)> { +async fn resolve_interface() -> io::Result<(IpAddr, IpAddr, IpAddr)> { // Connect to a remote address so we can later get the default network interface. let temporary_socket = UdpSocket::bind("0.0.0.0:0").await?; temporary_socket.connect("8.8.8.8:53").await?; @@ -152,36 +137,50 @@ async fn resolve_interface() -> AgentResult<(IpAddr, IpAddr, IpAddr)> { let raw_local_address = SockaddrStorage::from(local_address); // Try to find an interface that matches the local ip we have. - let usable_interface = nix::ifaddrs::getifaddrs() - .map_err(|err| AgentError::VpnError(err.to_string()))? + let usable_interface = nix::ifaddrs::getifaddrs()? .find(|iface| { iface .address .map(|addr| addr == raw_local_address) .unwrap_or(false) }) - .ok_or_else(|| AgentError::VpnError("usable_interface".to_owned()))?; + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no usable interface"))?; let ip = usable_interface .address - .ok_or_else(|| AgentError::VpnError("usable_interface.address".to_owned()))? - .as_sockaddr_in() - .ok_or_else(|| AgentError::VpnError("usable_interface.address.as_sockaddr_in".to_owned()))? + .as_ref() + .and_then(SockaddrStorage::as_sockaddr_in) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "usable_interface.address.as_sockaddr_in", + ) + })? .ip() .into(); let net_mask = usable_interface .netmask - .ok_or_else(|| AgentError::VpnError("usable_interface.netmask".to_owned()))? - .as_sockaddr_in() - .ok_or_else(|| AgentError::VpnError("usable_interface.netmask.as_sockaddr_in".to_owned()))? + .as_ref() + .and_then(SockaddrStorage::as_sockaddr_in) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "usable_interface.netmask.as_sockaddr_in", + ) + })? .ip() .into(); // extracting gateway is more difficult, ugly patch for now. let temp_gateway = usable_interface .address - .ok_or_else(|| AgentError::VpnError("usable_interface.address".to_owned()))? - .as_sockaddr_in() - .ok_or_else(|| AgentError::VpnError("usable_interface.address.as_sockaddr_in".to_owned()))? + .as_ref() + .and_then(SockaddrStorage::as_sockaddr_in) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "usable_interface.address.as_sockaddr_in", + ) + })? .ip() .octets(); @@ -209,16 +208,16 @@ impl fmt::Debug for VpnTask { } } -fn interface_index_to_sock_addr(index: i32) -> AgentResult { +fn interface_index_to_sock_addr(index: i32) -> io::Result { let mut addr_storage: libc::sockaddr_storage = unsafe { std::mem::zeroed() }; let len = std::mem::size_of::() as libc::socklen_t; - let macs = procfs::net::arp().map_err(|err| AgentError::VpnError(err.to_string()))?; + let macs = procfs::net::arp().map_err(|error| io::Error::new(io::ErrorKind::Other, error))?; tracing::debug!(?macs, "arp entries"); let hw_addr = macs .into_iter() .find_map(|entry| entry.hw_address) - .ok_or_else(|| AgentError::VpnError("no entry with hw_address".to_owned()))?; + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no entry with hw address"))?; unsafe { let sock_addr = std::ptr::addr_of_mut!(addr_storage) as *mut libc::sockaddr_ll; @@ -245,7 +244,7 @@ impl VpnTask { } #[allow(clippy::indexing_slicing)] - async fn run(mut self) -> AgentResult<()> { + async fn run(mut self) -> io::Result<()> { // so host won't respond with RST to our packets. // TODO: need to do it for UDP as well to avoid ICMP unreachable. let output = std::process::Command::new("iptables") @@ -260,8 +259,7 @@ impl VpnTask { "-j", "DROP", ]) - .output() - .map_err(|err| AgentError::VpnError(err.to_string()))?; + .output()?; tracing::debug!(?output, "iptables output"); let (ip, net_mask, gateway) = resolve_interface().await?; @@ -298,7 +296,7 @@ impl VpnTask { self.daemon_tx .send(ServerVpn::Packet(packet)) .await - .map_err(|err| AgentError::VpnError(err.to_string()))?; + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; buffer[..len].fill(0); } @@ -318,7 +316,7 @@ impl VpnTask { &mut self, message: ClientVpn, network_configuration: &NetworkConfiguration, - ) -> AgentResult<()> { + ) -> io::Result<()> { match message { // We make connection to the requested address, split the stream into halves with // `io::split`, and put them into respective maps. @@ -328,24 +326,17 @@ impl VpnTask { network_configuration.clone(), )) .await - .map_err(|err| AgentError::VpnError(err.to_string()))?; + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; } ClientVpn::Packet(packet) => { if let Some(socket) = self.socket.as_mut() { - socket - .write(&packet) - .await - .map_err(|err| AgentError::VpnError(err.to_string()))?; + socket.write(&packet).await?; } else { tracing::error!(?packet, "unable to send packet"); } } ClientVpn::OpenSocket => { - self.socket.replace( - create_raw_socket() - .await - .map_err(|err| AgentError::VpnError(err.to_string()))?, - ); + self.socket.replace(create_raw_socket().await?); } } diff --git a/mirrord/agent/src/watched_task.rs b/mirrord/agent/src/watched_task.rs deleted file mode 100644 index 2e7370b262c..00000000000 --- a/mirrord/agent/src/watched_task.rs +++ /dev/null @@ -1,128 +0,0 @@ -use std::future::Future; - -use tokio::sync::watch::{self, Receiver, Sender}; - -use crate::error::{AgentError, AgentResult}; - -/// A shared clonable view on a background task's status. -#[derive(Debug, Clone)] -pub(crate) struct TaskStatus { - /// Name of the task. - task_name: &'static str, - /// Channel to receive the result of the task. - /// Initially, this channel contains [`None`]. - /// Only one value should ever be sent through this channel and it should be [`Some`]. - result_rx: Receiver>>, -} - -impl TaskStatus { - /// Wait for the task to complete and return the error. - /// Can be called multiple times and safely cancelled. - /// - /// # Panics - /// Panic if the task has not failed. - pub async fn unwrap_err(&mut self) -> AgentError { - self.err().await.expect("task did not fail") - } - - /// Wait for the task to complete. - /// If the task has failed, return the error. - /// Can be called multiple times and safely cancelled. - pub async fn err(&mut self) -> Option { - if self.result_rx.borrow().is_none() && self.result_rx.changed().await.is_err() { - return Some(AgentError::BackgroundTaskFailed { - task: self.task_name, - cause: "task panicked".into(), - }); - } - - self.result_rx - .borrow() - .as_ref() - .expect("WatchedTask set an empty status on exit") - .as_ref() - .err() - .map(|e| AgentError::BackgroundTaskFailed { - task: self.task_name, - cause: e.to_string(), - }) - } -} - -/// A wrapper around asynchronous task. -/// Captures the task's status and exposes it through [`TaskStatus`]. -pub(crate) struct WatchedTask { - /// Shared view on the task status. - status: TaskStatus, - /// The task to be executed. - task: F, - /// Channel to send the task result. - result_tx: Sender>>, -} - -impl WatchedTask { - /// Wrap the given task in a new instance of this struct. - pub(crate) fn new(task_name: &'static str, task: F) -> Self { - let (result_tx, result_rx) = watch::channel(None); - - Self { - status: TaskStatus { - task_name, - result_rx, - }, - task, - result_tx, - } - } - - /// Return a shared view over the inner [`TaskStatus`]. - pub(crate) fn status(&self) -> TaskStatus { - self.status.clone() - } -} - -impl WatchedTask -where - T: Future>, -{ - /// Execute the wrapped task. - /// Store its result in the inner [`TaskStatus`]. - pub(crate) async fn start(self) { - let result = self.task.await; - self.result_tx.send(Some(result)).ok(); // All receivers may be dropped. - } -} - -#[cfg(test)] -pub(crate) mod test { - use super::*; - - impl TaskStatus { - pub fn dummy( - task_name: &'static str, - result_rx: Receiver>>, - ) -> Self { - Self { - task_name, - result_rx, - } - } - } - - #[tokio::test] - async fn simple_successful() { - let task = WatchedTask::new("task", async move { Ok(()) }); - let mut status = task.status(); - task.start().await; - assert!(status.err().await.is_none()); - } - - #[tokio::test] - async fn simple_failing() { - let task = WatchedTask::new("task", async move { Err(AgentError::TestError) }); - let mut status = task.status(); - task.start().await; - assert!(status.err().await.is_some()); - status.unwrap_err().await; - } -} diff --git a/mirrord/agent/tests/blackbox.rs b/mirrord/agent/tests/blackbox.rs deleted file mode 100644 index b40d0ea9d2b..00000000000 --- a/mirrord/agent/tests/blackbox.rs +++ /dev/null @@ -1,149 +0,0 @@ -use std::{io::ErrorKind, net::IpAddr, sync::Arc}; - -use actix_codec::Framed; -use futures::SinkExt; -use mirrord_protocol::{ - tcp::{DaemonTcp, LayerTcp, NewTcpConnection, TcpClose, TcpData}, - ClientCodec, ClientMessage, DaemonMessage, -}; -use test_bin::get_test_bin; -use tokio::{ - io::AsyncWriteExt, - net::{TcpListener, TcpStream}, - select, - sync::Mutex, - time::{sleep, Duration}, -}; -use tokio_stream::StreamExt; - -/// This test requires root or CAP_NET_RAW to setup TCP sniffing. -#[tokio::test] -async fn sanity() { - let mut bin = get_test_bin("mirrord-agent"); - // we do wait, not sure what's happened - #[allow(clippy::zombie_processes)] - let child = bin - .arg("-t") - .arg("2") - .arg("-i") - .arg("lo") - .arg("blackbox-test") - .spawn() - .expect("mirrord-agent failed to start"); - // Wait for agent to listen - sleep(Duration::from_millis(2000)).await; - let stream = TcpStream::connect("127.0.0.1:61337") - .await - .expect("connection to agent failed"); - let mutex = Arc::new(Mutex::new(0)); - let task_mutex = Arc::clone(&mutex); - let guard = mutex.lock().await; - let task = tokio::spawn(async move { - let listener = TcpListener::bind("127.0.0.1:1337") - .await - .expect("couldn't bind socket"); - loop { - select! { - Ok((socket, _)) = listener.accept() => { - let mut buf = [0; 4096]; - loop { - match socket.try_read(&mut buf) { - Ok(0) => break, - Ok(_) => {} - Err(ref e) if e.kind() == ErrorKind::WouldBlock => { - sleep(Duration::from_millis(10)).await; - } - Err(e) => panic!("socket error {e:?}") - } - } - }, - _ = task_mutex.lock() => { - break - } - } - } - }); - - let mut codec = Framed::new(stream, ClientCodec::default()); - let subscription_port = 1337; - - codec - .send(ClientMessage::Tcp(LayerTcp::PortSubscribe( - subscription_port, - ))) - .await - .expect("port subscribe failed"); - assert!(matches!( - codec - .next() - .await - .expect("couldn't get next message") - .expect("got invalid message"), - DaemonMessage::Tcp(DaemonTcp::SubscribeResult(Ok(_))) - )); - let mut test_conn = TcpStream::connect("127.0.0.1:1337") - .await - .expect("connection to dummy failed"); - let port = test_conn.local_addr().unwrap().port(); - let test_data = [0, 3, 5]; - test_conn - .write_all(&test_data) - .await - .expect("couldn't write test data"); - drop(test_conn); - let new_conn_msg = codec - .next() - .await - .expect("couldn't get next message") - .expect("got invalid message"); - let data_msg = codec - .next() - .await - .expect("couldn't get next message") - .expect("got invalid message"); - let close_msg = codec - .next() - .await - .expect("couldn't get next message") - .expect("got invalid message"); - assert_eq!( - new_conn_msg, - DaemonMessage::Tcp(DaemonTcp::NewConnection(NewTcpConnection { - connection_id: 0, - remote_address: IpAddr::V4("127.0.0.1".parse().unwrap()), - local_address: IpAddr::V4("127.0.0.1".parse().unwrap()), - destination_port: 1337, - source_port: port - })) - ); - - assert_eq!( - data_msg, - DaemonMessage::Tcp(DaemonTcp::Data(TcpData { - connection_id: 0, - bytes: test_data.to_vec() - })) - ); - - assert_eq!( - close_msg, - DaemonMessage::Tcp(DaemonTcp::Close(TcpClose { connection_id: 0 })) - ); - - drop(codec); - drop(guard); - drop(mutex); - - task.await.unwrap(); - let result = child.wait_with_output().unwrap(); - assert!(result.status.success()); - - let stderr = String::from_utf8_lossy(&result.stderr); - println!("stderr: {stderr:?}"); - - let stdout = String::from_utf8_lossy(&result.stdout); - println!("stdout: {stdout:?}"); - - assert!(!stderr.to_ascii_lowercase().contains("error")); - assert!(!stdout.to_ascii_lowercase().contains("error")); -} From d713955a127281a19f753da9e3a9ec0bf60d00aa Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Thu, 17 Apr 2025 15:58:42 +0200 Subject: [PATCH 3/5] Fixed passthrough address --- mirrord/agent/src/steal/connection.rs | 15 +-------------- mirrord/agent/src/steal/connections.rs | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/mirrord/agent/src/steal/connection.rs b/mirrord/agent/src/steal/connection.rs index cf7d73774dc..788111e69f2 100644 --- a/mirrord/agent/src/steal/connection.rs +++ b/mirrord/agent/src/steal/connection.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{HashMap, HashSet}, - net::{IpAddr, Ipv4Addr, Ipv6Addr}, -}; +use std::collections::{HashMap, HashSet}; use fancy_regex::Regex; use futures::{stream::FuturesUnordered, StreamExt}; @@ -459,16 +456,6 @@ impl TcpConnectionStealer { connection: RedirectedConnection, port_subscription: PortSubscription, ) -> AgentResult<()> { - let mut real_address = connection.destination(); - let localhost = if real_address.is_ipv6() { - IpAddr::V6(Ipv6Addr::LOCALHOST) - } else { - IpAddr::V4(Ipv4Addr::LOCALHOST) - }; - // If we use the original IP we would go through prerouting and hit a loop. - // localhost should always work. - real_address.set_ip(localhost); - let stolen_connection = StolenConnection { connection, port_subscription, diff --git a/mirrord/agent/src/steal/connections.rs b/mirrord/agent/src/steal/connections.rs index 3bcf0909953..69b7c61619d 100644 --- a/mirrord/agent/src/steal/connections.rs +++ b/mirrord/agent/src/steal/connections.rs @@ -1,7 +1,12 @@ //! Home for [`StolenConnections`] - manager for connections that were stolen based on active port //! subscriptions. -use std::{collections::HashMap, fmt, io, time::Duration}; +use std::{ + collections::HashMap, + fmt, io, + net::{IpAddr, Ipv4Addr, Ipv6Addr}, + time::Duration, +}; use hyper::{body::Incoming, Request, Response}; use mirrord_protocol::{ @@ -467,7 +472,16 @@ impl ConnectionTask { port_subscription, } = self.connection; - let destination = connection.destination(); + let mut destination = connection.destination(); + let localhost = if destination.is_ipv6() { + IpAddr::V6(Ipv6Addr::LOCALHOST) + } else { + IpAddr::V4(Ipv4Addr::LOCALHOST) + }; + // If we use the original IP we would go through prerouting and hit a loop. + // localhost should always work. + destination.set_ip(localhost); + let source = connection.source(); let filters = match port_subscription { From 8ac300a6658ddfa0a06fdeee8e744db692ae6be5 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Thu, 17 Apr 2025 16:00:39 +0200 Subject: [PATCH 4/5] Rollback multithread --- mirrord/agent/src/entrypoint.rs | 14 +++----------- mirrord/agent/src/main.rs | 8 +++----- mirrord/agent/src/util/remote_runtime.rs | 4 +--- 3 files changed, 7 insertions(+), 19 deletions(-) diff --git a/mirrord/agent/src/entrypoint.rs b/mirrord/agent/src/entrypoint.rs index b2e2cd29b57..967f2e01536 100644 --- a/mirrord/agent/src/entrypoint.rs +++ b/mirrord/agent/src/entrypoint.rs @@ -54,7 +54,6 @@ use crate::{ remote_runtime::{BgTaskRuntime, BgTaskStatus, RemoteRuntime}, ClientId, }, - TOKIO_WORKER_THREADS, }; mod setup; @@ -825,7 +824,7 @@ async fn start_iptable_guard(args: Args) -> AgentResult<()> { /// This weird flow is a safety measure - should the real agent OOM (which means instant process /// termination) or be killed with a signal, the parent will a chance to clean iptables. If we leave /// the iptables dirty, the whole target pod is broken, probably forever. -pub fn main() -> AgentResult<()> { +pub async fn main() -> AgentResult<()> { rustls::crypto::CryptoProvider::install_default(rustls::crypto::aws_lc_rs::default_provider()) .expect("Failed to install crypto provider"); @@ -864,15 +863,8 @@ pub fn main() -> AgentResult<()> { && std::env::var(IPTABLE_MESH_ENV).is_ok() && std::env::var(IPTABLE_STANDARD_ENV).is_ok()) { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(TOKIO_WORKER_THREADS) - .build()? - .block_on(start_agent(args)) + start_agent(args).await } else { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()? - .block_on(start_iptable_guard(args)) + start_iptable_guard(args).await } } diff --git a/mirrord/agent/src/main.rs b/mirrord/agent/src/main.rs index c70d930d9d0..2f1cf43c42f 100644 --- a/mirrord/agent/src/main.rs +++ b/mirrord/agent/src/main.rs @@ -26,9 +26,7 @@ mod steal; mod util; mod vpn; -/// Number of worker threads we use per [`tokio::runtime`]. -const TOKIO_WORKER_THREADS: usize = 8; - -fn main() -> crate::error::AgentResult<()> { - crate::entrypoint::main() +#[tokio::main(flavor = "current_thread")] +async fn main() -> crate::error::AgentResult<()> { + crate::entrypoint::main().await } diff --git a/mirrord/agent/src/util/remote_runtime.rs b/mirrord/agent/src/util/remote_runtime.rs index 851c41d1484..48e82ce9e16 100644 --- a/mirrord/agent/src/util/remote_runtime.rs +++ b/mirrord/agent/src/util/remote_runtime.rs @@ -30,7 +30,6 @@ use super::error::{BgTaskPanicked, RemoteRuntimeError}; use crate::{ error::AgentError, namespace::{self, NamespaceType}, - TOKIO_WORKER_THREADS, }; /// A cloneable handle to a remote [`tokio::runtime::Runtime`] that runs in its own thread. @@ -61,9 +60,8 @@ impl RemoteRuntime { return; } - let rt_result = tokio::runtime::Builder::new_multi_thread() + let rt_result = tokio::runtime::Builder::new_current_thread() .enable_all() - .worker_threads(TOKIO_WORKER_THREADS) .thread_name(format!( "remote-{target_pid}-{namespace_type}-runtime-worker" )) From 67dca5b2340d9299c81ed4af6af64435bf1e5541 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Thu, 17 Apr 2025 16:01:24 +0200 Subject: [PATCH 5/5] Changelog --- changelog.d/+oom-issue.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/+oom-issue.fixed.md diff --git a/changelog.d/+oom-issue.fixed.md b/changelog.d/+oom-issue.fixed.md new file mode 100644 index 00000000000..114cd57bf6f --- /dev/null +++ b/changelog.d/+oom-issue.fixed.md @@ -0,0 +1 @@ +Fixed an issue where mirrord-agent was being OOM killed due to a traffic redirection loop.