diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 32e33c0ae1..9678e4c5f6 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -954,6 +954,18 @@ func (r *Resolver) AsyncUnsubscribeSubscription(id SubscriptionIdentifier) error id: id, kind: subscriptionEventKindRemoveSubscription, }: + default: + // In the event we cannot insert immediately, defer insertion a goroutine, this should prevent deadlocks, at the cost of goroutine creation. + go func() { + select { + case <-r.ctx.Done(): + return + case r.events <- subscriptionEvent{ + id: id, + kind: subscriptionEventKindRemoveSubscription, + }: + } + }() } return nil } @@ -968,6 +980,20 @@ func (r *Resolver) AsyncUnsubscribeClient(connectionID int64) error { }, kind: subscriptionEventKindRemoveClient, }: + default: + // In the event we cannot insert immediately, defer insertion a goroutine, this should prevent deadlocks, at the cost of goroutine creation. + go func() { + select { + case <-r.ctx.Done(): + return + case r.events <- subscriptionEvent{ + id: SubscriptionIdentifier{ + ConnectionID: connectionID, + }, + kind: subscriptionEventKindRemoveClient, + }: + } + }() } return nil } @@ -1126,7 +1152,14 @@ func (r *Resolver) AsyncResolveGraphQLSubscription(ctx *Context, subscription *G return writeFlushComplete(writer, msg) } - event := subscriptionEvent{ + select { + case <-r.ctx.Done(): + // Stop resolving if the resolver is shutting down + return r.ctx.Err() + case <-ctx.ctx.Done(): + // Stop resolving if the client is gone + return ctx.ctx.Err() + case r.events <- subscriptionEvent{ triggerID: xxh.Sum64(), kind: subscriptionEventKindAddSubscription, addSubscription: &addSubscription{ @@ -1137,13 +1170,7 @@ func (r *Resolver) AsyncResolveGraphQLSubscription(ctx *Context, subscription *G id: id, completed: make(chan struct{}), }, - } - - select { - case <-r.ctx.Done(): - // Stop resolving if the resolver is shutting down - return r.ctx.Err() - case r.events <- event: + }: } return nil }