diff --git a/.changesets/feat_feat_fleet_detector_add_schema_metrics.md b/.changesets/feat_feat_fleet_detector_add_schema_metrics.md new file mode 100644 index 0000000000..472682897f --- /dev/null +++ b/.changesets/feat_feat_fleet_detector_add_schema_metrics.md @@ -0,0 +1,6 @@ +### Adds Fleet Awareness Schema Metrics + +Adds an additional metric to the `fleet_detector` plugin in the form of `apollo.router.schema` with 2 attributes: +`schema_hash` and `launch_id`. + +By [@loshz](https://github.com/loshz) and [@nmoutschen](https://github.com/nmoutschen) in https://github.com/apollographql/router/pull/6283 diff --git a/apollo-router/src/plugin/mod.rs b/apollo-router/src/plugin/mod.rs index bfdaa631e3..9479e7f91a 100644 --- a/apollo-router/src/plugin/mod.rs +++ b/apollo-router/src/plugin/mod.rs @@ -77,6 +77,9 @@ pub struct PluginInit { /// The parsed subgraph schemas from the query planner, keyed by subgraph name pub(crate) subgraph_schemas: Arc, + /// Launch ID + pub(crate) launch_id: Option>, + pub(crate) notify: Notify, } @@ -137,6 +140,7 @@ where .supergraph_schema_id(crate::spec::Schema::schema_id(&supergraph_sdl).into()) .supergraph_sdl(supergraph_sdl) .supergraph_schema(supergraph_schema) + .launch_id(Arc::new("launch_id".to_string())) .notify(Notify::for_tests()) .build() } @@ -173,6 +177,7 @@ where supergraph_schema_id: Arc, supergraph_schema: Arc>, subgraph_schemas: Option>, + launch_id: Option>>, notify: Notify, ) -> Self { PluginInit { @@ -181,6 +186,7 @@ where supergraph_schema_id, supergraph_schema, subgraph_schemas: subgraph_schemas.unwrap_or_default(), + launch_id: launch_id.flatten(), notify, } } @@ -196,6 +202,7 @@ where supergraph_schema_id: Arc, supergraph_schema: Arc>, subgraph_schemas: Option>, + launch_id: Option>, notify: Notify, ) -> Result { let config: T = serde_json::from_value(config)?; @@ -205,6 +212,7 @@ where supergraph_schema, supergraph_schema_id, subgraph_schemas: subgraph_schemas.unwrap_or_default(), + launch_id, notify, }) } @@ -217,6 +225,7 @@ where supergraph_schema_id: Option>, supergraph_schema: Option>>, subgraph_schemas: Option>, + launch_id: Option>, notify: Option>, ) -> Self { PluginInit { @@ -226,6 +235,7 @@ where supergraph_schema: supergraph_schema .unwrap_or_else(|| Arc::new(Valid::assume_valid(Schema::new()))), subgraph_schemas: subgraph_schemas.unwrap_or_default(), + launch_id, notify: notify.unwrap_or_else(Notify::for_tests), } } diff --git a/apollo-router/src/plugins/fleet_detector.rs b/apollo-router/src/plugins/fleet_detector.rs index 7b2882770b..5190c49756 100644 --- a/apollo-router/src/plugins/fleet_detector.rs +++ b/apollo-router/src/plugins/fleet_detector.rs @@ -45,14 +45,12 @@ impl SystemGetter { } fn get_system(&mut self) -> &System { - if self.start.elapsed() < REFRESH_INTERVAL { - &self.system - } else { + if self.start.elapsed() >= REFRESH_INTERVAL { self.start = Instant::now(); self.system.refresh_cpu_all(); self.system.refresh_memory(); - &self.system } + &self.system } } @@ -65,7 +63,7 @@ enum GaugeStore { } impl GaugeStore { - fn active() -> GaugeStore { + fn active(opts: &GaugeOptions) -> GaugeStore { let system_getter = Arc::new(Mutex::new(SystemGetter::new())); let meter = meter_provider().meter("apollo/router"); @@ -112,14 +110,14 @@ impl GaugeStore { "The CPU frequency of the underlying instance the router is deployed to", ) .with_unit(Unit::new("Mhz")) - .with_callback(move |i| { + .with_callback(move |gauge| { let local_system_getter = system_getter.clone(); let mut system_getter = local_system_getter.lock().unwrap(); let system = system_getter.get_system(); let cpus = system.cpus(); let cpu_freq = cpus.iter().map(|cpu| cpu.frequency()).sum::() / cpus.len() as u64; - i.observe(cpu_freq, &[]) + gauge.observe(cpu_freq, &[]) }) .init(), ); @@ -133,12 +131,12 @@ impl GaugeStore { .with_description( "The number of CPUs reported by the instance the router is running on", ) - .with_callback(move |i| { + .with_callback(move |gauge| { let local_system_getter = system_getter.clone(); let mut system_getter = local_system_getter.lock().unwrap(); let system = system_getter.get_system(); let cpu_count = detect_cpu_count(system); - i.observe(cpu_count, &[KeyValue::new("host.arch", get_otel_arch())]) + gauge.observe(cpu_count, &[KeyValue::new("host.arch", get_otel_arch())]) }) .init(), ); @@ -152,11 +150,11 @@ impl GaugeStore { .with_description( "The amount of memory reported by the instance the router is running on", ) - .with_callback(move |i| { + .with_callback(move |gauge| { let local_system_getter = system_getter.clone(); let mut system_getter = local_system_getter.lock().unwrap(); let system = system_getter.get_system(); - i.observe( + gauge.observe( system.total_memory(), &[KeyValue::new("host.arch", get_otel_arch())], ) @@ -165,19 +163,50 @@ impl GaugeStore { .init(), ); } + { + let opts = opts.clone(); + gauges.push( + meter + .u64_observable_gauge("apollo.router.instance.schema") + .with_description("Details about the current in-use schema") + .with_callback(move |gauge| { + // NOTE: this is a fixed gauge. We only care about observing the included + // attributes. + let mut attributes: Vec = vec![KeyValue::new( + "schema_hash", + opts.supergraph_schema_hash.clone(), + )]; + if let Some(launch_id) = opts.launch_id.as_ref() { + attributes.push(KeyValue::new("launch_id", launch_id.to_string())); + } + gauge.observe(1, attributes.as_slice()) + }) + .init(), + ) + } GaugeStore::Active(gauges) } } +#[derive(Clone, Default)] +struct GaugeOptions { + supergraph_schema_hash: String, + launch_id: Option, +} + #[derive(Default)] struct FleetDetector { gauge_store: Mutex, + + // Options passed to the gauge_store during activation. + gauge_options: GaugeOptions, } + #[async_trait::async_trait] impl PluginPrivate for FleetDetector { type Config = Conf; - async fn new(_: PluginInit) -> Result { + async fn new(plugin: PluginInit) -> Result { debug!("initialising fleet detection plugin"); if let Ok(val) = env::var(APOLLO_TELEMETRY_DISABLED) { if val == "true" { @@ -186,15 +215,21 @@ impl PluginPrivate for FleetDetector { } } + let gauge_options = GaugeOptions { + supergraph_schema_hash: plugin.supergraph_schema_id.to_string(), + launch_id: plugin.launch_id.map(|s| s.to_string()), + }; + Ok(FleetDetector { gauge_store: Mutex::new(GaugeStore::Pending), + gauge_options, }) } fn activate(&self) { let mut store = self.gauge_store.lock().expect("lock poisoned"); if matches!(*store, GaugeStore::Pending) { - *store = GaugeStore::active(); + *store = GaugeStore::active(&self.gauge_options); } } } diff --git a/apollo-router/src/router/event/mod.rs b/apollo-router/src/router/event/mod.rs index 2645ad4755..aa98558b6b 100644 --- a/apollo-router/src/router/event/mod.rs +++ b/apollo-router/src/router/event/mod.rs @@ -22,6 +22,7 @@ use self::Event::UpdateConfiguration; use self::Event::UpdateLicense; use self::Event::UpdateSchema; use crate::uplink::license_enforcement::LicenseState; +use crate::uplink::schema::SchemaState; use crate::Configuration; /// Messages that are broadcast across the app. @@ -33,7 +34,7 @@ pub(crate) enum Event { NoMoreConfiguration, /// The schema was updated. - UpdateSchema(String), + UpdateSchema(SchemaState), /// There are no more updates to the schema NoMoreSchema, diff --git a/apollo-router/src/router/event/schema.rs b/apollo-router/src/router/event/schema.rs index 229992fa68..f43e3dea4e 100644 --- a/apollo-router/src/router/event/schema.rs +++ b/apollo-router/src/router/event/schema.rs @@ -11,6 +11,7 @@ use url::Url; use crate::router::Event; use crate::router::Event::NoMoreSchema; use crate::router::Event::UpdateSchema; +use crate::uplink::schema::SchemaState; use crate::uplink::schema_stream::SupergraphSdlQuery; use crate::uplink::stream_from_uplink; use crate::uplink::UplinkConfig; @@ -74,9 +75,20 @@ impl SchemaSource { pub(crate) fn into_stream(self) -> impl Stream { match self { SchemaSource::Static { schema_sdl: schema } => { - stream::once(future::ready(UpdateSchema(schema))).boxed() + let update_schema = UpdateSchema(SchemaState { + sdl: schema, + launch_id: None, + }); + stream::once(future::ready(update_schema)).boxed() } - SchemaSource::Stream(stream) => stream.map(UpdateSchema).boxed(), + SchemaSource::Stream(stream) => stream + .map(|sdl| { + UpdateSchema(SchemaState { + sdl, + launch_id: None, + }) + }) + .boxed(), #[allow(deprecated)] SchemaSource::File { path, @@ -100,7 +112,13 @@ impl SchemaSource { let path = path.clone(); async move { match tokio::fs::read_to_string(&path).await { - Ok(schema) => Some(UpdateSchema(schema)), + Ok(schema) => { + let update_schema = UpdateSchema(SchemaState { + sdl: schema, + launch_id: None, + }); + Some(update_schema) + } Err(err) => { tracing::error!(reason = %err, "failed to read supergraph schema"); None @@ -110,7 +128,11 @@ impl SchemaSource { }) .boxed() } else { - stream::once(future::ready(UpdateSchema(schema))).boxed() + let update_schema = UpdateSchema(SchemaState { + sdl: schema, + launch_id: None, + }); + stream::once(future::ready(update_schema)).boxed() } } Err(err) => { @@ -121,10 +143,13 @@ impl SchemaSource { } } SchemaSource::Registry(uplink_config) => { - stream_from_uplink::(uplink_config) + stream_from_uplink::(uplink_config) .filter_map(|res| { future::ready(match res { - Ok(schema) => Some(UpdateSchema(schema)), + Ok(schema) => { + let update_schema = UpdateSchema(schema); + Some(update_schema) + } Err(e) => { tracing::error!("{}", e); None @@ -222,7 +247,13 @@ impl Fetcher { .await { Ok(res) if res.status().is_success() => match res.text().await { - Ok(schema) => return Some(UpdateSchema(schema)), + Ok(schema) => { + let update_schema = UpdateSchema(SchemaState { + sdl: schema, + launch_id: None, + }); + return Some(update_schema); + } Err(err) => { tracing::warn!( url.full = %url, @@ -346,10 +377,10 @@ mod tests { .into_stream(); assert!( - matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_1) + matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1) ); assert!( - matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_1) + matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1) ); } .with_subscriber(assert_snapshot_subscriber!()) @@ -382,10 +413,10 @@ mod tests { .into_stream(); assert!( - matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_2) + matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2) ); assert!( - matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_2) + matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2) ); } .with_subscriber(assert_snapshot_subscriber!({ @@ -448,7 +479,7 @@ mod tests { .await; assert!( - matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_1) + matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1) ); drop(success); @@ -468,7 +499,7 @@ mod tests { .await; assert!( - matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_1) + matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1) ); } .with_subscriber(assert_snapshot_subscriber!({ @@ -497,7 +528,7 @@ mod tests { .into_stream(); assert!( - matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_1) + matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1) ); assert!(matches!(stream.next().await.unwrap(), NoMoreSchema)); } diff --git a/apollo-router/src/router/mod.rs b/apollo-router/src/router/mod.rs index bd5461a046..030e9797eb 100644 --- a/apollo-router/src/router/mod.rs +++ b/apollo-router/src/router/mod.rs @@ -354,6 +354,7 @@ mod tests { use crate::router::Event::UpdateLicense; use crate::router::Event::UpdateSchema; use crate::uplink::license_enforcement::LicenseState; + use crate::uplink::schema::SchemaState; use crate::Configuration; fn init_with_server() -> RouterHttpServer { @@ -417,7 +418,10 @@ mod tests { .await .unwrap(); router_handle - .send_event(UpdateSchema(schema.to_string())) + .send_event(UpdateSchema(SchemaState { + sdl: schema.to_string(), + launch_id: None, + })) .await .unwrap(); router_handle @@ -460,9 +464,10 @@ mod tests { .await .unwrap(); router_handle - .send_event(UpdateSchema( - include_str!("../testdata/supergraph_missing_name.graphql").to_string(), - )) + .send_event(UpdateSchema(SchemaState { + sdl: include_str!("../testdata/supergraph_missing_name.graphql").to_string(), + launch_id: None, + })) .await .unwrap(); router_handle @@ -502,9 +507,10 @@ mod tests { // let's update the schema to add the field router_handle - .send_event(UpdateSchema( - include_str!("../testdata/supergraph.graphql").to_string(), - )) + .send_event(UpdateSchema(SchemaState { + sdl: include_str!("../testdata/supergraph.graphql").to_string(), + launch_id: None, + })) .await .unwrap(); @@ -528,9 +534,10 @@ mod tests { // let's go back and remove the field router_handle - .send_event(UpdateSchema( - include_str!("../testdata/supergraph_missing_name.graphql").to_string(), - )) + .send_event(UpdateSchema(SchemaState { + sdl: include_str!("../testdata/supergraph_missing_name.graphql").to_string(), + launch_id: None, + })) .await .unwrap(); diff --git a/apollo-router/src/router_factory.rs b/apollo-router/src/router_factory.rs index b3a4bb27b6..d34eae8b45 100644 --- a/apollo-router/src/router_factory.rs +++ b/apollo-router/src/router_factory.rs @@ -516,6 +516,7 @@ pub(crate) async fn add_plugin( schema_id: Arc, supergraph_schema: Arc>, subgraph_schemas: Arc>>>, + launch_id: Option>, notify: &crate::notification::Notify, plugin_instances: &mut Plugins, errors: &mut Vec, @@ -528,6 +529,7 @@ pub(crate) async fn add_plugin( .supergraph_schema_id(schema_id) .supergraph_schema(supergraph_schema) .subgraph_schemas(subgraph_schemas) + .launch_id(launch_id) .notify(notify.clone()) .build(), ) @@ -585,6 +587,7 @@ pub(crate) async fn create_plugins( supergraph_schema_id.clone(), supergraph_schema.clone(), subgraph_schemas.clone(), + schema.launch_id.clone(), &configuration.notify.clone(), &mut plugin_instances, &mut errors, diff --git a/apollo-router/src/spec/schema.rs b/apollo-router/src/spec/schema.rs index 2208a5863e..8bfda05e64 100644 --- a/apollo-router/src/spec/schema.rs +++ b/apollo-router/src/spec/schema.rs @@ -20,6 +20,7 @@ use sha2::Sha256; use crate::error::ParseErrors; use crate::error::SchemaError; use crate::query_planner::OperationKind; +use crate::uplink::schema::SchemaState; use crate::Configuration; /// A GraphQL schema. @@ -30,6 +31,7 @@ pub(crate) struct Schema { pub(crate) implementers_map: apollo_compiler::collections::HashMap, api_schema: ApiSchema, pub(crate) schema_id: Arc, + pub(crate) launch_id: Option>, } /// Wrapper type to distinguish from `Schema::definitions` for the supergraph schema @@ -38,16 +40,16 @@ pub(crate) struct ApiSchema(pub(crate) ValidFederationSchema); impl Schema { pub(crate) fn parse(raw_sdl: &str, config: &Configuration) -> Result { - Self::parse_arc(raw_sdl.to_owned().into(), config) + Self::parse_arc(raw_sdl.parse::().unwrap().into(), config) } pub(crate) fn parse_arc( - raw_sdl: Arc, + raw_sdl: Arc, config: &Configuration, ) -> Result { let start = Instant::now(); let mut parser = apollo_compiler::parser::Parser::new(); - let result = parser.parse_ast(raw_sdl.as_ref(), "schema.graphql"); + let result = parser.parse_ast(&raw_sdl.sdl, "schema.graphql"); // Trace log recursion limit data let recursion_limit = parser.recursion_reached(); @@ -110,7 +112,7 @@ impl Schema { let implementers_map = definitions.implementers_map(); let supergraph = Supergraph::from_schema(definitions)?; - let schema_id = Arc::new(Schema::schema_id(&raw_sdl)); + let schema_id = Arc::new(Schema::schema_id(&raw_sdl.sdl)); let api_schema = supergraph .to_api_schema(ApiSchemaOptions { @@ -124,7 +126,12 @@ impl Schema { })?; Ok(Schema { - raw_sdl, + launch_id: raw_sdl + .launch_id + .as_ref() + .map(ToString::to_string) + .map(Arc::new), + raw_sdl: Arc::new(raw_sdl.sdl.to_string()), supergraph, subgraphs, implementers_map, @@ -336,7 +343,8 @@ impl std::fmt::Debug for Schema { subgraphs, implementers_map, api_schema: _, // skip - schema_id: _, + schema_id: _, // skip + launch_id: _, // skip } = self; f.debug_struct("Schema") .field("raw_sdl", raw_sdl) diff --git a/apollo-router/src/state_machine.rs b/apollo-router/src/state_machine.rs index ed5765d4e3..669a70d9ef 100644 --- a/apollo-router/src/state_machine.rs +++ b/apollo-router/src/state_machine.rs @@ -39,6 +39,7 @@ use crate::spec::Schema; use crate::uplink::license_enforcement::LicenseEnforcementReport; use crate::uplink::license_enforcement::LicenseState; use crate::uplink::license_enforcement::LICENSE_EXPIRED_URL; +use crate::uplink::schema::SchemaState; use crate::ApolloRouterError::NoLicense; const STATE_CHANGE: &str = "state change"; @@ -54,14 +55,14 @@ pub(crate) struct ListenAddresses { enum State { Startup { configuration: Option>, - schema: Option>, + schema: Option>, license: Option, listen_addresses_guard: OwnedRwLockWriteGuard, }, Running { configuration: Arc, _metrics: Option, - schema: Arc, + schema: Arc, license: LicenseState, server_handle: Option, router_service_factory: FA::RouterFactory, @@ -118,7 +119,7 @@ impl State { async fn update_inputs( mut self, state_machine: &mut StateMachine, - new_schema: Option>, + new_schema: Option>, new_configuration: Option>, new_license: Option, ) -> Self @@ -308,7 +309,7 @@ impl State { server_handle: &mut Option, previous_router_service_factory: Option<&FA::RouterFactory>, configuration: Arc, - sdl: Arc, + schema_state: Arc, license: LicenseState, listen_addresses_guard: &mut OwnedRwLockWriteGuard, mut all_connections_stopped_signals: Vec>, @@ -318,7 +319,7 @@ impl State { FA: RouterSuperServiceFactory, { let schema = Arc::new( - Schema::parse_arc(sdl.clone(), &configuration) + Schema::parse_arc(schema_state.clone(), &configuration) .map_err(|e| ServiceCreationError(e.to_string().into()))?, ); // Check the license @@ -422,7 +423,7 @@ impl State { Ok(Running { configuration, _metrics: metrics, - schema: sdl, + schema: schema_state, license, server_handle: Some(server_handle), router_service_factory, @@ -619,11 +620,15 @@ mod tests { use crate::services::new_service::ServiceFactory; use crate::services::router; use crate::services::RouterRequest; + use crate::uplink::schema::SchemaState; type SharedOneShotReceiver = Arc>>>; - fn example_schema() -> String { - include_str!("testdata/supergraph.graphql").to_owned() + fn example_schema() -> SchemaState { + SchemaState { + sdl: include_str!("testdata/supergraph.graphql").to_owned(), + launch_id: None, + } } macro_rules! assert_matches { @@ -877,7 +882,10 @@ mod tests { router_factory, stream::iter(vec![ UpdateConfiguration(Configuration::builder().build().unwrap()), - UpdateSchema(minimal_schema.to_owned()), + UpdateSchema(SchemaState { + sdl: minimal_schema.to_owned(), + launch_id: None + }), UpdateLicense(LicenseState::default()), UpdateSchema(example_schema()), Shutdown @@ -900,9 +908,15 @@ mod tests { router_factory, stream::iter(vec![ UpdateConfiguration(Configuration::builder().build().unwrap()), - UpdateSchema(minimal_schema.to_owned()), + UpdateSchema(SchemaState { + sdl: minimal_schema.to_owned(), + launch_id: None + }), UpdateLicense(LicenseState::default()), - UpdateSchema(minimal_schema.to_owned()), + UpdateSchema(SchemaState { + sdl: minimal_schema.to_owned(), + launch_id: None + }), Shutdown ]) ) @@ -923,7 +937,10 @@ mod tests { router_factory, stream::iter(vec![ UpdateConfiguration(Configuration::builder().build().unwrap()), - UpdateSchema(minimal_schema.to_owned()), + UpdateSchema(SchemaState { + sdl: minimal_schema.to_owned(), + launch_id: None + }), UpdateLicense(LicenseState::default()), UpdateLicense(LicenseState::Licensed), Shutdown @@ -1046,7 +1063,10 @@ mod tests { UpdateConfiguration(Configuration::builder().build().unwrap()), UpdateSchema(example_schema()), UpdateLicense(LicenseState::default()), - UpdateSchema(minimal_schema.to_owned()), + UpdateSchema(SchemaState { + sdl: minimal_schema.to_owned(), + launch_id: None + }), Shutdown ]) ) @@ -1104,7 +1124,10 @@ mod tests { .build() .unwrap() ), - UpdateSchema(minimal_schema.to_owned()), + UpdateSchema(SchemaState { + sdl: minimal_schema.to_owned(), + launch_id: None + }), Shutdown ]), ) diff --git a/apollo-router/src/uplink/mod.rs b/apollo-router/src/uplink/mod.rs index 2ea483daf4..6bc9508efe 100644 --- a/apollo-router/src/uplink/mod.rs +++ b/apollo-router/src/uplink/mod.rs @@ -17,6 +17,7 @@ use url::Url; pub(crate) mod license_enforcement; pub(crate) mod license_stream; pub(crate) mod persisted_queries_manifest_stream; +pub(crate) mod schema; pub(crate) mod schema_stream; const GCP_URL: &str = "https://uplink.api.apollographql.com"; diff --git a/apollo-router/src/uplink/schema.rs b/apollo-router/src/uplink/schema.rs new file mode 100644 index 0000000000..57cce6ba0c --- /dev/null +++ b/apollo-router/src/uplink/schema.rs @@ -0,0 +1,20 @@ +use std::convert::Infallible; +use std::str::FromStr; + +/// Represents the new state of a schema after an update. +#[derive(Eq, PartialEq, Debug)] +pub(crate) struct SchemaState { + pub(crate) sdl: String, + pub(crate) launch_id: Option, +} + +impl FromStr for SchemaState { + type Err = Infallible; + + fn from_str(s: &str) -> Result { + Ok(Self { + sdl: s.to_string(), + launch_id: None, + }) + } +} diff --git a/apollo-router/src/uplink/schema_stream.rs b/apollo-router/src/uplink/schema_stream.rs index ee1dcbda27..376f600e74 100644 --- a/apollo-router/src/uplink/schema_stream.rs +++ b/apollo-router/src/uplink/schema_stream.rs @@ -5,6 +5,7 @@ use graphql_client::GraphQLQuery; +use super::schema::SchemaState; use crate::uplink::schema_stream::supergraph_sdl_query::FetchErrorCode; use crate::uplink::schema_stream::supergraph_sdl_query::SupergraphSdlQueryRouterConfig; use crate::uplink::UplinkRequest; @@ -63,6 +64,41 @@ impl From for UplinkResponse { } } +impl From for UplinkResponse { + fn from(response: supergraph_sdl_query::ResponseData) -> Self { + match response.router_config { + SupergraphSdlQueryRouterConfig::RouterConfigResult(result) => UplinkResponse::New { + response: SchemaState { + sdl: result.supergraph_sdl, + launch_id: Some(result.id.clone()), + }, + id: result.id, + // this will truncate the number of seconds to under u64::MAX, which should be + // a large enough delay anyway + delay: result.min_delay_seconds as u64, + }, + SupergraphSdlQueryRouterConfig::Unchanged(response) => UplinkResponse::Unchanged { + id: Some(response.id), + delay: Some(response.min_delay_seconds as u64), + }, + SupergraphSdlQueryRouterConfig::FetchError(err) => UplinkResponse::Error { + retry_later: err.code == FetchErrorCode::RETRY_LATER, + code: match err.code { + FetchErrorCode::AUTHENTICATION_FAILED => "AUTHENTICATION_FAILED".to_string(), + FetchErrorCode::ACCESS_DENIED => "ACCESS_DENIED".to_string(), + FetchErrorCode::UNKNOWN_REF => "UNKNOWN_REF".to_string(), + FetchErrorCode::RETRY_LATER => "RETRY_LATER".to_string(), + FetchErrorCode::NOT_IMPLEMENTED_ON_THIS_INSTANCE => { + "NOT_IMPLEMENTED_ON_THIS_INSTANCE".to_string() + } + FetchErrorCode::Other(other) => other, + }, + message: err.message, + }, + } + } +} + #[cfg(test)] mod test { use std::str::FromStr;