Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
}
)

// Planner creates the subgraph operation.
type Planner[T Configuration] struct {
id int
debug bool
Expand Down Expand Up @@ -79,6 +80,10 @@ type Planner[T Configuration] struct {
customScalarFieldRef int
parentTypeNodes []ast.Node

// propagatedOperationName is non-empty when the operation name is propagated
// to the downstream subgraph fetch.
propagatedOperationName string
Comment thread
devsergiy marked this conversation as resolved.

// federation

addedInlineFragments map[onTypeInlineFragment]struct{}
Expand Down Expand Up @@ -387,6 +392,7 @@ func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration {
PostProcessing: postProcessing,
SetTemplateOutputToNullOnVariableNull: requiresEntityFetch || requiresEntityBatchFetch,
QueryPlan: p.queryPlan,
OperationName: p.propagatedOperationName,
}
}

Expand Down Expand Up @@ -524,6 +530,7 @@ func (p *Planner[T]) EnterOperationDefinition(ref int) {

if p.dataSourcePlannerConfig.Options.EnableOperationNamePropagation {
operation := p.buildUpstreamOperationName(ref)
p.propagatedOperationName = operation
if operation != "" {
p.upstreamOperation.OperationDefinitions[definition.Ref].Name = p.upstreamOperation.Input.AppendInputString(operation)
}
Expand Down Expand Up @@ -809,6 +816,7 @@ func (p *Planner[T]) EnterDocument(_, _ *ast.Document) {
p.variables = p.variables[:0]
p.hasFederationRoot = false
p.queryPlan = nil
p.propagatedOperationName = ""

// reset information about root type
p.rootTypeName = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2111,7 +2111,8 @@ func TestGraphQLDataSourceFederation(t *testing.T) {
},
DataSourceIdentifier: []byte("graphql_datasource.Source"),
FetchConfiguration: resolve.FetchConfiguration{
Input: `{"method":"POST","url":"http://service-a","body":{"query":"query CompositeKey__service_a {user {__typename account {id} id} otherUser {__typename account {id} id}}"}}`,
Input: `{"method":"POST","url":"http://service-a","body":{"query":"query CompositeKey__service_a__0 {user {__typename account {id} id} otherUser {__typename account {id} id}}"}}`,
OperationName: "CompositeKey__service_a",
DataSource: &Source{},
PostProcessing: DefaultPostProcessingConfiguration,
},
Expand All @@ -2123,7 +2124,8 @@ func TestGraphQLDataSourceFederation(t *testing.T) {
},
DataSourceIdentifier: []byte("graphql_datasource.Source"),
FetchConfiguration: resolve.FetchConfiguration{
Input: `{"method":"POST","url":"http://service-b","body":{"query":"query CompositeKey__service_b($representations: [_Any!]!){_entities(representations: $representations){... on User {__typename foo}}}","variables":{"representations":[$$0$$]}}}`,
Input: `{"method":"POST","url":"http://service-b","body":{"query":"query CompositeKey__service_b__1($representations: [_Any!]!){_entities(representations: $representations){... on User {__typename foo}}}","variables":{"representations":[$$0$$]}}}`,
OperationName: "CompositeKey__service_b",
Variables: resolve.NewVariables(
&resolve.ResolvableObjectVariable{
Renderer: resolve.NewGraphQLVariableResolveRenderer(&resolve.Object{
Expand Down Expand Up @@ -2175,7 +2177,8 @@ func TestGraphQLDataSourceFederation(t *testing.T) {
},
DataSourceIdentifier: []byte("graphql_datasource.Source"),
FetchConfiguration: resolve.FetchConfiguration{
Input: `{"method":"POST","url":"http://service-b","body":{"query":"query CompositeKey__service_b($representations: [_Any!]!){_entities(representations: $representations){... on User {__typename foo}}}","variables":{"representations":[$$0$$]}}}`,
Input: `{"method":"POST","url":"http://service-b","body":{"query":"query CompositeKey__service_b__2($representations: [_Any!]!){_entities(representations: $representations){... on User {__typename foo}}}","variables":{"representations":[$$0$$]}}}`,
OperationName: "CompositeKey__service_b",
Variables: resolve.NewVariables(
&resolve.ResolvableObjectVariable{
Renderer: resolve.NewGraphQLVariableResolveRenderer(&resolve.Object{
Expand Down Expand Up @@ -2891,7 +2894,8 @@ func TestGraphQLDataSourceFederation(t *testing.T) {
},
DataSourceIdentifier: []byte("graphql_datasource.Source"),
FetchConfiguration: resolve.FetchConfiguration{
Input: `{"method":"POST","url":"http://service-a","body":{"query":"query CompositeKey__s_rvic_e_a {user {__typename account {id} id} otherUser {__typename account {id} id}}"}}`,
Input: `{"method":"POST","url":"http://service-a","body":{"query":"query CompositeKey__s_rvic_e_a__0 {user {__typename account {id} id} otherUser {__typename account {id} id}}"}}`,
OperationName: "CompositeKey__s_rvic_e_a",
DataSource: &Source{},
PostProcessing: DefaultPostProcessingConfiguration,
},
Expand All @@ -2903,7 +2907,8 @@ func TestGraphQLDataSourceFederation(t *testing.T) {
},
DataSourceIdentifier: []byte("graphql_datasource.Source"),
FetchConfiguration: resolve.FetchConfiguration{
Input: `{"method":"POST","url":"http://service-b","body":{"query":"query CompositeKey__service_b($representations: [_Any!]!){_entities(representations: $representations){... on User {__typename foo}}}","variables":{"representations":[$$0$$]}}}`,
Input: `{"method":"POST","url":"http://service-b","body":{"query":"query CompositeKey__service_b__1($representations: [_Any!]!){_entities(representations: $representations){... on User {__typename foo}}}","variables":{"representations":[$$0$$]}}}`,
OperationName: "CompositeKey__service_b",
Variables: resolve.NewVariables(
&resolve.ResolvableObjectVariable{
Renderer: resolve.NewGraphQLVariableResolveRenderer(&resolve.Object{
Expand Down Expand Up @@ -2955,7 +2960,8 @@ func TestGraphQLDataSourceFederation(t *testing.T) {
},
DataSourceIdentifier: []byte("graphql_datasource.Source"),
FetchConfiguration: resolve.FetchConfiguration{
Input: `{"method":"POST","url":"http://service-b","body":{"query":"query CompositeKey__service_b($representations: [_Any!]!){_entities(representations: $representations){... on User {__typename foo}}}","variables":{"representations":[$$0$$]}}}`,
Input: `{"method":"POST","url":"http://service-b","body":{"query":"query CompositeKey__service_b__2($representations: [_Any!]!){_entities(representations: $representations){... on User {__typename foo}}}","variables":{"representations":[$$0$$]}}}`,
OperationName: "CompositeKey__service_b",
Variables: resolve.NewVariables(
&resolve.ResolvableObjectVariable{
Renderer: resolve.NewGraphQLVariableResolveRenderer(&resolve.Object{
Expand Down Expand Up @@ -12099,7 +12105,8 @@ func TestGraphQLDataSourceFederation(t *testing.T) {
Fetches: resolve.Sequence(
resolve.Single(&resolve.SingleFetch{
FetchConfiguration: resolve.FetchConfiguration{
Input: `{"method":"POST","url":"http://first.service","body":{"query":"query Accounts__first_service {accounts {__typename ... on User {__typename id} ... on Admin {__typename adminID} ... on Moderator {__typename moderatorID}}}"}}`,
Input: `{"method":"POST","url":"http://first.service","body":{"query":"query Accounts__first_service__0 {accounts {__typename ... on User {__typename id} ... on Admin {__typename adminID} ... on Moderator {__typename moderatorID}}}"}}`,
OperationName: "Accounts__first_service",
PostProcessing: DefaultPostProcessingConfiguration,
DataSource: &Source{},
},
Expand All @@ -12110,7 +12117,8 @@ func TestGraphQLDataSourceFederation(t *testing.T) {
FetchID: 1,
DependsOnFetchIDs: []int{0},
}, FetchConfiguration: resolve.FetchConfiguration{
Input: `{"method":"POST","url":"http://second.service","body":{"query":"query Accounts__second_service($representations: [_Any!]!){_entities(representations: $representations){... on User {__typename name} ... on Admin {__typename adminName}}}","variables":{"representations":[$$0$$]}}}`,
Input: `{"method":"POST","url":"http://second.service","body":{"query":"query Accounts__second_service__1($representations: [_Any!]!){_entities(representations: $representations){... on User {__typename name} ... on Admin {__typename adminName}}}","variables":{"representations":[$$0$$]}}}`,
OperationName: "Accounts__second_service",
DataSource: &Source{},
SetTemplateOutputToNullOnVariableNull: true,
RequiresEntityBatchFetch: true,
Expand Down Expand Up @@ -12161,7 +12169,8 @@ func TestGraphQLDataSourceFederation(t *testing.T) {
DependsOnFetchIDs: []int{0},
},
FetchConfiguration: resolve.FetchConfiguration{
Input: `{"method":"POST","url":"http://third.service","body":{"query":"query Accounts__third_service($representations: [_Any!]!){_entities(representations: $representations){... on Moderator {__typename subject address {__typename id}}}}","variables":{"representations":[$$0$$]}}}`,
Input: `{"method":"POST","url":"http://third.service","body":{"query":"query Accounts__third_service__2($representations: [_Any!]!){_entities(representations: $representations){... on Moderator {__typename subject address {__typename id}}}}","variables":{"representations":[$$0$$]}}}`,
OperationName: "Accounts__third_service",
DataSource: &Source{},
SetTemplateOutputToNullOnVariableNull: true,
RequiresEntityBatchFetch: true,
Expand Down Expand Up @@ -12198,7 +12207,8 @@ func TestGraphQLDataSourceFederation(t *testing.T) {
DependsOnFetchIDs: []int{2},
},
FetchConfiguration: resolve.FetchConfiguration{
Input: `{"method":"POST","url":"http://first.service","body":{"query":"query Accounts__first_service($representations: [_Any!]!){_entities(representations: $representations){... on Address {__typename zip}}}","variables":{"representations":[$$0$$]}}}`,
Input: `{"method":"POST","url":"http://first.service","body":{"query":"query Accounts__first_service__3($representations: [_Any!]!){_entities(representations: $representations){... on Address {__typename zip}}}","variables":{"representations":[$$0$$]}}}`,
OperationName: "Accounts__first_service",
DataSource: &Source{},
SetTemplateOutputToNullOnVariableNull: true,
RequiresEntityBatchFetch: true,
Expand Down
14 changes: 8 additions & 6 deletions v2/pkg/engine/plan/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ type Planner struct {
prepareOperationWalker *astvisitor.Walker
}

// NewPlanner creates a new Planner from the Configuration
// NewPlanner creates a new Planner from the Configuration.
//
// NOTE: All stateful DataSources should be initiated with the same context.Context object provided to the PlannerFactory.
// The context.Context object is used to determine the lifecycle of stateful DataSources
// It's important to note that stateful DataSources must be closed when they are no longer being used
// The context.Context object is used to determine the lifecycle of stateful DataSources.
// It's important to note that stateful DataSources must be closed when they are no longer being used.
//
// Stateful DataSources could be those that initiate a WebSocket connection to an origin, a database client, a streaming client, etc...
// To ensure that there are no memory leaks, it's therefore important to add a cancel func or timeout to the context.Context
// At the time when the resolver and all operations should be garbage collected, ensure to first cancel or timeout the ctx object
// If you don't cancel the context.Context, the goroutines will run indefinitely and there's no reference left to stop them
// To ensure that there are no memory leaks, it's therefore important to add a cancel func or timeout to the context.Context.
// At the time when the resolver and all operations should be garbage collected, ensure to first cancel or timeout the ctx object.
// If you don't cancel the context.Context, the goroutines will run indefinitely and there's no reference left to stop them.
func NewPlanner(config Configuration) (*Planner, error) {
if config.Logger == nil {
config.Logger = abstractlogger.Noop{}
Expand Down
3 changes: 2 additions & 1 deletion v2/pkg/engine/plan/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type QueryPlanProvider interface {
IncludeQueryPlanInFetchConfiguration()
}

// Visitor creates the shape of resolve.GraphQLResponse.
type Visitor struct {
Operation, Definition *ast.Document
Walker *astvisitor.Walker
Expand Down Expand Up @@ -1286,7 +1287,7 @@ func (v *Visitor) configureSubscription(config *objectFetchConfiguration) {

func (v *Visitor) configureObjectFetch(config *objectFetchConfiguration) {
fetchConfig := config.planner.ConfigureFetch()
// If the datasource is missing, we can anticipate that configure fetch failed
// If the datasource is missing, we can expect that configure fetch failed
if fetchConfig.DataSource == nil {
return
}
Expand Down
47 changes: 47 additions & 0 deletions v2/pkg/engine/postprocess/fetch_id_appender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package postprocess

import (
"fmt"
"strings"

"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
)

// fetchIDAppender is a processor to append fetchIDs to the operation names propagated downstream.
type fetchIDAppender struct {
disable bool
}

func (f *fetchIDAppender) ProcessFetchTree(root *resolve.FetchTreeNode) {
if f.disable {
return
}
f.traverseNode(root)
}

func (f *fetchIDAppender) traverseNode(node *resolve.FetchTreeNode) {
if node == nil {
return
}
switch node.Kind {
case resolve.FetchTreeNodeKindSingle:
if singleFetch, ok := node.Item.Fetch.(*resolve.SingleFetch); ok {
f.traverseSingleFetch(singleFetch)
}
case resolve.FetchTreeNodeKindParallel, resolve.FetchTreeNodeKindSequence:
for i := range node.ChildNodes {
f.traverseNode(node.ChildNodes[i])
}
}
}

func (f *fetchIDAppender) traverseSingleFetch(fetch *resolve.SingleFetch) {
if fetch.OperationName != "" {
expandedName := fmt.Sprintf("%s__%d", fetch.OperationName, fetch.FetchID)
fetch.Input = strings.Replace(fetch.Input, fetch.OperationName, expandedName, 1)
if fetch.QueryPlan != nil {
// Needed for debugging of query plans
fetch.QueryPlan.Query = strings.Replace(fetch.QueryPlan.Query, fetch.OperationName, expandedName, 1)
}
}
}
8 changes: 8 additions & 0 deletions v2/pkg/engine/postprocess/postprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Processor struct {
disableExtractFetches bool
collectDataSourceInfo bool
resolveInputTemplates *resolveInputTemplates
appendFetchID *fetchIDAppender
dedupe *deduplicateSingleFetches
processResponseTree []ResponseTreeProcessor
processFetchTree []FetchTreeProcessor
Expand All @@ -32,6 +33,7 @@ type processorOptions struct {
disableCreateConcreteSingleFetchTypes bool
disableOrderSequenceByDependencies bool
disableMergeFields bool
disableRewriteOpNames bool
disableResolveInputTemplates bool
disableExtractFetches bool
disableCreateParallelNodes bool
Expand Down Expand Up @@ -101,6 +103,9 @@ func NewProcessor(options ...ProcessorOption) *Processor {
resolveInputTemplates: &resolveInputTemplates{
disable: opts.disableResolveInputTemplates,
},
appendFetchID: &fetchIDAppender{
disable: opts.disableRewriteOpNames,
},
dedupe: &deduplicateSingleFetches{
disable: opts.disableDeduplicateSingleFetches,
},
Expand Down Expand Up @@ -139,6 +144,8 @@ func (p *Processor) Process(pre plan.Plan) plan.Plan {
// NOTE: deduplication relies on the fact that the fetch tree
// have flat structure of child fetches
p.dedupe.ProcessFetchTree(t.Response.Fetches)
// Appending fetchIDs makes query content unique, thus it should happen after "dedupe".
p.appendFetchID.ProcessFetchTree(t.Response.Fetches)
p.resolveInputTemplates.ProcessFetchTree(t.Response.Fetches)
for i := range p.processFetchTree {
p.processFetchTree[i].ProcessFetchTree(t.Response.Fetches)
Expand All @@ -150,6 +157,7 @@ func (p *Processor) Process(pre plan.Plan) plan.Plan {
p.createFetchTree(t.Response.Response)
p.appendTriggerToFetchTree(t.Response)
p.dedupe.ProcessFetchTree(t.Response.Response.Fetches)
p.appendFetchID.ProcessFetchTree(t.Response.Response.Fetches)
p.resolveInputTemplates.ProcessFetchTree(t.Response.Response.Fetches)
p.resolveInputTemplates.ProcessTrigger(&t.Response.Trigger)
for i := range p.processFetchTree {
Expand Down
31 changes: 24 additions & 7 deletions v2/pkg/engine/resolve/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,23 +291,40 @@ type FetchConfiguration struct {
Input string
Variables Variables
DataSource DataSource
// RequiresParallelListItemFetch is used to indicate that the single fetches should be executed without batching
// When we have multiple fetches attached to the object - after post-processing of a plan we will get ParallelListItemFetch instead of ParallelFetch

// RequiresParallelListItemFetch indicates that the single fetches should be executed without batching.
// If we have multiple fetches attached to the object, then after post-processing of a plan
Comment thread
devsergiy marked this conversation as resolved.
// we will get ParallelListItemFetch instead of ParallelFetch.
// Happens only for objects under the array path and used only for the introspection.
RequiresParallelListItemFetch bool
// RequiresEntityFetch will be set to true if the fetch is an entity fetch on an object. After post-processing, we will get EntityFetch

// RequiresEntityFetch will be set to true if the fetch is an entity fetch on an object.
// After post-processing, we will get EntityFetch.
RequiresEntityFetch bool
// RequiresEntityBatchFetch indicates that entity fetches on array items could be batched. After post-processing, we will get EntityBatchFetch

// RequiresEntityBatchFetch indicates that entity fetches on array items should be batched.
// After post-processing, we will get EntityBatchFetch.
RequiresEntityBatchFetch bool
PostProcessing PostProcessingConfiguration

// PostProcessing specifies the data and error extraction path in the response along with
// the merge path where will insert the response.
PostProcessing PostProcessingConfiguration

// SetTemplateOutputToNullOnVariableNull will safely return "null" if one of the template variables renders to null
// This is the case, e.g. when using batching and one sibling is null, resulting in a null value for one batch item
// Returning null in this case tells the batch implementation to skip this item
SetTemplateOutputToNullOnVariableNull bool
QueryPlan *QueryPlan
// CoordinateDependencies contain a list of GraphCoordinates (typeName+fieldName) and which fields from other fetches they depend on

QueryPlan *QueryPlan

// CoordinateDependencies contain a list of GraphCoordinates (typeName+fieldName)
// and which fields from other fetches they depend on.
// This information is useful to understand why a fetch depends on other fetches,
// and how multiple dependencies lead to a chain of fetches
CoordinateDependencies []FetchDependency

// OperationName is non-empty when the operation name is propagated the downstream subgraph fetch.
OperationName string
}

func (fc *FetchConfiguration) Equals(other *FetchConfiguration) bool {
Expand Down
Loading