From 99fe6353280ecfa9016ee26eec909cfdb49935e5 Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 17 Sep 2025 12:52:42 +0100 Subject: [PATCH 1/4] fix: if blocked, defer async event insertion to prevent deadlocks --- v2/pkg/engine/resolve/resolve.go | 40 +++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 32e33c0ae1..d4fa987ff3 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -954,6 +954,18 @@ func (r *Resolver) AsyncUnsubscribeSubscription(id SubscriptionIdentifier) error id: id, kind: subscriptionEventKindRemoveSubscription, }: + default: + // In the event we cannot insert immediately, defer insertion a goroutine, this should prevent deadlocks, at the cost of goroutine creation. + go func() { + select { + case <-r.ctx.Done(): + return + case r.events <- subscriptionEvent{ + id: id, + kind: subscriptionEventKindRemoveSubscription, + }: + } + }() } return nil } @@ -968,6 +980,20 @@ func (r *Resolver) AsyncUnsubscribeClient(connectionID int64) error { }, kind: subscriptionEventKindRemoveClient, }: + default: + // In the event we cannot insert immediately, defer insertion a goroutine, this should prevent deadlocks, at the cost of goroutine creation. + go func() { + select { + case <-r.ctx.Done(): + return + case r.events <- subscriptionEvent{ + id: SubscriptionIdentifier{ + ConnectionID: connectionID, + }, + kind: subscriptionEventKindRemoveClient, + }: + } + }() } return nil } @@ -1126,7 +1152,11 @@ func (r *Resolver) AsyncResolveGraphQLSubscription(ctx *Context, subscription *G return writeFlushComplete(writer, msg) } - event := subscriptionEvent{ + select { + case <-r.ctx.Done(): + // Stop resolving if the resolver is shutting down + return r.ctx.Err() + case r.events <- subscriptionEvent{ triggerID: xxh.Sum64(), kind: subscriptionEventKindAddSubscription, addSubscription: &addSubscription{ @@ -1137,13 +1167,7 @@ func (r *Resolver) AsyncResolveGraphQLSubscription(ctx *Context, subscription *G id: id, completed: make(chan struct{}), }, - } - - select { - case <-r.ctx.Done(): - // Stop resolving if the resolver is shutting down - return r.ctx.Err() - case r.events <- event: + }: } return nil } From e4d64dcc2843111d8d5cc71fe940617ddc567f39 Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 17 Sep 2025 13:54:02 +0100 Subject: [PATCH 2/4] fix: abort AsyncResolveGraphQLSubscription if client context is cancelled --- v2/pkg/engine/resolve/resolve.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index d4fa987ff3..9678e4c5f6 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -1156,6 +1156,9 @@ func (r *Resolver) AsyncResolveGraphQLSubscription(ctx *Context, subscription *G case <-r.ctx.Done(): // Stop resolving if the resolver is shutting down return r.ctx.Err() + case <-ctx.ctx.Done(): + // Stop resolving if the client is gone + return ctx.ctx.Err() case r.events <- subscriptionEvent{ triggerID: xxh.Sum64(), kind: subscriptionEventKindAddSubscription, From 02e215ffa825692d8d0a5cdd6b32d64ba99effc9 Mon Sep 17 00:00:00 2001 From: endigma Date: Fri, 19 Sep 2025 15:30:54 +0100 Subject: [PATCH 3/4] refactor worker error handling --- v2/pkg/engine/resolve/resolve.go | 75 +++++++++++++++++++------------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 9678e4c5f6..5a4eba4fb9 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -300,7 +300,7 @@ type trigger struct { // executed in the worker goroutine. fn will be executed, and if // final is true the worker will be stopped after fn is executed. type workItem struct { - fn func() + fn func() error final bool } @@ -338,18 +338,24 @@ func (s *sub) startWorkerWithHeartbeat() { select { case <-s.ctx.ctx.Done(): // Complete when the client request context is done for synchronous subscriptions - s.close(SubscriptionCloseKindGoingAway) + _ = s.close(SubscriptionCloseKindGoingAway) return case <-s.resolver.ctx.Done(): // Abort immediately if the resolver is shutting down - s.close(SubscriptionCloseKindGoingAway) + _ = s.close(SubscriptionCloseKindGoingAway) return case <-heartbeatTicker.C: - s.resolver.handleHeartbeat(s) + if err := s.resolver.handleHeartbeat(s); err != nil { + // If heartbeat fails (e.g. client disconnected), remove the subscription. + _ = s.resolver.AsyncUnsubscribeSubscription(s.id) + } case work := <-s.workChan: - work.fn() + if err := work.fn(); err != nil { + // If work fails (e.g. client disconnected), remove the subscription. + _ = s.resolver.AsyncUnsubscribeSubscription(s.id) + } if work.final { return @@ -366,16 +372,19 @@ func (s *sub) startWorkerWithoutHeartbeat() { select { case <-s.ctx.ctx.Done(): // Complete when the client request context is done for synchronous subscriptions - s.close(SubscriptionCloseKindGoingAway) + _ = s.close(SubscriptionCloseKindGoingAway) return case <-s.resolver.ctx.Done(): // Abort immediately if the resolver is shutting down - s.close(SubscriptionCloseKindGoingAway) + _ = s.close(SubscriptionCloseKindGoingAway) return case work := <-s.workChan: - work.fn() + if err := work.fn(); err != nil { + // If work fails (e.g. client disconnected), remove the subscription. + _ = s.resolver.AsyncUnsubscribeSubscription(s.id) + } if work.final { return @@ -385,26 +394,30 @@ func (s *sub) startWorkerWithoutHeartbeat() { } // Called when subgraph indicates a "complete" subscription -func (s *sub) complete() { +func (s *sub) complete() error { // The channel is used to communicate that the subscription is done // It is used only in the synchronous subscription case and to avoid sending events // to a subscription that is already done. defer close(s.completed) s.writer.Complete() + + return nil } // Called when subgraph becomes unreachable or closes the connection without a "complete" event -func (s *sub) close(kind SubscriptionCloseKind) { +func (s *sub) close(kind SubscriptionCloseKind) error { // The channel is used to communicate that the subscription is done // It is used only in the synchronous subscription case and to avoid sending events // to a subscription that is already done. defer close(s.completed) s.writer.Close(kind) + + return nil } -func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, sharedInput []byte) { +func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, sharedInput []byte) error { if r.options.Debug { fmt.Printf("resolver:trigger:subscription:update:%d\n", sub.id.SubscriptionID) } @@ -421,42 +434,38 @@ func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, shar t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields) if err := t.resolvable.InitSubscription(resolveCtx, input, sub.resolve.Trigger.PostProcessing); err != nil { - r.asyncErrorWriter.WriteError(resolveCtx, err, sub.resolve.Response, sub.writer) if r.options.Debug { fmt.Printf("resolver:trigger:subscription:init:failed:%d\n", sub.id.SubscriptionID) } if r.reporter != nil { r.reporter.SubscriptionUpdateSent() } - return + return err } if err := t.loader.LoadGraphQLResponseData(resolveCtx, sub.resolve.Response, t.resolvable); err != nil { - r.asyncErrorWriter.WriteError(resolveCtx, err, sub.resolve.Response, sub.writer) if r.options.Debug { fmt.Printf("resolver:trigger:subscription:load:failed:%d\n", sub.id.SubscriptionID) } if r.reporter != nil { r.reporter.SubscriptionUpdateSent() } - return + return err } if err := t.resolvable.Resolve(resolveCtx.ctx, sub.resolve.Response.Data, sub.resolve.Response.Fetches, sub.writer); err != nil { - r.asyncErrorWriter.WriteError(resolveCtx, err, sub.resolve.Response, sub.writer) if r.options.Debug { fmt.Printf("resolver:trigger:subscription:resolve:failed:%d\n", sub.id.SubscriptionID) } if r.reporter != nil { r.reporter.SubscriptionUpdateSent() } - return + return err } if err := sub.writer.Flush(); err != nil { - // If flush fails (e.g. client disconnected), remove the subscription. _ = r.AsyncUnsubscribeSubscription(sub.id) - return + return err } if r.options.Debug { @@ -469,6 +478,8 @@ func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, shar if t.resolvable.WroteErrorsWithoutData() && r.options.Debug { fmt.Printf("resolver:trigger:subscription:completing:errors_without_data:%d\n", sub.id.SubscriptionID) } + + return nil } // processEvents maintains the single threaded event loop that processes all events @@ -518,17 +529,17 @@ func (r *Resolver) handleEvent(event subscriptionEvent) { } // handleHeartbeat sends a heartbeat to the client. It needs to be executed on the same goroutine as the writer. -func (r *Resolver) handleHeartbeat(sub *sub) { +func (r *Resolver) handleHeartbeat(sub *sub) error { if r.options.Debug { fmt.Printf("resolver:heartbeat\n") } - if r.ctx.Err() != nil { - return + if err := r.ctx.Err(); err != nil { + return err } - if sub.ctx.Context().Err() != nil { - return + if err := sub.ctx.Context().Err(); err != nil { + return err } if r.options.Debug { @@ -536,9 +547,7 @@ func (r *Resolver) handleHeartbeat(sub *sub) { } if err := sub.writer.Heartbeat(); err != nil { - // If heartbeat fails (e.g. client disconnected), remove the subscription. - _ = r.AsyncUnsubscribeSubscription(sub.id) - return + return err } if r.options.Debug { @@ -548,6 +557,8 @@ func (r *Resolver) handleHeartbeat(sub *sub) { if r.reporter != nil { r.reporter.SubscriptionUpdateSent() } + + return nil } func (r *Resolver) handleTriggerClose(s subscriptionEvent) { @@ -789,8 +800,12 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) { continue } - fn := func() { - r.executeSubscriptionUpdate(c, s, data) + fn := func() error { + if err := r.executeSubscriptionUpdate(c, s, data); err != nil { + r.asyncErrorWriter.WriteError(c, err, s.resolve.Response, s.writer) + return err // returning an error will trigger unsubscribe + } + return nil } select { @@ -897,7 +912,7 @@ func (r *Resolver) closeTriggerSubscriptions(id uint64, closeKind SubscriptionCl } // Send a work item to close the subscription - s.workChan <- workItem{func() { s.close(closeKind) }, true} + s.workChan <- workItem{func() error { return s.close(closeKind) }, true} // Because the event loop is single threaded, we can safely close the channel from this sender // The subscription worker will finish processing all events before the channel is closed. From 7c67341c4143a7969cc6bf8605001a18645de343 Mon Sep 17 00:00:00 2001 From: endigma Date: Tue, 23 Sep 2025 09:58:37 +0100 Subject: [PATCH 4/4] Revert "refactor worker error handling" This reverts commit 02e215ffa825692d8d0a5cdd6b32d64ba99effc9. --- v2/pkg/engine/resolve/resolve.go | 75 +++++++++++++------------------- 1 file changed, 30 insertions(+), 45 deletions(-) diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 5a4eba4fb9..9678e4c5f6 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -300,7 +300,7 @@ type trigger struct { // executed in the worker goroutine. fn will be executed, and if // final is true the worker will be stopped after fn is executed. type workItem struct { - fn func() error + fn func() final bool } @@ -338,24 +338,18 @@ func (s *sub) startWorkerWithHeartbeat() { select { case <-s.ctx.ctx.Done(): // Complete when the client request context is done for synchronous subscriptions - _ = s.close(SubscriptionCloseKindGoingAway) + s.close(SubscriptionCloseKindGoingAway) return case <-s.resolver.ctx.Done(): // Abort immediately if the resolver is shutting down - _ = s.close(SubscriptionCloseKindGoingAway) + s.close(SubscriptionCloseKindGoingAway) return case <-heartbeatTicker.C: - if err := s.resolver.handleHeartbeat(s); err != nil { - // If heartbeat fails (e.g. client disconnected), remove the subscription. - _ = s.resolver.AsyncUnsubscribeSubscription(s.id) - } + s.resolver.handleHeartbeat(s) case work := <-s.workChan: - if err := work.fn(); err != nil { - // If work fails (e.g. client disconnected), remove the subscription. - _ = s.resolver.AsyncUnsubscribeSubscription(s.id) - } + work.fn() if work.final { return @@ -372,19 +366,16 @@ func (s *sub) startWorkerWithoutHeartbeat() { select { case <-s.ctx.ctx.Done(): // Complete when the client request context is done for synchronous subscriptions - _ = s.close(SubscriptionCloseKindGoingAway) + s.close(SubscriptionCloseKindGoingAway) return case <-s.resolver.ctx.Done(): // Abort immediately if the resolver is shutting down - _ = s.close(SubscriptionCloseKindGoingAway) + s.close(SubscriptionCloseKindGoingAway) return case work := <-s.workChan: - if err := work.fn(); err != nil { - // If work fails (e.g. client disconnected), remove the subscription. - _ = s.resolver.AsyncUnsubscribeSubscription(s.id) - } + work.fn() if work.final { return @@ -394,30 +385,26 @@ func (s *sub) startWorkerWithoutHeartbeat() { } // Called when subgraph indicates a "complete" subscription -func (s *sub) complete() error { +func (s *sub) complete() { // The channel is used to communicate that the subscription is done // It is used only in the synchronous subscription case and to avoid sending events // to a subscription that is already done. defer close(s.completed) s.writer.Complete() - - return nil } // Called when subgraph becomes unreachable or closes the connection without a "complete" event -func (s *sub) close(kind SubscriptionCloseKind) error { +func (s *sub) close(kind SubscriptionCloseKind) { // The channel is used to communicate that the subscription is done // It is used only in the synchronous subscription case and to avoid sending events // to a subscription that is already done. defer close(s.completed) s.writer.Close(kind) - - return nil } -func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, sharedInput []byte) error { +func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, sharedInput []byte) { if r.options.Debug { fmt.Printf("resolver:trigger:subscription:update:%d\n", sub.id.SubscriptionID) } @@ -434,38 +421,42 @@ func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, shar t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields) if err := t.resolvable.InitSubscription(resolveCtx, input, sub.resolve.Trigger.PostProcessing); err != nil { + r.asyncErrorWriter.WriteError(resolveCtx, err, sub.resolve.Response, sub.writer) if r.options.Debug { fmt.Printf("resolver:trigger:subscription:init:failed:%d\n", sub.id.SubscriptionID) } if r.reporter != nil { r.reporter.SubscriptionUpdateSent() } - return err + return } if err := t.loader.LoadGraphQLResponseData(resolveCtx, sub.resolve.Response, t.resolvable); err != nil { + r.asyncErrorWriter.WriteError(resolveCtx, err, sub.resolve.Response, sub.writer) if r.options.Debug { fmt.Printf("resolver:trigger:subscription:load:failed:%d\n", sub.id.SubscriptionID) } if r.reporter != nil { r.reporter.SubscriptionUpdateSent() } - return err + return } if err := t.resolvable.Resolve(resolveCtx.ctx, sub.resolve.Response.Data, sub.resolve.Response.Fetches, sub.writer); err != nil { + r.asyncErrorWriter.WriteError(resolveCtx, err, sub.resolve.Response, sub.writer) if r.options.Debug { fmt.Printf("resolver:trigger:subscription:resolve:failed:%d\n", sub.id.SubscriptionID) } if r.reporter != nil { r.reporter.SubscriptionUpdateSent() } - return err + return } if err := sub.writer.Flush(); err != nil { + // If flush fails (e.g. client disconnected), remove the subscription. _ = r.AsyncUnsubscribeSubscription(sub.id) - return err + return } if r.options.Debug { @@ -478,8 +469,6 @@ func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, shar if t.resolvable.WroteErrorsWithoutData() && r.options.Debug { fmt.Printf("resolver:trigger:subscription:completing:errors_without_data:%d\n", sub.id.SubscriptionID) } - - return nil } // processEvents maintains the single threaded event loop that processes all events @@ -529,17 +518,17 @@ func (r *Resolver) handleEvent(event subscriptionEvent) { } // handleHeartbeat sends a heartbeat to the client. It needs to be executed on the same goroutine as the writer. -func (r *Resolver) handleHeartbeat(sub *sub) error { +func (r *Resolver) handleHeartbeat(sub *sub) { if r.options.Debug { fmt.Printf("resolver:heartbeat\n") } - if err := r.ctx.Err(); err != nil { - return err + if r.ctx.Err() != nil { + return } - if err := sub.ctx.Context().Err(); err != nil { - return err + if sub.ctx.Context().Err() != nil { + return } if r.options.Debug { @@ -547,7 +536,9 @@ func (r *Resolver) handleHeartbeat(sub *sub) error { } if err := sub.writer.Heartbeat(); err != nil { - return err + // If heartbeat fails (e.g. client disconnected), remove the subscription. + _ = r.AsyncUnsubscribeSubscription(sub.id) + return } if r.options.Debug { @@ -557,8 +548,6 @@ func (r *Resolver) handleHeartbeat(sub *sub) error { if r.reporter != nil { r.reporter.SubscriptionUpdateSent() } - - return nil } func (r *Resolver) handleTriggerClose(s subscriptionEvent) { @@ -800,12 +789,8 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) { continue } - fn := func() error { - if err := r.executeSubscriptionUpdate(c, s, data); err != nil { - r.asyncErrorWriter.WriteError(c, err, s.resolve.Response, s.writer) - return err // returning an error will trigger unsubscribe - } - return nil + fn := func() { + r.executeSubscriptionUpdate(c, s, data) } select { @@ -912,7 +897,7 @@ func (r *Resolver) closeTriggerSubscriptions(id uint64, closeKind SubscriptionCl } // Send a work item to close the subscription - s.workChan <- workItem{func() error { return s.close(closeKind) }, true} + s.workChan <- workItem{func() { s.close(closeKind) }, true} // Because the event loop is single threaded, we can safely close the channel from this sender // The subscription worker will finish processing all events before the channel is closed.