Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support !Sync (and maybe !Send) Context #840

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions examples/warp_subscriptions/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{env, pin::Pin, sync::Arc, time::Duration};

use futures::{FutureExt as _, Stream};
use futures::{FutureExt as _, Stream, TryFutureExt};
use juniper::{
graphql_object, graphql_subscription, DefaultScalarValue, EmptyMutation, FieldError,
GraphQLEnum, RootNode,
Expand Down Expand Up @@ -162,14 +162,17 @@ async fn main() {
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let root_node = root_node.clone();
ws.on_upgrade(move |websocket| async move {
serve_graphql_ws(websocket, root_node, ConnectionConfig::new(Context {}))
.map(|r| {
if let Err(e) = r {
println!("Websocket error: {}", e);
}
})
.await
ws.on_upgrade(move |websocket| {
tokio::task::spawn_local(async move {
serve_graphql_ws(websocket, root_node, ConnectionConfig::new(Context {}))
.map(|r| {
if let Err(e) = r {
println!("Websocket error: {}", e);
}
})
.await
})
.unwrap_or_else(|_| panic!("Failed to join handler"))
})
}))
.map(|reply| {
Expand Down
11 changes: 3 additions & 8 deletions juniper/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ pub type ExecutionResult<S = DefaultScalarValue> = Result<Value<S>, FieldError<S

/// Boxed `Stream` yielding `Result<Value<S>, ExecutionError<S>>`
pub type ValuesStream<'a, S = DefaultScalarValue> =
std::pin::Pin<Box<dyn Stream<Item = Result<Value<S>, ExecutionError<S>>> + Send + 'a>>;
std::pin::Pin<Box<dyn Stream<Item = Result<Value<S>, ExecutionError<S>>> + 'a>>;

/// The map of variables used for substitution during query execution
pub type Variables<S = DefaultScalarValue> = HashMap<String, InputValue<S>>;
Expand Down Expand Up @@ -412,7 +412,6 @@ where
'a: 'res,
T: GraphQLSubscriptionValue<S, Context = CtxT> + ?Sized,
T::TypeInfo: Sync,
CtxT: Sync,
S: Send + Sync,
{
self.subscribe(info, value).await.unwrap_or_else(|e| {
Expand All @@ -433,7 +432,6 @@ where
'a: 'res,
T: GraphQLSubscriptionValue<S, Context = CtxT> + ?Sized,
T::TypeInfo: Sync,
CtxT: Sync,
S: Send + Sync,
{
value.resolve_into_stream(info, self).await
Expand Down Expand Up @@ -462,7 +460,6 @@ where
where
T: GraphQLValueAsync<S, Context = CtxT> + ?Sized,
T::TypeInfo: Sync,
CtxT: Sync,
S: Send + Sync,
{
value
Expand All @@ -479,7 +476,7 @@ where
where
T: GraphQLValueAsync<S, Context = NewCtxT> + ?Sized,
T::TypeInfo: Sync,
NewCtxT: FromContext<CtxT> + Sync,
NewCtxT: FromContext<CtxT>,
S: Send + Sync,
{
let e = self.replaced_context(<NewCtxT as FromContext<CtxT>>::from(self.context));
Expand All @@ -506,7 +503,6 @@ where
where
T: GraphQLValueAsync<S, Context = CtxT> + ?Sized,
T::TypeInfo: Sync,
CtxT: Sync,
S: Send + Sync,
{
self.resolve_async(info, value).await.unwrap_or_else(|e| {
Expand Down Expand Up @@ -887,7 +883,6 @@ pub async fn execute_validated_query_async<'a, 'b, QueryT, MutationT, Subscripti
where
QueryT: GraphQLTypeAsync<S>,
QueryT::TypeInfo: Sync,
QueryT::Context: Sync,
MutationT: GraphQLTypeAsync<S, Context = QueryT::Context>,
MutationT::TypeInfo: Sync,
SubscriptionT: GraphQLType<S, Context = QueryT::Context> + Sync,
Expand Down Expand Up @@ -1033,7 +1028,7 @@ where
'op: 'd,
QueryT: GraphQLTypeAsync<S>,
QueryT::TypeInfo: Sync,
QueryT::Context: Sync + 'r,
QueryT::Context: 'r,
MutationT: GraphQLTypeAsync<S, Context = QueryT::Context>,
MutationT::TypeInfo: Sync,
SubscriptionT: GraphQLSubscriptionType<S, Context = QueryT::Context>,
Expand Down
3 changes: 0 additions & 3 deletions juniper/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ where
where
QueryT: GraphQLTypeAsync<S>,
QueryT::TypeInfo: Sync,
QueryT::Context: Sync,
MutationT: GraphQLTypeAsync<S, Context = QueryT::Context>,
MutationT::TypeInfo: Sync,
SubscriptionT: GraphQLType<S, Context = QueryT::Context> + Sync,
Expand Down Expand Up @@ -136,7 +135,6 @@ where
'ctx: 'a,
QueryT: GraphQLTypeAsync<S>,
QueryT::TypeInfo: Sync,
QueryT::Context: Sync,
MutationT: GraphQLTypeAsync<S, Context = QueryT::Context>,
MutationT::TypeInfo: Sync,
SubscriptionT: GraphQLSubscriptionType<S, Context = QueryT::Context>,
Expand Down Expand Up @@ -291,7 +289,6 @@ where
where
QueryT: GraphQLTypeAsync<S>,
QueryT::TypeInfo: Sync,
QueryT::Context: Sync,
MutationT: GraphQLTypeAsync<S, Context = QueryT::Context>,
MutationT::TypeInfo: Sync,
SubscriptionT: GraphQLSubscriptionType<S, Context = QueryT::Context>,
Expand Down
2 changes: 0 additions & 2 deletions juniper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ pub async fn execute<'a, S, QueryT, MutationT, SubscriptionT>(
where
QueryT: GraphQLTypeAsync<S>,
QueryT::TypeInfo: Sync,
QueryT::Context: Sync,
MutationT: GraphQLTypeAsync<S, Context = QueryT::Context>,
MutationT::TypeInfo: Sync,
SubscriptionT: GraphQLType<S, Context = QueryT::Context> + Sync,
Expand Down Expand Up @@ -308,7 +307,6 @@ pub async fn resolve_into_stream<'a, S, QueryT, MutationT, SubscriptionT>(
where
QueryT: GraphQLTypeAsync<S>,
QueryT::TypeInfo: Sync,
QueryT::Context: Sync,
MutationT: GraphQLTypeAsync<S, Context = QueryT::Context>,
MutationT::TypeInfo: Sync,
SubscriptionT: GraphQLSubscriptionType<S, Context = QueryT::Context>,
Expand Down
4 changes: 2 additions & 2 deletions juniper/src/macros/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{DefaultScalarValue, GraphQLType, GraphQLTypeAsync, RootNode, Value,
pub async fn run_query<Query, Mutation, Subscription>(query: &str) -> Value
where
Query: GraphQLTypeAsync<DefaultScalarValue, TypeInfo = ()> + Default,
Query::Context: Default + Sync,
Query::Context: Default,
Mutation:
GraphQLTypeAsync<DefaultScalarValue, TypeInfo = (), Context = Query::Context> + Default,
Subscription:
Expand Down Expand Up @@ -31,7 +31,7 @@ where
pub async fn run_info_query<Query, Mutation, Subscription>(type_name: &str) -> Value
where
Query: GraphQLTypeAsync<DefaultScalarValue, TypeInfo = ()> + Default,
Query::Context: Default + Sync,
Query::Context: Default,
Mutation:
GraphQLTypeAsync<DefaultScalarValue, TypeInfo = (), Context = Query::Context> + Default,
Subscription:
Expand Down
4 changes: 2 additions & 2 deletions juniper/src/schema/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl<'a, S, QueryT, MutationT, SubscriptionT> GraphQLValueAsync<S>
where
QueryT: GraphQLTypeAsync<S>,
QueryT::TypeInfo: Sync,
QueryT::Context: Sync + 'a,
QueryT::Context: 'a,
MutationT: GraphQLTypeAsync<S, Context = QueryT::Context>,
MutationT::TypeInfo: Sync,
SubscriptionT: GraphQLType<S, Context = QueryT::Context> + Sync,
Expand All @@ -115,7 +115,7 @@ where
field_name: &'b str,
arguments: &'b Arguments<S>,
executor: &'b Executor<Self::Context, S>,
) -> crate::BoxFuture<'b, ExecutionResult<S>> {
) -> crate::LocalBoxFuture<'b, ExecutionResult<S>> {
use futures::future::ready;
match field_name {
"__schema" | "__type" => {
Expand Down
15 changes: 5 additions & 10 deletions juniper/src/types/async_await.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
value::{DefaultScalarValue, Object, ScalarValue, Value},
};

use crate::BoxFuture;
use crate::LocalBoxFuture;

use super::base::{is_excluded, merge_key_into, Arguments, GraphQLType, GraphQLValue};

Expand All @@ -16,7 +16,6 @@ use super::base::{is_excluded, merge_key_into, Arguments, GraphQLType, GraphQLVa
pub trait GraphQLValueAsync<S = DefaultScalarValue>: GraphQLValue<S> + Sync
where
Self::TypeInfo: Sync,
Self::Context: Sync,
S: ScalarValue + Send + Sync,
{
/// Resolves the value of a single field on this [`GraphQLValueAsync`].
Expand All @@ -37,7 +36,7 @@ where
_field_name: &'a str,
_arguments: &'a Arguments<S>,
_executor: &'a Executor<Self::Context, S>,
) -> BoxFuture<'a, ExecutionResult<S>> {
) -> LocalBoxFuture<'a, ExecutionResult<S>> {
panic!(
"GraphQLValueAsync::resolve_field_async() must be implemented by objects and \
interfaces",
Expand All @@ -63,7 +62,7 @@ where
type_name: &str,
selection_set: Option<&'a [Selection<'a, S>]>,
executor: &'a Executor<'a, 'a, Self::Context, S>,
) -> BoxFuture<'a, ExecutionResult<S>> {
) -> LocalBoxFuture<'a, ExecutionResult<S>> {
if self.type_name(info).unwrap() == type_name {
self.resolve_async(info, selection_set, executor)
} else {
Expand Down Expand Up @@ -98,7 +97,7 @@ where
info: &'a Self::TypeInfo,
selection_set: Option<&'a [Selection<S>]>,
executor: &'a Executor<Self::Context, S>,
) -> BoxFuture<'a, ExecutionResult<S>> {
) -> LocalBoxFuture<'a, ExecutionResult<S>> {
if let Some(sel) = selection_set {
Box::pin(async move {
Ok(resolve_selection_set_into_async(self, info, sel, executor).await)
Expand All @@ -125,7 +124,6 @@ pub type DynGraphQLValueAsync<S, C, TI> =
/// doesn't require manual or code-generated implementation.
pub trait GraphQLTypeAsync<S = DefaultScalarValue>: GraphQLValueAsync<S> + GraphQLType<S>
where
Self::Context: Sync,
Self::TypeInfo: Sync,
S: ScalarValue + Send + Sync,
{
Expand All @@ -134,7 +132,6 @@ where
impl<S, T> GraphQLTypeAsync<S> for T
where
T: GraphQLValueAsync<S> + GraphQLType<S> + ?Sized,
T::Context: Sync,
T::TypeInfo: Sync,
S: ScalarValue + Send + Sync,
{
Expand All @@ -147,11 +144,10 @@ fn resolve_selection_set_into_async<'a, 'e, T, S>(
info: &'a T::TypeInfo,
selection_set: &'e [Selection<'e, S>],
executor: &'e Executor<'e, 'e, T::Context, S>,
) -> BoxFuture<'a, Value<S>>
) -> LocalBoxFuture<'a, Value<S>>
where
T: GraphQLValueAsync<S> + ?Sized,
T::TypeInfo: Sync,
T::Context: Sync,
S: ScalarValue + Send + Sync,
'e: 'a,
{
Expand Down Expand Up @@ -182,7 +178,6 @@ pub(crate) async fn resolve_selection_set_into_async_recursive<'a, T, S>(
where
T: GraphQLValueAsync<S> + ?Sized,
T::TypeInfo: Sync,
T::Context: Sync,
S: ScalarValue + Send + Sync,
{
use futures::stream::{FuturesOrdered, StreamExt as _};
Expand Down
10 changes: 3 additions & 7 deletions juniper/src/types/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ impl<S, T> GraphQLValueAsync<S> for Option<T>
where
T: GraphQLValueAsync<S>,
T::TypeInfo: Sync,
T::Context: Sync,
S: ScalarValue + Send + Sync,
{
fn resolve_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
_: Option<&'a [Selection<S>]>,
executor: &'a Executor<Self::Context, S>,
) -> crate::BoxFuture<'a, ExecutionResult<S>> {
) -> crate::LocalBoxFuture<'a, ExecutionResult<S>> {
let f = async move {
let value = match self {
Some(obj) => executor.resolve_into_value_async(info, obj).await,
Expand Down Expand Up @@ -144,15 +143,14 @@ impl<S, T> GraphQLValueAsync<S> for Vec<T>
where
T: GraphQLValueAsync<S>,
T::TypeInfo: Sync,
T::Context: Sync,
S: ScalarValue + Send + Sync,
{
fn resolve_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
_: Option<&'a [Selection<S>]>,
executor: &'a Executor<Self::Context, S>,
) -> crate::BoxFuture<'a, ExecutionResult<S>> {
) -> crate::LocalBoxFuture<'a, ExecutionResult<S>> {
let f = resolve_into_list_async(executor, info, self.iter());
Box::pin(f)
}
Expand Down Expand Up @@ -233,15 +231,14 @@ impl<S, T> GraphQLValueAsync<S> for [T]
where
T: GraphQLValueAsync<S>,
T::TypeInfo: Sync,
T::Context: Sync,
S: ScalarValue + Send + Sync,
{
fn resolve_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
_: Option<&'a [Selection<S>]>,
executor: &'a Executor<Self::Context, S>,
) -> crate::BoxFuture<'a, ExecutionResult<S>> {
) -> crate::LocalBoxFuture<'a, ExecutionResult<S>> {
let f = resolve_into_list_async(executor, info, self.iter());
Box::pin(f)
}
Expand Down Expand Up @@ -295,7 +292,6 @@ where
I: Iterator<Item = &'t T> + ExactSizeIterator,
T: GraphQLValueAsync<S> + ?Sized + 't,
T::TypeInfo: Sync,
T::Context: Sync,
S: ScalarValue + Send + Sync,
{
use futures::stream::{FuturesOrdered, StreamExt as _};
Expand Down
3 changes: 1 addition & 2 deletions juniper/src/types/nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,14 @@ impl<S, T> GraphQLValueAsync<S> for Nullable<T>
where
T: GraphQLValueAsync<S>,
T::TypeInfo: Sync,
T::Context: Sync,
S: ScalarValue + Send + Sync,
{
fn resolve_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
_: Option<&'a [Selection<S>]>,
executor: &'a Executor<Self::Context, S>,
) -> crate::BoxFuture<'a, ExecutionResult<S>> {
) -> crate::LocalBoxFuture<'a, ExecutionResult<S>> {
let f = async move {
let value = match self {
Self::Some(obj) => executor.resolve_into_value_async(info, obj).await,
Expand Down
Loading