Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 35 additions & 34 deletions apollo-router/src/layers/async_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ where
fn layer(&self, service: S) -> Self::Service {
AsyncCheckpointService {
checkpoint_fn: Arc::clone(&self.checkpoint_fn),
inner: service,
service,
}
}
}
Expand Down Expand Up @@ -153,7 +153,7 @@ where
<S as Service<Request>>::Future: Send + 'static,
Fut: Future<Output = Result<ControlFlow<<S as Service<Request>>::Response, Request>, BoxError>>,
{
inner: S,
service: S,
checkpoint_fn: Arc<Pin<Box<dyn Fn(Request) -> Fut + Send + Sync + 'static>>>,
}

Expand All @@ -172,7 +172,7 @@ where
{
Self {
checkpoint_fn: Arc::new(Box::pin(checkpoint_fn)),
inner: service,
service,
}
}
}
Expand All @@ -197,16 +197,18 @@ where
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
self.service.poll_ready(cx)
}

fn call(&mut self, req: Request) -> Self::Future {
let checkpoint_fn = Arc::clone(&self.checkpoint_fn);
let inner = self.inner.clone();
let service = self.service.clone();
let mut inner = std::mem::replace(&mut self.service, service);

Box::pin(async move {
match (checkpoint_fn)(req).await {
Ok(ControlFlow::Break(response)) => Ok(response),
Ok(ControlFlow::Continue(request)) => inner.oneshot(request).await,
Ok(ControlFlow::Continue(request)) => inner.call(request).await,
Err(error) => Err(error),
}
})
Expand Down Expand Up @@ -277,21 +279,21 @@ mod async_checkpoint_tests {
let expected_label = "from_mock_service";

let mut execution_service = MockExecutionService::new();
execution_service.expect_clone().return_once(move || {
let mut execution_service = MockExecutionService::new();
execution_service
.expect_call()
.times(1)
.returning(move |req: ExecutionRequest| {
Ok(ExecutionResponse::fake_builder()
.label(expected_label.to_string())
.context(req.context)
.build()
.unwrap())
});

execution_service
});

execution_service
.expect_clone()
.return_once(MockExecutionService::new);

execution_service
.expect_call()
.times(1)
.returning(move |req| {
Ok(ExecutionResponse::fake_builder()
.label(expected_label.to_string())
.context(req.context)
.build()
.unwrap())
});

let service_stack = ServiceBuilder::new()
.checkpoint_async(|req: ExecutionRequest| async { Ok(ControlFlow::Continue(req)) })
Expand All @@ -317,20 +319,19 @@ mod async_checkpoint_tests {
let expected_label = "from_mock_service";
let mut router_service = MockExecutionService::new();

router_service.expect_clone().return_once(move || {
let mut router_service = MockExecutionService::new();
router_service
.expect_call()
.times(1)
.returning(move |_req| {
Ok(ExecutionResponse::fake_builder()
.label(expected_label.to_string())
.build()
.unwrap())
});
router_service
});
router_service
.expect_clone()
.return_once(MockExecutionService::new);

router_service
.expect_call()
.times(1)
.returning(move |_req| {
Ok(ExecutionResponse::fake_builder()
.label(expected_label.to_string())
.build()
.unwrap())
});
let service_stack =
AsyncCheckpointLayer::new(|req| async { Ok(ControlFlow::Continue(req)) })
.layer(router_service);
Expand Down
5 changes: 3 additions & 2 deletions apollo-router/src/plugins/coprocessor/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tower_service::Service;

use super::*;
use crate::graphql;
use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
use crate::layers::async_checkpoint::AsyncCheckpointLayer;
use crate::layers::ServiceBuilderExt;
use crate::plugins::coprocessor::EXTERNAL_SPAN_NAME;
use crate::services::execution;
Expand Down Expand Up @@ -85,7 +85,7 @@ impl ExecutionStage {
let http_client = http_client.clone();
let sdl = sdl.clone();

OneShotAsyncCheckpointLayer::new(move |request: execution::Request| {
AsyncCheckpointLayer::new(move |request: execution::Request| {
let request_config = request_config.clone();
let coprocessor_url = coprocessor_url.clone();
let http_client = http_client.clone();
Expand Down Expand Up @@ -176,6 +176,7 @@ impl ExecutionStage {
.instrument(external_service_span())
.option_layer(request_layer)
.option_layer(response_layer)
.buffer(50_000)
.service(service)
.boxed()
}
Expand Down
8 changes: 5 additions & 3 deletions apollo-router/src/plugins/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tower::ServiceExt;
use crate::configuration::shared::Client;
use crate::error::Error;
use crate::graphql;
use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
use crate::layers::async_checkpoint::AsyncCheckpointLayer;
use crate::layers::ServiceBuilderExt;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
Expand Down Expand Up @@ -397,7 +397,7 @@ impl RouterStage {
let http_client = http_client.clone();
let sdl = sdl.clone();

OneShotAsyncCheckpointLayer::new(move |request: router::Request| {
AsyncCheckpointLayer::new(move |request: router::Request| {
let request_config = request_config.clone();
let coprocessor_url = coprocessor_url.clone();
let http_client = http_client.clone();
Expand Down Expand Up @@ -485,6 +485,7 @@ impl RouterStage {
.instrument(external_service_span())
.option_layer(request_layer)
.option_layer(response_layer)
.buffer(50_000)
.service(service)
.boxed()
}
Expand Down Expand Up @@ -534,7 +535,7 @@ impl SubgraphStage {
let http_client = http_client.clone();
let coprocessor_url = coprocessor_url.clone();
let service_name = service_name.clone();
OneShotAsyncCheckpointLayer::new(move |request: subgraph::Request| {
AsyncCheckpointLayer::new(move |request: subgraph::Request| {
let http_client = http_client.clone();
let coprocessor_url = coprocessor_url.clone();
let service_name = service_name.clone();
Expand Down Expand Up @@ -623,6 +624,7 @@ impl SubgraphStage {
.instrument(external_service_span())
.option_layer(request_layer)
.option_layer(response_layer)
.buffer(50_000)
.service(service)
.boxed()
}
Expand Down
5 changes: 3 additions & 2 deletions apollo-router/src/plugins/coprocessor/supergraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tower_service::Service;

use super::*;
use crate::graphql;
use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
use crate::layers::async_checkpoint::AsyncCheckpointLayer;
use crate::layers::ServiceBuilderExt;
use crate::plugins::coprocessor::EXTERNAL_SPAN_NAME;
use crate::plugins::telemetry::config_new::conditions::Condition;
Expand Down Expand Up @@ -91,7 +91,7 @@ impl SupergraphStage {
let http_client = http_client.clone();
let sdl = sdl.clone();

OneShotAsyncCheckpointLayer::new(move |request: supergraph::Request| {
AsyncCheckpointLayer::new(move |request: supergraph::Request| {
let request_config = request_config.clone();
let coprocessor_url = coprocessor_url.clone();
let http_client = http_client.clone();
Expand Down Expand Up @@ -180,6 +180,7 @@ impl SupergraphStage {
.instrument(external_service_span())
.option_layer(request_layer)
.option_layer(response_layer)
.buffer(50_000)
.service(service)
.boxed()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tower::ServiceBuilder;
use super::query_analysis::ParsedDocument;
use crate::graphql::Error;
use crate::json_ext::Object;
use crate::layers::async_checkpoint::OneShotAsyncCheckpointService;
use crate::layers::async_checkpoint::AsyncCheckpointService;
use crate::layers::ServiceBuilderExt;
use crate::services::SupergraphRequest;
use crate::services::SupergraphResponse;
Expand All @@ -34,15 +34,15 @@ where
+ 'static,
<S as Service<SupergraphRequest>>::Future: Send + 'static,
{
type Service = OneShotAsyncCheckpointService<
type Service = AsyncCheckpointService<
S,
BoxFuture<'static, Result<ControlFlow<SupergraphResponse, SupergraphRequest>, BoxError>>,
SupergraphRequest,
>;

fn layer(&self, service: S) -> Self::Service {
ServiceBuilder::new()
.oneshot_checkpoint_async(|req: SupergraphRequest| {
.checkpoint_async(|req: SupergraphRequest| {
Box::pin(async {
if req.supergraph_request.method() == Method::POST {
return Ok(ControlFlow::Continue(req));
Expand Down Expand Up @@ -143,6 +143,10 @@ mod forbid_http_get_mutations_tests {
async fn it_lets_http_post_queries_pass_through() {
let mut mock_service = MockSupergraphService::new();

mock_service
.expect_clone()
.returning(MockSupergraphService::new);

mock_service
.expect_call()
.times(1)
Expand All @@ -166,6 +170,10 @@ mod forbid_http_get_mutations_tests {
async fn it_lets_http_post_mutations_pass_through() {
let mut mock_service = MockSupergraphService::new();

mock_service
.expect_clone()
.returning(MockSupergraphService::new);

mock_service
.expect_call()
.times(1)
Expand All @@ -189,6 +197,10 @@ mod forbid_http_get_mutations_tests {
async fn it_lets_http_get_queries_pass_through() {
let mut mock_service = MockSupergraphService::new();

mock_service
.expect_clone()
.returning(MockSupergraphService::new);

mock_service
.expect_call()
.times(1)
Expand Down Expand Up @@ -238,7 +250,12 @@ mod forbid_http_get_mutations_tests {
.map(|method| create_request(method, OperationKind::Mutation));

for request in forbidden_requests {
let mock_service = MockSupergraphService::new();
let mut mock_service = MockSupergraphService::new();

mock_service
.expect_clone()
.returning(MockSupergraphService::new);

let mut service_stack = AllowOnlyHttpPostMutationsLayer::default().layer(mock_service);
let services = service_stack.ready().await.unwrap();

Expand Down
58 changes: 31 additions & 27 deletions apollo-router/src/services/supergraph/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,12 +898,40 @@ impl PluggableSupergraphServiceBuilder {
)),
));

let supergraph_service = SupergraphService::builder()
.query_planner_service(query_planner_service.clone())
.execution_service_factory(ExecutionServiceFactory {
schema: schema.clone(),
subgraph_schemas: query_planner_service.subgraph_schemas(),
plugins: self.plugins.clone(),
fetch_service_factory,
})
.schema(schema.clone())
.notify(configuration.notify.clone())
.build();

let supergraph_service =
AllowOnlyHttpPostMutationsLayer::default().layer(supergraph_service);

let sb = ServiceBuilder::new()
.buffer(50_000)
.layer(content_negotiation::SupergraphLayer::default())
.service(
self.plugins
.iter()
.rev()
.fold(supergraph_service.boxed(), |acc, (_, e)| {
e.supergraph_service(acc)
}),
)
.boxed_clone();

Ok(SupergraphCreator {
query_planner_service,
fetch_service_factory,
schema,
plugins: self.plugins,
config: configuration,
sb: Arc::new(parking_lot::Mutex::new(sb)),
})
}
}
Expand All @@ -912,10 +940,10 @@ impl PluggableSupergraphServiceBuilder {
#[derive(Clone)]
pub(crate) struct SupergraphCreator {
query_planner_service: CachingQueryPlanner<QueryPlannerService>,
fetch_service_factory: Arc<FetchServiceFactory>,
schema: Arc<Schema>,
config: Arc<Configuration>,
plugins: Arc<Plugins>,
sb: Arc<parking_lot::Mutex<supergraph::BoxCloneService>>,
}

pub(crate) trait HasPlugins {
Expand Down Expand Up @@ -964,31 +992,7 @@ impl SupergraphCreator {
Error = BoxError,
Future = BoxFuture<'static, supergraph::ServiceResult>,
> + Send {
let supergraph_service = SupergraphService::builder()
.query_planner_service(self.query_planner_service.clone())
.execution_service_factory(ExecutionServiceFactory {
schema: self.schema.clone(),
subgraph_schemas: self.query_planner_service.subgraph_schemas(),
plugins: self.plugins.clone(),
fetch_service_factory: self.fetch_service_factory.clone(),
})
.schema(self.schema.clone())
.notify(self.config.notify.clone())
.build();

let supergraph_service =
AllowOnlyHttpPostMutationsLayer::default().layer(supergraph_service);

ServiceBuilder::new()
.layer(content_negotiation::SupergraphLayer::default())
.service(
self.plugins
.iter()
.rev()
.fold(supergraph_service.boxed(), |acc, (_, e)| {
e.supergraph_service(acc)
}),
)
self.sb.lock().clone()
}

pub(crate) fn previous_cache(&self) -> InMemoryCachePlanner {
Expand Down