Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
source: apollo-router/src/configuration/tests.rs
expression: "&schema"
snapshot_kind: text
---
{
"$schema": "http://json-schema.org/draft-07/schema#",
Expand Down Expand Up @@ -2800,7 +2801,7 @@ expression: "&schema"
},
"type": "object"
},
"EventLevel": {
"EventLevelConfig": {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whole change is caused by my crating separate enum type just for config

"enum": [
"info",
"warn",
Expand Down Expand Up @@ -2854,8 +2855,8 @@ expression: "&schema"
"description": "#/definitions/Condition_for_ConnectorSelector"
},
"level": {
"$ref": "#/definitions/EventLevel",
"description": "#/definitions/EventLevel"
"$ref": "#/definitions/EventLevelConfig",
"description": "#/definitions/EventLevelConfig"
},
"message": {
"description": "The event message.",
Expand Down Expand Up @@ -2885,8 +2886,8 @@ expression: "&schema"
"description": "#/definitions/Condition_for_RouterSelector"
},
"level": {
"$ref": "#/definitions/EventLevel",
"description": "#/definitions/EventLevel"
"$ref": "#/definitions/EventLevelConfig",
"description": "#/definitions/EventLevelConfig"
},
"message": {
"description": "The event message.",
Expand Down Expand Up @@ -2916,8 +2917,8 @@ expression: "&schema"
"description": "#/definitions/Condition_for_SubgraphSelector"
},
"level": {
"$ref": "#/definitions/EventLevel",
"description": "#/definitions/EventLevel"
"$ref": "#/definitions/EventLevelConfig",
"description": "#/definitions/EventLevelConfig"
},
"message": {
"description": "The event message.",
Expand Down Expand Up @@ -2947,8 +2948,8 @@ expression: "&schema"
"description": "#/definitions/Condition_for_SupergraphSelector"
},
"level": {
"$ref": "#/definitions/EventLevel",
"description": "#/definitions/EventLevel"
"$ref": "#/definitions/EventLevelConfig",
"description": "#/definitions/EventLevelConfig"
},
"message": {
"description": "The event message.",
Expand Down Expand Up @@ -5971,8 +5972,8 @@ expression: "&schema"
"StandardEventConfig_for_ConnectorSelector": {
"anyOf": [
{
"$ref": "#/definitions/EventLevel",
"description": "#/definitions/EventLevel"
"$ref": "#/definitions/EventLevelConfig",
"description": "#/definitions/EventLevelConfig"
},
{
"properties": {
Expand All @@ -5981,8 +5982,8 @@ expression: "&schema"
"description": "#/definitions/Condition_for_ConnectorSelector"
},
"level": {
"$ref": "#/definitions/EventLevel",
"description": "#/definitions/EventLevel"
"$ref": "#/definitions/EventLevelConfig",
"description": "#/definitions/EventLevelConfig"
}
},
"required": [
Expand All @@ -5996,8 +5997,8 @@ expression: "&schema"
"StandardEventConfig_for_RouterSelector": {
"anyOf": [
{
"$ref": "#/definitions/EventLevel",
"description": "#/definitions/EventLevel"
"$ref": "#/definitions/EventLevelConfig",
"description": "#/definitions/EventLevelConfig"
},
{
"properties": {
Expand All @@ -6006,8 +6007,8 @@ expression: "&schema"
"description": "#/definitions/Condition_for_RouterSelector"
},
"level": {
"$ref": "#/definitions/EventLevel",
"description": "#/definitions/EventLevel"
"$ref": "#/definitions/EventLevelConfig",
"description": "#/definitions/EventLevelConfig"
}
},
"required": [
Expand All @@ -6021,8 +6022,8 @@ expression: "&schema"
"StandardEventConfig_for_SubgraphSelector": {
"anyOf": [
{
"$ref": "#/definitions/EventLevel",
"description": "#/definitions/EventLevel"
"$ref": "#/definitions/EventLevelConfig",
"description": "#/definitions/EventLevelConfig"
},
{
"properties": {
Expand All @@ -6031,8 +6032,8 @@ expression: "&schema"
"description": "#/definitions/Condition_for_SubgraphSelector"
},
"level": {
"$ref": "#/definitions/EventLevel",
"description": "#/definitions/EventLevel"
"$ref": "#/definitions/EventLevelConfig",
"description": "#/definitions/EventLevelConfig"
}
},
"required": [
Expand All @@ -6046,8 +6047,8 @@ expression: "&schema"
"StandardEventConfig_for_SupergraphSelector": {
"anyOf": [
{
"$ref": "#/definitions/EventLevel",
"description": "#/definitions/EventLevel"
"$ref": "#/definitions/EventLevelConfig",
"description": "#/definitions/EventLevelConfig"
},
{
"properties": {
Expand All @@ -6056,8 +6057,8 @@ expression: "&schema"
"description": "#/definitions/Condition_for_SupergraphSelector"
},
"level": {
"$ref": "#/definitions/EventLevel",
"description": "#/definitions/EventLevel"
"$ref": "#/definitions/EventLevelConfig",
"description": "#/definitions/EventLevelConfig"
}
},
"required": [
Expand Down
57 changes: 27 additions & 30 deletions apollo-router/src/plugins/connectors/handle_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,37 +507,34 @@ async fn deserialize_response<T: HttpBody>(
let log_response_level = context
.extensions()
.with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
.and_then(|event| match event.0.condition() {
Some(condition) => {
// Create a temporary response here so we can evaluate the condition. This response
// is missing any information about the mapped response, because we don't have that
// yet. This means that we cannot correctly evaluate any condition that relies on
// the mapped response data or mapping problems. But we can't wait until we do have
// that information, because this is the only place we have the body bytes (without
// making an expensive clone of the body). So we either need to not expose any
// selector which can be used as a condition that requires mapping information, or
// we must document that such selectors cannot be used as conditions on standard
// connectors events.

let response = connector::request_service::Response {
context: context.clone(),
connector: connector.clone(),
transport_result: Ok(TransportResponse::Http(HttpResponse {
inner: parts.clone(),
})),
mapped_response: MappedResponse::Data {
data: Value::Null,
key: response_key.clone(),
problems: vec![],
},
};
if condition.lock().evaluate_response(&response) {
Some(event.0.level())
} else {
None
}
.and_then(|event| {
// Create a temporary response here so we can evaluate the condition. This response
// is missing any information about the mapped response, because we don't have that
// yet. This means that we cannot correctly evaluate any condition that relies on
// the mapped response data or mapping problems. But we can't wait until we do have
// that information, because this is the only place we have the body bytes (without
// making an expensive clone of the body). So we either need to not expose any
// selector which can be used as a condition that requires mapping information, or
// we must document that such selectors cannot be used as conditions on standard
// connectors events.

let response = connector::request_service::Response {
context: context.clone(),
connector: connector.clone(),
transport_result: Ok(TransportResponse::Http(HttpResponse {
inner: parts.clone(),
})),
mapped_response: MappedResponse::Data {
data: Value::Null,
key: response_key.clone(),
problems: vec![],
},
};
if event.condition.evaluate_response(&response) {
Some(event.level)
} else {
None
}
None => Some(event.0.level()),
});

if let Some(level) = log_response_level {
Expand Down
117 changes: 52 additions & 65 deletions apollo-router/src/plugins/telemetry/config_new/connector/events.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use opentelemetry::Key;
use opentelemetry::KeyValue;
use parking_lot::Mutex;
Expand All @@ -6,25 +8,35 @@ use serde::Deserialize;
use tower::BoxError;

use crate::Context;
use crate::plugins::telemetry::config_new::conditions::Condition;
use crate::plugins::telemetry::config_new::connector::ConnectorRequest;
use crate::plugins::telemetry::config_new::connector::ConnectorResponse;
use crate::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes;
use crate::plugins::telemetry::config_new::connector::selectors::ConnectorSelector;
use crate::plugins::telemetry::config_new::events::CustomEvent;
use crate::plugins::telemetry::config_new::events::CustomEventInner;
use crate::plugins::telemetry::config_new::events::CustomEvents;
use crate::plugins::telemetry::config_new::events::Event;
use crate::plugins::telemetry::config_new::events::EventLevel;
use crate::plugins::telemetry::config_new::events::StandardEvent;
use crate::plugins::telemetry::config_new::events::StandardEventConfig;
use crate::plugins::telemetry::config_new::events::log_event;
use crate::plugins::telemetry::config_new::extendable::Extendable;
use crate::plugins::telemetry::config_new::instruments::Instrumented;

#[derive(Clone)]
pub(crate) struct ConnectorEventRequest(pub(crate) StandardEvent<ConnectorSelector>);
pub(crate) struct ConnectorEventRequest {
// XXX(@IvanGoncharov): As part of removing Mutex from StandardEvent I moved it here
// I think it's not nessary here but can't verify it right now, so in future can just wrap StandardEvent
pub(crate) level: EventLevel,
pub(crate) condition: Arc<Mutex<Condition<ConnectorSelector>>>,
}

#[derive(Clone)]
pub(crate) struct ConnectorEventResponse(pub(crate) StandardEvent<ConnectorSelector>);
pub(crate) struct ConnectorEventResponse {
// XXX(@IvanGoncharov): As part of removing Arc from StandardEvent I moved it here
// I think it's not nessary here but can't verify it right now, so in future can just wrap StandardEvent
pub(crate) level: EventLevel,
pub(crate) condition: Arc<Condition<ConnectorSelector>>,
}

#[derive(Clone, Deserialize, JsonSchema, Debug, Default)]
#[serde(deny_unknown_fields, default)]
Expand All @@ -46,92 +58,67 @@ pub(crate) fn new_connector_events(
let custom_events = config
.custom
.iter()
.filter_map(|(event_name, event_cfg)| match &event_cfg.level {
EventLevel::Off => None,
_ => Some(CustomEvent {
inner: Mutex::new(CustomEventInner {
name: event_name.clone(),
level: event_cfg.level,
event_on: event_cfg.on,
message: event_cfg.message.clone(),
selectors: event_cfg.attributes.clone().into(),
condition: event_cfg.condition.clone(),
attributes: Vec::new(),
_phantom: Default::default(),
}),
}),
})
.filter_map(|(name, config)| CustomEvent::from_config(name, config))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These functions are very repetitive, so I just moved the code to from_config methods. We can't use From or TryFrom trait here since this method returns options.
I would appreciate suggestions on how to name it better.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could still use TryFrom and convert it into an option using .ok()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TryFrom returns Result: https://doc.rust-lang.org/std/convert/trait.TryFrom.html
So it will require us to define some dummy error type.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically we return None if event is Off
So it's not an error

.collect();

ConnectorEvents {
request: config.attributes.request.clone().into(),
response: config.attributes.response.clone().into(),
error: config.attributes.error.clone().into(),
request: StandardEvent::from_config(&config.attributes.request),
response: StandardEvent::from_config(&config.attributes.response),
error: StandardEvent::from_config(&config.attributes.error),
custom: custom_events,
}
}

impl Instrumented
for CustomEvents<
ConnectorRequest,
ConnectorResponse,
(),
ConnectorAttributes,
ConnectorSelector,
>
{
type Request = ConnectorRequest;
type Response = ConnectorResponse;
type EventResponse = ();

fn on_request(&self, request: &Self::Request) {
impl CustomEvents<ConnectorRequest, ConnectorResponse, (), ConnectorAttributes, ConnectorSelector> {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the Instrumented trait from here, and it worked without any other code modifications.
Doing this allowed me to change on_request so it now gets a mutable reference.

pub(crate) fn on_request(&mut self, request: &ConnectorRequest) {
// Any condition on the request is NOT evaluated here. It must be evaluated later when
// getting the ConnectorEventRequest from the context. The request context is shared
// between all connector requests, so any request could find this ConnectorEventRequest in
// the context. Its presence on the context cannot be conditional on an individual request.
if self.request.level() != EventLevel::Off {
request
.context
.extensions()
.with_lock(|lock| lock.insert(ConnectorEventRequest(self.request.clone())));
if let Some(request_event) = self.request.take() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use Option::take so event is basically moved into context and we don't need to call clone here.

request.context.extensions().with_lock(|lock| {
lock.insert(ConnectorEventRequest {
level: request_event.level,
condition: Arc::new(Mutex::new(request_event.condition)),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted earlier I don't think we really need this wrapping, but I don't have time to check so I move it here just to be 100% sure I'm not introducing regression here.

})
});
}

if self.response.level() != EventLevel::Off {
request
.context
.extensions()
.with_lock(|lock| lock.insert(ConnectorEventResponse(self.response.clone())));
if let Some(response_event) = self.response.take() {
request.context.extensions().with_lock(|lock| {
lock.insert(ConnectorEventResponse {
level: response_event.level,
condition: Arc::new(response_event.condition),
})
});
}

for custom_event in &self.custom {
for custom_event in &mut self.custom {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything is mutable now because I removed the internal wrapping of Arc + Mutex around the condition.

custom_event.on_request(request);
}
}

fn on_response(&self, response: &Self::Response) {
for custom_event in &self.custom {
pub(crate) fn on_response(&mut self, response: &ConnectorResponse) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to make it public since I removed trait

for custom_event in &mut self.custom {
custom_event.on_response(response);
}
}

fn on_error(&self, error: &BoxError, ctx: &Context) {
if self.error.level() != EventLevel::Off {
if let Some(condition) = self.error.condition() {
if !condition.lock().evaluate_error(error, ctx) {
return;
}
pub(crate) fn on_error(&mut self, error: &BoxError, ctx: &Context) {
if let Some(error_event) = &mut self.error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we call .take() here like in request ? Just a question I'm not sure it's needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, we can, if we sure it's the last call (which it probably is).
But I use take more as a hack to move event outside of this plugin.
In this case error event is handled inside so &mut is enough.

if error_event.condition.evaluate_error(error, ctx) {
log_event(
error_event.level,
"connector.http.error",
vec![KeyValue::new(
Key::from_static_str("error"),
opentelemetry::Value::String(error.to_string().into()),
)],
"",
);
}
log_event(
self.error.level(),
"connector.http.error",
vec![KeyValue::new(
Key::from_static_str("error"),
opentelemetry::Value::String(error.to_string().into()),
)],
"",
);
}
for custom_event in &self.custom {
for custom_event in &mut self.custom {
custom_event.on_error(error, ctx);
}
}
Expand Down
Loading