Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
533aae7
plugins/fleet_detector: add schema and persisted_queries gauges
loshz Nov 19, 2024
c3f75ce
add comments
loshz Nov 19, 2024
64adb53
add gauge store options
loshz Nov 21, 2024
78a3cfc
fix gauge attributes list
loshz Nov 22, 2024
7be0703
fix lint issues
loshz Nov 22, 2024
9a8c734
add SchemaState
loshz Nov 22, 2024
5c5a604
fix stream map closure
loshz Nov 22, 2024
894e429
fix deref addrof
loshz Nov 22, 2024
00640a8
plugin: add launch_id
loshz Nov 25, 2024
9842d32
fix type cast
loshz Nov 25, 2024
1d469a1
fix plugin types
loshz Nov 25, 2024
207f3b6
add launch_id to schema
loshz Nov 25, 2024
5b1f7e2
compile, please
loshz Nov 26, 2024
953ff07
remove launch_id from PluginInit serde
loshz Nov 26, 2024
89da9dc
Merge branch 'dev' into loshz/FLEET-22
loshz Dec 4, 2024
430c4e4
add changeset
loshz Dec 4, 2024
f8c7e3c
feat: connect launch ID
nmoutschen Dec 4, 2024
21f46ea
Merge branch 'loshz/FLEET-22' into nicolas/FLEET-22
nmoutschen Dec 4, 2024
aa6317e
feat: connect launch ID (#6394)
nmoutschen Dec 4, 2024
3c78759
Merge branch 'loshz/FLEET-22' of github.com:apollographql/router into…
loshz Dec 4, 2024
188e8de
update changeset
loshz Dec 4, 2024
c5e43ea
fix from type
loshz Dec 4, 2024
ec62b5c
fix schema source from
loshz Dec 4, 2024
a792192
Merge branch 'dev' into loshz/FLEET-22
loshz Dec 4, 2024
47f7528
rename metric
loshz Dec 5, 2024
770d790
Merge branch 'dev' into loshz/FLEET-22
loshz Dec 5, 2024
0906863
Merge branch 'dev' into loshz/FLEET-22
loshz Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changesets/feat_feat_fleet_detector_add_schema_metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Adds Fleet Awareness Schema Metrics

Adds an additional metric to the `fleet_detector` plugin in the form of `apollo.router.schema` with 2 attributes:
`schema_hash` and `launch_id`.

By [@loshz](https://github.com/loshz) and [@nmoutschen](https://github.com/nmoutschen) in https://github.com/apollographql/router/pull/6283
10 changes: 10 additions & 0 deletions apollo-router/src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ pub struct PluginInit<T> {
/// The parsed subgraph schemas from the query planner, keyed by subgraph name
pub(crate) subgraph_schemas: Arc<SubgraphSchemas>,

/// Launch ID
pub(crate) launch_id: Option<Arc<String>>,

pub(crate) notify: Notify<String, graphql::Response>,
}

Expand Down Expand Up @@ -137,6 +140,7 @@ where
.supergraph_schema_id(crate::spec::Schema::schema_id(&supergraph_sdl).into())
.supergraph_sdl(supergraph_sdl)
.supergraph_schema(supergraph_schema)
.launch_id(Arc::new("launch_id".to_string()))
.notify(Notify::for_tests())
.build()
}
Expand Down Expand Up @@ -173,6 +177,7 @@ where
supergraph_schema_id: Arc<String>,
supergraph_schema: Arc<Valid<Schema>>,
subgraph_schemas: Option<Arc<SubgraphSchemas>>,
launch_id: Option<Option<Arc<String>>>,
notify: Notify<String, graphql::Response>,
) -> Self {
PluginInit {
Expand All @@ -181,6 +186,7 @@ where
supergraph_schema_id,
supergraph_schema,
subgraph_schemas: subgraph_schemas.unwrap_or_default(),
launch_id: launch_id.flatten(),
notify,
}
}
Expand All @@ -196,6 +202,7 @@ where
supergraph_schema_id: Arc<String>,
supergraph_schema: Arc<Valid<Schema>>,
subgraph_schemas: Option<Arc<SubgraphSchemas>>,
launch_id: Option<Arc<String>>,
notify: Notify<String, graphql::Response>,
) -> Result<Self, BoxError> {
let config: T = serde_json::from_value(config)?;
Expand All @@ -205,6 +212,7 @@ where
supergraph_schema,
supergraph_schema_id,
subgraph_schemas: subgraph_schemas.unwrap_or_default(),
launch_id,
notify,
})
}
Expand All @@ -217,6 +225,7 @@ where
supergraph_schema_id: Option<Arc<String>>,
supergraph_schema: Option<Arc<Valid<Schema>>>,
subgraph_schemas: Option<Arc<SubgraphSchemas>>,
launch_id: Option<Arc<String>>,
notify: Option<Notify<String, graphql::Response>>,
) -> Self {
PluginInit {
Expand All @@ -226,6 +235,7 @@ where
supergraph_schema: supergraph_schema
.unwrap_or_else(|| Arc::new(Valid::assume_valid(Schema::new()))),
subgraph_schemas: subgraph_schemas.unwrap_or_default(),
launch_id,
notify: notify.unwrap_or_else(Notify::for_tests),
}
}
Expand Down
61 changes: 48 additions & 13 deletions apollo-router/src/plugins/fleet_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ impl SystemGetter {
}

fn get_system(&mut self) -> &System {
if self.start.elapsed() < REFRESH_INTERVAL {
&self.system
} else {
if self.start.elapsed() >= REFRESH_INTERVAL {
self.start = Instant::now();
self.system.refresh_cpu_all();
self.system.refresh_memory();
&self.system
}
&self.system
}
}

Expand All @@ -65,7 +63,7 @@ enum GaugeStore {
}

impl GaugeStore {
fn active() -> GaugeStore {
fn active(opts: &GaugeOptions) -> GaugeStore {
let system_getter = Arc::new(Mutex::new(SystemGetter::new()));
let meter = meter_provider().meter("apollo/router");

Expand Down Expand Up @@ -112,14 +110,14 @@ impl GaugeStore {
"The CPU frequency of the underlying instance the router is deployed to",
)
.with_unit(Unit::new("Mhz"))
.with_callback(move |i| {
.with_callback(move |gauge| {
let local_system_getter = system_getter.clone();
let mut system_getter = local_system_getter.lock().unwrap();
let system = system_getter.get_system();
let cpus = system.cpus();
let cpu_freq =
cpus.iter().map(|cpu| cpu.frequency()).sum::<u64>() / cpus.len() as u64;
i.observe(cpu_freq, &[])
gauge.observe(cpu_freq, &[])
})
.init(),
);
Expand All @@ -133,12 +131,12 @@ impl GaugeStore {
.with_description(
"The number of CPUs reported by the instance the router is running on",
)
.with_callback(move |i| {
.with_callback(move |gauge| {
let local_system_getter = system_getter.clone();
let mut system_getter = local_system_getter.lock().unwrap();
let system = system_getter.get_system();
let cpu_count = detect_cpu_count(system);
i.observe(cpu_count, &[KeyValue::new("host.arch", get_otel_arch())])
gauge.observe(cpu_count, &[KeyValue::new("host.arch", get_otel_arch())])
})
.init(),
);
Expand All @@ -152,11 +150,11 @@ impl GaugeStore {
.with_description(
"The amount of memory reported by the instance the router is running on",
)
.with_callback(move |i| {
.with_callback(move |gauge| {
let local_system_getter = system_getter.clone();
let mut system_getter = local_system_getter.lock().unwrap();
let system = system_getter.get_system();
i.observe(
gauge.observe(
system.total_memory(),
&[KeyValue::new("host.arch", get_otel_arch())],
)
Expand All @@ -165,19 +163,50 @@ impl GaugeStore {
.init(),
);
}
{
let opts = opts.clone();
gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.schema")
.with_description("Details about the current in-use schema")
.with_callback(move |gauge| {
// NOTE: this is a fixed gauge. We only care about observing the included
// attributes.
let mut attributes: Vec<KeyValue> = vec![KeyValue::new(
"schema_hash",
opts.supergraph_schema_hash.clone(),
)];
if let Some(launch_id) = opts.launch_id.as_ref() {
attributes.push(KeyValue::new("launch_id", launch_id.to_string()));
}
gauge.observe(1, attributes.as_slice())
})
.init(),
)
}
GaugeStore::Active(gauges)
}
}

#[derive(Clone, Default)]
struct GaugeOptions {
supergraph_schema_hash: String,
launch_id: Option<String>,
}

#[derive(Default)]
struct FleetDetector {
gauge_store: Mutex<GaugeStore>,

// Options passed to the gauge_store during activation.
gauge_options: GaugeOptions,
}

#[async_trait::async_trait]
impl PluginPrivate for FleetDetector {
type Config = Conf;

async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
async fn new(plugin: PluginInit<Self::Config>) -> Result<Self, BoxError> {
debug!("initialising fleet detection plugin");
if let Ok(val) = env::var(APOLLO_TELEMETRY_DISABLED) {
if val == "true" {
Expand All @@ -186,15 +215,21 @@ impl PluginPrivate for FleetDetector {
}
}

let gauge_options = GaugeOptions {
supergraph_schema_hash: plugin.supergraph_schema_id.to_string(),
launch_id: plugin.launch_id.map(|s| s.to_string()),
};

Ok(FleetDetector {
gauge_store: Mutex::new(GaugeStore::Pending),
gauge_options,
})
}

fn activate(&self) {
let mut store = self.gauge_store.lock().expect("lock poisoned");
if matches!(*store, GaugeStore::Pending) {
*store = GaugeStore::active();
*store = GaugeStore::active(&self.gauge_options);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion apollo-router/src/router/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use self::Event::UpdateConfiguration;
use self::Event::UpdateLicense;
use self::Event::UpdateSchema;
use crate::uplink::license_enforcement::LicenseState;
use crate::uplink::schema::SchemaState;
use crate::Configuration;

/// Messages that are broadcast across the app.
Expand All @@ -33,7 +34,7 @@ pub(crate) enum Event {
NoMoreConfiguration,

/// The schema was updated.
UpdateSchema(String),
UpdateSchema(SchemaState),

/// There are no more updates to the schema
NoMoreSchema,
Expand Down
59 changes: 45 additions & 14 deletions apollo-router/src/router/event/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use url::Url;
use crate::router::Event;
use crate::router::Event::NoMoreSchema;
use crate::router::Event::UpdateSchema;
use crate::uplink::schema::SchemaState;
use crate::uplink::schema_stream::SupergraphSdlQuery;
use crate::uplink::stream_from_uplink;
use crate::uplink::UplinkConfig;
Expand Down Expand Up @@ -74,9 +75,20 @@ impl SchemaSource {
pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
match self {
SchemaSource::Static { schema_sdl: schema } => {
stream::once(future::ready(UpdateSchema(schema))).boxed()
let update_schema = UpdateSchema(SchemaState {
sdl: schema,
launch_id: None,
});
stream::once(future::ready(update_schema)).boxed()
}
SchemaSource::Stream(stream) => stream.map(UpdateSchema).boxed(),
SchemaSource::Stream(stream) => stream
.map(|sdl| {
UpdateSchema(SchemaState {
sdl,
launch_id: None,
})
})
.boxed(),
#[allow(deprecated)]
SchemaSource::File {
path,
Expand All @@ -100,7 +112,13 @@ impl SchemaSource {
let path = path.clone();
async move {
match tokio::fs::read_to_string(&path).await {
Ok(schema) => Some(UpdateSchema(schema)),
Ok(schema) => {
let update_schema = UpdateSchema(SchemaState {
sdl: schema,
launch_id: None,
});
Some(update_schema)
}
Err(err) => {
tracing::error!(reason = %err, "failed to read supergraph schema");
None
Expand All @@ -110,7 +128,11 @@ impl SchemaSource {
})
.boxed()
} else {
stream::once(future::ready(UpdateSchema(schema))).boxed()
let update_schema = UpdateSchema(SchemaState {
sdl: schema,
launch_id: None,
});
stream::once(future::ready(update_schema)).boxed()
}
}
Err(err) => {
Expand All @@ -121,10 +143,13 @@ impl SchemaSource {
}
}
SchemaSource::Registry(uplink_config) => {
stream_from_uplink::<SupergraphSdlQuery, String>(uplink_config)
stream_from_uplink::<SupergraphSdlQuery, SchemaState>(uplink_config)
.filter_map(|res| {
future::ready(match res {
Ok(schema) => Some(UpdateSchema(schema)),
Ok(schema) => {
let update_schema = UpdateSchema(schema);
Some(update_schema)
}
Err(e) => {
tracing::error!("{}", e);
None
Expand Down Expand Up @@ -222,7 +247,13 @@ impl Fetcher {
.await
{
Ok(res) if res.status().is_success() => match res.text().await {
Ok(schema) => return Some(UpdateSchema(schema)),
Ok(schema) => {
let update_schema = UpdateSchema(SchemaState {
sdl: schema,
launch_id: None,
});
return Some(update_schema);
}
Err(err) => {
tracing::warn!(
url.full = %url,
Expand Down Expand Up @@ -346,10 +377,10 @@ mod tests {
.into_stream();

assert!(
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_1)
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
);
assert!(
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_1)
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
);
}
.with_subscriber(assert_snapshot_subscriber!())
Expand Down Expand Up @@ -382,10 +413,10 @@ mod tests {
.into_stream();

assert!(
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_2)
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2)
);
assert!(
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_2)
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2)
);
}
.with_subscriber(assert_snapshot_subscriber!({
Expand Down Expand Up @@ -448,7 +479,7 @@ mod tests {
.await;

assert!(
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_1)
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
);

drop(success);
Expand All @@ -468,7 +499,7 @@ mod tests {
.await;

assert!(
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_1)
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
);
}
.with_subscriber(assert_snapshot_subscriber!({
Expand Down Expand Up @@ -497,7 +528,7 @@ mod tests {
.into_stream();

assert!(
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema == SCHEMA_1)
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
);
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
Expand Down
Loading