Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion pkg/engine/datasource/graphql_datasource/graphql_sse_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,28 @@ func (h *gqlSSEConnectionHandler) StartBlocking(sub Subscription) {
}

func (h *gqlSSEConnectionHandler) subscribe(ctx context.Context, sub Subscription, dataCh, errCh chan []byte) {
resp, err := h.performSubscriptionRequest(ctx)
// if we used the downstream context, we got a panic if the downstream client disconnects immediately after the request was sent
// this happens, e.g. with React strict mode which renders the component twice
// to solve the issue, we use a separate context for the origin request
// with a goroutine that cancels the origin request if the downstream client disconnects
// in order to free resources after the initial handshake, we cancel the goroutine after we've received a response
originCtx, cancelOriginRequest := context.WithCancel(context.Background())
defer cancelOriginRequest()
waitForResponse, cancelWaitForResponse := context.WithCancel(context.Background())
go func() {
select {
case <-ctx.Done():
// cancel the origin request if the downstream client disconnected
cancelOriginRequest()
case <-waitForResponse.Done():
// end the goroutine to free resources
}
}()
resp, err := h.performSubscriptionRequest(originCtx)
// cancel the goroutine to free resources
// the originRequest will be canceled through defer cancelOriginRequest()
// as we check on every iteration (below) if the downstream ctx is done
cancelWaitForResponse()
if err != nil {
h.log.Error("failed to perform subscription request", log.Error(err))

Expand Down Expand Up @@ -117,6 +138,11 @@ func (h *gqlSSEConnectionHandler) subscribe(ctx context.Context, sub Subscriptio
continue
}

if ctx.Err() != nil {
// request context was canceled do not send an error as channel will be closed
return
}

dataCh <- data
case bytes.HasPrefix(line, headerEvent):
event := trim(line[len(headerEvent):])
Expand Down
7 changes: 5 additions & 2 deletions pkg/engine/datasource/httpclient/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ func CtxSetUndefinedVariables(ctx context.Context, undefinedVariables []string)
}

func CtxGetUndefinedVariables(ctx context.Context) []string {
undefinedVariables, _ := ctx.Value(removeUndefinedVariables).([]string)
return undefinedVariables
undefinedVariables := ctx.Value(removeUndefinedVariables)
if undefinedVariables, ok := undefinedVariables.([]string); ok {
return undefinedVariables
}
return nil
}

func wrapQuotesIfString(b []byte) []byte {
Expand Down