Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
bef5367
First draft of a backpressure enabled pipeline
Dec 18, 2024
6d80932
Obey the linter...
Dec 18, 2024
7491e1e
Merge branch 'next' into garypen/next-backpressure
Dec 18, 2024
718a870
Merge branch 'next' into garypen/next-backpressure
Jan 6, 2025
4c93a96
Merge branch 'next' into garypen/next-backpressure
Jan 7, 2025
716f7e5
Fix the rhai integration test
Jan 7, 2025
8b7653b
fix lint complaint
Jan 7, 2025
746d2c0
Add the new rhai testng config file
Jan 7, 2025
330f969
temporarily comment out one test
Jan 8, 2025
1b724d5
still experimenting to see how far away this approach is
Jan 8, 2025
f932806
Move limits to traffic shaping
Jan 9, 2025
3b8ed61
Rename http_server to router
Jan 9, 2025
81faed0
Fix formatting errors reported by lint
Jan 9, 2025
fc2e822
Fix the rate limiting test so that it works with new rate limiting
Jan 9, 2025
3155440
Rename some stuff to minimise change from 1.x
Jan 10, 2025
9bd0386
Try to restore the existing behaviour for reporting errors
Jan 10, 2025
5627dce
Clean up some of the failing tests
Jan 10, 2025
7432306
Fix lint complaints
Jan 10, 2025
d05619f
Remove 1/2 implemented little loadshedder
Jan 13, 2025
69e36f1
Merge branch 'next' into garypen/next-backpressure
Jan 13, 2025
b85ca4f
POC: Make supergraph creator clone a BoxCloneService
Jan 13, 2025
d5718f2
Fix AsyncCheckpoint and update tests for correct behaviour
Jan 14, 2025
5c72744
Fix the xtask lint complaints
Jan 14, 2025
a8a8950
POC: Make supergraph creator clone a BoxCloneService (#6540)
Jan 14, 2025
21dee20
Modify subgraph rate-limiting test to pass for now
Jan 14, 2025
00c1689
Merge branch 'next' into garypen/next-backpressure
Jan 14, 2025
6c239aa
Small tidying up to use `buffered`
Jan 15, 2025
f1cd40a
Document tower layers in router and supergraph services (#6549)
goto-bus-stop Jan 16, 2025
6788915
List the plugin tower layers
goto-bus-stop Jan 16, 2025
949ea23
Enforce backpressure between the router and qp services
Jan 16, 2025
9511057
traffic_shaping: remove subgraph_service_internal method
IvanGoncharov Jan 16, 2025
e573024
Merge branch 'garypen/next-backpressure' into i1g/next-traffic_shaping
Jan 17, 2025
f2303d6
traffic_shaping: remove subgraph_service_internal method (#6565)
Jan 17, 2025
0c3b6b0
Fix the batching test snapshots
Jan 17, 2025
98391f7
Bring subgraph rate-limiting to life
Jan 17, 2025
fadae7b
Fix remaining broken tests
Jan 18, 2025
83fd57c
xtask lint
Jan 18, 2025
6caaa56
Fix mock expectations in affected examples tests
Jan 20, 2025
ecf3aa9
Remove OneShotAsyncCheckpoint and associated functionality
Jan 20, 2025
2f5014f
Revert change to axum_http_server_factory.rs
Jan 20, 2025
77b2a59
Revert earlier changes to body limit layers and re-enable test
Jan 20, 2025
3ffe632
A compromise solution for Body Limits
Jan 20, 2025
11d19b2
Merge branch 'next' into garypen/next-backpressure
Jan 20, 2025
ee3bd5a
Merge branch 'dev' into garypen/next-backpressure
Jan 21, 2025
26364f6
Prepare PR for review.
Jan 21, 2025
034686f
Merge branch 'dev' into garypen/next-backpressure
Jan 21, 2025
57e8531
Add a helpful commit to explain the Mutex on make()
Jan 21, 2025
d2156ed
add a changeset
Jan 21, 2025
aac1492
Merge branch 'dev' into garypen/next-backpressure
Jan 21, 2025
2107729
Fix some readying issues in tests and deduplication
Jan 21, 2025
0bddcde
Found another inner service not following tower advice
Jan 23, 2025
2b3bed2
Make subgraph_name mandatory on Request and Response
Jan 23, 2025
f6525a8
Merge branch 'dev' into garypen/next-backpressure
Jan 23, 2025
f08f21d
xtask lint
Jan 23, 2025
339c502
Remove the comment because the name is no longer Option
Jan 23, 2025
f3944d1
Code review comments.
Jan 24, 2025
3b88d45
Spotted this dbg! in code review and should remove it
Jan 24, 2025
017b0f1
Fix mistakes made during code review changes.
Jan 24, 2025
9b75423
Replace our use of Mutex with a Buffer
Jan 25, 2025
ddad68d
Add to the migration guide and the router documentation.
Jan 27, 2025
c629fca
Merge branch 'dev' into garypen/next-backpressure
Jan 27, 2025
b2c236f
Remember the name of the concurrency limit
Jan 27, 2025
f5d3bdb
Fix the body limit layer.
Jan 27, 2025
95e22d8
Add a link to the appropriate PR for the migration changes
Jan 27, 2025
f748a93
wrap up `mock_subgraph_service_withf_panics_should_be_reported_as_ser…
goto-bus-stop Jan 27, 2025
49fd09b
Fixup limits plugin tests
Jan 27, 2025
8ea3123
Fixup telemetry tests
Jan 27, 2025
bde4fb4
Merge branch 'dev' into garypen/next-backpressure
Jan 27, 2025
2a5a012
Revert this change from 8ea31233935e9b5d717d61e07dddf0b191cec2ef
Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changesets/feat_garypen_next_backpressure.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### Enabling back-pressure in the request processing pipeline ([PR #6486](https://github.com/apollographql/router/pull/6486))

In Router 1.x, back-pressure was not maintained. Requests would be accepted by the router. This could cause issue for routers which were accepting high levels of traffic.

We are now improving the handling of back-pressure so that traffic shaping measures are more effective and integration with telemetry is improved. In particular, this means that telemetry events will not be lost due to traffic shaping and that traffic shaping now works more precisely. This will make the behaviour of the router more predictable.

For more details about how these improvements effect the router please refer to the [migrating from 1.x guide](reference/migration/from-router-v1.mdx).

By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/6486
122 changes: 0 additions & 122 deletions apollo-router/src/axum_factory/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
Expand Down Expand Up @@ -67,28 +66,18 @@ use crate::graphql;
use crate::http_server_factory::HttpServerFactory;
use crate::http_server_factory::HttpServerHandle;
use crate::json_ext::Path;
use crate::plugin::test::MockSubgraph;
use crate::query_planner::QueryPlannerService;
use crate::router_factory::create_plugins;
use crate::router_factory::Endpoint;
use crate::router_factory::RouterFactory;
use crate::services::execution;
use crate::services::layers::persisted_queries::PersistedQueryLayer;
use crate::services::layers::query_analysis::QueryAnalysisLayer;
use crate::services::layers::static_page::home_page_content;
use crate::services::layers::static_page::sandbox_page_content;
use crate::services::new_service::ServiceFactory;
use crate::services::router;
use crate::services::router::service::RouterCreator;
use crate::services::supergraph;
use crate::services::HasSchema;
use crate::services::PluggableSupergraphServiceBuilder;
use crate::services::RouterRequest;
use crate::services::RouterResponse;
use crate::services::SupergraphResponse;
use crate::services::MULTIPART_DEFER_ACCEPT;
use crate::services::MULTIPART_DEFER_CONTENT_TYPE;
use crate::spec::Schema;
use crate::test_harness::http_client;
use crate::test_harness::http_client::MaybeMultipart;
use crate::uplink::license_enforcement::LicenseState;
Expand Down Expand Up @@ -2406,114 +2395,3 @@ async fn test_supergraph_and_health_check_same_port_different_listener() {
error.to_string()
);
}

#[tokio::test]
async fn test_supergraph_timeout() {
let config = serde_json::json!({
"supergraph": {
"listen": "127.0.0.1:0",
"defer_support": false,
},
"traffic_shaping": {
"router": {
"timeout": "1ns"
}
},
});

let conf: Arc<Configuration> = Arc::new(serde_json::from_value(config).unwrap());

let schema = include_str!("..//testdata/minimal_supergraph.graphql");
let schema = Arc::new(Schema::parse(schema, &conf).unwrap());
let planner = QueryPlannerService::new(schema.clone(), conf.clone())
.await
.unwrap();

// we do the entire supergraph rebuilding instead of using `from_supergraph_mock_callback_and_configuration`
// because we need the plugins to apply on the supergraph
let subgraph_schemas = Arc::new(
planner
.subgraph_schemas()
.iter()
.map(|(k, v)| (k.clone(), v.schema.clone()))
.collect(),
);
let mut plugins = create_plugins(&conf, &schema, subgraph_schemas, None, None)
.await
.unwrap();

plugins.insert("delay".into(), Box::new(Delay));

struct Delay;

#[async_trait::async_trait]
impl crate::plugin::Plugin for Delay {
type Config = ();

async fn new(_: crate::plugin::PluginInit<()>) -> Result<Self, BoxError> {
Ok(Self)
}

fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
service
.map_future(|fut| async {
tokio::time::sleep(Duration::from_millis(10)).await;
fut.await
})
.boxed()
}
}

let builder = PluggableSupergraphServiceBuilder::new(planner)
.with_configuration(conf.clone())
.with_subgraph_service("accounts", MockSubgraph::new(HashMap::new()));

let supergraph_creator = builder
.with_plugins(Arc::new(plugins))
.build()
.await
.unwrap();

let service = RouterCreator::new(
QueryAnalysisLayer::new(supergraph_creator.schema(), Arc::clone(&conf)).await,
Arc::new(PersistedQueryLayer::new(&conf).await.unwrap()),
Arc::new(supergraph_creator),
conf.clone(),
)
.await
.unwrap()
.make();

// keep the server handle around otherwise it will immediately shutdown
let (server, client) = init_with_config(service, conf.clone(), MultiMap::new())
.await
.unwrap();
let url = server
.graphql_listen_address()
.as_ref()
.unwrap()
.to_string();

let response = client
.post(url)
.body(r#"{ "query": "{ me }" }"#)
.send()
.await
.unwrap();

assert_eq!(response.status(), StatusCode::GATEWAY_TIMEOUT);

let body = response.bytes().await.unwrap();
let body: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(
body,
json!({
"errors": [{
"message": "Request timed out",
"extensions": {
"code": "REQUEST_TIMEOUT"
}
}]
})
);
}
6 changes: 3 additions & 3 deletions apollo-router/src/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl Batch {
request, sender, ..
} in cancelled_requests
{
let subgraph_name = request.subgraph_name.ok_or(SubgraphBatchingError::MissingSubgraphName)?;
let subgraph_name = request.subgraph_name;
if let Err(log_error) = sender.send(Err(Box::new(FetchError::SubrequestBatchingError {
service: subgraph_name.clone(),
reason: format!("request cancelled: {reason}"),
Expand Down Expand Up @@ -365,7 +365,7 @@ impl Batch {
sender: tx,
} in all_in_one
{
let subgraph_name = sg_request.subgraph_name.clone().ok_or(SubgraphBatchingError::MissingSubgraphName)?;
let subgraph_name = sg_request.subgraph_name.clone();
let value = svc_map
.entry(
subgraph_name,
Expand Down Expand Up @@ -583,7 +583,7 @@ mod tests {
.body(graphql::Response::builder().data(data.clone()).build())
.unwrap(),
context: Context::new(),
subgraph_name: None,
subgraph_name: String::default(),
id: SubgraphRequestId(String::new()),
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5380,6 +5380,13 @@ expression: "&schema"
"RouterShaping": {
"additionalProperties": false,
"properties": {
"concurrency_limit": {
"description": "The global concurrency limit",
"format": "uint",
"minimum": 0.0,
"nullable": true,
"type": "integer"
},
"global_rate_limit": {
"$ref": "#/definitions/RateLimitConf",
"description": "#/definitions/RateLimitConf",
Expand Down
2 changes: 0 additions & 2 deletions apollo-router/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,6 @@ impl std::fmt::Display for ValidationErrors {
pub(crate) enum SubgraphBatchingError {
/// Sender unavailable
SenderUnavailable,
/// Request does not have a subgraph name
MissingSubgraphName,
/// Requests is empty
RequestsIsEmpty,
/// Batch processing failed: {0}
Expand Down
Loading