diff --git a/Cargo.lock b/Cargo.lock index c559f0b64..6b84f4154 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -150,9 +150,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.80" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" +checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" [[package]] name = "arc-swap" @@ -247,7 +247,7 @@ dependencies = [ "fastrand", "hex", "http 0.2.12", - "hyper", + "hyper 0.14.28", "ring", "time", "tokio", @@ -283,7 +283,7 @@ dependencies = [ "bytes", "fastrand", "http 0.2.12", - "http-body", + "http-body 0.4.6", "percent-encoding", "pin-project-lite", "tracing", @@ -404,9 +404,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcf7f09a27286d84315dfb9346208abb3b0973a692454ae6d0bc8d803fcce3b4" +checksum = "d26ea8fa03025b2face2b3038a63525a10891e3d8829901d502e5384a0d8cd46" dependencies = [ "futures-util", "pin-project-lite", @@ -415,9 +415,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.6" +version = "0.60.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6ca214a6a26f1b7ebd63aa8d4f5e2194095643023f9608edf99a58247b9d80d" +checksum = "3f10fa66956f01540051b0aa7ad54574640f748f9839e843442d99b970d3aff9" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -425,7 +425,7 @@ dependencies = [ "bytes-utils", "futures-core", "http 0.2.12", - "http-body", + "http-body 0.4.6", "once_cell", "percent-encoding", "pin-project-lite", @@ -435,18 +435,18 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.60.6" +version = "0.60.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1af80ecf3057fb25fe38d1687e94c4601a7817c6a1e87c1b0635f7ecb644ace5" +checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" dependencies = [ "aws-smithy-types", ] [[package]] name = "aws-smithy-query" -version = "0.60.6" +version = "0.60.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb27084f72ea5fc20033efe180618677ff4a2f474b53d84695cfe310a6526cbc" +checksum = "f2fbd61ceb3fe8a1cb7352e42689cec5335833cd9f94103a61e98f9bb61c64bb" dependencies = [ "aws-smithy-types", "urlencoding", @@ -454,9 +454,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbb5fca54a532a36ff927fbd7407a7c8eb9c3b4faf72792ba2965ea2cad8ed55" +checksum = "ec81002d883e5a7fd2bb063d6fb51c4999eb55d404f4fff3dd878bf4733b9f01" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -464,10 +464,10 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "h2", + "h2 0.3.24", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-rustls", "once_cell", "pin-project-lite", @@ -479,9 +479,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.1.7" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22389cb6f7cac64f266fb9f137745a9349ced7b47e0d2ba503e9e40ede4f7060" +checksum = "9acb931e0adaf5132de878f1398d83f8677f90ba70f01f65ff87f6d7244be1c5" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -496,16 +496,16 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f081da5481210523d44ffd83d9f0740320050054006c719eae0232d411f024d3" +checksum = "abe14dceea1e70101d38fbf2a99e6a34159477c0fb95e68e05c66bd7ae4c3729" dependencies = [ "base64-simd", "bytes", "bytes-utils", "futures-core", "http 0.2.12", - "http-body", + "http-body 0.4.6", "itoa", "num-integer", "pin-project-lite", @@ -519,9 +519,9 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.6" +version = "0.60.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fccd8f595d0ca839f9f2548e66b99514a85f92feb4c01cf2868d93eb4888a42" +checksum = "872c68cf019c0e4afc5de7753c4f7288ce4b71663212771bf5e4542eb9346ca9" dependencies = [ "xmlparser", ] @@ -561,6 +561,58 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1236b4b292f6c4d6dc34604bb5120d85c3fe1d1aa596bd5cc52ca054d13e7b9e" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.2.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -716,9 +768,9 @@ dependencies = [ [[package]] name = "bytemuck_derive" -version = "1.5.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "965ab7eb5f8f97d2a083c799f3a1b994fc397b2fe2da5d1da1626ce15a39f2b1" +checksum = "4da9a32f3fed317401fa3c862968128267c3106685286e15d5aaa3d7389c2f60" dependencies = [ "proc-macro2", "quote", @@ -1737,9 +1789,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "erased-serde" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "388979d208a049ffdfb22fa33b9c81942215b940910bccfe258caeb25d125cb3" +checksum = "2b73807008a3c7f171cc40312f37d95ef0396e048b5848d775f54b1a4dd4a0d3" dependencies = [ "serde", ] @@ -2104,6 +2156,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.1.0", + "indexmap 2.2.5", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.4.0" @@ -2229,6 +2300,29 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +dependencies = [ + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.8.0" @@ -2251,9 +2345,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.24", "http 0.2.12", - "http-body", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -2265,6 +2359,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.2", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -2273,7 +2387,7 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.12", - "hyper", + "hyper 0.14.28", "log", "rustls 0.21.10", "rustls-native-certs 0.6.3", @@ -2288,12 +2402,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.28", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.2.0", + "pin-project-lite", + "socket2 0.5.6", + "tokio", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -2627,6 +2757,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md5" version = "0.7.0" @@ -3210,6 +3346,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -3351,9 +3507,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.78" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" dependencies = [ "unicode-ident", ] @@ -3683,9 +3839,9 @@ checksum = "e898588f33fdd5b9420719948f9f2a32c922a246964576f71ba7f24f80610fbc" [[package]] name = "reqwest" -version = "0.11.24" +version = "0.11.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" +checksum = "78bf93c4af7a8bb7d879d51cebe797356ff10ae8516ace542b5182d9dcac10b2" dependencies = [ "async-compression", "base64 0.21.7", @@ -3693,10 +3849,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.24", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-tls", "ipnet", "js-sys", @@ -4241,9 +4397,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.6.1" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15d167997bd841ec232f5b2b8e0e26606df2e7caa4c31b95ea9ca52b200bd270" +checksum = "ee80b0e361bbf88fd2f6e242ccd19cfda072cb0faa6ae694ecee08199938569a" dependencies = [ "base64 0.21.7", "chrono", @@ -4259,9 +4415,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.6.1" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "865f9743393e638991566a8b7a479043c2c8da94a33e0a31f18214c9cae0a64d" +checksum = "6561dc161a9224638a31d876ccdfefbc1df91d3f3a8342eddb35f055d48c7655" dependencies = [ "darling 0.20.8", "proc-macro2", @@ -4349,6 +4505,7 @@ dependencies = [ "atomic_enum", "aws-config", "aws-sdk-kms", + "axum", "backtrace", "backtrace-ext", "base64 0.22.0", @@ -4375,7 +4532,6 @@ dependencies = [ "hex-literal", "http 1.1.0", "httparse", - "hyper", "itertools 0.12.1", "kafka-protocol", "lz4_flex", @@ -4580,9 +4736,9 @@ dependencies = [ [[package]] name = "ssh-key" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01f8f4ea73476c0aa5d5e6a75ce1e8634e2c3f82005ef3bbed21547ac57f2bf7" +checksum = "3b71299a724c8d84956caaf8fc3b3ea57c3587fe2d0b800cd0dc1f3599905d7e" dependencies = [ "ed25519-dalek", "p256", @@ -4634,11 +4790,11 @@ checksum = "cae14b91c7d11c9a851d3fbc80a963198998c2a64eec840477fa92d8ce9b70bb" [[package]] name = "strum" -version = "0.26.1" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "723b93e8addf9aa965ebe2d11da6d7540fa2283fcea14b3371ff055f7ba13f5f" +checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" dependencies = [ - "strum_macros 0.26.1", + "strum_macros 0.26.2", ] [[package]] @@ -4656,9 +4812,9 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.26.1" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a3417fc93d76740d974a01654a09777cb500428cc874ca9f45edfe0c4d4cd18" +checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946" dependencies = [ "heck 0.4.1", "proc-macro2", @@ -4775,18 +4931,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.57" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" +checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.57" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" +checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", @@ -5030,6 +5186,27 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -5649,7 +5826,7 @@ dependencies = [ "console", "copy_dir", "serde", - "strum 0.26.1", + "strum 0.26.2", "time", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 8f60f193d..5a7e9d9c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,5 +56,4 @@ typetag = "0.2.5" aws-throwaway = { version = "0.6.0", default-features = false } tokio-bin-process = "0.5.0" ordered-float = { version = "4.0.0", features = ["serde"] } -hyper = { version = "0.14.14", features = ["server", "tcp", "http1"] } shell-quote = { default-features = false, version = "0.5.0" } diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index a5c295c48..f3be169de 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -49,6 +49,7 @@ default = ["cassandra", "redis", "kafka", "opensearch"] [dependencies] atomic_enum = "0.3.0" +axum = { version = "0.7", default-features = false, features = ["tokio", "tracing", "http1"] } pretty-hex = "0.4.0" tokio-stream = "0.1.2" bytes-utils = { version = "0.1.1", optional = true } @@ -97,7 +98,6 @@ metrics-exporter-prometheus = { version = "0.13.0", default-features = false } tracing.workspace = true tracing-subscriber.workspace = true tracing-appender.workspace = true -hyper.workspace = true halfbrown = { version = "0.2.1", optional = true } # Transform dependencies diff --git a/shotover/src/http.rs b/shotover/src/http.rs new file mode 100644 index 000000000..f637a8b8b --- /dev/null +++ b/shotover/src/http.rs @@ -0,0 +1,27 @@ +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; + +// Make our own error that wraps `anyhow::Error`. +pub(crate) struct HttpServerError(pub anyhow::Error); + +// Tell axum how to convert `AppError` into a response. +impl IntoResponse for HttpServerError { + fn into_response(self) -> Response { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("HTTP 500 error: {}", self.0), + ) + .into_response() + } +} + +// This enables using `?` on functions that return `Result<_, anyhow::Error>` to turn them into +// `Result<_, AppError>`. That way you don't need to do that manually. +impl From for HttpServerError +where + E: Into, +{ + fn from(err: E) -> Self { + Self(err.into()) + } +} diff --git a/shotover/src/lib.rs b/shotover/src/lib.rs index 44180d51c..a80b70749 100644 --- a/shotover/src/lib.rs +++ b/shotover/src/lib.rs @@ -55,6 +55,7 @@ pub mod codec; pub mod config; mod connection_span; pub mod frame; +mod http; pub mod message; mod observability; pub mod runner; diff --git a/shotover/src/observability/mod.rs b/shotover/src/observability/mod.rs index dd96acfe8..4728089e8 100644 --- a/shotover/src/observability/mod.rs +++ b/shotover/src/observability/mod.rs @@ -1,12 +1,8 @@ +use crate::http::HttpServerError; use crate::runner::ReloadHandle; use anyhow::{anyhow, Context, Result}; -use bytes::Bytes; -use hyper::{ - service::{make_service_fn, service_fn}, - Method, Request, StatusCode, {Body, Response, Server}, -}; +use axum::{extract::State, response::Html, Router}; use metrics_exporter_prometheus::PrometheusHandle; -use std::convert::Infallible; use std::str; use std::{net::SocketAddr, sync::Arc}; use tracing::{error, trace}; @@ -18,21 +14,6 @@ pub(crate) struct LogFilterHttpExporter { tracing_handle: ReloadHandle, } -/// Sets the `tracing_suscriber` filter level to the value of `bytes` on `handle` -fn set_filter(bytes: Bytes, handle: &ReloadHandle) -> Result<()> { - let body = str::from_utf8(bytes.as_ref())?; - trace!(request.body = ?body); - let new_filter = body.parse::()?; - handle.reload(new_filter) -} - -fn rsp(status: StatusCode, body: impl Into) -> Response { - Response::builder() - .status(status) - .body(body.into()) - .expect("builder with known status code must not fail") -} - impl LogFilterHttpExporter { /// Creates a new [`LogFilterHttpExporter`] that listens on the given `address`. /// @@ -58,55 +39,46 @@ impl LogFilterHttpExporter { } async fn async_run_inner(self) -> Result<()> { - let recorder_handle = Arc::new(self.recorder_handle); - let tracing_handle = Arc::new(self.tracing_handle); + let state = AppState { + recorder_handle: Arc::new(self.recorder_handle), + tracing_handle: Arc::new(self.tracing_handle), + }; - let make_svc = make_service_fn(move |_| { - let recorder_handle = recorder_handle.clone(); - let tracing_handle = tracing_handle.clone(); - - async move { - Ok::<_, Infallible>(service_fn(move |req: Request| { - let recorder_handle = recorder_handle.clone(); - let tracing_handle = tracing_handle.clone(); - - async move { - let response = match (req.method(), req.uri().path()) { - (&Method::GET, "/metrics") => { - Response::new(Body::from(recorder_handle.as_ref().render())) - } - (&Method::PUT, "/filter") => { - trace!("setting filter"); - match hyper::body::to_bytes(req).await { - Ok(body) => match set_filter(body, &tracing_handle) { - Err(error) => { - error!(?error, "setting filter failed!"); - rsp( - StatusCode::INTERNAL_SERVER_ERROR, - format!("{:?}", error), - ) - } - Ok(()) => rsp(StatusCode::NO_CONTENT, Body::empty()), - }, - Err(error) => { - error!(%error, "setting filter failed - Couldn't read bytes"); - rsp(StatusCode::INTERNAL_SERVER_ERROR, format!("{error:?}")) - } - } - } - _ => rsp(StatusCode::NOT_FOUND, "try '/filter' or `/metrics`"), - }; - Ok::<_, Infallible>(response) - } - })) - } - }); + let app = Router::new() + .route("/", axum::routing::get(root)) + .route("/metrics", axum::routing::get(serve_metrics)) + .route("/filter", axum::routing::put(put_filter)) + .with_state(state); let address = self.address; - Server::try_bind(&address) - .with_context(|| format!("Failed to bind to {}", address))? - .serve(make_svc) + let listener = tokio::net::TcpListener::bind(address) .await - .map_err(|e| anyhow!(e)) + .with_context(|| format!("Failed to bind to {}", address))?; + axum::serve(listener, app).await.map_err(|e| anyhow!(e)) } } + +async fn root() -> Html<&'static str> { + Html("try /filter or /metrics") +} + +async fn serve_metrics(State(state): State) -> Html { + Html(state.recorder_handle.as_ref().render()) +} + +async fn put_filter( + State(state): State, + new_filter_string: String, +) -> Result, HttpServerError> { + trace!("setting filter to: {new_filter_string}"); + let new_filter = new_filter_string.parse::()?; + state.tracing_handle.reload(new_filter)?; + tracing::info!("filter set to: {new_filter_string}"); + Ok(Html("Filter set")) +} + +#[derive(Clone)] +struct AppState { + tracing_handle: Arc, + recorder_handle: Arc, +} diff --git a/shotover/src/transforms/tee.rs b/shotover/src/transforms/tee.rs index 97fff9f85..adb9b538e 100644 --- a/shotover/src/transforms/tee.rs +++ b/shotover/src/transforms/tee.rs @@ -1,22 +1,21 @@ use super::{TransformContextBuilder, TransformContextConfig}; use crate::config::chain::TransformChainConfig; +use crate::http::HttpServerError; use crate::message::{Message, MessageIdMap, Messages}; use crate::transforms::chain::{BufferedChain, TransformChainBuilder}; use crate::transforms::{Transform, TransformBuilder, TransformConfig, Wrapper}; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use atomic_enum::atomic_enum; -use bytes::Bytes; -use hyper::{ - service::{make_service_fn, service_fn}, - Method, Request, StatusCode, {Body, Response, Server}, -}; +use axum::extract::State; +use axum::response::Html; +use axum::Router; use metrics::{counter, Counter}; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::fmt; use std::sync::atomic::Ordering; -use std::{convert::Infallible, net::SocketAddr, str, sync::Arc}; +use std::{net::SocketAddr, str, sync::Arc}; use tracing::{debug, error, trace, warn}; pub struct TeeBuilder { @@ -515,36 +514,6 @@ impl ChainSwitchListener { Self { address } } - fn rsp(status: StatusCode, body: impl Into) -> Response { - Response::builder() - .status(status) - .body(body.into()) - .expect("builder with known status code must not fail") - } - - async fn set_result_source_chain( - body: Bytes, - result_source: Arc, - ) -> Result<()> { - let new_result_source = str::from_utf8(body.as_ref())?; - - let new_value = match new_result_source { - "tee-chain" => ResultSource::TeeChain, - "regular-chain" => ResultSource::RegularChain, - _ => { - return Err(anyhow!( - r"Invalid value for result source: {}, should be 'tee-chain' or 'regular-chain'", - new_result_source - )) - } - }; - - debug!("Setting result source to {}", new_value); - - result_source.store(new_value, Ordering::Relaxed); - Ok(()) - } - async fn async_run(self, result_source: Arc) { if let Err(err) = self.async_run_inner(result_source).await { error!("Error in ChainSwitchListener: {}", err); @@ -552,67 +521,61 @@ impl ChainSwitchListener { } async fn async_run_inner(self, result_source: Arc) -> Result<()> { - let make_svc = make_service_fn(move |_| { - let result_source = result_source.clone(); - async move { - Ok::<_, Infallible>(service_fn(move |req: Request| { - let result_source = result_source.clone(); - async move { - let response = match (req.method(), req.uri().path()) { - (&Method::GET, "/transform/tee/result-source") => { - let result_source: ResultSource = - result_source.load(Ordering::Relaxed); - Self::rsp(StatusCode::OK, result_source.to_string()) - } - (&Method::PUT, "/transform/tee/result-source") => { - match hyper::body::to_bytes(req.into_body()).await { - Ok(body) => { - match Self::set_result_source_chain( - body, - result_source.clone(), - ) - .await - { - Err(error) => { - error!(?error, "setting result source failed"); - Self::rsp( - StatusCode::BAD_REQUEST, - format!( - "setting result source failed: {error}" - ), - ) - } - Ok(()) => Self::rsp(StatusCode::OK, Body::empty()), - } - } - Err(error) => { - error!(%error, "setting result source failed - Couldn't read bytes"); - Self::rsp( - StatusCode::INTERNAL_SERVER_ERROR, - format!("{error:?}"), - ) - } - } - } - _ => { - Self::rsp(StatusCode::NOT_FOUND, "try /tranform/tee/result-source") - } - }; - Ok::<_, Infallible>(response) - } - })) - } - }); + let app = Router::new() + .route("/", axum::routing::get(root)) + .route( + "/transform/tee/result-source", + axum::routing::get(get_result_source), + ) + .route( + "/transform/tee/result-source", + axum::routing::put(put_result_source), + ) + .with_state(AppState { result_source }); let address = self.address; - Server::try_bind(&address) - .with_context(|| format!("Failed to bind to {}", address))? - .serve(make_svc) + let listener = tokio::net::TcpListener::bind(address) .await - .map_err(|e| anyhow!(e)) + .with_context(|| format!("Failed to bind to {}", address))?; + axum::serve(listener, app).await.map_err(|e| anyhow!(e)) } } +async fn root() -> Html<&'static str> { + Html("try /transform/tee/result-source") +} + +async fn get_result_source(State(state): State) -> Html { + let result_source: ResultSource = state.result_source.load(Ordering::Relaxed); + Html(result_source.to_string()) +} + +async fn put_result_source( + State(state): State, + new_result_source: String, +) -> Result<(), HttpServerError> { + let new_value = match new_result_source.as_str() { + "tee-chain" => ResultSource::TeeChain, + "regular-chain" => ResultSource::RegularChain, + _ => { + return Err(HttpServerError(anyhow!( + r"Invalid value for result source: {:?}, should be 'tee-chain' or 'regular-chain'", + new_result_source + ))); + } + }; + + state.result_source.store(new_value, Ordering::Relaxed); + tracing::info!("result source set to {new_value}"); + + Ok(()) +} + +#[derive(Clone)] +struct AppState { + result_source: Arc, +} + #[cfg(all(test, feature = "redis"))] mod tests { use super::*;