-
Notifications
You must be signed in to change notification settings - Fork 233
feat: add defer support #2697
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
devsergiy
wants to merge
23
commits into
main
Choose a base branch
from
feat/eng-7770-add-defer-support
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
feat: add defer support #2697
Changes from all commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
2a8bd82
rename flush writer
devsergiy 5a4af6a
add defer support
devsergiy cc2c003
add defer test suite
devsergiy 4c696ad
fix queries field count, update fixtures
devsergiy 57cbea4
update fixtures and fix products queries
devsergiy ed86425
add failing todo file
devsergiy 7707180
add reconstructed fixtures
devsergiy 65b3681
add all reconstructed fixtures
devsergiy 4c136a5
fix some fixtures
devsergiy 0647d58
update failing fixtures
devsergiy b6990ff
update fixtures after fixing rendering typename with defer
devsergiy 911c0cd
fix fixtures for same ds requires chain
devsergiy 9048150
fix fixtures for defer sorting fix, fix fragment type condition
devsergiy 826f031
fix incorrect fragment type condition
devsergiy 7eafeba
save full structure of defer test
devsergiy 5af29f4
TMP: always print query plan
devsergiy ce9b7f5
tmp: add plan wrapper to generate defer query plans
devsergiy 0daa866
chore: fix syntax error after rebase
devsergiy a723c00
update to latest defer engine version
devsergiy e3f0b84
move defer tests to proper location
devsergiy 8721a19
fix build, disable debug logs
devsergiy 9be5939
add forwarding headers for deferred queries
devsergiy 70cd5ba
update engine
devsergiy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,305 @@ | ||
| package integration | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "encoding/json" | ||
| "fmt" | ||
| "io" | ||
| "net/http" | ||
| "os" | ||
| "path/filepath" | ||
| "slices" | ||
| "strings" | ||
| "testing" | ||
|
|
||
| "github.com/sebdah/goldie/v2" | ||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| "github.com/wundergraph/astjson" | ||
|
|
||
| "github.com/wundergraph/cosmo/router-tests/testenv" | ||
| "github.com/wundergraph/cosmo/router/pkg/config" | ||
| ) | ||
|
|
||
| func TestDeferTestDataQueries(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| testDir := filepath.Join("testdata", "queries_defer") | ||
| entries, err := os.ReadDir(testDir) | ||
| require.NoError(t, err) | ||
|
|
||
| groupQueries := map[string][]string{} | ||
|
|
||
| for _, entry := range entries { | ||
| fileName := entry.Name() | ||
| ext := filepath.Ext(fileName) | ||
| name := strings.TrimSuffix(fileName, ext) | ||
|
|
||
| if ext != ".graphql" { | ||
| continue | ||
| } | ||
|
|
||
| // "full_defer_01_single_defer" → source = "full" | ||
| source, _, found := strings.Cut(name, "_defer_") | ||
| if !found { | ||
| continue | ||
| } | ||
|
|
||
| groupQueries[source] = append(groupQueries[source], name) | ||
| } | ||
|
|
||
| groups := make([]string, 0, len(groupQueries)) | ||
| for k, _ := range groupQueries { | ||
| groups = append(groups, k) | ||
| } | ||
| slices.Sort(groups) | ||
|
|
||
| for _, group := range groups { | ||
| t.Run(group, func(t *testing.T) { | ||
| for _, name := range groupQueries[group] { | ||
| t.Run(name, func(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| gMultipart := goldie.New( | ||
| t, | ||
| goldie.WithFixtureDir("testdata/queries_defer"), | ||
| goldie.WithNameSuffix(".txt"), | ||
| goldie.WithDiffEngine(goldie.ClassicDiff), | ||
| ) | ||
| gFull := goldie.New( | ||
| t, | ||
| goldie.WithFixtureDir("testdata/queries_defer"), | ||
| goldie.WithNameSuffix(".json"), | ||
| goldie.WithDiffEngine(goldie.ClassicDiff), | ||
| ) | ||
|
|
||
| testenv.Run(t, &testenv.Config{ | ||
| NoRetryClient: true, | ||
| ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { | ||
| // cfg.Debug.PrintIntermediateQueryPlans = true | ||
| // cfg.Debug.PrintPlanningPaths = true | ||
| // cfg.Debug.PrintNodeSuggestions = true | ||
| // cfg.Debug.PrintOperationTransformations = true | ||
| }, | ||
| }, func(t *testing.T, xEnv *testenv.Environment) { | ||
| queryFilePath := filepath.Join(testDir, fmt.Sprintf("%s.graphql", name)) | ||
| t.Cleanup(func() { | ||
| if t.Failed() { | ||
| abs, _ := filepath.Abs(queryFilePath) | ||
| t.Logf("query file: %s", abs) | ||
| } | ||
| }) | ||
|
|
||
| queryData, err := os.ReadFile(queryFilePath) | ||
| require.NoError(t, err) | ||
|
|
||
| payload := map[string]any{"query": string(queryData)} | ||
| payloadData, err := json.Marshal(payload) | ||
| require.NoError(t, err) | ||
|
|
||
| req := xEnv.MakeGraphQLDeferRequest(http.MethodPost, bytes.NewReader(payloadData)) | ||
| res, err := xEnv.RouterClient.Do(req) | ||
| require.NoError(t, err) | ||
| defer func() { require.NoError(t, res.Body.Close()) }() | ||
|
|
||
| assert.Equal(t, http.StatusOK, res.StatusCode) | ||
|
|
||
| // defer could be fully discarded in case query has duplicate field which are not deffered | ||
| isMultipart := strings.HasPrefix(res.Header.Get("Content-Type"), "multipart/mixed") | ||
|
|
||
| body, err := io.ReadAll(res.Body) | ||
| require.NoError(t, err) | ||
|
|
||
| update := false | ||
|
|
||
| t.Run("raw multipart body", func(t *testing.T) { | ||
| if !update { | ||
| gMultipart.Assert(t, name, body) | ||
| } else { | ||
| gMultipart.Update(t, name, body) | ||
| } | ||
| }) | ||
|
|
||
| var actual []byte | ||
|
|
||
| if isMultipart { | ||
| // Reconstruct the full response from chunks | ||
| reconstructed, err := reconstructDeferResponse(body) | ||
| require.NoError(t, err) | ||
| actual = normalizeJSON(t, reconstructed) | ||
| } else { | ||
| actual = normalizeJSON(t, body) | ||
| } | ||
|
|
||
| t.Run("assert full response", func(t *testing.T) { | ||
| if !update { | ||
| gFull.Assert(t, name+"_reconstructed", actual) | ||
| } else { | ||
| gFull.Update(t, name+"_reconstructed", actual) | ||
| } | ||
| }) | ||
|
|
||
| t.Run("compare with response without defer", func(t *testing.T) { | ||
| expected, err := os.ReadFile(gFull.GoldenFileName(t, group+"_original")) | ||
| require.NoError(t, err) | ||
|
|
||
| expected = normalizeWithKeysSort(t, expected) | ||
| actual = normalizeWithKeysSort(t, actual) | ||
|
|
||
| // manually assert to never update the original when the update flag is specified | ||
| if diff := goldie.Diff(goldie.ClassicDiff, string(actual), string(expected)); diff != "" { | ||
| t.Fatal(diff) | ||
| } | ||
| }) | ||
| }) | ||
| }) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func normalizeWithKeysSort(tb testing.TB, data []byte) []byte { | ||
| var val map[string]interface{} | ||
| require.NoError(tb, json.Unmarshal(data, &val)) | ||
|
|
||
| out, err := json.MarshalIndent(val, "", " ") | ||
| require.NoError(tb, err) | ||
|
|
||
| return out | ||
| } | ||
|
|
||
| // reconstructDeferResponse parses a multipart/mixed defer body, merges all | ||
| // incremental patches onto the initial data using astjson, and returns | ||
| // the complete JSON response (without transport fields like hasNext). | ||
| func reconstructDeferResponse(body []byte) ([]byte, error) { | ||
| parts, err := parseMultipartParts(body) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if len(parts) == 0 { | ||
| return nil, fmt.Errorf("no parts in multipart response") | ||
| } | ||
|
|
||
| var p astjson.Parser | ||
| result, err := p.ParseBytes(parts[0]) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("parse initial part: %w", err) | ||
| } | ||
|
|
||
| for _, part := range parts[1:] { | ||
| partVal, err := p.ParseBytes(part) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("parse part: %w", err) | ||
| } | ||
|
|
||
| for _, item := range partVal.GetArray("incremental") { | ||
| patchData := item.Get("data") | ||
| if patchData == nil { | ||
| continue | ||
| } | ||
|
|
||
| // Build path: prepend "data", then each segment from the path array. | ||
| pathKeys := []string{"data"} | ||
| for _, seg := range item.GetArray("path") { | ||
| switch seg.Type() { | ||
| case astjson.TypeNumber: | ||
| pathKeys = append(pathKeys, string(seg.MarshalTo(nil))) | ||
| default: | ||
| s, _ := seg.StringBytes() | ||
| pathKeys = append(pathKeys, string(s)) | ||
| } | ||
| } | ||
|
|
||
| if err := mergeAtPath(result, patchData, pathKeys); err != nil { | ||
| return nil, fmt.Errorf("merge at path %v: %w", pathKeys, err) | ||
| } | ||
|
|
||
| // Collect errors from incremental items into root errors. | ||
| patchErrors := item.Get("errors") | ||
| if patchErrors != nil && patchErrors.Type() == astjson.TypeArray { | ||
| existing := result.Get("errors") | ||
| if existing == nil || existing.Type() == astjson.TypeNull { | ||
| result.Set(nil, "errors", patchErrors) | ||
| } else { | ||
| merged := appendArrayValues(existing, patchErrors) | ||
| result.Set(nil, "errors", merged) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Remove transport-only field. | ||
| result.Del("hasNext") | ||
|
|
||
| return result.MarshalTo(nil), nil | ||
| } | ||
|
|
||
| // mergeAtPath navigates result to the node at pathKeys and deep-merges patch there. | ||
| func mergeAtPath(result, patch *astjson.Value, pathKeys []string) error { | ||
| if len(pathKeys) == 0 { | ||
| _, _, err := astjson.MergeValues(nil, result, patch) | ||
| return err | ||
| } | ||
|
|
||
| // Navigate to the parent of the target node. | ||
| current := result | ||
| for _, key := range pathKeys[:len(pathKeys)-1] { | ||
| next := current.Get(key) | ||
| if next == nil { | ||
| return nil | ||
| } | ||
| current = next | ||
| } | ||
|
|
||
| lastKey := pathKeys[len(pathKeys)-1] | ||
| target := current.Get(lastKey) | ||
| if target == nil { | ||
| current.Set(nil, lastKey, patch) | ||
| return nil | ||
| } | ||
|
|
||
| merged, _, err := astjson.MergeValues(nil, target, patch) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| current.Set(nil, lastKey, merged) | ||
| return nil | ||
| } | ||
|
|
||
| // appendArrayValues returns a new TypeArray containing all elements of a followed by all of b. | ||
| func appendArrayValues(a, b *astjson.Value) *astjson.Value { | ||
| out := astjson.ArrayValue(nil) | ||
| idx := 0 | ||
| for _, v := range a.GetArray() { | ||
| out.SetArrayItem(nil, idx, v) | ||
| idx++ | ||
| } | ||
| for _, v := range b.GetArray() { | ||
| out.SetArrayItem(nil, idx, v) | ||
| idx++ | ||
| } | ||
| return out | ||
| } | ||
|
|
||
| // parseMultipartParts splits a multipart/mixed body on the --graphql boundary | ||
| // and returns the raw JSON bytes of each part. | ||
| func parseMultipartParts(body []byte) ([][]byte, error) { | ||
| boundary := []byte("\r\n--graphql") | ||
| parts := bytes.Split(body, boundary) | ||
| var result [][]byte | ||
| for _, part := range parts { | ||
| if bytes.HasPrefix(part, []byte("--")) { | ||
| continue | ||
| } | ||
| _, jsonBody, found := bytes.Cut(part, []byte("\r\n\r\n")) | ||
| if !found { | ||
| continue | ||
| } | ||
| jsonBody = bytes.TrimSpace(jsonBody) | ||
| if len(jsonBody) == 0 { | ||
| continue | ||
| } | ||
| result = append(result, jsonBody) | ||
| } | ||
| return result, nil | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t drop incremental errors when
datais absent.The early
continueon Line 198 means an incremental entry that only carrieserrorsnever reaches the merge on Lines 217-227, so deferred error cases get reconstructed without those errors.Suggested fix
for _, item := range partVal.GetArray("incremental") { + patchErrors := item.Get("errors") + if patchErrors != nil && patchErrors.Type() == astjson.TypeArray { + existing := result.Get("errors") + if existing == nil || existing.Type() == astjson.TypeNull { + result.Set(nil, "errors", patchErrors) + } else { + merged := appendArrayValues(existing, patchErrors) + result.Set(nil, "errors", merged) + } + } + patchData := item.Get("data") if patchData == nil { continue } @@ - // Collect errors from incremental items into root errors. - patchErrors := item.Get("errors") - if patchErrors != nil && patchErrors.Type() == astjson.TypeArray { - existing := result.Get("errors") - if existing == nil || existing.Type() == astjson.TypeNull { - result.Set(nil, "errors", patchErrors) - } else { - merged := appendArrayValues(existing, patchErrors) - result.Set(nil, "errors", merged) - } - } }📝 Committable suggestion
🤖 Prompt for AI Agents