Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve client tracing spans #2044

Merged
merged 13 commits into from
Dec 7, 2022
12 changes: 12 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -677,3 +677,15 @@ The Unit type for a Union member is no longer rendered. The serializers and pars
references = ["smithy-rs#1989"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "all" }
author = "ysaito1001"

[[aws-sdk-rust]]
message = "Fixed and improved the request `tracing` span hierarchy to improve log messages, profiling, and debuggability."
references = ["smithy-rs#2044", "smithy-rs#371"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "jdisanti"

[[smithy-rs]]
message = "Fixed and improved the request `tracing` span hierarchy to improve log messages, profiling, and debuggability."
references = ["smithy-rs#2044", "smithy-rs#371"]
meta = { "breaking" = false, "tada" = true, "bug" = false, "target" = "client"}
author = "jdisanti"
4 changes: 4 additions & 0 deletions aws/rust-runtime/aws-config/src/imds/client/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ impl AsyncMapRequest for TokenMiddleware {
type Error = ImdsError;
type Future = Pin<Box<dyn Future<Output = Result<Request, Self::Error>> + Send + 'static>>;

fn name(&self) -> &'static str {
"attach_imds_token"
}

fn apply(&self, request: Request) -> Self::Future {
let this = self.clone();
Box::pin(async move { this.add_token(request).await })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ impl ProvideCredentials for LazyCachingCredentialsProvider {
// There may be other threads also loading simultaneously, but this is OK
// since the futures are not eagerly executed, and the cache will only run one
// of them.
let span = trace_span!("lazy_load_credentials");
let future = Timeout::new(loader.provide_credentials(), timeout_future);
cache
.get_or_load(|| {
let span = trace_span!("lazy_load_credentials");
async move {
let credentials = future.await.map_err(|_err| {
CredentialsError::provider_timed_out(load_timeout)
Expand Down
4 changes: 4 additions & 0 deletions aws/rust-runtime/aws-endpoint/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl From<AwsAuthStageErrorKind> for AwsAuthStageError {
impl MapRequest for AwsAuthStage {
type Error = AwsAuthStageError;

fn name(&self) -> &'static str {
"resolve_endpoint"
}

fn apply(&self, request: Request) -> Result<Request, Self::Error> {
request.augment(|http_req, props| {
let endpoint = props
Expand Down
4 changes: 4 additions & 0 deletions aws/rust-runtime/aws-http/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ impl AsyncMapRequest for CredentialsStage {
type Error = CredentialsStageError;
type Future = Pin<Box<dyn Future<Output = Result<Request, Self::Error>> + Send + 'static>>;

fn name(&self) -> &'static str {
"retrieve_credentials"
}

fn apply(&self, request: Request) -> BoxFuture<Result<Request, Self::Error>> {
Box::pin(Self::load_creds(request))
}
Expand Down
4 changes: 4 additions & 0 deletions aws/rust-runtime/aws-http/src/recursion_detection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ impl RecursionDetectionStage {
impl MapRequest for RecursionDetectionStage {
type Error = std::convert::Infallible;

fn name(&self) -> &'static str {
"recursion_detection"
}

fn apply(&self, request: Request) -> Result<Request, Self::Error> {
request.augment(|mut req, _conf| {
augument_request(&mut req, &self.env);
Expand Down
4 changes: 4 additions & 0 deletions aws/rust-runtime/aws-http/src/user_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,10 @@ lazy_static::lazy_static! {
impl MapRequest for UserAgentStage {
type Error = UserAgentStageError;

fn name(&self) -> &'static str {
"generate_user_agent"
}

fn apply(&self, request: Request) -> Result<Request, Self::Error> {
request.augment(|mut req, conf| {
let ua = conf
Expand Down
4 changes: 4 additions & 0 deletions aws/rust-runtime/aws-sig-auth/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ fn signing_config(
impl MapRequest for SigV4SigningStage {
type Error = SigningStageError;

fn name(&self) -> &'static str {
"sigv4_sign_request"
}

fn apply(&self, req: Request) -> Result<Request, Self::Error> {
req.augment(|mut req, config| {
let operation_config = config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ private fun renderCustomizableOperationSendMethod(
/// Sends this operation's request
pub async fn send<T, E>(self) -> Result<T, SdkError<E>>
where
E: std::error::Error + 'static,
E: std::error::Error + Send + Sync + 'static,
O: #{ParseHttpResponse}<Output = Result<T, E>> + Send + Sync + Clone + 'static,
Retry: #{ClassifyRetry}<#{SdkSuccess}<T>, SdkError<E>> + Send + Sync + Clone,
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class CustomizableOperationGenerator(
/// Sends this operation's request
pub async fn send<T, E>(self) -> Result<T, SdkError<E>>
where
E: std::error::Error + 'static,
E: std::error::Error + Send + Sync + 'static,
jdisanti marked this conversation as resolved.
Show resolved Hide resolved
O: #{ParseHttpResponse}<Output = Result<T, E>> + Send + Sync + Clone + 'static,
Retry: Send + Sync + Clone,
<R as #{NewRequestPolicy}>::Policy: #{SmithyRetryPolicy}<O, T, E, Retry> + Clone,
Expand Down
54 changes: 48 additions & 6 deletions rust-runtime/aws-smithy-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,14 @@ pub use aws_smithy_http::result::{SdkError, SdkSuccess};
use aws_smithy_http::retry::ClassifyRetry;
use aws_smithy_http_tower::dispatch::DispatchLayer;
use aws_smithy_http_tower::parse_response::ParseResponseLayer;
use aws_smithy_types::error::display::DisplayErrorContext;
use aws_smithy_types::retry::ProvideErrorKind;
use aws_smithy_types::timeout::OperationTimeoutConfig;
use std::error::Error;
use std::sync::Arc;
use timeout::ClientTimeoutParams;
use tower::{Layer, Service, ServiceBuilder, ServiceExt};
use tracing::{debug_span, field, field::display, Instrument};

/// Smithy service client.
///
Expand Down Expand Up @@ -173,16 +175,16 @@ where
///
/// For ergonomics, this does not include the raw response for successful responses. To
/// access the raw response use `call_raw`.
pub async fn call<O, T, E, Retry>(&self, input: Operation<O, Retry>) -> Result<T, SdkError<E>>
pub async fn call<O, T, E, Retry>(&self, op: Operation<O, Retry>) -> Result<T, SdkError<E>>
where
O: Send + Sync,
E: 'static,
E: std::error::Error + Send + Sync + 'static,
Retry: Send + Sync,
R::Policy: bounds::SmithyRetryPolicy<O, T, E, Retry>,
bounds::Parsed<<M as bounds::SmithyMiddleware<C>>::Service, O, Retry>:
Service<Operation<O, Retry>, Response = SdkSuccess<T>, Error = SdkError<E>> + Clone,
{
self.call_raw(input).await.map(|res| res.parsed)
self.call_raw(op).await.map(|res| res.parsed)
}

/// Dispatch this request to the network
Expand All @@ -191,11 +193,11 @@ where
/// implementing unsupported features.
pub async fn call_raw<O, T, E, Retry>(
&self,
input: Operation<O, Retry>,
op: Operation<O, Retry>,
) -> Result<SdkSuccess<T>, SdkError<E>>
where
O: Send + Sync,
E: 'static,
E: std::error::Error + Send + Sync + 'static,
Retry: Send + Sync,
R::Policy: bounds::SmithyRetryPolicy<O, T, E, Retry>,
// This bound is not _technically_ inferred by all the previous bounds, but in practice it
Expand Down Expand Up @@ -226,7 +228,47 @@ where
.layer(DispatchLayer::new())
.service(connector);

check_send_sync(svc).ready().await?.call(input).await
// send_operation records the full request-response lifecycle.
// NOTE: For operations that stream output, only the setup is captured in this span.
Velfi marked this conversation as resolved.
Show resolved Hide resolved
let span = debug_span!(
"send_operation",
operation = field::Empty,
service = field::Empty,
status = field::Empty,
message = field::Empty
);
let (mut req, parts) = op.into_request_response();
if let Some(metadata) = &parts.metadata {
span.record("operation", &metadata.name());
span.record("service", &metadata.service());
// This will clone two `Cow::<&'static str>::Borrow`s in the vast majority of cases
req.properties_mut().insert(metadata.clone());
}
jdisanti marked this conversation as resolved.
Show resolved Hide resolved
let op = Operation::from_parts(req, parts);

let result = async move { check_send_sync(svc).ready().await?.call(op).await }
.instrument(span.clone())
.await;
Comment on lines +249 to +251
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why this is the way it is but I still think it looks a bit odd.

match &result {
Ok(_) => {
span.record("status", &"ok");
}
Err(err) => {
span.record(
"status",
&match err {
SdkError::ConstructionFailure(_) => "construction_failure",
SdkError::DispatchFailure(_) => "dispatch_failure",
SdkError::ResponseError(_) => "response_error",
SdkError::ServiceError(_) => "service_error",
SdkError::TimeoutError(_) => "timeout_error",
_ => "error",
},
)
.record("message", &display(DisplayErrorContext(err)));
}
}
result
}

/// Statically check the validity of a `Client` without a request to send.
Expand Down
5 changes: 3 additions & 2 deletions rust-runtime/aws-smithy-http-tower/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{Layer, Service};
use tracing::trace;
use tracing::{debug_span, trace, Instrument};

/// Connects Operation driven middleware to an HTTP implementation.
///
Expand Down Expand Up @@ -50,7 +50,8 @@ where
.await
.map(|resp| operation::Response::from_parts(resp, property_bag))
.map_err(|e| SendOperationError::RequestDispatchError(e.into()))
};
}
.instrument(debug_span!("dispatch"));
Box::pin(future)
}
}
Expand Down
5 changes: 5 additions & 0 deletions rust-runtime/aws-smithy-http-tower/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ mod tests {
struct AddHeader;
impl MapRequest for AddHeader {
type Error = Infallible;

fn name(&self) -> &'static str {
"add_header"
}

fn apply(&self, request: Request) -> Result<Request, Self::Error> {
request.augment(|mut req, _| {
req.headers_mut()
Expand Down
13 changes: 9 additions & 4 deletions rust-runtime/aws-smithy-http-tower/src/map_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{Layer, Service};
use tracing::{debug_span, Instrument};

#[derive(Debug)]
pub struct AsyncMapRequestLayer<M> {
Expand Down Expand Up @@ -61,10 +62,13 @@ where
}

fn call(&mut self, req: operation::Request) -> Self::Future {
let mapper_name = self.mapper.name();
let mut inner = self.inner.clone();
let future = self.mapper.apply(req);
Box::pin(async move {
let span = debug_span!("async_map_request", name = mapper_name);
let mapped_request = future
.instrument(span)
.await
.map_err(|e| SendOperationError::RequestConstructionError(e.into()))?;
inner.call(mapped_request).await
Expand Down Expand Up @@ -122,8 +126,8 @@ where
}
}

#[derive(Clone)]
/// Tower service for [`MapRequest`](aws_smithy_http::middleware::MapRequest)
#[derive(Clone)]
pub struct MapRequestService<S, M> {
inner: S,
mapper: M,
Expand All @@ -143,9 +147,10 @@ where
}

fn call(&mut self, req: operation::Request) -> Self::Future {
match self
.mapper
.apply(req)
let span = debug_span!("map_request", name = self.mapper.name());
let mapper = &self.mapper;
match span
.in_scope(|| mapper.apply(req))
.map_err(|e| SendOperationError::RequestConstructionError(e.into()))
{
Err(e) => MapRequestFuture::Ready { inner: Some(e) },
Expand Down
61 changes: 12 additions & 49 deletions rust-runtime/aws-smithy-http-tower/src/parse_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ use aws_smithy_http::middleware::load_response;
use aws_smithy_http::operation;
use aws_smithy_http::operation::Operation;
use aws_smithy_http::response::ParseHttpResponse;
use aws_smithy_http::result::SdkError;
use aws_smithy_types::error::display::DisplayErrorContext;
use aws_smithy_http::result::{SdkError, SdkSuccess};
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{Layer, Service};
use tracing::field::display;
use tracing::{debug_span, field, Instrument};
use tracing::{debug_span, Instrument};

/// `ParseResponseService` dispatches [`Operation`](aws_smithy_http::operation::Operation)s and parses them.
///
Expand Down Expand Up @@ -68,7 +66,7 @@ type BoxedResultFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>
/// `E`: The error path return of the response parser
/// `R`: The type of the retry policy
impl<InnerService, ResponseHandler, SuccessResponse, FailureResponse, RetryPolicy>
tower::Service<operation::Operation<ResponseHandler, RetryPolicy>>
Service<Operation<ResponseHandler, RetryPolicy>>
for ParseResponseService<InnerService, ResponseHandler, RetryPolicy>
where
InnerService:
Expand All @@ -80,63 +78,28 @@ where
+ 'static,
FailureResponse: std::error::Error + 'static,
{
type Response = aws_smithy_http::result::SdkSuccess<SuccessResponse>;
type Error = aws_smithy_http::result::SdkError<FailureResponse>;
type Response = SdkSuccess<SuccessResponse>;
type Error = SdkError<FailureResponse>;
type Future = BoxedResultFuture<Self::Response, Self::Error>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(|err| err.into())
}

fn call(&mut self, req: Operation<ResponseHandler, RetryPolicy>) -> Self::Future {
let (mut req, parts) = req.into_request_response();
let (req, parts) = req.into_request_response();
let handler = parts.response_handler;
// send_operation records the full request-response lifecycle.
// NOTE: For operations that stream output, only the setup is captured in this span.
let span = debug_span!(
"send_operation",
operation = field::Empty,
service = field::Empty,
status = field::Empty,
message = field::Empty
);
let inner_span = span.clone();
if let Some(metadata) = parts.metadata {
span.record("operation", &metadata.name());
span.record("service", &metadata.service());
req.properties_mut().insert(metadata);
}
let resp = self.inner.call(req);
let fut = async move {
let resp = match resp.await {
Box::pin(async move {
match resp.await {
Err(e) => Err(e.into()),
Ok(resp) => {
// load_response contains reading the body as far as is required & parsing the response
let response_span = debug_span!("load_response");
load_response(resp, &handler)
.instrument(response_span)
// load_response contains reading the body as far as is required & parsing the response
.instrument(debug_span!("load_response"))
.await
}
};
match &resp {
Ok(_) => inner_span.record("status", &"ok"),
Err(err) => inner_span
.record(
"status",
&match err {
SdkError::ConstructionFailure(_) => "construction_failure",
SdkError::DispatchFailure(_) => "dispatch_failure",
SdkError::ResponseError(_) => "response_error",
SdkError::ServiceError(_) => "service_error",
SdkError::TimeoutError(_) => "timeout_error",
_ => "error",
},
)
.record("message", &display(DisplayErrorContext(err))),
};
resp
}
.instrument(span);
Box::pin(fut)
}
})
}
}
4 changes: 4 additions & 0 deletions rust-runtime/aws-smithy-http/src/endpoint/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ impl SmithyEndpointStage {
impl MapRequest for SmithyEndpointStage {
type Error = ResolveEndpointError;

fn name(&self) -> &'static str {
"resolve_endpoint"
}

fn apply(&self, request: Request) -> Result<Request, Self::Error> {
request.augment(|mut http_req, props| {
// we need to do a little dance so that this works with retries.
Expand Down
Loading