Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record TCP connection local socket address in metadata #3286

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Record TCP connection local socket address in metadata
**Description**
 - Add a new optional `local_addr` field in the `ConnectionMetadata`
   struct.
 - Transfer the `local_addr` `SocketAddress` from the
   `hyper::HttpInfo` to the `ConnectionMetadata` field.
 - Add to the `trace-serialize` example program so that it will print
   out the capture connection values.
 - Add a new builder type to construct the `ConnectionMetadata` type

**Motivation**
I want to use this field to uniquely identify TCP connection based
on their `local_addr` + `remote_addr`.

See awslabs/aws-sdk-rust#990 for additional
motivation for this change.

Needed a new builder type so that adding new fields is backwards
compatible.

**Testing Done**
`cargo test` in `rust-runtime/aws-smithy-runtime-api` and
`aws-smithy-runtime`.

Also ran:
```
thedeck@c889f3b04fb0 examples % cargo run --example trace-serialize
    Finished dev [unoptimized + debuginfo] target(s) in 0.13s
     Running `/Users/thedeck/repos/github/declanvk/smithy-rs/target/debug/examples/trace-serialize`
2023-12-06T00:13:15.605555Z  INFO lazy_load_identity: aws_smithy_runtime::client::identity::cache::lazy: identity cache miss occurred; added new identity (took Ok(296µs))
2023-12-06T00:13:15.608344Z  INFO trace_serialize: Response received: response=Response { status: StatusCode(200), headers: Headers { headers: {"content-type": HeaderValue { _private: "application/json" }, "content-length": HeaderValue { _private: "17" }, "date": HeaderValue { _private: "Wed, 06 Dec 2023 00:13:15 GMT" }} }, body: SdkBody { inner: BoxBody, retryable: false }, extensions: Extensions }
2023-12-06T00:13:15.608388Z  INFO trace_serialize: Captured connection info remote_addr=Some(127.0.0.1:13734) local_addr=Some(127.0.0.1:50199)
2023-12-06T00:13:15.608511Z  INFO trace_serialize: Response received POKEMON_SERVICE_URL=http://localhost:13734 response=GetServerStatisticsOutput { calls_count: 0 }
```

You can see the log line with "Captured connection info" contains the
`remote_addr` and the `local_addr` fields.
Declan Kelly committed Dec 6, 2023
commit 5dc3f7cbac8a1a3f41ef75cb11eed36275ccff5c
6 changes: 6 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
@@ -73,3 +73,9 @@ message = "Fix documentation and examples on HyperConnector and HyperClientBuild
references = ["smithy-rs#3282"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" }
author = "jdisanti"

[[smithy-rs]]
message = "Expose local socket address from ConnectionMetadata."
references = ["aws-sdk-rust#990"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" }
author = "declanvk"
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use aws_smithy_runtime::client::http::connection_poisoning::CaptureSmithyConnection;
/// This example demonstrates how an interceptor can be written to trace what is being
/// serialized / deserialized on the wire.
///
@@ -59,7 +60,7 @@ impl Intercept for WireFormatInterceptor {
&self,
context: &BeforeDeserializationInterceptorContextRef<'_>,
_runtime_components: &RuntimeComponents,
_cfg: &mut ConfigBag,
cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
// Get the response type from the context.
let response = context.response();
@@ -70,6 +71,18 @@ impl Intercept for WireFormatInterceptor {
tracing::error!(?response);
}

// Print the connection information
let captured_connection = cfg.load::<CaptureSmithyConnection>().cloned();
if let Some(captured_connection) = captured_connection.and_then(|conn| conn.get()) {
tracing::info!(
remote_addr = ?captured_connection.remote_addr(),
local_addr = ?captured_connection.local_addr(),
"Captured connection info"
);
} else {
tracing::warn!("Connection info is missing!");
}

Ok(())
}
}
204 changes: 204 additions & 0 deletions rust-runtime/aws-smithy-runtime-api/src/client/connection.rs
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ use std::sync::Arc;
pub struct ConnectionMetadata {
is_proxied: bool,
remote_addr: Option<SocketAddr>,
local_addr: Option<SocketAddr>,
poison_fn: Arc<dyn Fn() + Send + Sync>,
}

@@ -25,6 +26,10 @@ impl ConnectionMetadata {
}

/// Create a new [`ConnectionMetadata`].
#[deprecated(
since = "1.1.0",
note = "`ConnectionMetadata::new` is deprecated in favour of `ConnectionMetadata::builder`."
)]
pub fn new(
is_proxied: bool,
remote_addr: Option<SocketAddr>,
@@ -33,21 +38,220 @@ impl ConnectionMetadata {
Self {
is_proxied,
remote_addr,
// need to use builder to set this field
local_addr: None,
poison_fn: Arc::new(poison),
}
}

/// Builder for this connection metadata
pub fn builder() -> ConnectionMetadataBuilder {
ConnectionMetadataBuilder::new()
}

/// Get the remote address for this connection, if one is set.
pub fn remote_addr(&self) -> Option<SocketAddr> {
self.remote_addr
}

/// Get the local address for this connection, if one is set.
pub fn local_addr(&self) -> Option<SocketAddr> {
self.local_addr
}
}

impl Debug for ConnectionMetadata {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SmithyConnection")
.field("is_proxied", &self.is_proxied)
.field("remote_addr", &self.remote_addr)
.field("local_addr", &self.local_addr)
.finish()
}
}

/// Builder type that is used to construct a [`ConnectionMetadata`] value.
#[derive(Default)]
pub struct ConnectionMetadataBuilder {
is_proxied: Option<bool>,
remote_addr: Option<SocketAddr>,
local_addr: Option<SocketAddr>,
poison_fn: Option<Arc<dyn Fn() + Send + Sync>>,
}

impl Debug for ConnectionMetadataBuilder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConnectionMetadataBuilder")
.field("is_proxied", &self.is_proxied)
.field("remote_addr", &self.remote_addr)
.field("local_addr", &self.local_addr)
.finish()
}
}

impl ConnectionMetadataBuilder {
/// Creates a new builder.
pub fn new() -> Self {
Self::default()
}

/// Set whether or not the associated connection is to an HTTP proxy.
pub fn proxied(mut self, proxied: bool) -> Self {
self.set_proxied(Some(proxied));
self
}

/// Set whether or not the associated connection is to an HTTP proxy.
pub fn set_proxied(&mut self, proxied: Option<bool>) -> &mut Self {
self.is_proxied = proxied;
self
}

/// Set the remote address of the connection used.
pub fn remote_addr(mut self, remote_addr: SocketAddr) -> Self {
self.set_remote_addr(Some(remote_addr));
self
}

/// Set the remote address of the connection used.
pub fn set_remote_addr(&mut self, remote_addr: Option<SocketAddr>) -> &mut Self {
self.remote_addr = remote_addr;
self
}

/// Set the local address of the connection used.
pub fn local_addr(mut self, local_addr: SocketAddr) -> Self {
self.set_local_addr(Some(local_addr));
self
}

/// Set the local address of the connection used.
pub fn set_local_addr(&mut self, local_addr: Option<SocketAddr>) -> &mut Self {
self.local_addr = local_addr;
self
}

/// Set a closure which will poison the associated connection.
///
/// A poisoned connection will not be reused for subsequent requests by the pool
pub fn poison_fn(mut self, poison_fn: impl Fn() + Send + Sync + 'static) -> Self {
self.set_poison_fn(Some(poison_fn));
self
}

/// Set a closure which will poison the associated connection.
///
/// A poisoned connection will not be reused for subsequent requests by the pool
pub fn set_poison_fn(
&mut self,
poison_fn: Option<impl Fn() + Send + Sync + 'static>,
) -> &mut Self {
self.poison_fn =
poison_fn.map(|poison_fn| Arc::new(poison_fn) as Arc<dyn Fn() + Send + Sync>);
self
}

/// Build a [`ConnectionMetadata`] value.
///
/// # Panics
///
/// If either the `is_proxied` or `poison_fn` has not been set, then this method will panic
pub fn build(self) -> ConnectionMetadata {
ConnectionMetadata {
is_proxied: self
.is_proxied
.expect("is_proxied should be set for ConnectionMetadata"),
remote_addr: self.remote_addr,
local_addr: self.local_addr,
poison_fn: self
.poison_fn
.expect("poison_fn should be set for ConnectionMetadata"),
}
}
}

#[cfg(test)]
mod tests {
use std::{
net::{IpAddr, Ipv6Addr},
sync::Mutex,
};

use super::*;

const TEST_SOCKET_ADDR: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 100);

#[test]
#[should_panic]
fn builder_panic_missing_proxied() {
ConnectionMetadataBuilder::new()
.poison_fn(|| {})
.local_addr(TEST_SOCKET_ADDR)
.remote_addr(TEST_SOCKET_ADDR)
.build();
}

#[test]
#[should_panic]
fn builder_panic_missing_poison_fn() {
ConnectionMetadataBuilder::new()
.proxied(true)
.local_addr(TEST_SOCKET_ADDR)
.remote_addr(TEST_SOCKET_ADDR)
.build();
}

#[test]
fn builder_all_fields_successful() {
let mutable_flag = Arc::new(Mutex::new(false));

let connection_metadata = ConnectionMetadataBuilder::new()
.proxied(true)
.local_addr(TEST_SOCKET_ADDR)
.remote_addr(TEST_SOCKET_ADDR)
.poison_fn({
let mutable_flag = Arc::clone(&mutable_flag);
move || {
let mut guard = mutable_flag.lock().unwrap();
*guard = !*guard;
}
})
.build();

assert_eq!(connection_metadata.is_proxied, true);
assert_eq!(connection_metadata.remote_addr(), Some(TEST_SOCKET_ADDR));
assert_eq!(connection_metadata.local_addr(), Some(TEST_SOCKET_ADDR));
assert_eq!(*mutable_flag.lock().unwrap(), false);
connection_metadata.poison();
assert_eq!(*mutable_flag.lock().unwrap(), true);
}

#[test]
fn builder_optional_fields_translate() {
let metadata1 = ConnectionMetadataBuilder::new()
.proxied(true)
.poison_fn(|| {})
.build();

assert_eq!(metadata1.local_addr(), None);
assert_eq!(metadata1.remote_addr(), None);

let metadata2 = ConnectionMetadataBuilder::new()
.proxied(true)
.poison_fn(|| {})
.local_addr(TEST_SOCKET_ADDR)
.build();

assert_eq!(metadata2.local_addr(), Some(TEST_SOCKET_ADDR));
assert_eq!(metadata2.remote_addr(), None);

let metadata3 = ConnectionMetadataBuilder::new()
.proxied(true)
.poison_fn(|| {})
.remote_addr(TEST_SOCKET_ADDR)
.build();

assert_eq!(metadata3.local_addr(), None);
assert_eq!(metadata3.remote_addr(), Some(TEST_SOCKET_ADDR));
}
}
Original file line number Diff line number Diff line change
@@ -100,7 +100,6 @@ impl Intercept for ConnectionPoisoningInterceptor {
type LoaderFn = dyn Fn() -> Option<ConnectionMetadata> + Send + Sync;

/// State for a middleware that will monitor and manage connections.
#[allow(missing_debug_implementations)]
#[derive(Clone, Default)]
pub struct CaptureSmithyConnection {
loader: Arc<Mutex<Option<Box<LoaderFn>>>>,
@@ -154,7 +153,14 @@ mod test {
let retriever = CaptureSmithyConnection::new();
let retriever_clone = retriever.clone();
assert!(retriever.get().is_none());
retriever.set_connection_retriever(|| Some(ConnectionMetadata::new(true, None, || {})));
retriever.set_connection_retriever(|| {
Some(
ConnectionMetadata::builder()
.proxied(true)
.poison_fn(|| {})
.build(),
)
});

assert!(retriever.get().is_some());
assert!(retriever_clone.get().is_some());
17 changes: 11 additions & 6 deletions rust-runtime/aws-smithy-runtime/src/client/http/hyper_014.rs
Original file line number Diff line number Diff line change
@@ -287,14 +287,19 @@ fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<Connect
let mut extensions = Extensions::new();
conn.get_extras(&mut extensions);
let http_info = extensions.get::<HttpInfo>();
let smithy_connection = ConnectionMetadata::new(
conn.is_proxied(),
http_info.map(|info| info.remote_addr()),
move || match capture_conn.connection_metadata().as_ref() {
let mut builder = ConnectionMetadata::builder()
.proxied(conn.is_proxied())
.poison_fn(move || match capture_conn.connection_metadata().as_ref() {
Some(conn) => conn.poison(),
None => tracing::trace!("no connection existed to poison"),
},
);
});

builder
.set_local_addr(http_info.map(|info| info.local_addr()))
.set_remote_addr(http_info.map(|info| info.remote_addr()));

let smithy_connection = builder.build();

Some(smithy_connection)
} else {
None